# Data Exploration and Machine Learning with Spark

### Imports

In [0]:
import pandas as pd
import matplotlib.pyplot as plt

from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
from pyspark.sql.types import FloatType, DateType

from pyspark.ml.classification import DecisionTreeClassifier, LogisticRegression, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from sklearn.metrics import confusion_matrix

%matplotlib inline

### Data

In [0]:
df = spark.read.format("csv")\
    .option("header", "true")\
    .option("delimiter", ",")\
    .load("dbfs:/FileStore/shared_uploads/amine.ait-amalik@ut-capitole.fr/tracks.csv")

In [0]:
df.printSchema()
df.show(n=5)

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- duration_ms: string (nullable = true)
 |-- explicit: string (nullable = true)
 |-- artists: string (nullable = true)
 |-- id_artists: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- danceability: string (nullable = true)
 |-- energy: string (nullable = true)
 |-- key: string (nullable = true)
 |-- loudness: string (nullable = true)
 |-- mode: string (nullable = true)
 |-- speechiness: string (nullable = true)
 |-- acousticness: string (nullable = true)
 |-- instrumentalness: string (nullable = true)
 |-- liveness: string (nullable = true)
 |-- valence: string (nullable = true)
 |-- tempo: string (nullable = true)
 |-- time_signature: string (nullable = true)

+--------------------+--------------------+----------+-----------+--------+-------------------+--------------------+------------+------------+------+---+--------+----+-----------+-----

## I. Data Cleaning

#### Convert some variables to Float

In [0]:
df = df.withColumn("danceability", fn.col("danceability").cast(FloatType()))\
    .withColumn("tempo", fn.col("tempo").cast(FloatType()))\
    .withColumn("loudness", fn.col("loudness").cast(FloatType()))\
    .withColumn("duration_ms", fn.col("duration_ms").cast(FloatType()))

#### Keep only interesting columns

In [0]:
df = df.select("id", "name", "popularity", "duration_ms", "artists",
          "release_date", "danceability", "loudness", "tempo")

df.printSchema()
df.show(n=5)

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- duration_ms: float (nullable = true)
 |-- artists: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- danceability: float (nullable = true)
 |-- loudness: float (nullable = true)
 |-- tempo: float (nullable = true)

+--------------------+--------------------+----------+-----------+-------------------+------------+------------+--------+-------+
|                  id|                name|popularity|duration_ms|            artists|release_date|danceability|loudness|  tempo|
+--------------------+--------------------+----------+-----------+-------------------+------------+------------+--------+-------+
|35iwgR4jXetI318WE...|               Carve|         6|   126903.0|            ['Uli']|  1922-02-22|       0.645| -13.338|104.851|
|021ht4sdgPcrDgSk7...|Capítulo 2.16 - B...|         0|    98200.0|['Fernando Pessoa']|  1922-06-01|       0.695| -22.13

#### Drop NAs

In [0]:
print(f"df shape: {df.count()} rows and {len(df.columns)} columns")

df shape: 586672 rows and 9 columns


In [0]:
clean_df = df.na.drop()

In [0]:
print(f"clean_df shape: {clean_df.count()} rows and {len(clean_df.columns)} columns")

clean_df shape: 584116 rows and 9 columns


#### Dummy variable indicating if release date > 2000

In [0]:
clean_df = clean_df.withColumn(
    "after_1980",
    fn.when(
        (fn.col("release_date") > 1980), 1
    ).otherwise(0)
)

clean_df.show(n=5)

+--------------------+--------------------+----------+-----------+-------------------+------------+------------+--------+-------+----------+
|                  id|                name|popularity|duration_ms|            artists|release_date|danceability|loudness|  tempo|after_1980|
+--------------------+--------------------+----------+-----------+-------------------+------------+------------+--------+-------+----------+
|35iwgR4jXetI318WE...|               Carve|         6|   126903.0|            ['Uli']|  1922-02-22|       0.645| -13.338|104.851|         0|
|021ht4sdgPcrDgSk7...|Capítulo 2.16 - B...|         0|    98200.0|['Fernando Pessoa']|  1922-06-01|       0.695| -22.136|102.009|         0|
|07A5yehtSnoedViJA...|Vivo para Querert...|         0|   181640.0|['Ignacio Corsini']|  1922-03-21|       0.434|  -21.18|130.418|         0|
|08FmqUhxtyLTn6pAh...|El Prisionero - R...|         0|   176907.0|['Ignacio Corsini']|  1922-03-21|       0.321| -27.961| 169.98|         0|
|08y9GfoqCWfO

#### Variable indicating the year

In [0]:
clean_df = clean_df.withColumn(
    "year",
    fn.year("release_date")
)

