In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import Normalizer, StandardScaler
import random
import time

TOPIC = "canzoni"
SERVER = 'localhost:9092'

# Inizializzare SparkSession
spark = SparkSession \
        .builder \
        .appName("Emergify Recomendation System") \
        .master("local[*]") \
        .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

#Spark Streaming
df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", SERVER) \
        .option("subscribe", TOPIC) \
        .option("startingOffsets", "latest") \
        .load()

df1 = df.selectExpr("CAST(value AS STRING)", "timestamp") 

def_schema = "order_id INT, id STRING, name STRING, popularity INT, duration_ms DOUBLE, " \
             + "artists STRING, id_artists STRING, release_date STRING, " \
             + "danceability DOUBLE,energy DOUBLE, key INT, loudness DOUBLE, " \
             + "mode INT,speechiness DOUBLE," \
             + "acousticness DOUBLE, instrumentalness DOUBLE, liveness DOUBLE, " \
             + "valence DOUBLE, tempo DOUBLE, time_signature DOUBLE"
df2 = df1.select(from_csv(col("value"),def_schema).alias("song"), "timestamp")

In [2]:
#Creare View 
df3 = df2.select("song.*", "timestamp")  
df3.createOrReplaceTempView("df3_View");
df3.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- popularity: integer (nullable = true)
 |-- duration_ms: double (nullable = true)
 |-- artists: string (nullable = true)
 |-- id_artists: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- danceability: double (nullable = true)
 |-- energy: double (nullable = true)
 |-- key: integer (nullable = true)
 |-- loudness: double (nullable = true)
 |-- mode: integer (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- time_signature: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [3]:
song_find = spark.sql("SELECT * FROM df3_View")

#Write in memoria
songs_write = song_find \
        .writeStream \
        .trigger(processingTime='5 seconds') \
        .outputMode("append") \
        .option("truncate", "false") \
        .format("memory") \
        .queryName("Table7") \
        .start()

songs_write.awaitTermination(1)

False

In [8]:
spark_songs = spark.sql("SELECT * FROM Table7")
#spark_songs.show()
spark_songs.select('order_id','id','name','popularity','duration_ms','artists').show(3)

+--------+--------------------+---------+----------+-----------+--------------------+
|order_id|                  id|     name|popularity|duration_ms|             artists|
+--------+--------------------+---------+----------+-----------+--------------------+
|      38|5kRJFL34WexICZFj5...|Limitless|         1|   177005.0|AuroraAnthonyLoui...|
|      39|1h5nhUpNi7naqZIdb...|   No Cap|         1|   184634.0|              XoSuav|
|      40|31ezfRnEbz4QjTZsv...| Big Tyme|         0|   225903.0|  CZARnicholasSahara|
+--------+--------------------+---------+----------+-----------+--------------------+
only showing top 3 rows



In [9]:
spark_songs.count()

95

In [10]:
#SPOTIFY API

import spotipy
import spotipy.util
import seaborn as sns
import pandas as pd
import os
import numpy as np
import pandas as pd
import ujson


os.environ["SPOTIPY_CLIENT_ID"] = '06e71ff0e72e43a686c928e968376472'
os.environ["SPOTIPY_CLIENT_SECRET"] = '8b9f20e887de4b4baa4f3a35748d0c34'
os.environ["SPOTIPY_REDIRECT_URI"] = 'http://localhost:7777/callback'


scope = 'user-library-read'
username = 'iNSERIRE USERNAME'

token = spotipy.util.prompt_for_user_token(username, scope)
spotipy_obj = spotipy.Spotify(auth=token)
saved_tracks = spotipy_obj.current_user_saved_tracks(limit=50) 
n_tracks = saved_tracks['total']
print('Totale canzoni: %d ' % n_tracks)

Totale canzoni: 923 


In [11]:
def select_features(track_response):
    return {        
        'id': str(track_response['track']['id']),
        'name': str(track_response['track']['name']),
        'artists': [artist['name'] for artist in track_response['track']['artists']],
        'popularity': track_response['track']['popularity']
    }

tracks = [select_features(track) for track in saved_tracks['items']]

while saved_tracks['next']:
    saved_tracks = spotipy_obj.next(saved_tracks)
    tracks.extend([select_features(track) for track in saved_tracks['items']])

#Creare dataframe in pandas
tracks_df = pd.DataFrame(tracks)
pd.set_option('display.max_rows', len(tracks))
tracks_df['artists'] = tracks_df['artists'].apply(lambda artists: artists[0])

In [12]:
#Aggiungere audio features
audio_features = {}

for idd in tracks_df['id'].tolist():
    audio_features[idd] = spotipy_obj.audio_features(idd)[0]
    
tracks_df['acousticness'] = tracks_df['id'].apply(lambda idd: audio_features[idd]['acousticness'])
tracks_df['speechiness'] = tracks_df['id'].apply(lambda idd: audio_features[idd]['speechiness'])
tracks_df['key'] = tracks_df['id'].apply(lambda idd: str(audio_features[idd]['key']))
tracks_df['liveness'] = tracks_df['id'].apply(lambda idd: audio_features[idd]['liveness'])
tracks_df['instrumentalness'] = tracks_df['id'].apply(lambda idd: audio_features[idd]['instrumentalness'])
tracks_df['energy'] = tracks_df['id'].apply(lambda idd: audio_features[idd]['energy'])
tracks_df['tempo'] = tracks_df['id'].apply(lambda idd: audio_features[idd]['tempo'])
tracks_df['loudness'] = tracks_df['id'].apply(lambda idd: audio_features[idd]['loudness'])
tracks_df['danceability'] = tracks_df['id'].apply(lambda idd: audio_features[idd]['danceability'])
tracks_df['valence'] = tracks_df['id'].apply(lambda idd: audio_features[idd]['valence'])

tracks_df


Unnamed: 0,id,name,artists,popularity,acousticness,speechiness,key,liveness,instrumentalness,energy,tempo,loudness,danceability,valence
0,3nOz1U41SZZ0N3fuUWr9nb,I AM WOMAN,Emmy Meli,80,0.122,0.157,10,0.125,0.0,0.471,170.084,-9.247,0.646,0.455
1,5Qnrgqy1pAm9GyNQOgyVFz,Fourth of July,Sufjan Stevens,71,0.973,0.0445,6,0.119,0.33,0.104,120.484,-22.39,0.435,0.162
2,2y1UG6Ps1h6rgKYW9kk6bC,Treehouse,toastool,72,0.632,0.0371,8,0.11,0.127,0.277,132.302,-17.888,0.703,0.139
3,7GLSAyJY25FyB7w0blIdDf,IL MIO CUORE È OCCUPATO,Pop X,40,0.629,0.12,2,0.21,0.0815,0.454,136.02,-10.962,0.882,0.554
4,36V4aP2bpDUuvBYcpku9LI,Pure Imagination,Unclenathannn,64,0.765,0.0921,7,0.0845,7.6e-05,0.0312,72.727,-15.873,0.824,0.314
5,5gVMHBodcPKPvFTxnFV5BT,Aging out of the 20th Century,Trash Panda,49,0.951,0.0387,10,0.117,0.0021,0.132,114.974,-15.643,0.55,0.241
6,1175r6ZmkUjX8dRJLz9Ohk,I Love You So,The Walters,77,0.65,0.0377,10,0.13,0.0,0.669,75.995,-4.898,0.578,0.503
7,1Dp7ASSZVVOPJ85VdLpjOX,Seventeen Going Under,Sam Fender,77,0.00471,0.0355,1,0.0813,0.00589,0.873,161.948,-4.946,0.477,0.611
8,6lAMnXLVCBPMXXWAK4lGSL,Ylang Ylang,FKJ,67,0.925,0.0326,1,0.108,0.935,0.243,69.941,-12.453,0.559,0.162
9,1AC9rHDPQC3QoC3iYPeSEI,Quanto ti vorrei,chiello,71,0.00721,0.0336,2,0.0842,2e-06,0.711,185.996,-5.769,0.476,0.756


In [13]:
#Estrarre una canzone casuale
rand_song = random. randint(0,len(tracks_df)-1)
rand_song_df = tracks_df.head(rand_song)[-1:]
rand_song_df

Unnamed: 0,id,name,artists,popularity,acousticness,speechiness,key,liveness,instrumentalness,energy,tempo,loudness,danceability,valence
321,2VxeLyX666F8uXCJ0dZF8B,Shallow,Lady Gaga,85,0.371,0.0308,7,0.231,0.0,0.385,95.799,-6.362,0.572,0.323


In [14]:
spark_songs
spark_songs = spark_songs.drop('order_id', 'mode', 'release_date', 'id_artists','time_signature', 'duration_ms','timestamp')

df_sp = spark.createDataFrame(rand_song_df)
df = spark_songs.union(df_sp)


In [15]:
#Feature Engineering

from pyspark.ml.feature import VectorAssembler
vettore=VectorAssembler(inputCols=[
 'danceability',
 'energy',
 'loudness',
 'speechiness',
 'acousticness',
 'instrumentalness',
 'liveness',
 'valence',
 'tempo'], outputCol='song_features')
assembled=vettore.setHandleInvalid("skip").transform(df)

from pyspark.ml.feature import StandardScaler
std=StandardScaler(inputCol='song_features',outputCol='standardized')
scale=std.fit(assembled)
df=scale.transform(assembled)

In [16]:
#Clustering 

from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
silhouette=[]
cls_ev = ClusteringEvaluator(predictionCol='prediction', featuresCol='standardized', \
                                metricName='silhouette', distanceMeasure='squaredEuclidean')


KMeans_algo=KMeans(featuresCol='standardized', k=3)
    
KMeans_fit=KMeans_algo.fit(df)
    
output_df =KMeans_fit.transform(df)

In [17]:
#Recommendation System

import numpy as np, pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
import warnings
warnings.filterwarnings("ignore")

class SpotifyRecommenderSystem():
    def __init__(self, data):
        self.data_ = data
    
 
    def Recomm(self, nome_canzone, amount=1):
        distances = []
        song = self.data_[(self.data_.name.str.lower() == nome_canzone.lower())].head(1).values[0]
        res_dt = self.data_[self.data_.name.str.lower() != nome_canzone.lower()]
        for i_song in tqdm(res_dt.values):
            distanza = 0
            for col in np.arange(len(res_dt.columns)):
                if not col in [0,1,2,14]:
                    distanza = distanza + np.absolute(float(song[col]) - float(i_song[col]))
            distances.append(distanza)
        res_dt['distance'] = distances
        res_dt = res_dt.sort_values('distance')
        columns = ['id','name', 'artists', 'acousticness', 'liveness', 'instrumentalness', 'energy', 'danceability', 'valence']
        return res_dt[columns][:amount]
    


In [20]:
datalabel = output_df.select('id',
 'name',
 'artists',
 'danceability',
 'energy',
 'key',
 'loudness',
 'speechiness',
 'acousticness',
 'instrumentalness',
 'liveness',
 'valence',
 'tempo',
 'prediction')



final_data = datalabel.toPandas()
final_data.drop(final_data[final_data['artists'] == '0'].index, inplace = True)
final_data.drop_duplicates(inplace=True)
final_data.drop(final_data[final_data['danceability'] == 0.0000].index, inplace = True)
final_data.drop(final_data[final_data['liveness'] == 0.000].index, inplace = True)
final_data.drop(final_data[final_data['instrumentalness'] == 0.000000].index, inplace = True)
final_data.drop(final_data[final_data['energy'] == 0.0000].index, inplace = True)
final_data.drop(final_data[final_data['danceability'] == 0.000].index, inplace = True)
final_data.drop(final_data[final_data['valence'] == 0.000].index, inplace = True)

z = final_data
value_pred = final_data.iloc[-1:]['prediction']

recommendation = SpotifyRecommenderSystem(final_data)
x = rand_song_df['name'].tolist()[0]

emergify = recommendation.Recomm(x, 10)

y = rand_song_df[['id','name', 'artists',  'acousticness', 'liveness', 'instrumentalness', 'energy', 
       'danceability', 'valence']]

emergify = pd.concat([emergify, y])
emergify.to_csv('emergify.csv')

100%|█████████████████████████████████████████████████████████████████████████████| 945/945 [00:00<00:00, 12769.84it/s]


In [21]:
df_emergify = (spark.read.format("csv").options(header="true")
    .load("emergify.csv"))
df_emergify.show()

+----+--------------------+--------------------+-------------------+------------+--------+----------------+------+------------+-------+
| _c0|                  id|                name|            artists|acousticness|liveness|instrumentalness|energy|danceability|valence|
+----+--------------------+--------------------+-------------------+------------+--------+----------------+------+------------+-------+
| 964|7khpPruHJK39VTBUQ...|            Stranger|         MildOrange|       0.412|   0.109|           0.113| 0.491|       0.334|  0.452|
| 927|66JegFJ408S5Q3psO...|         Schillerize|TonioGeugelinMoglii|       0.638|   0.329|        3.79e-06| 0.468|       0.714|  0.265|
|1015|1fbkVi5aq24ZWFUfb...|Needs (feat. Mac ...|  ZachBerroMacAyres|       0.514|  0.0672|         0.00237| 0.269|       0.638|   0.15|
|1001|0Via7nKS0iUZug2YX...|      Still No Plans|    ThebandcalledOh|       0.272|   0.663|         2.6e-05| 0.638|       0.617|  0.792|
| 871|7mRKpGZRtQuqgyuEl...|       Let's Pretend|