# Import Libraries

In [None]:
# Installing required packages
!pip install pyspark
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession,SQLContext

# Create Spark Context

In [None]:
spark = SparkSession \
    .builder \
    .appName("ML_Classifications") \
    .getOrCreate()



In [None]:
sc = spark.sparkContext
sqlContext = SQLContext(sc)



# Open File

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

spotify_schema = StructType([
    StructField("id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("artists", StringType(), True),
    StructField("duration_ms", IntegerType(), True),
    StructField("release_date", StringType(), True),
    StructField("year", IntegerType(), True),
    StructField("acousticness", DoubleType(), True),
    StructField("danceability", DoubleType(), True),
    StructField("energy", DoubleType(), True),
    StructField("instrumentalness", DoubleType(), True),
    StructField("liveness", DoubleType(), True),
    StructField("loudness", DoubleType(), True),
    StructField("speechiness", DoubleType(), True),
    StructField("tempo", DoubleType(), True),
    StructField("valence", DoubleType(), True),
    StructField("mode", IntegerType(), True),
    StructField("key", IntegerType(), True),
    StructField("popularity", IntegerType(), True),
    StructField("explicit", IntegerType(), True),
])


In [None]:
file='spotify-data.csv'
df = spark.read.csv(file,header='true', schema=spotify_schema)

In [None]:
df.columns

['id',
 'name',
 'artists',
 'duration_ms',
 'release_date',
 'year',
 'acousticness',
 'danceability',
 'energy',
 'instrumentalness',
 'liveness',
 'loudness',
 'speechiness',
 'tempo',
 'valence',
 'mode',
 'key',
 'popularity',
 'explicit']

In [None]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- artists: string (nullable = true)
 |-- duration_ms: integer (nullable = true)
 |-- release_date: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- danceability: double (nullable = true)
 |-- energy: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: double (nullable = true)
 |-- loudness: double (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- mode: integer (nullable = true)
 |-- key: integer (nullable = true)
 |-- popularity: integer (nullable = true)
 |-- explicit: integer (nullable = true)



In [None]:
df.show()

+--------------------+--------------------+--------------------+-----------+------------+----+------------+------------+-------+----------------+--------+--------+-----------+-------+-------+----+---+----------+--------+
|                  id|                name|             artists|duration_ms|release_date|year|acousticness|danceability| energy|instrumentalness|liveness|loudness|speechiness|  tempo|valence|mode|key|popularity|explicit|
+--------------------+--------------------+--------------------+-----------+------------+----+------------+------------+-------+----------------+--------+--------+-----------+-------+-------+----+---+----------+--------+
|6KbQ3uYMLKb5jDxLF...|Singende Bataillo...| ['Carl Woitschach']|     158648|        1928|1928|       0.995|       0.708|  0.195|           0.563|   0.151| -12.428|     0.0506|118.469|  0.779|   1| 10|         0|       0|
|6KuQTIu1KoTTkLXKr...|Fantasiestücke, O...|['Robert Schumann...|     282133|        1928|1928|       0.994|       0.

In [None]:
df.show()

+--------------------+--------------------+--------------------+-----------+------------+----+------------+------------+-------+----------------+--------+--------+-----------+-------+-------+----+---+----------+--------+
|                  id|                name|             artists|duration_ms|release_date|year|acousticness|danceability| energy|instrumentalness|liveness|loudness|speechiness|  tempo|valence|mode|key|popularity|explicit|
+--------------------+--------------------+--------------------+-----------+------------+----+------------+------------+-------+----------------+--------+--------+-----------+-------+-------+----+---+----------+--------+
|6KbQ3uYMLKb5jDxLF...|Singende Bataillo...| ['Carl Woitschach']|     158648|        1928|1928|       0.995|       0.708|  0.195|           0.563|   0.151| -12.428|     0.0506|118.469|  0.779|   1| 10|         0|       0|
|6KuQTIu1KoTTkLXKr...|Fantasiestücke, O...|['Robert Schumann...|     282133|        1928|1928|       0.994|       0.

In [None]:
df.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
id,169909,,,000G1xMMuwxNHmwVsBdtj1,7zzuPsjj9L3M7ikqGmjN0D
name,169909,Infinity,,!Que Vida! - Mono Version,화려하지 않은 고백 Confession Is Not Flashy
artists,169909,,,"""""Cool"""" (Allegretto)""",['黑豹']
duration_ms,168462,231288.1418480132,120689.8977337405,5108,5403500
release_date,169909,6193.66778670272,42244.4328262541,"""""'Legally Blonde' Ensemble""""]""","['Wolfgang Amadeus Mozart', 'Walter Franck', '..."
year,169217,2364.329204512549,10908.368278421962,1921,710400
acousticness,169624,197.16453702548884,7127.8832830763395,0.0,706400.0
danceability,169771,111.97785567892575,5511.097855467194,0.0,668333.0
energy,169839,70.88351548936565,4321.7813953980385,0.0,503320.0


# Data Cleansing

## Drop Null value

In [None]:
df.count()

169909

In [None]:
from pyspark.sql.functions import col, sum

null_counts = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])
null_counts.show()


+---+----+-------+-----------+------------+----+------------+------------+------+----------------+--------+--------+-----------+-----+-------+----+---+----------+--------+
| id|name|artists|duration_ms|release_date|year|acousticness|danceability|energy|instrumentalness|liveness|loudness|speechiness|tempo|valence|mode|key|popularity|explicit|
+---+----+-------+-----------+------------+----+------------+------------+------+----------------+--------+--------+-----------+-----+-------+----+---+----------+--------+
|  0|   0|      0|       1447|           0| 692|         285|         138|    70|              44|      23|       8|          8|    4|      2|1438|614|       326|     187|
+---+----+-------+-----------+------------+----+------------+------------+------+----------------+--------+--------+-----------+-----+-------+----+---+----------+--------+



In [None]:
df = df.dropna()

In [None]:
#No. of column after drop null
df.count()

168462

## Extract artist name

In [None]:
from pyspark.sql.functions import split, regexp_replace, explode


df_exploded = df.withColumn("artist_array", split(regexp_replace("artists", r"[\[\]']", ""), ", "))
df_exploded = df_exploded.withColumn("artist", explode("artist_array"))


In [None]:
df_exploded.select("artist_array", "artist").show()

+--------------------+-------------------+
|        artist_array|             artist|
+--------------------+-------------------+
|   [Carl Woitschach]|    Carl Woitschach|
|[Robert Schumann,...|    Robert Schumann|
|[Robert Schumann,...|  Vladimir Horowitz|
|[Seweryn Goszczyń...|Seweryn Goszczyński|
|  [Francisco Canaro]|   Francisco Canaro|
|[Frédéric Chopin,...|    Frédéric Chopin|
|[Frédéric Chopin,...|  Vladimir Horowitz|
|[Felix Mendelssoh...|  Felix Mendelssohn|
|[Felix Mendelssoh...|  Vladimir Horowitz|
|[Franz Liszt, Vla...|        Franz Liszt|
|[Franz Liszt, Vla...|  Vladimir Horowitz|
|   [Carl Woitschach]|    Carl Woitschach|
|[Francisco Canaro...|   Francisco Canaro|
|[Francisco Canaro...|             Charlo|
|[Seweryn Goszczyń...|Seweryn Goszczyński|
|[Sergei Rachmanin...|Sergei Rachmaninoff|
|[Sergei Rachmanin...|  Vladimir Horowitz|
|[Frédéric Chopin,...|    Frédéric Chopin|
|[Frédéric Chopin,...|  Vladimir Horowitz|
|[Samuel Barber, V...|      Samuel Barber|
+----------

# Explore Data

In [None]:
#Top 20 artist by number of song
from pyspark.sql.functions import desc
df_exploded.groupBy('artist').count().orderBy(desc('count')).show()

+--------------------+-----+
|              artist|count|
+--------------------+-----+
|    Francisco Canaro| 2234|
|    Эрнест Хемингуэй| 1215|
|     Frédéric Chopin| 1033|
|Ludwig van Beethoven|  935|
|Wolfgang Amadeus ...|  904|
|Johann Sebastian ...|  838|
|   Эрих Мария Ремарк|  781|
|       Frank Sinatra|  732|
|      Billie Holiday|  680|
|    Arturo Toscanini|  628|
|     Igor Stravinsky|  623|
|     Ignacio Corsini|  620|
|   Vladimir Horowitz|  612|
|         Johnny Cash|  589|
|New York Philharm...|  556|
|           Bob Dylan|  553|
|  The Rolling Stones|  522|
|      The Beach Boys|  503|
|     Lata Mangeshkar|  502|
|       Elvis Presley|  501|
+--------------------+-----+
only showing top 20 rows



In [None]:
#Top 20 year with highest song
df.groupBy('year').count().orderBy(desc('count')).show()

+----+-----+
|year|count|
+----+-----+
|1973| 2000|
|1978| 1999|
|1976| 1999|
|2017| 1999|
|1977| 1998|
|2018| 1998|
|1988| 1998|
|1979| 1998|
|2019| 1998|
|1982| 1998|
|1970| 1998|
|1987| 1998|
|2005| 1998|
|1984| 1998|
|2010| 1998|
|1975| 1997|
|2006| 1997|
|1992| 1997|
|1969| 1997|
|1965| 1997|
+----+-----+
only showing top 20 rows



In [None]:
# Top 20 years with least song
df.groupBy('year').count().orderBy('count').show()

+----+-----+
|year|count|
+----+-----+
|1922|   72|
|1921|  128|
|1923|  168|
|1924|  237|
|1925|  262|
|1932|  478|
|1934|  550|
|1938|  576|
|1927|  594|
|1931|  595|
|1937|  596|
|1933|  622|
|1943|  628|
|1944|  769|
|1926|  874|
|1929|  924|
|1941|  956|
|1939|  999|
|1936| 1046|
|1928| 1182|
+----+-----+
only showing top 20 rows



In [None]:
# Number of artist
df_exploded.select("artist").distinct().count()

27389

In [None]:
# Average songs of each artist
from pyspark.sql.functions import count, avg
artist_song_counts = df_exploded.groupBy("artist").agg(count("id").alias("num_songs"))
artist_song_counts.agg(avg("num_songs")).show()

+-----------------+
|   avg(num_songs)|
+-----------------+
|8.036730074117346|
+-----------------+



In [None]:
# Average songs over the year
year_song_counts = df.groupBy("year").agg(count("name").alias("num_songs"))
year_song_counts.agg(avg("num_songs")).show()

+--------------+
|avg(num_songs)|
+--------------+
|       1684.62|
+--------------+



In [None]:
# Total song
df.count()

168462

In [None]:
# Song release in recent years
df.groupBy('year').count().orderBy(desc('year')).show()

+----+-----+
|year|count|
+----+-----+
|2020| 1752|
|2019| 1998|
|2018| 1998|
|2017| 1999|
|2016| 1963|
|2015| 1929|
|2014| 1996|
|2013| 1995|
|2012| 1997|
|2011| 1993|
|2010| 1998|
|2009| 1991|
|2008| 1993|
|2007| 1981|
|2006| 1997|
|2005| 1998|
|2004| 1989|
|2003| 1994|
|2002| 1990|
|2001| 1992|
+----+-----+
only showing top 20 rows



# Save result after clean

In [None]:
df_exploded2 = df_exploded.drop('artist_array')

In [None]:
# Save result after clean
df.write.csv("spotify_clean", header=True)
df_exploded2.write.csv("spotify_artist_clean", header=True)


# Machine Learning

In [None]:
df.show()

+--------------------+--------------------+--------------------+-----------+------------+----+------------+------------+-------+----------------+--------+--------+-----------+-------+-------+----+---+----------+--------+
|                  id|                name|             artists|duration_ms|release_date|year|acousticness|danceability| energy|instrumentalness|liveness|loudness|speechiness|  tempo|valence|mode|key|popularity|explicit|
+--------------------+--------------------+--------------------+-----------+------------+----+------------+------------+-------+----------------+--------+--------+-----------+-------+-------+----+---+----------+--------+
|6KbQ3uYMLKb5jDxLF...|Singende Bataillo...| ['Carl Woitschach']|     158648|        1928|1928|       0.995|       0.708|  0.195|           0.563|   0.151| -12.428|     0.0506|118.469|  0.779|   1| 10|         0|       0|
|6KuQTIu1KoTTkLXKr...|Fantasiestücke, O...|['Robert Schumann...|     282133|        1928|1928|       0.994|       0.

#Regression to predict duration_ms

## Attribute Selection

In [None]:
df_select = df.select('year','acousticness','danceability','energy','instrumentalness','liveness','loudness','speechiness','tempo','valence','mode','key','explicit','duration_ms','popularity')

In [None]:
#Check correlation
features = df_select.columns[:-1]

for col in features:
    corr = df.stat.corr(col, "popularity")
    print(f"Correlation between {col} and popularity: {corr:.4f}")

Correlation between year and popularity: 0.8801
Correlation between acousticness and popularity: -0.5897
Correlation between danceability and popularity: 0.2142
Correlation between energy and popularity: 0.4937
Correlation between instrumentalness and popularity: -0.3030
Correlation between liveness and popularity: -0.0739
Correlation between loudness and popularity: 0.4637
Correlation between speechiness and popularity: -0.1375
Correlation between tempo and popularity: 0.1318
Correlation between valence and popularity: -0.0011
Correlation between mode and popularity: -0.0325
Correlation between key and popularity: 0.0099
Correlation between explicit and popularity: 0.2127
Correlation between duration_ms and popularity: 0.0650


In [None]:
from pyspark.sql import DataFrameNaFunctions
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import LogisticRegression,DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [None]:
df_select = df.select('year','acousticness','danceability','energy','instrumentalness','loudness','tempo','explicit','popularity')

In [None]:
featureColumns = df_select.columns[:-1]

In [None]:
featureColumns

['year',
 'acousticness',
 'danceability',
 'energy',
 'instrumentalness',
 'loudness',
 'tempo',
 'explicit']

In [None]:
(trainData, testData) = df_select.randomSplit([0.8,0.2], seed = 13234 )

In [None]:
assembler = VectorAssembler(inputCols=featureColumns, outputCol="features")
scaler = StandardScaler(inputCol = 'features',outputCol='scaledFeatures',withStd=True,withMean=False)

In [None]:
lr = LinearRegression(featuresCol="scaledFeatures", labelCol="popularity")

In [None]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[assembler,scaler,lr])

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
paramGrid = ParamGridBuilder() \
    .addGrid(lr.fitIntercept, [False, True]) \
    .addGrid(lr.maxIter, [5, 10, 20]) \
    .build()

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(labelCol="popularity", metricName="rmse"),
                          numFolds=5)
cvModel = crossval.fit(trainData)

In [None]:
predictions =  cvModel.transform(testData)

In [None]:
rmse_evaluator = RegressionEvaluator(labelCol="popularity", metricName="rmse")
r2_evaluator = RegressionEvaluator(labelCol="popularity", metricName="r2")
rmse = rmse_evaluator.evaluate(predictions)
r2 = r2_evaluator.evaluate(predictions)

# Print the results
print(f"Root Mean Squared Error (RMSE): {rmse}")
print(f"R-squared (R2): {r2}")

Root Mean Squared Error (RMSE): 10.040384945080456
R-squared (R2): 0.7813629999625085


# Classification

In [None]:
df.groupBy('mode').count().show()

+----+------+
|mode| count|
+----+------+
|   1|119326|
|   0| 49136|
+----+------+



# Attribute Selection

In [None]:
df_select = df.select('year','acousticness','danceability','energy','instrumentalness','liveness','loudness','speechiness','tempo','valence','key','duration_ms','popularity','mode','explicit')
features = df_select.columns[:-1]

for col in features:
    corr = df.stat.corr(col, "explicit")
    print(f"Correlation between {col} and explicit: {corr:.4f}")

Correlation between year and explicit: 0.2440
Correlation between acousticness and explicit: -0.2524
Correlation between danceability and explicit: 0.2410
Correlation between energy and explicit: 0.1410
Correlation between instrumentalness and explicit: -0.1387
Correlation between liveness and explicit: 0.0401
Correlation between loudness and explicit: 0.1513
Correlation between speechiness and explicit: 0.4141
Correlation between tempo and explicit: 0.0105
Correlation between valence and explicit: -0.0251
Correlation between key and explicit: 0.0086
Correlation between duration_ms and explicit: -0.0443
Correlation between popularity and explicit: 0.2127
Correlation between mode and explicit: -0.0832


In [None]:
df_select = df.select('year','acousticness','danceability','energy','instrumentalness','loudness','speechiness','popularity','explicit')

In [None]:
featureColumns = df_select.columns[:-1]

In [None]:
featureColumns

['year',
 'acousticness',
 'danceability',
 'energy',
 'instrumentalness',
 'loudness',
 'speechiness',
 'popularity']

In [None]:
(trainData, testData) = df_select.randomSplit([0.8,0.2], seed = 13234 )

In [None]:
assembler = VectorAssembler(inputCols=featureColumns, outputCol="features")
scaler = StandardScaler(inputCol = 'features',outputCol='scaledFeatures',withStd=True,withMean=False)

In [None]:
lr = LogisticRegression(featuresCol="scaledFeatures", labelCol="explicit")

In [None]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[assembler,scaler,lr])

In [None]:
paramGrid = ParamGridBuilder() \
    .addGrid(lr.fitIntercept, [False, True]) \
    .addGrid(lr.maxIter, [5, 10,20]) \
    .build()

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator = MulticlassClassificationEvaluator(labelCol="explicit", predictionCol="prediction", metricName="accuracy"),
                          numFolds=5)
cvModel = crossval.fit(trainData)

In [None]:
predictions = cvModel.transform(testData)

In [None]:
def evaluate(result):
    predictionAndLabels = result.select("prediction", "explicit")
    metrics = ["f1", "precisionByLabel", "recallByLabel", "weightedPrecision", "weightedRecall", "accuracy"]
    for m in metrics:
        evaluator = MulticlassClassificationEvaluator(labelCol="explicit", predictionCol="prediction", metricName=m)
        print(f"{m}: {evaluator.evaluate(predictionAndLabels):.4f}")


In [None]:
evaluate(predictions)

f1: 0.9334
precisionByLabel: 0.9522
recallByLabel: 0.9834
weightedPrecision: 0.9325
weightedRecall: 0.9395
accuracy: 0.9395


In [None]:
sc.stop()