clean_df.show(n=5)

+--------------------+--------------------+----------+-----------+-------------------+------------+------------+--------+-------+----------+----+
|                  id|                name|popularity|duration_ms|            artists|release_date|danceability|loudness|  tempo|after_1980|year|
+--------------------+--------------------+----------+-----------+-------------------+------------+------------+--------+-------+----------+----+
|35iwgR4jXetI318WE...|               Carve|         6|   126903.0|            ['Uli']|  1922-02-22|       0.645| -13.338|104.851|         0|1922|
|021ht4sdgPcrDgSk7...|Capítulo 2.16 - B...|         0|    98200.0|['Fernando Pessoa']|  1922-06-01|       0.695| -22.136|102.009|         0|1922|
|07A5yehtSnoedViJA...|Vivo para Querert...|         0|   181640.0|['Ignacio Corsini']|  1922-03-21|       0.434|  -21.18|130.418|         0|1922|
|08FmqUhxtyLTn6pAh...|El Prisionero - R...|         0|   176907.0|['Ignacio Corsini']|  1922-03-21|       0.321| -27.961| 16

#### Dummy variable indicating if popularity is above 50

In [0]:
clean_df = clean_df.withColumn(
    "popular_af",
    fn.when(
        (fn.col("popularity") > 50), 1
    ).otherwise(0)
)

clean_df.show(n=5)

+--------------------+--------------------+----------+-----------+-------------------+------------+------------+--------+-------+----------+----+----------+
|                  id|                name|popularity|duration_ms|            artists|release_date|danceability|loudness|  tempo|after_1980|year|popular_af|
+--------------------+--------------------+----------+-----------+-------------------+------------+------------+--------+-------+----------+----+----------+
|35iwgR4jXetI318WE...|               Carve|         6|   126903.0|            ['Uli']|  1922-02-22|       0.645| -13.338|104.851|         0|1922|         0|
|021ht4sdgPcrDgSk7...|Capítulo 2.16 - B...|         0|    98200.0|['Fernando Pessoa']|  1922-06-01|       0.695| -22.136|102.009|         0|1922|         0|
|07A5yehtSnoedViJA...|Vivo para Querert...|         0|   181640.0|['Ignacio Corsini']|  1922-03-21|       0.434|  -21.18|130.418|         0|1922|         0|
|08FmqUhxtyLTn6pAh...|El Prisionero - R...|         0|   1

#### Split into popular and unpopular (above or below 50) data sets

In [0]:
pop_df = clean_df.filter(clean_df.popularity > 50)

print(f"Number of popular hits: {pop_df.count()}")

Number of popular hits: 69430


In [0]:
unpop_df = clean_df.filter(clean_df.popularity <= 50)

print(f"Number of UNpopular hits: {unpop_df.count()}")

Number of UNpopular hits: 514065


## II. Analysis

#### A) Popularity by Year

In [0]:
popularity_by_year = clean_df.groupBy("year").agg(
    fn.min("popularity").alias("min_popularity"),\
    fn.max("popularity").alias("max_popularity"),\
    fn.mean("popularity").alias("avg_popularity")
)

print("Highest Average Popularity")
popularity_by_year.orderBy("avg_popularity", ascending=False).show(5)

print("Lowest Average Popularity")
popularity_by_year.orderBy("avg_popularity", ascending=True).show(5)

Highest Average Popularity
+----+--------------+--------------+------------------+
|year|min_popularity|max_popularity|    avg_popularity|
+----+--------------+--------------+------------------+
|2019|             0|            94|44.910613584715094|
|2020|             0|            97| 44.68210586881473|
|2017|             0|             9| 42.22243615727604|
|2018|             0|             9| 42.15595489135418|
|2016|             0|             9| 39.29119536693512|
+----+--------------+--------------+------------------+
only showing top 5 rows

Lowest Average Popularity
+----+--------------+--------------+--------------------+
|year|min_popularity|max_popularity|      avg_popularity|
+----+--------------+--------------+--------------------+
|1922|             0|             6|0.057971014492753624|
|1929|             0|             9|  0.3431111111111111|
|1924|             0|             9|  0.6129541864139021|
|1927|             0|             9|  0.6532999164578112|
|1934|      

We notice that 2019 is the year with the highest average song popularity while the worst is 1922. The older the song, the less popular so we can understand this result.   
However, there seems to be a problem because there are many years where the max popularity is extremely low (around 9). This is not normal because it is hard to believe that years like 2017, 2018 or 2016 have a max popularity of 9. We consider this to be a problem in the computation of the popularity variable, which is inherent to the original data set. No information has been given by the data set author regarding this variable.

### B) Artist Popularity

