#Exploring correlation between genres and time of day and country

The dataset created in the Webscraping notebook is loaded by this notebook which encodes categorical values using one hot encoding and performs Kmeans clustering on chosen feature sets and scores these feature sets based on differing K values. the most ideal K is then chosen from the output CSV file to display and graph the results

#Installing packages and importing libraries

In [0]:
pip install mllib tqdm plotnine

Python interpreter will be restarted.
Python interpreter will be restarted.


In [0]:
from tqdm import tqdm
from multiprocessing.pool import ThreadPool
from pyspark.sql.types import IntegerType, TimestampType, StringType, FloatType, StructType,StructField
from pyspark.sql import SparkSession
from datetime import datetime 
from datetime import time
import pyspark.sql.functions as f
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.feature import StringIndexer, StandardScaler
import pandas as pd
from pyspark.sql.functions import col,lit, countDistinct
from pyspark.ml.feature import VectorAssembler, PCA
import numpy as np
from numpy import array
from numpy import argmax
from sklearn.preprocessing import LabelEncoder
from pyspark.sql.functions import concat_ws
import seaborn as sns
from pyspark.ml.feature import OneHotEncoder
from pyspark.sql.functions import when
from pyspark.ml.linalg import Vectors
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel
import matplotlib.pyplot as plt
from pyspark.ml import Pipeline
from plotnine import *



#Functions

##Time extraction and conversion from raw datetime object to hour

extractHour takes the raw time in the dataset and returns the hour

In [0]:
def extractHour(rawtime):
    rawtime = rawtime[0]
    hour = rawtime.time()
    hour = int(hour.hour)
    result = [(rawtime), (hour)]
    
    return result

##One Hot Encoder

The one hot encoder takes a table and a list of columns to be encoded and returns the table with the encoded columns

In [0]:
def onehotencoder(table,columnList):
    table = table
    for column in columnList:
        Indexer = StringIndexer(inputCol=column, outputCol=column+"_Index")
        table = Indexer.fit(table).transform(table)

        onehotencoder_output_vector = OneHotEncoder(inputCol=column+"_Index", outputCol=column+"_vec")

        table = onehotencoder_output_vector.fit(table).transform(table)
    
    return table


##Kmeans K selection and pipeline implementation

The Kmeans function takes a flag of "Test" or "Run" if the flag is set to "Test" it returns a list of the features tested and the sillouhette scores if it is set to "Run" it returns the transformed table with the cluster column.
the stages required for KMeans clustering are iomplemeteds in the pipeline with 3 stages to reduce the run time, especially with large sets as there are nearly 300 Genres and 48 countries

In [0]:
def Kmeans(K,table, featureList, flag):
    
    stage_1 = VectorAssembler(inputCols=featureList,outputCol = "Rawfeatures")
    stage_2 = StandardScaler(inputCol="Rawfeatures",outputCol = 'features')
    stage_3 = KMeans(
        featuresCol = "features",
        predictionCol = "label",
        k=K,
        maxIter = 20,
    )
    
    pipeline = Pipeline(stages = [stage_1,stage_2,stage_3])
    pipeline_model = pipeline.fit(table)
    table = pipeline_model.transform(table)
    if flag == "Test":
        evaluator = ClusteringEvaluator()
        evaluator.setPredictionCol("label")

        ##Results is the list of the features, K tested and the corresponding sillouhette score 
        results= [(featureList),(K),(evaluator.evaluate(table))]   
        return results
    elif flag == "Run":
        return table


##K list generation

In [0]:
def getKList(start, end, increment):
    return list(range(start, end, increment))

#Loading Dataset

The dataset that has been filled with genre info from the webscraping notebook is loaded

In [0]:
MMTDwithGenres = (spark.read
  .format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("/FileStore/tables/MMTDGenres")
  
)

MMTDNullsremoved = MMTDwithGenres.na.drop(subset=["Genre","country","tweet_datetime"])

#Extracting Time Info

The tweet datetime is saved as a datetime object in the raw dataset. This is extracted and saved as the hour in which the tweet was sent to allow for clearer clustering and more effecient one hot encoding.

In [0]:
times = MMTDNullsremoved.select("tweet_datetime")
times = times.collect()

pool = ThreadPool(9)

timeResults = pool.imap(lambda time: extractHour(time),times)


schema = StructType([\
                    StructField("RawTime", TimestampType(),True), \
                    StructField("Hour", StringType(),True)\
                    ])

timesDF = spark.createDataFrame(data = timeResults, schema = schema)


MMTDNullsremoved = MMTDNullsremoved.join(timesDF,MMTDNullsremoved.tweet_datetime == timesDF.RawTime)


#One Hot Encoding

One hot encoding is done once outside of a pipeline as to avoid repeating intensive operations

