# Preliminaries from tutorial examples at: 
https://www.youtube.com/watch?v=Qsx6Endfvbg

In [1]:
! pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext
sc

In [29]:
import os
import re
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from pyspark.sql.types import *
from pyspark.sql import functions as fun
from pyspark.sql import Row
from pyspark.sql.functions import expr
from pyspark.ml import Pipeline
from pyspark.ml.feature import *
from pyspark.ml.classification import *
from pyspark.ml.tuning import *
from pyspark.ml.evaluation import *
from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import LogisticRegression
from sklearn.metrics import classification_report

In [4]:
#from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector

from pyspark.sql import functions as F
from pyspark.sql.functions import when

# Import data

In [5]:
#myPath = "/content/drive/My Drive/Project_Work/"
myPath = "/content/drive/My Drive/H516/GroupProject/bc working files/"


In [6]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [7]:
from pyspark.sql.types import StructField, StructType, StringType,LongType
custom_schema = StructType([
    StructField("track_id", StringType(), False),
    StructField("track_name", StringType(), False),
    StructField("track_artist", StringType(), False),
    StructField("track_popularity", IntegerType(), False),
    StructField("track_album_id", StringType(), False),
    StructField("track_album_name", StringType(), False),
    StructField("track_album_release_date", StringType(), False),
    StructField("playlist_name", StringType(), False),
    StructField("playlist_id", StringType(), False),
    StructField("playlist_genre", StringType(), False),
    StructField("playlist_subgenre", StringType(), False),
    StructField("danceability", FloatType(), False),
    StructField("energy",FloatType(), False),
    StructField("key", IntegerType(), False),
    StructField("loudness", FloatType(), False),
    StructField("mode", IntegerType(), False),
    StructField("speechiness", FloatType(), False),
    StructField("acousticness", FloatType(), False),
    StructField("instrumentalness", FloatType(), False),
    StructField("liveness", FloatType(), False),
    StructField("valence", FloatType(), False),
    StructField("tempo", FloatType(), False),
    StructField("duration_ms", IntegerType(), False),
    StructField("language", StringType(), False),
    StructField("genreID", IntegerType(), False),
    StructField("year", IntegerType(), False),
    StructField("minutes", FloatType(), False),
    StructField("word_count", IntegerType(), False),
    StructField("words_per_minute", FloatType(), False),
    StructField("repetition_pct", FloatType(), False),
    StructField("stopword_pct", FloatType(), False),
    StructField("profanity_pct", FloatType(), False),
    StructField("positive_pct", FloatType(), False),
    StructField("negative_pct", FloatType(), False),
    StructField("Sentiment", IntegerType(), False),
    StructField("words_only_lyrics", StringType(),False)

])
rawdataDF = spark.read.format("csv") \
    .schema(custom_schema) \
    .option("header", True) \
    .load(myPath + "spotify_with_word_counts.csv")
rawdataDF.show(3)

+--------------------+--------------------+------------+----------------+--------------------+--------------------+------------------------+--------------------+--------------------+--------------+-----------------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+-----------+--------+-------+----+-------+----------+----------------+--------------+------------+-------------+------------+------------+---------+--------------------+
|            track_id|          track_name|track_artist|track_popularity|      track_album_id|    track_album_name|track_album_release_date|       playlist_name|         playlist_id|playlist_genre|playlist_subgenre|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|duration_ms|language|genreID|year|minutes|word_count|words_per_minute|repetition_pct|stopword_pct|profanity_pct|positive_pct|negative_pct|Sentiment|   words_only_lyrics|
+-------------------

In [8]:
rawdataDF.printSchema()

root
 |-- track_id: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- track_artist: string (nullable = true)
 |-- track_popularity: integer (nullable = true)
 |-- track_album_id: string (nullable = true)
 |-- track_album_name: string (nullable = true)
 |-- track_album_release_date: string (nullable = true)
 |-- playlist_name: string (nullable = true)
 |-- playlist_id: string (nullable = true)
 |-- playlist_genre: string (nullable = true)
 |-- playlist_subgenre: string (nullable = true)
 |-- danceability: float (nullable = true)
 |-- energy: float (nullable = true)
 |-- key: integer (nullable = true)
 |-- loudness: float (nullable = true)
 |-- mode: integer (nullable = true)
 |-- speechiness: float (nullable = true)
 |-- acousticness: float (nullable = true)
 |-- instrumentalness: float (nullable = true)
 |-- liveness: float (nullable = true)
 |-- valence: float (nullable = true)
 |-- tempo: float (nullable = true)
 |-- duration_ms: integer (nullable = true)
 |-- l

