In [1]:
import findspark
findspark.init() 

In [2]:
import pyspark

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.ml.stat import Correlation
from pyspark.sql.types import IntegerType,BooleanType,DateType,FloatType, StringType
import matplotlib.pyplot as plt 
import seaborn as sns
import pyspark.sql.types as T


In [4]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

def fudf(val):
    return functools.reduce(lambda x, y:x+y, val)

In [37]:
df = spark.read.json('C:\\Users\\pasqu\\Desktop\\Università\\Distributed Data Analysis and Mining\\cleaned_dataset')

In [12]:
df.count()

583363

In [40]:
df.show()

+------------+------------------+--------------------+---------------------+------------+-----------+------+--------------------+--------------------+----------------+---+--------+--------+----+----------------+------------+-----------+--------------------+---------------------+-------+--------------+-------+
|acousticness|               age|avg_artist_followers|avg_artist_popularity|danceability|duration_ms|energy|              genres|            id_track|instrumentalness|key|liveness|loudness|mode|popularity_track|release_date|speechiness|sum_artist_followers|sum_artist_popularity|  tempo|time_signature|valence|
+------------+------------------+--------------------+---------------------+------------+-----------+------+--------------------+--------------------+----------------+---+--------+--------+----+----------------+------------+-----------+--------------------+---------------------+-------+--------------+-------+
|       0.658|41.821917808219176|              5403.5|             

# Data Transformation

Vector Assembler. A vector assembler is a transformer that converts a set of features into a single vector column often referred to as an array of features 

In [41]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["avg_artist_popularity","duration_ms", "danceability", "loudness", "speechiness","acousticness","instrumentalness","liveness","valence","tempo","time_signature","age","key", "mode","sum_artist_followers","sum_artist_popularity"],
    outputCol="features")

df = assembler.transform(df)

In [42]:
df.select('features').show(truncate=False)

+----------------------------------------------------------------------------------------------------------------------------+
|features                                                                                                                    |
+----------------------------------------------------------------------------------------------------------------------------+
|[40.0,156067.0,0.602,-6.667,0.404,0.658,0.0,0.0972,0.65,182.229,3.0,41.821917808219176,0.0,1.0,10807.0,80.0]                |
|[43.0,220133.0,0.77,-7.306,0.172,0.543,7.96E-4,0.0684,0.898,135.573,4.0,45.824657534246576,1.0,1.0,19833.0,43.0]            |
|[68.0,250960.0,0.212,-6.69,0.14,4.8E-5,0.918,0.324,0.231,140.917,4.0,25.673972602739727,0.0,0.0,874600.0,68.0]              |
|[42.0,457040.0,0.362,-17.744,0.0398,0.144,0.827,0.117,0.257,118.853,4.0,31.813698630136987,11.0,0.0,69129.0,42.0]           |
|[68.0,282891.0,0.343,-14.937,0.0384,0.957,2.49E-4,0.661,0.101,144.533,4.0,4.071232876712329,1.0,0.0,1709414.0,

Standardizzo i valori del vettore

In [43]:
from pyspark.mllib.util import MLUtils
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=False)

# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(df)

# Normalize each feature to have unit standard deviation.
df = scalerModel.transform(df)

