In [26]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.clustering import GaussianMixture, GaussianMixtureModel
import numpy as np

from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

In [2]:
# creating spark app
spark = SparkSession.builder.appName("gmm_implementation").getOrCreate()

#reading in the CSV file 
# local_file = 'numpy_array_for_modeling_with_cathegorical_columns.csv'
cloud_fle = 'gs://term-project-fall-202511162025/running_big_data/big_data_ready_for_modeling.csv'
df = spark.read.csv(local_file,
                    header=True, inferSchema = True)

df.printSchema()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/02 13:11:14 WARN Utils: Your hostname, Bens-MacBook-Air-7.local, resolves to a loopback address: 127.0.0.1; using 172.20.60.48 instead (on interface en0)
25/12/02 13:11:14 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/02 13:11:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


root
 |-- title: string (nullable = true)
 |-- artist: string (nullable = true)
 |-- album: string (nullable = true)
 |-- af_danceability: double (nullable = true)
 |-- af_energy: double (nullable = true)
 |-- af_loudness: double (nullable = true)
 |-- af_speechiness: double (nullable = true)
 |-- af_acousticness: double (nullable = true)
 |-- af_instrumentalness: double (nullable = true)
 |-- af_valence: double (nullable = true)
 |-- af_tempo: double (nullable = true)
 |-- language_id: double (nullable = true)



In [3]:
columns_to_clean = ['title','artist', 'album']
# we are removing quotations from the titles
df = df.select(
    *[
        F.regexp_replace(c, r"[\"']", "").alias(c)
        if c in columns_to_clean else c
        for c in df.columns
    ]
)

In [4]:
input_columns =  [
    "af_danceability",
    "af_energy",
    "af_loudness",
    "af_speechiness",
    "af_acousticness",
    "af_instrumentalness",
    "af_valence",
    "af_tempo",
    "language_id"
]
vector_assembler = VectorAssembler(inputCols = input_columns, 
                            outputCol = 'vectorized_features')
vector_dataframe = vector_assembler.transform(df) # df is our data without being vectorized
transformed_vectors = vector_dataframe.select('vectorized_features') # selecting the vectorized features so we can use them in a model
transformed_vectors.show(5)