In [9]:
rawdataDF.createOrReplaceTempView("model_input")
rawdataDF.count()

14398

In [10]:
#model_inputDF = spark.sql("SELECT track_id, year, genreID AS label, instrumentalness, acousticness, speechiness, track_popularity FROM model_input WHERE playlist_genre is not null AND track_popularity is not null")

#for removing edm
#model_inputDF = spark.sql("SELECT a.genreID AS label, a.* FROM model_input a WHERE playlist_genre is not null AND playlist_genre <> 'edm' AND track_popularity is not null")


keyDF = rawdataDF.groupBy("track_id").pivot("key").agg(F.lit(1)).na.fill(0)
keyDF2 = keyDF.join(rawdataDF, keyDF.track_id == rawdataDF.track_id)



keyDF2 = keyDF2.withColumn('instrumentalnessFlag', when(keyDF2.instrumentalness > .5 , 1).otherwise(0))
keyDF2 = keyDF2.withColumn('livenessFlag', when(keyDF2.liveness > .5 , 1).otherwise(0))
keyDF2 = keyDF2.withColumn('acousticnessFlag', when(keyDF2.acousticness > .5 , 1).otherwise(0))
keyDF2.createOrReplaceTempView('keyDF3')

model_inputDF = spark.sql("SELECT a.genreID AS label, a.null as no_key, a.* FROM keyDF3 a WHERE playlist_genre is not null AND track_popularity is not null")
model_inputDF.show()


+-----+------+--------------------+----+---+---+---+---+---+---+---+---+---+---+---+---+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+------------------------+--------------------+--------------------+--------------+--------------------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+-----------+--------+-------+----+-------+----------+----------------+--------------+------------+-------------+------------+------------+---------+--------------------+--------------------+------------+----------------+
|label|no_key|            track_id|null|  0|  1|  2|  3|  4|  5|  6|  7|  8|  9| 10| 11|            track_id|          track_name|        track_artist|track_popularity|      track_album_id|    track_album_name|track_album_release_date|       playlist_name|         playlist_id|playlist_genre|   playlist_subgenre|danceability|energy|key|loudness|mode|speechines

#testing different feature selections

##spotify features

In [11]:
#using only spotify features
featureNames = ['no_key','0','1','2','3','4','5','6','7','8','9','10','11','duration_ms','danceability','energy','loudness','speechiness','acousticnessFlag','livenessFlag','instrumentalnessFlag','valence','tempo', 'mode']

In [12]:
#add additional feature columns to the cleaned text that is output from the nlp pipeline
%%time
xtra_cols = VectorAssembler(inputCols= featureNames, outputCol='features')

CPU times: user 3.61 ms, sys: 0 ns, total: 3.61 ms
Wall time: 56.6 ms


In [13]:
pipeline = Pipeline(stages=[xtra_cols])

In [14]:
%%time
processed = pipeline.fit(model_inputDF).transform(model_inputDF)

CPU times: user 15.5 ms, sys: 7.78 ms, total: 23.3 ms
Wall time: 1.37 s


In [15]:
#split the data into training and test data sets
train, test = processed.randomSplit([0.8,0.2], seed=123)

In [16]:
#declare the model
LogRegression = LogisticRegression(featuresCol='features')

In [17]:

#fit the nb model to the processed model
%%time
model = LogRegression.fit(train)

CPU times: user 153 ms, sys: 21.9 ms, total: 175 ms
Wall time: 24.5 s


In [19]:
#for test features:

#get coefficients for each feature for each genre

print(featureNames)
print(model.coefficientMatrix)

target_names = ['rock','pop','r&b','rap', 'edm']

countt = 0
#print(model.coefficientMatrix.toArray())

coefMatrix = model.coefficientMatrix.toArray()

print('SPOTIFY FEATURES')
for x in coefMatrix:
  print(target_names[countt])
  emptyList = []
  bestFeature = [None,0]
  for i in range(len(x)):
    featureValue = float(round(x[i],3))
    emptyList.append((featureNames[i],  featureValue))
    if abs(featureValue) > abs(bestFeature[1]):
      bestFeature = [featureNames[i], featureValue]
  print(emptyList)
  print('best feature: ', bestFeature)
  countt=countt+1

