In [3]:
from pyspark.sql import SparkSession, functions as F, types as T

spark = SparkSession.builder \
    .appName("SpotifyClustering") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://namenode:9000") \
    .getOrCreate()


Let's define a function for checking missing values, no matter the type

In [4]:
def is_missing(colname, dtype):
    c = F.col(colname)
    dt = dtype.lower()

    if dt in ("double", "float"):
        # null OR NaN
        return c.isNull() | F.isnan(c)
    elif dt in ("int", "integer", "bigint", "long", "short"):
        return c.isNull()
    elif dt == "boolean":
        # booleans can't be NaN or empty-string; only null is "missing"
        return c.isNull()
    elif "string" in dt:
        # null OR empty/whitespace-only string
        return c.isNull() | (F.trim(c) == "")
    else:
        # fallback: treat only null as missing
        return c.isNull()

Now let's run the inspection again

In [5]:
path = "hdfs://namenode:9000/spotify/dataset.csv"

# 1) Read with header, no schema (but robust CSV options)
df0 = (spark.read
       .option("header", True)
       .option("multiLine", True)
       .option("quote", '"')
       .option("escape", '"')
       .option("mode", "PERMISSIVE")
       .csv(path))

print(df0.columns)      
#df0.show(3, truncate=False)

# 2) Cast columns by NAME (not by position)
cast_to = {
    "popularity": "int",
    "duration_ms": "int",
    "explicit": "boolean",   
    "danceability": "double",
    "energy": "double",
    "key": "int",
    "loudness": "double",
    "mode": "int",
    "speechiness": "double",
    "acousticness": "double",
    "instrumentalness": "double",
    "liveness": "double",
    "valence": "double",
    "tempo": "double",
    "time_signature": "int",
    "track_genre": "string",
}

df = df0.select(*[
    F.col(c).cast(cast_to[c]).alias(c) if c in cast_to else F.col(c)
    for c in df0.columns
])



['_c0', 'track_id', 'artists', 'album_name', 'track_name', 'popularity', 'duration_ms', 'explicit', 'danceability', 'energy', 'key', 'loudness', 'mode', 'speechiness', 'acousticness', 'instrumentalness', 'liveness', 'valence', 'tempo', 'time_signature', 'track_genre']


In [6]:
row_count = df.count()

# Numeric summary (exclude boolean or cast it to int first)
num_types = {"double","float","int","integer","bigint","long","short"}
num_cols  = [c for c,t in df.dtypes if t.lower() in num_types]
df.select(*[c for c in ["popularity","duration_ms","danceability","energy"] if c in num_cols]) \
  .summary("count","mean","stddev","min","25%","50%","75%","max").show(truncate=False)

df.select(*[c for c in ["key","loudness","mode","speechiness"] if c in num_cols]) \
  .summary("count","mean","stddev","min","25%","50%","75%","max").show(truncate=False)

df.select(*[c for c in ["acousticness","instrumentalness","liveness","valence", "tempo", "time_signature"] if c in num_cols]) \
  .summary("count","mean","stddev","min","25%","50%","75%","max").show(truncate=False)


+-------+------------------+------------------+-------------------+------------------+
|summary|popularity        |duration_ms       |danceability       |energy            |
+-------+------------------+------------------+-------------------+------------------+
|count  |114000            |114000            |114000             |114000            |
|mean   |33.2385350877193  |228029.15311403509|0.5668000657894607 |0.6413827583964953|
|stddev |22.305078493372324|107297.71264491338|0.17354217360214574|0.251529068802541 |
|min    |0                 |0                 |0.0                |0.0               |
|25%    |17                |174062            |0.456              |0.472             |
|50%    |35                |212903            |0.58               |0.684             |
|75%    |50                |261500            |0.695              |0.854             |
|max    |100               |5237295           |0.985              |1.0               |
+-------+------------------+---------------

In [7]:
df_missing=df.select(
    *[ F.count(F.when(
        (F.col(c).isNull() | (F.trim(F.col(c)) == "")) if t=="string"
        else (F.col(c).isNull() | F.isnan(c)) if t in ("double","float")
        else F.col(c).isNull()
    , 1)).alias(c) for c,t in df.dtypes ]
)

df1 = df_missing.select(*(["_c0","artists", "album_name", "track_name","track_genre"]))
df2 = df_missing.select(*(["popularity","duration_ms","danceability","energy"]))
df3 = df_missing.select(*(["key","loudness","mode","speechiness"]))
df4 = df_missing.select(*(["acousticness","instrumentalness","liveness","valence", "tempo", "time_signature"]))

df1.show()
df2.show()
df3.show()
df4.show()

+---+-------+----------+----------+-----------+
|_c0|artists|album_name|track_name|track_genre|
+---+-------+----------+----------+-----------+
|  0|      1|         1|         1|          0|
+---+-------+----------+----------+-----------+

+----------+-----------+------------+------+
|popularity|duration_ms|danceability|energy|
+----------+-----------+------------+------+
|         0|          0|           0|     0|
+----------+-----------+------------+------+