In [44]:
df.select('scaledFeatures').show(truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|scaledFeatures                                                                                                                                                                                                                                                                                                     |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[1.9235483655073684,1.2361331222982879,3.63675244594466,-1.3167426258

variance treshold per eliminare gli attributi con basssa varianza (minore di 1)

In [45]:
from pyspark.ml.feature import VarianceThresholdSelector

selector = VarianceThresholdSelector(varianceThreshold=1,featuresCol='scaledFeatures', outputCol="selectedFeatures")

df = selector.fit(df).transform(df)

print("Output: Features with variance lower than %f are removed." %
      selector.getVarianceThreshold())

Output: Features with variance lower than 0.500000 are removed.


In [46]:
df.select('selectedFeatures').show(truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|selectedFeatures                                                                                                                                                                                                                                                                                                   |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[1.9235483655073684,1.2361331222982879,3.63675244594466,-1.3167426258

# K-Means

In [29]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.clustering import KMeansModel

confronto silhouette score per scegliere il miglior numero di cluster

In [47]:
silhouette_score=[]
evaluator = ClusteringEvaluator(predictionCol='prediction', featuresCol='selectedFeatures', \
                                metricName='silhouette', distanceMeasure='squaredEuclidean')
for i in range(2,10):
    
    KMeans_algo=KMeans(featuresCol='selectedFeatures', k=i)
    
    KMeans_fit=KMeans_algo.fit(df)
    
    output=KMeans_fit.transform(df)
    
    
    
    score=evaluator.evaluate(output)
    
    silhouette_score.append(score)
    
    print("k={}, Silhouette Score:{}".format(i,score))

k=2, Silhouette Score:0.25192945476312034
k=3, Silhouette Score:0.2079730993989029
k=4, Silhouette Score:0.28406603837262556
k=5, Silhouette Score:0.14728086008732028
k=6, Silhouette Score:0.14579192678039807
k=7, Silhouette Score:0.16358366360574786
k=8, Silhouette Score:0.16628532225528309
k=9, Silhouette Score:0.15893874244955664


In [48]:
# Trains a k-means model.
kmeans = KMeans(featuresCol='selectedFeatures', k=5)
model = kmeans.fit(df)

# Make predictions
predictions = model.transform(df)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Silhouette with squared euclidean distance = -0.4910221921380032
Cluster Centers: 
[ 2.74927082  1.8035089   4.1574416  -1.46628652  0.48042448  0.6325733
  0.13280622  0.95237669  2.79014617  4.0265065   8.43808923  0.86478639
  1.58376652  1.23026469  0.54128057  1.66550693]
[ 1.58811253  1.54419543  3.48683955 -2.26118222  0.38096045  2.04412068
  0.3607295   1.06654735  2.54041791  3.88590727  8.19882541  2.24303968
  1.42579558  1.52773717  0.04306252  0.85966683]
[ 2.6056649   1.2125697   3.99958892 -3.04031154  4.80244151  1.81062079
  0.01604281  2.13303643  2.17859978  3.42998511  7.52322712  2.02313384
  1.42234312  1.4101695   0.04161935  1.50522145]
[ 2.29760602  2.03328829  2.19689077 -3.59489047  0.28029442  2.49400196
  2.05582762  1.00297966  1.08857499  3.44400096  7.67959431  2.30067611
  1.42393404  1.42482246  0.18907834  1.91785187]
[ 2.47941163  2.09129636  2.81931351 -1.69879201  0.32954357  0.90024383
  0.28587247  1.39056414  1.46703221  4.30184468  8.28287524 

In [49]:
centroidi = predictions.groupby("centers")

In [19]:
generi=predictions.groupBy("prediction","genres").count().orderBy(desc("prediction"), desc("count"))

In [20]:
generi.where(col('prediction')==0).show(100, truncate=False)

+----------+----------------------------------------------------------------------------------------------------------------------------------------------+-----+
|prediction|genres                                                                                                                                        |count|
+----------+----------------------------------------------------------------------------------------------------------------------------------------------+-----+
|0         |[]                                                                                                                                            |16625|
|0         |[tango, vintage tango]                                                                                                                        |1405 |
|0         |[classic bollywood, desi pop, filmi, sufi]                                                                                                    |902  |
|0         |[classic tollywo

0 rock, rock classico\
1 audiolibri\
2 classica, orchestra, jazz\
3 pop di differenti paesi\
4 anni 40-50, canzoni d'autore\

# Bisecting K-means

In [18]:
from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml.evaluation import ClusteringEvaluator

In [22]:
silhouette_score=[]
evaluator = ClusteringEvaluator(predictionCol='prediction', featuresCol='selectedFeatures', \
                                metricName='silhouette', distanceMeasure='squaredEuclidean')
for i in range(2,10):
    
    bkm = BisectingKMeans(featuresCol='selectedFeatures').setK(i).setSeed(1)
    
    model = bkm.fit(df)
    
    output=model.transform(df)
    
    
    
    score=evaluator.evaluate(output)
    
    silhouette_score.append(score)
    
    print("k={}, Silhouette Score:{}".format(i,score))

k=2, Silhouette Score:0.4061798875030401
k=3, Silhouette Score:0.2768299970153146
k=4, Silhouette Score:0.32016102816976305
k=5, Silhouette Score:0.33751155345326717
k=6, Silhouette Score:0.3536279694111923
k=7, Silhouette Score:0.34484547632872037
k=8, Silhouette Score:0.3684353661065895
k=9, Silhouette Score:0.2743499920314114


In [23]:

# Trains a bisecting k-means model.
bkm = BisectingKMeans(featuresCol='selectedFeatures').setK(8).setSeed(1)
model = bkm.fit(df)

# Make predictions
predictions = model.transform(df)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

# Shows the result.
print("Cluster Centers: ")
centers = model.clusterCenters()
for center in centers:
    print(center)

Silhouette with squared euclidean distance = -0.16939377786943563
Cluster Centers: 
[-7.1987085  -3.35752204 -0.57133909 -4.27483497]
[-7.28636726  0.56753485  0.03842125 -4.42272346]
[-7.27314776 -4.01861142  0.850197   -3.95405191]
[-7.35879516 -2.40846734  2.25294462 -5.47202315]
[-5.58646787 -3.54031179 -0.10332735 -4.69713683]
[-3.79305081 -3.66337051  0.14915578 -5.46273691]
[-5.06620105 -2.93272916  0.64628581 -2.87890654]
[-4.10043509  0.03955328  1.01909236 -0.84268415]


In [24]:
generi=predictions.groupBy("prediction","genres").count().orderBy(desc("prediction"), desc("count"))

In [33]:
generi.where(col('prediction')==7).show(100, truncate=False)

+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|prediction|genres                                                                                                                                                     |count|
+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|7         |[]                                                                                                                                                         |1656 |
|7         |[hoerspiel]                                                                                                                                                |801  |
|7         |[barnsagor]                                                                                                      

0: [pop occidentale]\
1: [audiolibri]\
2: [pop orientale]\
3: [audiolibri]\
4: [vintage, jazz]\
5: [orchestra, classica, jazz]\
6: [classica orientale, sufi]\
7: [audiolibri]