In [0]:
columnList =["Genre", "country", "Hour", "tweet_weekday"]
MMTDEncoded = onehotencoder(MMTDNullsremoved, columnList)
   


# Testing Values of K for different groupings

The below code block tests different groups of features fo different K values and outputs the values of the feature set, K clusters and evaluation score(sillouhette) so the best K can be selected for classification  
For testing the feature features_with_WeekDays with a klist of 2 - 7 was used

Weaknesses of this section:

The runtime for the 3 feature sets is 21 minutes, this is over shadowed by the runtime of the largest K value for the feature set that attempts to cluster the genre and countries which had toi be reduced to allow this section to ruin within the 2 hours allowed by Databricks.

I would like to test further feature sets as only the genre and weekday provided promising grouping and categorisation 

I would like to load the entire MMTD to this analysis and see if there can be any clusters found between continents

In [0]:
maxK = [7,20,30]
## max grouping 7 as only 7 days in week
Genre_Weekday = ["Genre_vec","tweet_weekday_vec"]
Time_Day = ["Hour_vec", "tweet_weekday_vec" ]
Day_Country = ["country_vec","tweet_weekday_vec"]
features_with_WeekDays = [Genre_Weekday,Time_Day,Day_Country]

#max grouping 24 as 24 hours in day
Genre_time = ["Genre_vec", "Hour_vec" ]
Country_Time = ["country_vec", "Hour_vec" ]
features_with_Hours = [Genre_time, Country_Time]

#max grouping 48 as 48 countries in Europe dataset 
Genre_Country = ["Genre_vec","country_vec"]
features_with_Countries = [Genre_Country]

featureSetstoTest = [features_with_WeekDays, features_with_Hours, features_with_Countries]  ## groups of features to be tested for correlation

dbutils.fs.rm("/FileStore/tables/MMTDKscores", True) ##clears scores before running programme again

savepath = "/FileStore/tables/MMTDKscores"

schema = StructType([\
                    StructField("Features", StringType(),True), \
                    StructField("Clusters", StringType(),True),\
                    StructField("Silhouette ", FloatType(),True)
                    ])
i = 0
for featureSets in featureSetstoTest:
    print(i, maxK[i])
    for featureList in featureSets:
        Klist = getKList(2,maxK[i],1)## creates list of K values to be tested based on smallest grouping in feature set
        
        KtestResults = pool.map(lambda K: Kmeans(K, MMTDEncoded, featureList, "Test"),Klist) ##multithreads Ktesting to improve speed 
        
        KtestResults = spark.createDataFrame(data = KtestResults, schema = schema)
        KtestResults.coalesce(1).write.option("header",True).format("csv").mode("append").save(savepath)##append mode was selected to add to csv file rather than overwrite 
        print("Write Complete ", featureList)##prints to show progress

    i = i+1 ## increments through set K values for each feature set
     



0 7
Write Complete  ['Genre_vec', 'tweet_weekday_vec']
Write Complete  ['Hour_vec', 'tweet_weekday_vec']
Write Complete  ['country_vec', 'tweet_weekday_vec']
1 20


# Display KScores

The output of the KMeans testing is saved to a csv file so the test will not have to be rerun on every instance of the code and to save the results between runtimes

In [0]:
MMTDKscores = (spark.read
  .format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("/FileStore/tables/MMTDKscores")
  
)

display(MMTDKscores)

Features,Clusters,Silhouette
"[country_vec, Hour_vec]",2,-0.070897095
"[country_vec, Hour_vec]",3,-0.1245058
"[country_vec, Hour_vec]",4,-0.065734886
"[country_vec, Hour_vec]",5,-0.053027634
"[country_vec, Hour_vec]",6,-0.06828506
"[country_vec, Hour_vec]",7,-0.040457904
"[country_vec, Hour_vec]",8,-0.019987827
"[country_vec, Hour_vec]",9,-0.0896247
"[country_vec, Hour_vec]",10,-0.029365674
"[country_vec, Hour_vec]",11,-0.09155995


#Kmeans Analysis

Using the K score results from the above steps the Kclustering models are created and a selection of the results are displayed to show the clustering

Genre and weekday had the highest sillouhette score of .99 which shows a high correlation between the day of the week and the genre being played. The next highest was a score of .5 between the genre and hour

In [0]:
MMTDGenre_Weekday = Kmeans(2, MMTDEncoded,Genre_time, "Run")## Creates model and labels dataset

MMTDGenre_Weekday_Sample = MMTDGenre_Weekday.sample(.35)## data is samples to plot more effeciently and more easily see clusters

data = MMTDGenre_Weekday_Sample.select("tweet_weekday","Genre", "label")
MMTDGenre_Weekday_PD = data.toPandas()
ggplot(MMTDGenre_Weekday_PD, aes(x= "tweet_weekday", y = "Genre", color = "label"))+ geom_point() ## colour is set to label to show clusters generated by the Kmeans clustering algorithm 