+--------------------+
| vectorized_features|
+--------------------+
|[0.666,0.83,-5.71...|
|[0.818,0.803,-4.2...|
|[0.849,0.759,-6.2...|
|[0.731,0.863,-5.3...|
|[0.745,0.875,-4.2...|
+--------------------+
only showing top 5 rows


In [29]:
#scaling the data for k-means since it's a distance based algorithm 
scaler = StandardScaler(inputCol = 'vectorized_features',
                       outputCol = 'scaled_vectorized_features',
                       withStd = True,
                       withMean = False)
#check the summary statistics of our resutls by fitting the standard scaler
scalerModel = scaler.fit(vector_dataframe)

#normalizing each feature to have unit standard deviation
vector_dataframe_scaled = scalerModel.transform(vector_dataframe)

#showing the scaled features
vector_dataframe_scaled.select('scaled_vectorized_features').show(5)
vector_dataframe_scaled.show(1)

+--------------------------+
|scaled_vectorized_features|
+--------------------------+
|      [7.30590220636553...|
|      [8.97331532253304...|
|      [9.31337983964615...|
|      [8.01894070998979...|
|      [8.17251823384733...|
+--------------------------+
only showing top 5 rows
+--------------------+-----------------+--------------------+---------------+---------+-----------+--------------+---------------+-------------------+----------+--------+-----------+--------------------+--------------------------+
|               title|           artist|               album|af_danceability|af_energy|af_loudness|af_speechiness|af_acousticness|af_instrumentalness|af_valence|af_tempo|language_id| vectorized_features|scaled_vectorized_features|
+--------------------+-----------------+--------------------+---------------+---------+-----------+--------------+---------------+-------------------+----------+--------+-----------+--------------------+--------------------------+
|CANT STOP THE FEE...|

In [6]:
results = [] 
for i in range(2,11): 
    gmm_model = GaussianMixture(k = i,
                                featuresCol = 'scaled_vectorized_features', 
                                predictionCol = 'gmm_predictions') 
    model = gmm_model.fit(vector_dataframe_scaled) 
    lllh = model.summary.logLikelihood 
    results.append((i,lllh)) 

print(results)

25/12/02 13:11:19 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
25/12/02 13:11:20 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/12/02 13:11:21 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


[(2, -1234.2195756651586), (3, -971.6766521476645), (4, -712.8857740318316), (5, -737.6924212765223), (6, 1621.8753893362273), (7, 1878.1726614338481), (8, 2453.1373187199483), (9, 379.0702216518933), (10, 2520.2154976815673)]


In [7]:
best_k = max(results, key=lambda x: x[1])[0]
print("Best K =", best_k)

Best K = 10


In [8]:
n_components = best_k

gmm = GaussianMixture(k = n_components, 
                      featuresCol = 'scaled_vectorized_features', 
                      predictionCol = 'gmm_predictions')
model = gmm.fit(vector_dataframe_scaled)
# predict where each song lies 
df_with_gmm_predictions = model.transform(vector_dataframe_scaled)

In [9]:
english_df  = df_with_gmm_predictions.filter("language_id = 0")
spanish_df  = df_with_gmm_predictions.filter("language_id = 1")

In [10]:
from pyspark.ml.functions import vector_to_array

df_json_ready = df_with_gmm_predictions.withColumn("vectorized_features", vector_to_array("vectorized_features")).withColumn("scaled_vectorized_features", vector_to_array("scaled_vectorized_features")).withColumn("probability", vector_to_array("probability"))

In [30]:
model.write().overwrite().save("models/gmm_model")
scalerModel.write().overwrite().save("models/scaler_model")

In [11]:


# for k in range(num_clusters):
    
#     # Filter cluster k
#     cluster_df = df_json_ready.filter(f"gmm_predictions = {k}")
    
#     # English
#     eng_rows = cluster_df.filter("language_id = 0").toPandas().to_dict(orient="records")
    
#     # Spanish
#     spa_rows = cluster_df.filter("language_id = 1").toPandas().to_dict(orient="records")
    
#     # Build JSON object
#     result = {
#         f"cluster_{k}": {
#             "english": eng_rows,
#             "spanish": spa_rows
#         }
#     }
    
#     # Write JSON file
#     with open(f"{output_path}/cluster_{k}.json", "w", encoding="utf-8") as f:
#         json.dump(result, f, ensure_ascii=False, indent=2)

#     print(f"Saved cluster_{k}.json")





In [12]:
import json

num_clusters = best_k   # earlier we computed best_k

#we have to create the folder before we run this

output_path = "gmm_cluster_outputs"  # folder to store JSON files

def df_to_dict_list(df):
    return [row.asDict() for row in df.collect()]   # stays distributed

for k in range(best_k):
    cluster_df = df_json_ready.filter(f"gmm_predictions = {k}")
    
    eng = df_to_dict_list(cluster_df.filter("language_id = 0"))
    spa = df_to_dict_list(cluster_df.filter("language_id = 1"))
    
    result = {
        f"cluster_{k}": {
            "english": eng,
            "spanish": spa
        }
    }
    
    with open(f"{output_path}/cluster_{k}.json", "w", encoding="utf-8") as f:
        json.dump(result, f, ensure_ascii=False, indent=2)


In [13]:
df_with_gmm_predictions.printSchema()

root
 |-- title: string (nullable = true)
 |-- artist: string (nullable = true)
 |-- album: string (nullable = true)
 |-- af_danceability: double (nullable = true)
 |-- af_energy: double (nullable = true)
 |-- af_loudness: double (nullable = true)
 |-- af_speechiness: double (nullable = true)
 |-- af_acousticness: double (nullable = true)
 |-- af_instrumentalness: double (nullable = true)
 |-- af_valence: double (nullable = true)
 |-- af_tempo: double (nullable = true)
 |-- language_id: double (nullable = true)
 |-- vectorized_features: vector (nullable = true)
 |-- scaled_vectorized_features: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- gmm_predictions: integer (nullable = false)



In [14]:
df_with_gmm_predictions.select('gmm_predictions').show(10)

+---------------+
|gmm_predictions|
+---------------+
|              2|
|              7|
|              4|
|              0|
|              0|
|              0|
|              4|
|              0|
|              0|
|              0|
+---------------+
only showing top 10 rows


In [15]:
# check the distribution of clusters
df_with_gmm_predictions.groupBy('gmm_predictions').count().show()

+---------------+-----+
|gmm_predictions|count|
+---------------+-----+
|              1|    4|
|              6|    8|
|              3|    5|
|              5|    6|
|              9|    4|
|              4|    9|
|              8|    5|
|              7|    9|
|              2|   22|
|              0|  112|
+---------------+-----+



In [17]:
# show some language_id 0 songs in cluster 1
df_english_cluster = df_with_gmm_predictions.filter((df_with_gmm_predictions.language_id == 0) & (df_with_gmm_predictions.gmm_predictions == 3)).show(5)

+--------------------+--------------------+--------------------+---------------+---------+-----------+--------------+---------------+-------------------+----------+--------+-----------+--------------------+--------------------------+--------------------+---------------+
|               title|              artist|               album|af_danceability|af_energy|af_loudness|af_speechiness|af_acousticness|af_instrumentalness|af_valence|af_tempo|language_id| vectorized_features|scaled_vectorized_features|         probability|gmm_predictions|
+--------------------+--------------------+--------------------+---------------+---------+-----------+--------------+---------------+-------------------+----------+--------+-----------+--------------------+--------------------------+--------------------+---------------+
|Bailando - Spanis...|Enrique Iglesias,...|SEX AND LOVE (Del...|          0.723|    0.777|     -3.503|         0.108|         0.0426|            3.68E-6|     0.961|  91.017|        0.0|[0

In [18]:
df_cluster0_eng = df_with_gmm_predictions.filter((df_with_gmm_predictions.gmm_predictions == 0) & (df_with_gmm_predictions.language_id == 0))
df_cluster0_eng.show(1)

+-----+------+-----+---------------+---------+-----------+--------------+---------------+-------------------+----------+--------+-----------+-------------------+--------------------------+-----------+---------------+
|title|artist|album|af_danceability|af_energy|af_loudness|af_speechiness|af_acousticness|af_instrumentalness|af_valence|af_tempo|language_id|vectorized_features|scaled_vectorized_features|probability|gmm_predictions|
+-----+------+-----+---------------+---------+-----------+--------------+---------------+-------------------+----------+--------+-----------+-------------------+--------------------------+-----------+---------------+
+-----+------+-----+---------------+---------+-----------+--------------+---------------+-------------------+----------+--------+-----------+-------------------+--------------------------+-----------+---------------+



In [None]:
# df_cluster0_eng.coalesce(1).write.csv(
#     "output/cluster0_eng",
#     header=True,
#     mode="overwrite"
# )

AnalysisException: [UNSUPPORTED_DATA_TYPE_FOR_DATASOURCE] The CSV datasource doesn't support the column `vectorized_features` of the type "STRUCT<type: TINYINT, size: INT, indices: ARRAY<INT>, values: ARRAY<DOUBLE>>".