In [58]:
!pip install pyspark
!pip install findspark
# Alternatively, if you want to install a specific version of pyspark:
#!pip install pyspark==3.2.1 

In [59]:
from tqdm import tqdm

import requests
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
import pyspark.sql.functions as f
from pyspark.sql.functions import split, regexp_replace, year, month, dayofmonth, to_timestamp

from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator #, BinaryClassificationEvaluator 

# Basic libreries 
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# Pre-processing phase
from sklearn import preprocessing
from sklearn.preprocessing import OneHotEncoder, LabelEncoder
from imblearn.over_sampling import SMOTE
from sklearn.model_selection import train_test_split

# Features Importance
from sklearn.inspection import permutation_importance

# Model
from sklearn import tree
from sklearn import svm
from sklearn.ensemble import RandomForestClassifier
from sklearn.neural_network import MLPClassifier

# Hyper-Parameter Tuning
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import RandomizedSearchCV

from sklearn.model_selection import cross_val_score

# Evaluation
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report

In [60]:
# Create the session
#conf = SparkConf().set("spark.ui.port", "4050").set('spark.executor.memory', '4G').set('spark.driver.memory', '45G').set('spark.driver.maxResultSize', '10G')

# Create the context
#sc = pyspark.SparkContext(conf=conf)
#spark = SparkSession.builder.getOrCreate()

#sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
#spark = SparkSession.builder.getOrCreate()

#bucket = "train_test_data_spotify"
#spark.conf.set('temporaryGcsBucket', bucket)

spark = SparkSession \
    .builder \
    .appName("Python Spark Spotify Genre Model") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [61]:
spark

In [62]:
spotify_tracks = spark.read.load("../input/spotify-track/dataframe.csv", 
                           format="csv", 
                           sep=";", 
                           inferSchema="true", 
                           header="true")


In [63]:
spotify_tracks.cache()

In [64]:
print("The shape of the dataset is {:d} rows by {:d} columns".format(spotify_tracks.count(), len(spotify_tracks.columns)))

In [65]:
spotify_tracks.printSchema()

In [66]:
spotify_tracks.show(5)

In [67]:
spotify_tracks = spotify_tracks.dropDuplicates()

In [68]:
spotify_tracks = spotify_tracks.select(spotify_tracks.columns + [f.translate(f.col("audio_avg_pitches"), "[]", "").alias("audio_avg_pitches_list")])
spotify_tracks = spotify_tracks.withColumn("pitch_1", split(col("audio_avg_pitches_list"), ", ").getItem(0))\
                                .withColumn("pitch_2", split(col("audio_avg_pitches_list"), ", ").getItem(1))\
                                .withColumn("pitch_3", split(col("audio_avg_pitches_list"), ", ").getItem(2))\
                                .withColumn("pitch_4", split(col("audio_avg_pitches_list"), ", ").getItem(3))\
                                .withColumn("pitch_5", split(col("audio_avg_pitches_list"), ", ").getItem(4))\
                                .withColumn("pitch_6", split(col("audio_avg_pitches_list"), ", ").getItem(5))\
                                .withColumn("pitch_7", split(col("audio_avg_pitches_list"), ", ").getItem(6))\
                                .withColumn("pitch_8", split(col("audio_avg_pitches_list"), ", ").getItem(7))\
                                .withColumn("pitch_9", split(col("audio_avg_pitches_list"), ", ").getItem(8))\
                                .withColumn("pitch_10", split(col("audio_avg_pitches_list"), ", ").getItem(9))\
                                .withColumn("pitch_11", split(col("audio_avg_pitches_list"), ", ").getItem(10))\
                                .withColumn("pitch_12", split(col("audio_avg_pitches_list"), ", ").getItem(11))
spotify_tracks.printSchema()

In [69]:
spotify_tracks = spotify_tracks.select(spotify_tracks.columns + [f.translate(f.col("audio_avg_timbre"), "[]", "").alias("audio_avg_timbre_list")])
spotify_tracks = spotify_tracks.withColumn("timbre_1", split(col("audio_avg_timbre_list"), ", ").getItem(0))\
                                .withColumn("timbre_2", split(col("audio_avg_timbre_list"), ", ").getItem(1))\
                                .withColumn("timbre_3", split(col("audio_avg_timbre_list"), ", ").getItem(2))\
                                .withColumn("timbre_4", split(col("audio_avg_timbre_list"), ", ").getItem(3))\
                                .withColumn("timbre_5", split(col("audio_avg_timbre_list"), ", ").getItem(4))\
                                .withColumn("timbre_6", split(col("audio_avg_timbre_list"), ", ").getItem(5))\
                                .withColumn("timbre_7", split(col("audio_avg_timbre_list"), ", ").getItem(6))\
                                .withColumn("timbre_8", split(col("audio_avg_timbre_list"), ", ").getItem(7))\
                                .withColumn("timbre_9", split(col("audio_avg_timbre_list"), ", ").getItem(8))\
                                .withColumn("timbre_10", split(col("audio_avg_timbre_list"), ", ").getItem(9))\
                                .withColumn("timbre_11", split(col("audio_avg_timbre_list"), ", ").getItem(10))\
                                .withColumn("timbre_12", split(col("audio_avg_timbre_list"), ", ").getItem(11))