##Evaluator score = .99

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-980664751662336>:1[0m
[0;32m----> 1[0m MMTDGenre_Weekday [38;5;241m=[39m Kmeans([38;5;241m2[39m, MMTDEncoded,Genre_time, [38;5;124m"[39m[38;5;124mRun[39m[38;5;124m"[39m)
[1;32m      3[0m MMTDGenre_Weekday_Sample [38;5;241m=[39m MMTDGenre_Time[38;5;241m.[39msample([38;5;241m.35[39m)[38;5;66;03m## data is samples to plot more effeciently and more easily see clusters[39;00m
[1;32m      5[0m data [38;5;241m=[39m MMTDGenre_Weekday_Sample[38;5;241m.[39mselect([38;5;124m"[39m[38;5;124mtweet_weekday[39m[38;5;124m"[39m,[38;5;124m"[39m[38;5;124mGenre[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mlabel[39m[38;5;124m"[39m)

[0;31mNameError[0m: name 'MMTDEncoded' is not defined

In [0]:
MMTDGenre_time = Kmeans(20, MMTDEncoded,Genre_time, "Run")

MMTDGenre_Time_Sample = MMTDGenre_Time.sample(.35)

data = MMTDGenre_Time_Sample.select("Hour","Genre", "label")
MMTDGenre_Time_PD = data.toPandas()
ggplot(MMTDGenre_Time_PD, aes(x= "Hour", y = "Genre", color = "label"))+ geom_point()

##Evaluator score = .5

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-980664751662357>:1[0m
[0;32m----> 1[0m MMTDGenre_time [38;5;241m=[39m Kmeans([38;5;241m20[39m, MMTDEncoded,Genre_time, [38;5;124m"[39m[38;5;124mRun[39m[38;5;124m"[39m)
[1;32m      3[0m MMTDGenre_Time_Sample [38;5;241m=[39m MMTDGenre_Time[38;5;241m.[39msample([38;5;241m.35[39m)[38;5;66;03m## data is samples to plot more effeciently and more easily see clusters[39;00m
[1;32m      5[0m data [38;5;241m=[39m MMTDGenre_Time_Sample[38;5;241m.[39mselect([38;5;124m"[39m[38;5;124mHour[39m[38;5;124m"[39m,[38;5;124m"[39m[38;5;124mGenre[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mlabel[39m[38;5;124m"[39m)

[0;31mNameError[0m: name 'MMTDEncoded' is not defined

In [0]:
MMTDTime_Day = Kmeans(6, MMTDEncoded,Time_Day, "Run")

MMTDTime_Day_Sample = MMTDTime_Day.sample(.35)

data = MMTDTime_Day_Sample.select("Hour","tweet_weekday", "label")
MMTDTime_Day_PD = data.toPandas()
ggplot(MMTDTime_Day_PD, aes(x= "tweet_weekday", y = "Hour", color = "label"))+ geom_point()

##Evaluator score =

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-980664751662355>:1[0m
[0;32m----> 1[0m MMTDTime_Day [38;5;241m=[39m Kmeans([38;5;241m6[39m, MMTDEncoded,Time_Day, [38;5;124m"[39m[38;5;124mRun[39m[38;5;124m"[39m)
[1;32m      3[0m MMTDTime_Day_Sample [38;5;241m=[39m MMTDGenre_Time[38;5;241m.[39msample([38;5;241m.35[39m)[38;5;66;03m## data is samples to plot more effeciently and more easily see clusters[39;00m
[1;32m      5[0m data [38;5;241m=[39m MMTDTime_Day_Sample[38;5;241m.[39mselect([38;5;124m"[39m[38;5;124mHour[39m[38;5;124m"[39m,[38;5;124m"[39m[38;5;124mtweet_weekday[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mlabel[39m[38;5;124m"[39m)

[0;31mNameError[0m: name 'MMTDEncoded' is not defined

In [0]:
MMTDDay_Country = Kmeans(3, MMTDEncoded,Day_Country, "Run")

MMTDDay_Country_Sample = MMTDDay_Country.sample(.35)

data = MMTDDay_Country_Sample.select("tweet_weekday","country", "label")
MMTDDay_Country_PD = data.toPandas()
ggplot(MMTDDay_Country_PD, aes(x= "tweet_weekday", y = "country", color = "label"))+ geom_point()

##Evaluator score = .32

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-980664751662356>:1[0m
[0;32m----> 1[0m MMTDDay_Country [38;5;241m=[39m Kmeans([38;5;241m3[39m, MMTDEncoded,Day_Country, [38;5;124m"[39m[38;5;124mRun[39m[38;5;124m"[39m)
[1;32m      3[0m MMTDDay_Country_Sample [38;5;241m=[39m MMTDGenre_Time[38;5;241m.[39msample([38;5;241m.35[39m)[38;5;66;03m## data is samples to plot more effeciently and more easily see clusters[39;00m
[1;32m      5[0m data [38;5;241m=[39m MMTDDay_Country_Sample[38;5;241m.[39mselect([38;5;124m"[39m[38;5;124mtweet_weekday[39m[38;5;124m"[39m,[38;5;124m"[39m[38;5;124mcountry[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mlabel[39m[38;5;124m"[39m)

[0;31mNameError[0m: name 'MMTDEncoded' is not defined

In [0]:
MMTDCountry_Time = Kmeans(21, MMTDEncoded,Country_Time, "Run")

MMTDCountry_Time_Sample = MMTDCountry_Time.sample(.35)

data = MMTDCountry_Time_Sample.select("Hour","country", "label")
MMTDCountry_Time_PD = data.toPandas()
ggplot(dataPD, aes(x= "Genre", y = "Hour", color = "label"))+ geom_point()

##Evaluator score = .16

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-980664751662358>:1[0m
[0;32m----> 1[0m MMTDCountry_Time [38;5;241m=[39m Kmeans([38;5;241m21[39m, MMTDEncoded,Country_Time, [38;5;124m"[39m[38;5;124mRun[39m[38;5;124m"[39m)
[1;32m      3[0m MMTDCountry_Time_Sample [38;5;241m=[39m MMTDGenre_Time[38;5;241m.[39msample([38;5;241m.35[39m)[38;5;66;03m## data is samples to plot more effeciently and more easily see clusters[39;00m
[1;32m      5[0m data [38;5;241m=[39m MMTDCountry_Time_Sample[38;5;241m.[39mselect([38;5;124m"[39m[38;5;124mHour[39m[38;5;124m"[39m,[38;5;124m"[39m[38;5;124mcountry[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mlabel[39m[38;5;124m"[39m)

[0;31mNameError[0m: name 'MMTDEncoded' is not defined

In [0]:
MMTDGenre_Country = Kmeans(8, MMTDEncoded,Genre_Country, "Run")

MMTDGenre_Country_Sample = MMTDGenre_Country.sample(.35)

data = MMTDGenre_Country_Sample.select("genre","country", "label", "tweet_longitude", "tweet_latitude" )
MMTDGenre_Country_PD = data.toPandas()
ggplot(MMTDGenre_Country_PD, aes(x= "Genre", y = "Hour", color = "label"))+ geom_point()

##Evaluator score =

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-980664751662359>:1[0m
[0;32m----> 1[0m MMTDGenre_Country [38;5;241m=[39m Kmeans([38;5;241m8[39m, MMTDEncoded,Genre_Country, [38;5;124m"[39m[38;5;124mRun[39m[38;5;124m"[39m)
[1;32m      3[0m MMTDGenre_Country_Sample [38;5;241m=[39m MMTDGenre_Time[38;5;241m.[39msample([38;5;241m.35[39m)[38;5;66;03m## data is samples to plot more effeciently and more easily see clusters[39;00m
[1;32m      5[0m data [38;5;241m=[39m MMTDGenre_Country_Sample[38;5;241m.[39mselect([38;5;124m"[39m[38;5;124mgenre[39m[38;5;124m"[39m,[38;5;124m"[39m[38;5;124mcountry[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mlabel[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mtweet_longitude[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mtweet_latitude[39m[38;5;124m"[39m )

[0;31mN

This graph graphs the tweets genre against its lat and long colouring the subset based on its location which is reflected in the graph

In [0]:
MMTDEncoded_display = MMTDEncoded.sample(.35)

data = MMTDEncoded_display.select("tweet_longitude","tweet_latitude", "Genre")
MMTDEncoded_display_PD = data.toPandas()

ggplot(MMTDEncoded_display_PD, aes(x= "tweet_longitude", y = "tweet_latitude", color = "Genre"))+ geom_point()

[0;31m---------------------------------------------------------------------------[0m
[0;31mNameError[0m                                 Traceback (most recent call last)
File [0;32m<command-980664751662361>:1[0m
[0;32m----> 1[0m ggplot([43mdataPD[49m, aes(x[38;5;241m=[39m [38;5;124m"[39m[38;5;124mtweet_longitude[39m[38;5;124m"[39m, y [38;5;241m=[39m [38;5;124m"[39m[38;5;124mtweet_latitude[39m[38;5;124m"[39m, color [38;5;241m=[39m [38;5;124m"[39m[38;5;124mlabel[39m[38;5;124m"[39m))[38;5;241m+[39m geom_point()

[0;31mNameError[0m: name 'dataPD' is not defined