In [0]:
popularity_by_artists = clean_df.groupBy("artists")\
    .agg({"popularity": "sum"})\
    .filter(fn.col("sum(popularity)").isNotNull())

popularity_by_artists.orderBy("sum(popularity)", ascending=False).show(5)
popularity_by_artists.orderBy("sum(popularity)", ascending=True).show(5)

+--------------------+---------------+
|             artists|sum(popularity)|
+--------------------+---------------+
|    ['Die drei ???']|       140705.0|
|['TKKG Retro-Arch...|        58554.0|
| ['Bibi Blocksberg']|        50096.0|
|['Benjamin Blümch...|        42371.0|
|   ['Bibi und Tina']|        32037.0|
+--------------------+---------------+
only showing top 5 rows

+--------------------+---------------+
|             artists|sum(popularity)|
+--------------------+---------------+
|['Georgia Mitaki'...|            0.0|
|['Carmen Miranda'...|            0.0|
|['Giorgos Papasid...|            0.0|
|['Anantadev Mukhe...|            0.0|
|['Kalyani', 'Kant...|            0.0|
+--------------------+---------------+
only showing top 5 rows



**Die drei ???** is the most popular artist apparently. After looking it up, we found out that it is actually a book series, probably transformed into a podcast or an e-book on Spotify.  
After looking up the rest of the top artists, we noticed that they were all more or less German audio books or artists for children. This shows a clear bias in this data set for the popularity variable.

In [0]:
pop_artists = clean_df.groupBy("artists")\
    .agg({"popularity": "sum"})\
    .filter(fn.col("sum(popularity)") != 0)\
    .count()

print(f"Number of artists with popularity higher than 0: {pop_artists}")

Number of artists with popularity higher than 0: 103754


In [0]:
zeropop_artists = clean_df.groupBy("artists")\
    .agg({"popularity": "sum"})\
    .filter(fn.col("sum(popularity)") == 0)\
    .count()

print(f"Number of artists with 0 popularity: {zeropop_artists}")

Number of artists with 0 popularity: 8605


# II. Machine Learning

## MLLIB Part

#### Concatenate features in Dense Vector

In [0]:
vect_ass = VectorAssembler(
    inputCols=["duration_ms", "danceability", "loudness", "tempo", "after_1980"],
    outputCol="features"
)

In [0]:
vect_ass_df = vect_ass.transform(clean_df)
vect_ass_df = vect_ass_df.select(["features", "popular_af"])\
    .withColumnRenamed("popular_af", "label")

vect_ass_df.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[126903.0,0.64499...|    0|
|[98200.0,0.694999...|    0|
|[181640.0,0.43399...|    0|
|[176907.0,0.32100...|    0|
|[163080.0,0.40200...|    0|
+--------------------+-----+
only showing top 5 rows



#### Train Test Split

In [0]:
train_data, test_data = vect_ass_df.randomSplit([0.8, 0.2])

In [0]:
print(f"All data length: {vect_ass_df.count()}")
print(f"Train data length: {train_data.count()}")
print(f"Test data length: {test_data.count()}")

All data length: 584116
Train data length: 467138
Test data length: 116978


#### Decision Tree Classifier

In [0]:
dt = DecisionTreeClassifier(
    featuresCol="features",
    labelCol="label"
)

fitted_dt = dt.fit(train_data)
dt_test_preds = fitted_dt.transform(test_data)

In [0]:
dt_test_preds.show(5)

+--------------------+-----+------------------+--------------------+----------+
|            features|label|     rawPrediction|         probability|prediction|
+--------------------+-----+------------------+--------------------+----------+
|(5,[0,2],[4000.0,...|    0|[411715.0,55423.0]|[0.88135625875009...|       0.0|
|(5,[0,2],[10371.0...|    0|[411715.0,55423.0]|[0.88135625875009...|       0.0|
|(5,[0,2],[13600.0...|    0|[411715.0,55423.0]|[0.88135625875009...|       0.0|
|(5,[0,2],[13959.0...|    0|[411715.0,55423.0]|[0.88135625875009...|       0.0|
|(5,[0,2],[14733.0...|    0|[411715.0,55423.0]|[0.88135625875009...|       0.0|
+--------------------+-----+------------------+--------------------+----------+
only showing top 5 rows



#### Evaluation

In [0]:
area_under_roc_evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="prediction", labelCol="label", metricName="areaUnderROC"
)
area_under_pr_evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="prediction", labelCol="label", metricName="areaUnderPR"
)

dt_area_under_roc = area_under_roc_evaluator.evaluate(dt_test_preds)
dt_area_under_pr_evaluator = area_under_pr_evaluator.evaluate(dt_test_preds)

 
print("Decision Tree Prediction areaUnderROC: ", dt_area_under_roc)
print("Decision Tree Prediction areaUnderPR: ", dt_area_under_pr_evaluator)


y_dt_pred = dt_test_preds.select("prediction").collect()
y_dt_orig = dt_test_preds.select("label").collect()

dt_cm = confusion_matrix(y_dt_orig, y_dt_pred)
print("Confusion Matrix:")
print(dt_cm)

Decision Tree Prediction areaUnderROC:  0.5
Decision Tree Prediction areaUnderPR:  0.11974046401887535
Confusion Matrix:
[[102971      0]
 [ 14007      0]]


## Pipeline Part

You'll also train another model using Pipelines:

- Creating a pipeline with at least one feature extraction/manipulation and one model estimator
- Fitting the pipeline to the training data
- Applying the model to the test data and computing the errors

In [0]:
train_df, test_df = clean_df.randomSplit([0.9, 0.1])

#### Gradient Boosted Tree Model

In [0]:
gb_model = GBTClassifier(
    labelCol="popular_af"
)

#### Logistic Regression Model

In [0]:
lr_model = LogisticRegression(
    maxIter=1000,
    regParam=0.1,
    elasticNetParam=0.2,
    standardization=False,
    labelCol="popular_af",
    threshold=0.1
)

## Logistic Pipeline

In [0]:
v_ass = VectorAssembler(
    inputCols=["duration_ms", "danceability", "loudness", "tempo", "after_1980"],
    outputCol="features"
)

In [0]:
pipe = Pipeline(
    stages=[v_ass, lr_model]
)

fitted_lr = pipe.fit(train_df)

In [0]:
lr_test_preds = fitted_lr.transform(test_df)

#### Evaluation

In [0]:
auROC_evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="prediction", labelCol="popular_af", metricName="areaUnderROC"
)
auPR_evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="prediction", labelCol="popular_af", metricName="areaUnderPR"
)

lr_area_under_roc = auROC_evaluator.evaluate(lr_test_preds)
lr_area_under_pr_evaluator = auPR_evaluator.evaluate(lr_test_preds)

 
print("Logistic Regression Prediction areaUnderROC: ", lr_area_under_roc)
print("Logistic Regression Prediction areaUnderPR: ", lr_area_under_pr_evaluator)


y_lr_pred = lr_test_preds.select("prediction").collect()
y_lr_orig = lr_test_preds.select("popular_af").collect()

lr_cm = confusion_matrix(y_lr_orig, y_lr_pred)
print("Confusion Matrix:")
print(lr_cm)

Logistic Regression Prediction areaUnderROC:  0.6386561369848354
Logistic Regression Prediction areaUnderPR:  0.16466001454241766
Confusion Matrix:
[[23062 28374]
 [ 1190  5767]]


## GBT Pipeline

In [0]:
gb_pipe = Pipeline(
    stages=[v_ass, gb_model]
)

fitted_gb = pipe.fit(train_df)

In [0]:
gb_test_preds = fitted_gb.transform(test_df)

#### Evaluation

In [0]:
auROC_evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="prediction", labelCol="popular_af", metricName="areaUnderROC"
)
auPR_evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="prediction", labelCol="popular_af", metricName="areaUnderPR"
)

gb_area_under_roc = auROC_evaluator.evaluate(gb_test_preds)
gb_area_under_pr_evaluator = auPR_evaluator.evaluate(gb_test_preds)

 
print("Gradient Boosted Tree Prediction areaUnderROC: ", gb_area_under_roc)
print("Gradient Boosted Tree Prediction areaUnderPR: ", gb_area_under_pr_evaluator)


y_gb_pred = gb_test_preds.select("prediction").collect()
y_gb_orig = gb_test_preds.select("popular_af").collect()

gb_cm = confusion_matrix(y_gb_orig, y_gb_pred)
print("Confusion Matrix:")
print(gb_cm)

Gradient Boosted Tree Prediction areaUnderROC:  0.6386561369848354
Gradient Boosted Tree Prediction areaUnderPR:  0.16466001454241766
Confusion Matrix:
[[23062 28374]
 [ 1190  5767]]


### Conclusion

The 3 models did not yield satisfying results. Decision Tree yielded an area under ROC of 0.5 which corresponds to a random classifier.  
Strangely, the logistic regression and gradient boosted tree pipelines returned the exact same results: an area under ROC of 0.64 along with an area under PR of 0.16.   
These results are mostly due to a biased popularity variable, a bias which we've noticed while exploring the data previously. There is another inherent problem to our data set. We expected to see more of the famous mainstream artists that we know today such as Ariana Grande, The Weeknd or whatever other horrible music people listen to nowadays. This enforces our intuition for a large bias in this data set.