spotify_tracks.printSchema()

In [70]:
# String to Date but return null value 
# spotify_tracks_dt = spotify_tracks.withColumn("album_release_date", to_date(col("album_release_date"),"yyyy-MM-dd"))
# spotify_tracks_dt.select("album_release_date").show()

def to_date_(col, formats=("yyyy-MM-dd", "y")):
    # Spark 2.2 or later syntax, for < 2.2 use unix_timestamp and cast
    return coalesce(*[to_date(col, f) for f in formats])

spotify_tracks = spotify_tracks.withColumn("album_release_date_td", to_date_("album_release_date"))
spotify_tracks.select("album_release_date_td").show()

In [71]:
spotify_tracks = spotify_tracks.withColumn('day', dayofmonth(col('album_release_date_td')))
spotify_tracks = spotify_tracks.withColumn('month', month(col('album_release_date_td')))
spotify_tracks = spotify_tracks.withColumn('year', year(col('album_release_date_td')))

In [72]:
spotify_tracks.printSchema()

In [73]:
cols = ("album_release_date", "album_release_date_td", "audio_avg_pitches","audio_avg_timbre", "audio_avg_pitches_list", "audio_avg_timbre_list", "track_uri", "id")
spotify_tracks = spotify_tracks.drop(*cols)

spotify_tracks.printSchema()

In [74]:
spotify_tracks = spotify_tracks.withColumn("pitch_1", spotify_tracks.pitch_1.cast('double'))\
                                .withColumn("pitch_2", spotify_tracks.pitch_2.cast('double'))\
                                .withColumn("pitch_3", spotify_tracks.pitch_3.cast('double'))\
                                .withColumn("pitch_4", spotify_tracks.pitch_4.cast('double'))\
                                .withColumn("pitch_5", spotify_tracks.pitch_5.cast('double'))\
                                .withColumn("pitch_6", spotify_tracks.pitch_6.cast('double'))\
                                .withColumn("pitch_7", spotify_tracks.pitch_7.cast('double'))\
                                .withColumn("pitch_8", spotify_tracks.pitch_8.cast('double'))\
                                .withColumn("pitch_9", spotify_tracks.pitch_9.cast('double'))\
                                .withColumn("pitch_10", spotify_tracks.pitch_10.cast('double'))\
                                .withColumn("pitch_11", spotify_tracks.pitch_11.cast('double'))\
                                .withColumn("pitch_12", spotify_tracks.pitch_12.cast('double'))

spotify_tracks = spotify_tracks.withColumn("timbre_1", spotify_tracks.timbre_1.cast('double'))\
                                .withColumn("timbre_2", spotify_tracks.timbre_2.cast('double'))\
                                .withColumn("timbre_3", spotify_tracks.timbre_3.cast('double'))\
                                .withColumn("timbre_4", spotify_tracks.timbre_4.cast('double'))\
                                .withColumn("timbre_5", spotify_tracks.timbre_5.cast('double'))\
                                .withColumn("timbre_6", spotify_tracks.timbre_6.cast('double'))\
                                .withColumn("timbre_7", spotify_tracks.timbre_7.cast('double'))\
                                .withColumn("timbre_8", spotify_tracks.timbre_8.cast('double'))\
                                .withColumn("timbre_9", spotify_tracks.timbre_9.cast('double'))\
                                .withColumn("timbre_10", spotify_tracks.timbre_10.cast('double'))\
                                .withColumn("timbre_11", spotify_tracks.timbre_11.cast('double'))\
                                .withColumn("timbre_12", spotify_tracks.timbre_12.cast('double'))

spotify_tracks = spotify_tracks.withColumn("year", spotify_tracks.year.cast('int'))\
                                .withColumn("day", spotify_tracks.day.cast('int'))\
                                .withColumn("month", spotify_tracks.month.cast('int'))

spotify_tracks.printSchema()


In [75]:
# Let's define some constants which we will use throughout this notebook
NUMERICAL_FEATURES = []
CATEGORICAL_FEATURES = []
TARGET_VARIABLE = "track_genre"

#Get All column names and it's types
for col in spotify_tracks.dtypes:
    if col[1] == "string":
        CATEGORICAL_FEATURES.append(col[0])
    else:
        NUMERICAL_FEATURES.append(col[0])

CATEGORICAL_FEATURES.remove(TARGET_VARIABLE)
print("Categorical: ", CATEGORICAL_FEATURES, "\nNumerical: ", NUMERICAL_FEATURES)

In [76]:
spotify_tracks.cache()

