In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark.ml.classification import LogisticRegression
spark = SparkSession.builder.config("spark.driver.memory","20g").getOrCreate()
from pyspark.conf import SparkConf
from pyspark.sql.types import * 
import pyspark.sql.functions as F
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler,StandardScaler
from pyspark.ml import Pipeline
from sklearn.metrics import confusion_matrix

In [None]:
dataset = spark.read.csv('spotify song list.csv',header= True)
dataset.show()

In [None]:
df = dataset.select('name', 'album', 'artists', 'explicit', 'energy', 'valence', 'tempo', 'duration_ms', 'time_signature', 'year')
cols = df.columns
df.printSchema()

In [None]:
from pyspark.sql.types import StringType, FloatType, IntegerType, BooleanType

df = df.select(
    df.name.cast(StringType()),
    df.album.cast(StringType()),
    df.artists.cast(StringType()),
    df.explicit.cast(BooleanType()),
    df.energy.cast(FloatType()),
    df.valence.cast(FloatType()),
    df.tempo.cast(IntegerType()),
    df.duration_ms.cast(FloatType()),
    df.time_signature.cast(IntegerType()),
    df.year.cast(IntegerType()),
)

df.printSchema()
df.show()

In [6]:
import pyspark.sql.functions as F
df = df.withColumn('explicit', F.when(df.explicit == 'false', 0).otherwise(1))

In [None]:
df.show()

In [None]:
df.orderBy("year", ascending=True).show(10)

In [10]:
#df.dropna(subset=['name'])
categoricalColumns = ['name','album','artists']
stages = []
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index', handleInvalid = 'skip')
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]
label_stringIdx = StringIndexer(inputCol = 'explicit', outputCol = 'label', handleInvalid = 'skip')
stages += [label_stringIdx]
numericCols = ['energy', 'valence', 'tempo', 'duration_ms','time_signature','year']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="vectorized_features", handleInvalid = 'skip')
stages += [assembler]
scaler = StandardScaler(inputCol="vectorized_features", outputCol="features")
stages += [scaler]

In [None]:
cols = df.columns
cols

In [None]:
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df)
df = pipelineModel.transform(df)
selectedCols = ['label', 'features'] + cols
df = df.select(selectedCols)
df.printSchema()

In [None]:
train, test = df.randomSplit([0.8, 0.2], seed = 12345)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

In [None]:
train2, test2 = df.randomSplit([0.75, 0.25], seed = 12345)
train3, test3 = df.randomSplit([0.7, 0.3], seed = 12345)

In [None]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=3)
lrModel = lr.fit(train)
pred = lrModel.transform(test)

In [None]:
class_names=[1.0,0.0]
import itertools
def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

    print(cm)

    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')

In [None]:
y_true = pred.select("label")
y_true = y_true.toPandas()

y_pred = pred.select("prediction")
y_pred = y_pred.toPandas()

cnf_matrix = confusion_matrix(y_true, y_pred,labels=class_names)

plt.figure()
plot_confusion_matrix(cnf_matrix, classes=class_names,
                      title='Confusion matrix')
plt.show()

In [None]:
accuracy = pred.filter(pred.label == pred.prediction).count() / float(pred.count())
print("Accuracy : ",accuracy)

In [None]:
trainingSummary = lrModel.summary
roc = trainingSummary.roc.toPandas()
plt.plot(roc['FPR'],roc['TPR'])
plt.ylabel('False Positive Rate')
plt.xlabel('True Positive Rate')
plt.title('ROC Curve')
plt.show()
print('Training Area Under ROC: ' + str(trainingSummary.areaUnderROC))

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
print('Test Area Under ROC', evaluator.evaluate(pred))

In [None]:
#Model 1 80-20 split
#Accuracy :  0.9676762048130143
#Training Area Under ROC: 0.9992808974017898   
#Test Area Under ROC 0.9831225938810771

In [None]:
lr2 = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=3)
lrModel2 = lr2.fit(train2)
pred2 = lrModel2.transform(test2)

In [None]:
y_true2 = pred2.select("label")
y_true2 = y_true2.toPandas()

y_pred2 = pred2.select("prediction")
y_pred2 = y_pred2.toPandas()

cnf_matrix2 = confusion_matrix(y_true2, y_pred2,labels=class_names)

plt.figure()
plot_confusion_matrix(cnf_matrix2, classes=class_names,
                      title='Confusion matrix')
plt.show()

In [None]:
accuracy2 = pred2.filter(pred2.label == pred2.prediction).count() / float(pred2.count())
print("Accuracy : ",accuracy2)

In [None]:
trainingSummary2 = lrModel2.summary
roc2 = trainingSummary2.roc.toPandas()
plt.plot(roc2['FPR'],roc2['TPR'])
plt.ylabel('False Positive Rate')
plt.xlabel('True Positive Rate')
plt.title('ROC Curve')
plt.show()
print('Training Area Under ROC: ' + str(trainingSummary2.areaUnderROC))

In [None]:
print('Test Area Under ROC', evaluator.evaluate(pred2))

In [None]:
#Model 2 75-25 split
#Accuracy :  0.9674617074378462
#Training Area Under ROC: 0.9993084178588645
#Test Area Under ROC 0.982823692357705

In [None]:
lr3 = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=3)
lrModel3 = lr3.fit(train3)
pred3 = lrModel3.transform(test3)

In [None]:
y_true3 = pred3.select("label")
y_true3 = y_true3.toPandas()

y_pred3 = pred3.select("prediction")
y_pred3 = y_pred3.toPandas()

cnf_matrix3 = confusion_matrix(y_true3, y_pred3,labels=class_names)

plt.figure()
plot_confusion_matrix(cnf_matrix3, classes=class_names,
                      title='Confusion matrix')
plt.show()

In [None]:
accuracy3 = pred3.filter(pred3.label == pred3.prediction).count() / float(pred3.count())
print("Accuracy : ",accuracy3)

In [None]:
trainingSummary3 = lrModel3.summary
roc3 = trainingSummary3.roc.toPandas()
plt.plot(roc3['FPR'],roc3['TPR'])
plt.ylabel('False Positive Rate')
plt.xlabel('True Positive Rate')
plt.title('ROC Curve')
plt.show()
print('Training Area Under ROC: ' + str(trainingSummary3.areaUnderROC))

In [None]:
print('Test Area Under ROC', evaluator.evaluate(pred3))

In [None]:
#Model 3 70-30 split
#Accuracy :  0.9673963863521301
#Training Area Under ROC: 0.9993391035377046
#Test Area Under ROC 0.9821757322179495

In [None]:
pr3 = trainingSummary3.pr.toPandas()
plt.plot(pr3['recall'],pr3['precision'])
plt.ylabel('Precision')
plt.xlabel('Recall')
plt.show()