+---+--------+----+-----------+
|key|loudness|mode|speechiness|
+---+--------+----+-----------+
|  0|       0|   0|          0|
+---+--------+----+-----------+

+------------+----------------+--------+-------+-----+--------------+
|acousticness|instrumentalness|liveness|valence|tempo|time_signature|
+------------+----------------+--------+-------+-----+--------------+
|           0|               0|       0|      0|    0|             0|
+------------+----------------+--------+-------+-----+--------------+



In [8]:
# i checked and the slowest tempo is like 10-30bpm so i will remove rows with tempo value under that threshold
#similarly i will remove songs that last less than 30 seconds (they are usually skits)
#i will drop the few null rows
#to make sure that the boolean "explicit" column can be used, i will cast it to int 0/1


In [9]:
#then i will check highly correlated columns to perform feature selection

In [10]:
df.show(3)

+---+--------------------+--------------------+----------------+----------------+----------+-----------+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+------+--------------+-----------+
|_c0|            track_id|             artists|      album_name|      track_name|popularity|duration_ms|explicit|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence| tempo|time_signature|track_genre|
+---+--------------------+--------------------+----------------+----------------+----------+-----------+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+------+--------------+-----------+
|  0|5SuOikwiRyPMVoIQD...|         Gen Hoshino|          Comedy|          Comedy|        73|     230666|   false|       0.676| 0.461|  1|  -6.746|   0|      0.143|      0.0322|         1.01E-6|   0.358|  0.715|87.917|             4|   acoustic|
|  1|4qPNDBW1i3p13qL

In [11]:
#dropping _c0 (i dont think we need it) 
print(df.columns)

if "_c0" in df.columns:
    df = df.drop("_c0")
    
print(df.columns)

['_c0', 'track_id', 'artists', 'album_name', 'track_name', 'popularity', 'duration_ms', 'explicit', 'danceability', 'energy', 'key', 'loudness', 'mode', 'speechiness', 'acousticness', 'instrumentalness', 'liveness', 'valence', 'tempo', 'time_signature', 'track_genre']
['track_id', 'artists', 'album_name', 'track_name', 'popularity', 'duration_ms', 'explicit', 'danceability', 'energy', 'key', 'loudness', 'mode', 'speechiness', 'acousticness', 'instrumentalness', 'liveness', 'valence', 'tempo', 'time_signature', 'track_genre']


In [12]:
#Casting explicit to (0/1) (more usable) (boolean columns still there)
print(df.dtypes)
if "explicit" in df.columns and dict(df.dtypes)["explicit"] == "boolean":
    df = df.withColumn("explicit_int", F.col("explicit").cast("int"))
print(df.dtypes)
    

[('track_id', 'string'), ('artists', 'string'), ('album_name', 'string'), ('track_name', 'string'), ('popularity', 'int'), ('duration_ms', 'int'), ('explicit', 'boolean'), ('danceability', 'double'), ('energy', 'double'), ('key', 'int'), ('loudness', 'double'), ('mode', 'int'), ('speechiness', 'double'), ('acousticness', 'double'), ('instrumentalness', 'double'), ('liveness', 'double'), ('valence', 'double'), ('tempo', 'double'), ('time_signature', 'int'), ('track_genre', 'string')]
[('track_id', 'string'), ('artists', 'string'), ('album_name', 'string'), ('track_name', 'string'), ('popularity', 'int'), ('duration_ms', 'int'), ('explicit', 'boolean'), ('danceability', 'double'), ('energy', 'double'), ('key', 'int'), ('loudness', 'double'), ('mode', 'int'), ('speechiness', 'double'), ('acousticness', 'double'), ('instrumentalness', 'double'), ('liveness', 'double'), ('valence', 'double'), ('tempo', 'double'), ('time_signature', 'int'), ('track_genre', 'string'), ('explicit_int', 'int')]

In [13]:
ex = df.select(["explicit", "explicit_int"]).limit(5)
ex.show()

+--------+------------+
|explicit|explicit_int|
+--------+------------+
|   false|           0|
|   false|           0|
|   false|           0|
|   false|           0|
|   false|           0|
+--------+------------+



In [14]:
#Range filters
conds = []
def add(c, expr):
    if c in df.columns: conds.append(expr)

add("danceability", F.col("danceability").between(0,1))
add("energy",       F.col("energy").between(0,1))
add("valence",      F.col("valence").between(0,1))
add("acousticness", F.col("acousticness").between(0,1))
add("instrumentalness", F.col("instrumentalness").between(0,1))
add("liveness",     F.col("liveness").between(0,1))
add("speechiness",  F.col("speechiness").between(0,1))
add("loudness",     F.col("loudness").between(-60, 5))
add("tempo",        F.col("tempo").between(30, 260))
add("duration_ms",  F.col("duration_ms").between(30000, 600000)) #30s - 10min
add("popularity",   F.col("popularity").between(0, 100))
add("key",          F.col("key").between(0, 11))
add("mode",         F.col("mode").isin(0,1))
add("time_signature", F.col("time_signature").between(1, 12))

clean = df
for c in conds:
    clean = clean.filter(c)

Let's check again the summary