In [77]:
#spotify_tracks_rp = spark.createDataFrame(spotify_tracks_pd)

spotify_tracks.rdd.getNumPartitions()

In [78]:
#spotify_tracks = spotify_tracks.repartition(6)
##spotify_tracks = spotify_tracks_rp.coalesce(1)

In [79]:
#spotify_tracks.rdd.getNumPartitions()

In [80]:
# null values in each column
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
data_agg = spotify_tracks.agg(*[f.count(f.when(f.isnull(c), c)).alias(c) for c in spotify_tracks.columns])

data_agg.show()

In [81]:
spotify_tracks = spotify_tracks.dropna()

In [82]:
data_agg = spotify_tracks.agg(*[f.count(f.when(f.isnull(c), c)).alias(c) for c in spotify_tracks.columns])

data_agg.show()

In [83]:
# value counts of Batsman_Name column
spotify_tracks.groupBy('album_release_date_precision').count().show()

spotify_tracks.groupBy('track_name').count().show()

spotify_tracks.groupBy('album_name').count().show()

spotify_tracks.groupBy('artist_name').count().show()

In [84]:
# This function is responsible to implement the pipeline above for transforming categorical features into numerical ones
def to_numerical(df, numerical_features, categorical_features, target_variable):
  
    # 1. Label Encode target feature 
    stage_1= StringIndexer(inputCol=target_variable, outputCol='label')

    # 2. Label Encode Categorical features
    stage_2 = [StringIndexer(inputCol=c, outputCol="{0}_index".format(c), handleInvalid="skip") for c in categorical_features]

    # 3. OneHot Encode 
    # stage_3 = OneHotEncoder(inputCol='album_release_date_precision_index', outputCol='album_release_date_precision_oh')

    # 4. create a vector of all the features required to train the logistic regression model 
    # encoded_columns = ['track_name_index', 'album_name_index', 'artist_name_index', 'album_release_date_precision_oh']
    # stage_4 = VectorAssembler(inputCols= encoded_columns + numerical_features, outputCol='features')
    stage_4 = VectorAssembler(inputCols= [indexer.getOutputCol() for indexer in stage_2] + numerical_features, outputCol='features')

    # 4.a Create the StandardScaler
    scaler = StandardScaler(inputCol=stage_4.getOutputCol(), outputCol="std_" + stage_4.getOutputCol(), withStd=True, withMean=True)

    # 5. Populate the stages of the pipeline
    # stages = [stage_1] + stage_2 +[stage_3] + [stage_4] + [scaler]
    stages = [stage_1] + stage_2 + [stage_4] + [scaler]

    # 6. Setup the pipeline with the stages above
    pipeline = Pipeline(stages=stages)

    # 7. Transform the input dataframe accordingly
    transformer = pipeline.fit(df)
    df_transformed = transformer.transform(df)

    return df_transformed

In [85]:
# Transform the training set and get back both the transformer and the new dataset
spotify_tracks = to_numerical(spotify_tracks, NUMERICAL_FEATURES, CATEGORICAL_FEATURES, TARGET_VARIABLE)
spotify_tracks.cache()

# Select `features` and `label` (i.e., formerly `deposit`) target variable only
spotify_tracks = spotify_tracks.select(["features", "label"])
spotify_tracks.cache()

RANDOM_SEED = 42
# Randomly split our original dataset `house_df` into 80÷20 for training and test, respectively
train_set, test_set = spotify_tracks.randomSplit([0.8, 0.2], seed=RANDOM_SEED)

train_set.show(5, truncate=False)

In [86]:
# This function defines the general pipeline for logistic regression
def logistic_regression_pipeline(train):
  
    stage_5_lg = LogisticRegression(featuresCol='features',labelCol='label')

    logistic_regression_pipeline = Pipeline(stages= [stage_5_lg])

    #### LOGISTIC REGRESSION
    param_grid = ParamGridBuilder()\
    .addGrid(stage_5_lg.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(stage_5_lg.maxIter, [10, 20, 50]) \
    .build()
    # other param: .addGrid(stage_4_lg.elasticNetParam, [0.0, 0.25, 0.5, 0.75, 1.0])
    cross_val_lg = CrossValidator(estimator=logistic_regression_pipeline,
                            estimatorParamMaps=param_grid,
                            evaluator=MulticlassClassificationEvaluator().setMetricName("accuracy"), # default = "areaUnderROC", alternatively "areaUnderPR"
                            numFolds=5,
                            collectSubModels=True
                            )
    cv_model_lg = cross_val_lg.fit(spotify_tracks)

    return cv_model_lg

In [None]:
cv_model = logistic_regression_pipeline(train_set)

# Make predictions on the test set (`cv_model` contains the best model according to the result of k-fold cross validation)
# `test_df` will follow exactly the same pipeline defined above, and already fit to `train_df`
test_predictions = cv_model.transform(test_set)

test_predictions.select("features", "prediction", "label").show(5)