['no_key', '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', 'duration_ms', 'danceability', 'energy', 'loudness', 'speechiness', 'acousticnessFlag', 'livenessFlag', 'instrumentalnessFlag', 'valence', 'tempo', 'mode']
DenseMatrix([[ 0.00000000e+00,  8.94492057e-02, -3.57492513e-01,
               3.04658285e-01, -3.78207580e-02,  4.85465017e-01,
              -1.07583977e-02, -3.49045271e-01,  1.69156626e-01,
              -1.67699251e-01,  3.16086476e-01, -2.91629161e-01,
              -1.71681818e-01,  2.49759959e-06, -8.31866274e+00,
               3.58461327e+00, -2.72767818e-01, -8.44588854e+00,
               1.57389738e-01, -1.93619103e-01, -1.90395868e-01,
               2.39873272e+00, -4.09551328e-03,  3.26137453e-01],
             [ 0.00000000e+00,  1.09917185e-01, -3.39359565e-02,
              -1.10130027e-01,  6.52651436e-02, -7.51126400e-03,
               2.49222085e-02,  9.70605597e-02, -1.01312867e-01,
              -1.16538106e-01,  3.92840087e-02,  4.2504

In [20]:
#create model predictions on the test data
%%time
results = model.transform(test)

CPU times: user 5.79 ms, sys: 2.62 ms, total: 8.41 ms
Wall time: 244 ms


In [21]:
y_true = results.select(['label']).rdd.collect()
y_pred = results.select(['prediction']).rdd.collect()

In [30]:
target_names = ['rock','pop','r&b','rap', 'edm']

print(classification_report(y_true, y_pred, target_names=target_names))

              precision    recall  f1-score   support

        rock       0.61      0.71      0.65       657
         pop       0.43      0.51      0.47       728
         r&b       0.55      0.46      0.50       665
         rap       0.64      0.57      0.60       528
         edm       0.51      0.41      0.46       335

    accuracy                           0.54      2913
   macro avg       0.55      0.53      0.54      2913
weighted avg       0.55      0.54      0.54      2913



##lyric features

In [31]:
#using only lyric features
featureNames = ['stopword_pct','word_count','repetition_pct','profanity_pct', 'positive_pct', 'negative_pct']

In [32]:
#add additional feature columns to the cleaned text that is output from the nlp pipeline
%%time
xtra_cols = VectorAssembler(inputCols= featureNames, outputCol='features')

CPU times: user 1.34 ms, sys: 0 ns, total: 1.34 ms
Wall time: 6.74 ms


In [33]:
pipeline = Pipeline(stages=[xtra_cols])

In [34]:
%%time
processed = pipeline.fit(model_inputDF).transform(model_inputDF)

CPU times: user 4.08 ms, sys: 2.93 ms, total: 7.01 ms
Wall time: 117 ms


In [35]:
#split the data into training and test data sets
train, test = processed.randomSplit([0.8,0.2], seed=123)

In [36]:
#declare the model, LASSO method
LogRegression = LogisticRegression(featuresCol='features')

In [37]:
#fit the nb model to the processed model
%%time
model = LogRegression.fit(train)

CPU times: user 78.1 ms, sys: 20.7 ms, total: 98.8 ms
Wall time: 11.4 s


In [38]:
#for test features:

#get coefficients for each feature for each genre

print(featureNames)
print(model.coefficientMatrix)


['stopword_pct', 'word_count', 'repetition_pct', 'profanity_pct', 'positive_pct', 'negative_pct']
DenseMatrix([[-1.71524550e-01, -2.14354284e-03,  6.14572386e-01,
              -4.42490534e+01,  5.84930223e+00, -9.64280507e-01],
             [-8.08516598e-01,  1.57901998e-03,  1.29469348e+00,
              -2.89798560e+01,  7.59513506e-01,  1.57813433e+00],
             [ 1.83530284e+00,  1.48731316e-03, -2.04250181e+00,
               5.77487556e+00, -7.59993322e+00,  5.47902761e+00],
             [ 9.15402815e-02,  3.62452417e-03, -6.83972486e+00,
               4.48801474e+01,  5.57725070e-01, -5.92876694e+00],
             [-9.46801978e-01, -4.54731446e-03,  6.97296081e+00,
               2.25738865e+01,  4.33392416e-01, -1.64114493e-01]])


In [39]:
#create model predictions on the test data
%%time
results = model.transform(test)

CPU times: user 5.83 ms, sys: 3.33 ms, total: 9.17 ms
Wall time: 166 ms


In [40]:
y_true = results.select(['label']).rdd.collect()
y_pred = results.select(['prediction']).rdd.collect()

In [41]:
target_names = ['rock','pop','r&b','rap', 'edm']

print(classification_report(y_true, y_pred, target_names=target_names))

              precision    recall  f1-score   support

        rock       0.41      0.58      0.48       657
         pop       0.36      0.52      0.43       728
         r&b       0.39      0.18      0.25       665
         rap       0.70      0.65      0.67       528
         edm       0.46      0.20      0.28       335

    accuracy                           0.44      2913
   macro avg       0.47      0.43      0.42      2913
weighted avg       0.45      0.44      0.43      2913



##combined features

In [42]:
#all features
featureNames = ['no_key','0','1','2','3','4','5','6','7','8','9','10','11','duration_ms', 'danceability','energy','loudness','speechiness','acousticness','liveness','instrumentalness','valence','tempo', 'mode', 'word_count', 'stopword_pct','repetition_pct','profanity_pct', 'positive_pct', 'negative_pct']

In [43]:
#add additional feature columns to the cleaned text that is output from the nlp pipeline
%%time
xtra_cols = VectorAssembler(inputCols= featureNames, outputCol='features')

CPU times: user 2.53 ms, sys: 724 µs, total: 3.25 ms
Wall time: 7.55 ms


In [44]:
pipeline = Pipeline(stages=[xtra_cols])

In [45]:
%%time
processed = pipeline.fit(model_inputDF).transform(model_inputDF)

CPU times: user 8.39 ms, sys: 2.1 ms, total: 10.5 ms
Wall time: 173 ms


In [46]:
#split the data into training and test data sets
train, test = processed.randomSplit([0.8,0.2], seed=123)

In [47]:
#declare the model
LogRegression = LogisticRegression(featuresCol='features')

In [48]:
#fit the nb model to the processed model
%%time
model = LogRegression.fit(train)

CPU times: user 86.7 ms, sys: 15.7 ms, total: 102 ms
Wall time: 12.4 s


In [49]:
#for test features:

#get coefficients for each feature for each genre

#print(featureNames)
#print(model.coefficientMatrix)



target_names = ['rock','pop','r&b','rap', 'edm']
countt = 0
#print(model.coefficientMatrix.toArray())


print('COMBINED MODEL')
coefMatrix = model.coefficientMatrix.toArray()
for x in coefMatrix:
  print(target_names[countt])
  emptyList = []
  bestFeature = [None,0]
  for i in range(len(x)):
    featureValue = float(round(x[i],3))
    emptyList.append((featureNames[i],  featureValue))
    if abs(featureValue) > abs(bestFeature[1]):
      bestFeature = [featureNames[i], featureValue]
  print(emptyList)
  print('best feature: ', bestFeature)
  countt=countt+1

COMBINED MODEL
rock
[('no_key', 0.0), ('0', 0.062), ('1', -0.278), ('2', 0.297), ('3', -0.065), ('4', 0.455), ('5', -0.028), ('6', -0.348), ('7', 0.191), ('8', -0.169), ('9', 0.279), ('10', -0.279), ('11', -0.181), ('duration_ms', 0.0), ('danceability', -7.755), ('energy', 3.608), ('loudness', -0.268), ('speechiness', -6.855), ('acousticness', 0.163), ('liveness', -0.315), ('instrumentalness', -0.303), ('valence', 2.454), ('tempo', -0.004), ('mode', 0.367), ('word_count', -0.002), ('stopword_pct', 0.15), ('repetition_pct', 0.981), ('profanity_pct', -5.591), ('positive_pct', 5.244), ('negative_pct', -1.254)]
best feature:  ['danceability', -7.755]
pop
[('no_key', 0.0), ('0', 0.086), ('1', -0.024), ('2', -0.094), ('3', 0.067), ('4', 0.014), ('5', 0.024), ('6', 0.101), ('7', -0.105), ('8', -0.154), ('9', 0.051), ('10', 0.041), ('11', 0.045), ('duration_ms', -0.0), ('danceability', 0.152), ('energy', -0.703), ('loudness', 0.055), ('speechiness', -2.739), ('acousticness', 0.551), ('liveness

In [50]:
#create model predictions on the test data
%%time
results = model.transform(test)

CPU times: user 9.08 ms, sys: 1.38 ms, total: 10.5 ms
Wall time: 203 ms


In [51]:
y_true = results.select(['label']).rdd.collect()
y_pred = results.select(['prediction']).rdd.collect()

In [52]:
target_names = ['rock','pop','r&b','rap', 'edm']

print(classification_report(y_true, y_pred, target_names=target_names))

              precision    recall  f1-score   support

        rock       0.63      0.72      0.67       657
         pop       0.49      0.54      0.51       728
         r&b       0.61      0.52      0.56       665
         rap       0.71      0.68      0.69       528
         edm       0.57      0.49      0.52       335

    accuracy                           0.59      2913
   macro avg       0.60      0.59      0.59      2913
weighted avg       0.60      0.59      0.59      2913



##selected features

In [53]:
#using only features that had coefficient > (abs(0.5))
featureNames = ['duration_ms','danceability','energy','loudness','speechiness','acousticness','liveness','instrumentalness','valence','tempo', 'mode','word_count','repetition_pct', 'profanity_pct', 'positive_pct', 'negative_pct']

In [54]:
#add additional feature columns to the cleaned text that is output from the nlp pipeline
%%time
xtra_cols = VectorAssembler(inputCols= featureNames, outputCol='features')

CPU times: user 1.97 ms, sys: 831 µs, total: 2.8 ms
Wall time: 11.9 ms


In [55]:
pipeline = Pipeline(stages=[xtra_cols])

In [56]:
%%time
processed = pipeline.fit(model_inputDF).transform(model_inputDF)

CPU times: user 6.87 ms, sys: 298 µs, total: 7.17 ms
Wall time: 104 ms


In [57]:
#split the data into training and test data sets
train, test = processed.randomSplit([0.8,0.2], seed=123)

In [58]:
#declare the model
LogRegression = LogisticRegression(featuresCol='features', regParam=0.001)

In [59]:
#fit the nb model to the processed model
%%time
model = LogRegression.fit(train)

CPU times: user 92.3 ms, sys: 11.1 ms, total: 103 ms
Wall time: 12.7 s


In [60]:
target_names = ['rock','pop','r&b','rap','edm']
countt = 0
#print(model.coefficientMatrix.toArray())

coefMatrix = model.coefficientMatrix.toArray()
for x in coefMatrix:
  print(target_names[countt])
  emptyList = []
  bestFeature = [None,0]
  for i in range(len(x)):
    featureValue = float(round(x[i],3))
    emptyList.append((featureNames[i],  featureValue))
    if abs(featureValue) > abs(bestFeature[1]):
      bestFeature = [featureNames[i], featureValue]
  print(emptyList)
  print('best feature: ', bestFeature)
  countt=countt+1

rock
[('duration_ms', 0.0), ('danceability', -7.609), ('energy', 3.378), ('loudness', -0.258), ('speechiness', -6.769), ('acousticness', 0.146), ('liveness', -0.226), ('instrumentalness', -0.293), ('valence', 2.407), ('tempo', -0.004), ('mode', 0.403), ('word_count', -0.002), ('repetition_pct', 0.891), ('profanity_pct', -6.323), ('positive_pct', 5.142), ('negative_pct', -1.399)]
best feature:  ['danceability', -7.609]
pop
[('duration_ms', -0.0), ('danceability', 0.153), ('energy', -0.663), ('loudness', 0.054), ('speechiness', -2.768), ('acousticness', 0.548), ('liveness', -0.286), ('instrumentalness', 1.183), ('valence', -0.105), ('tempo', 0.001), ('mode', 0.121), ('word_count', 0.002), ('repetition_pct', 0.538), ('profanity_pct', -26.147), ('positive_pct', 0.231), ('negative_pct', 1.892)]
best feature:  ['profanity_pct', -26.147]
r&b
[('duration_ms', 0.0), ('danceability', 0.434), ('energy', -3.697), ('loudness', -0.021), ('speechiness', 3.483), ('acousticness', 0.788), ('liveness', 0

In [61]:
#create model predictions on the test data
%%time
results = model.transform(test)

CPU times: user 11.5 ms, sys: 1.43 ms, total: 12.9 ms
Wall time: 198 ms


In [62]:
y_true = results.select(['label']).rdd.collect()
y_pred = results.select(['prediction']).rdd.collect()

In [63]:
target_names = ['rock','pop','r&b','rap','edm']

print(classification_report(y_true, y_pred, target_names=target_names))

              precision    recall  f1-score   support

        rock       0.63      0.73      0.68       657
         pop       0.49      0.54      0.52       728
         r&b       0.62      0.52      0.57       665
         rap       0.71      0.68      0.70       528
         edm       0.57      0.47      0.52       335

    accuracy                           0.60      2913
   macro avg       0.60      0.59      0.60      2913
weighted avg       0.60      0.60      0.60      2913