In [15]:
clean.select(*[c for c in ["popularity","duration_ms","danceability","energy"] if c in num_cols]) \
  .summary("count","mean","stddev","min","25%","50%","75%","max").show(truncate=False)

clean.select(*[c for c in ["key","loudness","mode","speechiness"] if c in num_cols]) \
  .summary("count","mean","stddev","min","25%","50%","75%","max").show(truncate=False)

clean.select(*[c for c in ["acousticness","instrumentalness","liveness","valence", "tempo", "time_signature"] if c in num_cols]) \
  .summary("count","mean","stddev","min","25%","50%","75%","max").show(truncate=False)

+-------+-----------------+------------------+-------------------+-------------------+
|summary|popularity       |duration_ms       |danceability       |energy             |
+-------+-----------------+------------------+-------------------+-------------------+
|count  |113221           |113221            |113221             |113221             |
|mean   |33.28348981196068|224380.62709214722|0.5683106817639713 |0.6427303792582697 |
|stddev |22.33003832156068|79883.31406310928 |0.17180716358961456|0.25045856932708677|
|min    |0                |30080             |0.0513             |2.02E-5            |
|25%    |17               |174002            |0.457              |0.473              |
|50%    |35               |212596            |0.581              |0.686              |
|75%    |50               |260532            |0.695              |0.854              |
|max    |100              |599999            |0.985              |1.0                |
+-------+-----------------+----------------

In [16]:
clean_missing=clean.select(
    *[ F.count(F.when(
        (F.col(c).isNull() | (F.trim(F.col(c)) == "")) if t=="string"
        else (F.col(c).isNull() | F.isnan(c)) if t in ("double","float")
        else F.col(c).isNull()
    , 1)).alias(c) for c,t in clean.dtypes ]
)

c1 = clean_missing.select(*(["artists", "album_name", "track_name","track_genre"]))
c2 = clean_missing.select(*(["popularity","duration_ms","danceability","energy"]))
c3 = clean_missing.select(*(["key","loudness","mode","speechiness"]))
c4 = clean_missing.select(*(["acousticness","instrumentalness","liveness","valence", "tempo", "time_signature"]))

c1.show()
c2.show()
c3.show()
c4.show()

+-------+----------+----------+-----------+
|artists|album_name|track_name|track_genre|
+-------+----------+----------+-----------+
|      0|         0|         0|          0|
+-------+----------+----------+-----------+

+----------+-----------+------------+------+
|popularity|duration_ms|danceability|energy|
+----------+-----------+------------+------+
|         0|          0|           0|     0|
+----------+-----------+------------+------+

+---+--------+----+-----------+
|key|loudness|mode|speechiness|
+---+--------+----+-----------+
|  0|       0|   0|          0|
+---+--------+----+-----------+

+------------+----------------+--------+-------+-----+--------------+
|acousticness|instrumentalness|liveness|valence|tempo|time_signature|
+------------+----------------+--------+-------+-----+--------------+
|           0|               0|       0|      0|    0|             0|
+------------+----------------+--------+-------+-----+--------------+



No null values! 
Dataset is clean
I will now check highly correlated features 

In [17]:
num_types = {"double","float","int","integer","bigint","long","short"}
num_cols  = [c for c,t in clean.dtypes if t.lower() in num_types]

pairs = []
for i,c1 in enumerate(num_cols):
    for c2 in num_cols[i+1:]:
        rho = clean.stat.corr(c1, c2)
        pairs.append((c1, c2, float(rho) if rho is not None else None))
spark.createDataFrame(pairs, ["f1","f2","pearson_r"]).orderBy(F.abs(F.col("pearson_r")).desc_nulls_last()).show(20, truncate=False)

+----------------+----------------+--------------------+
|f1              |f2              |pearson_r           |
+----------------+----------------+--------------------+
|energy          |loudness        |0.761805614335919   |
|energy          |acousticness    |-0.7365100740062134 |
|loudness        |acousticness    |-0.5918298718748708 |
|danceability    |valence         |0.47147701999399744 |
|loudness        |instrumentalness|-0.42981385902141495|
|instrumentalness|valence         |-0.3193113416812313 |
|speechiness     |explicit_int    |0.306978232020246   |
|loudness        |valence         |0.27261413433040516 |
|energy          |valence         |0.2533012125608733  |
|danceability    |loudness        |0.24691673744916787 |
|energy          |tempo           |0.23901142467640327 |
|acousticness    |tempo           |-0.2068352579243168 |
|speechiness     |liveness        |0.20514763194825927 |
|loudness        |tempo           |0.19963588097863136 |
|energy          |liveness     

In [21]:
# We try to see if some song are there several time
pandas_clean = clean.toPandas()
print(type(pandas_clean))
nb_duplicate = pandas_clean.duplicated(subset="track_id", keep = "first").sum()
print("Number of duplicate : ", nb_duplicate)

<class 'pandas.core.frame.DataFrame'>
Number of duplicate :  24203


In [27]:
# Since we found some duplicate, we drop them
clean = clean.dropDuplicates(["track_id"])

In [30]:
clean.write.mode("overwrite").option("header", "true").csv("hdfs://namenode:9000/spotify/clean_data")

89018
