In [1]:
import os 
import random
import numpy as np 
import pandas as pd
import pyspark 
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import Normalizer, StandardScaler, VectorAssembler, StringIndexer
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from tqdm import tqdm
import ujson
import spotipy
import spotipy.util

In [2]:
#Kafka server address
SERVER = 'localhost:9092'

In [3]:
#Topic name
TOPIC = 'playlist'

In [4]:
#Connectors between Spark and Kafka. 
#You need the connector to do the connection between the docker containing the kafka and the spark on the local machine.
#You can download the connector here: https://mvnrepository.com/
#Make sure that you are downloading the .jar that is compatible with the versions that are intalled in your machine.
spark_jars = ("{},{},{},{},{}".format(os.getcwd()+ "/jars/commons-pool2-2.12.0.jar",
                                      os.getcwd()+ "/jars/kafka-clients-3.7.0.jar",
                                      os.getcwd()+ "/jars/spark-sql-kafka-0-10_2.12-3.5.1.jar",
                                      os.getcwd()+ "/jars/spark-streaming-kafka-0-10_2.12-3.5.1.jar",
                                      os.getcwd()+ "/jars/spark-token-provider-kafka-0-10_2.12-3.5.1.jar"))

In [5]:
spark_jars

'c:\\Data_Science\\Study-Projects\\Python-Projects\\Recommender-system/jars/commons-pool2-2.12.0.jar,c:\\Data_Science\\Study-Projects\\Python-Projects\\Recommender-system/jars/kafka-clients-3.7.0.jar,c:\\Data_Science\\Study-Projects\\Python-Projects\\Recommender-system/jars/spark-sql-kafka-0-10_2.12-3.5.1.jar,c:\\Data_Science\\Study-Projects\\Python-Projects\\Recommender-system/jars/spark-streaming-kafka-0-10_2.12-3.5.1.jar,c:\\Data_Science\\Study-Projects\\Python-Projects\\Recommender-system/jars/spark-token-provider-kafka-0-10_2.12-3.5.1.jar'

## At this moment, we're gonna start the Spark Session. If you have the all the environment variables correctly set, you wont experience a erro in this step. If you have an issue with that, before you start the Spark session, run the following code: 
### os.environ['PYSPARK_PYTHON'] = sys.executable
### os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

## This must fix the error. 

In [6]:
spark = SparkSession.builder.config("spark.jars", spark_jars).appName("Recommender-System").getOrCreate()

In [7]:
spark.sparkContext.setLogLevel('ERROR')

In [8]:
#We use the Spark Streaming to read the data streaming of Kafka and save the dataframe 
df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", SERVER) \
        .option("subscribe", TOPIC) \
        .option("startingOffsets", "latest") \
        .load()

In [9]:
#Select the timestamp column and save as string in a new dataframe 
df1 = df.selectExpr("CAST(value as STRING)","timestamp")

In [10]:
#We define a schema with the name and type of each column 

def_schema = "order_id INT, id STRING, name STRING, genre STRING, popularity INT, duration_ms DOUBLE, " \
             + "artists STRING, id_artists STRING, release_date STRING, " \
             + "danceability DOUBLE,energy DOUBLE, key INT, loudness DOUBLE, " \
             + "mode INT,speechiness DOUBLE," \
             + "acousticness DOUBLE, instrumentalness DOUBLE, liveness DOUBLE, " \
             + "valence DOUBLE, tempo DOUBLE, time_signature DOUBLE"

In [11]:
def_schema

'order_id INT, id STRING, name STRING, genre STRING, popularity INT, duration_ms DOUBLE, artists STRING, id_artists STRING, release_date STRING, danceability DOUBLE,energy DOUBLE, key INT, loudness DOUBLE, mode INT,speechiness DOUBLE,acousticness DOUBLE, instrumentalness DOUBLE, liveness DOUBLE, valence DOUBLE, tempo DOUBLE, time_signature DOUBLE'

In [12]:
#Selecting the data streaming accordingly with the schema and save in a new dataframe

df2 = df1.select(from_csv(col("value"), def_schema).alias("song"), "timestamp")

In [13]:
#Creating a view in the memory of Spark and we visualize the schema 
df3 = df2.select("song.*","timestamp")
df3.createOrReplaceTempView("df3_View");
df3.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- popularity: integer (nullable = true)
 |-- duration_ms: double (nullable = true)
 |-- artists: string (nullable = true)
 |-- id_artists: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- danceability: double (nullable = true)
 |-- energy: double (nullable = true)
 |-- key: integer (nullable = true)
 |-- loudness: double (nullable = true)
 |-- mode: integer (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- time_signature: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [14]:
#Select the data with the songs of stream 
music_stream = spark.sql("SELECT * FROM df3_View")

In [15]:
#We cannot see yet, because we have to generate the stream of the Spark Streaming 
#Let's create the stream data on the Spark Streaming 

musics_stream_spark = music_stream \
        .writeStream \
        .trigger(processingTime= '5 seconds') \
        .outputMode("append") \
        .option("truncate","false") \
        .format("memory") \
        .queryName("tabela_spark") \
        .start()

musics_stream_spark.awaitTermination(1)

#The data producer created earlier in the "Real-Time-Data-Producer" must be running and connected with kafka

False

In [16]:
# Select the songs of Spark Stream table 
spark_songs = spark.sql("SELECT * FROM tabela_spark")

In [17]:
spark_songs.show(5)

+--------+--------------------+--------------------+--------------------+----------+-----------+-------------+--------------------+--------------------+------------+------+----+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+--------------------+
|order_id|                  id|                name|               genre|popularity|duration_ms|      artists|          id_artists|        release_date|danceability|energy| key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|time_signature|           timestamp|
+--------+--------------------+--------------------+--------------------+----------+-----------+-------------+--------------------+--------------------+------------+------+----+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+--------------------+
|      76|3FmS1F9CSpVoHIMss...|       Red Dragonfly|experimental hip hop|         0|   282624.0|  Black Taffy|0Y9vf

In [18]:
#We can see just a few columns, for example
spark_songs.select('order_id','id','name','genre','popularity','duration_ms','artists').show(5)

+--------+--------------------+--------------------+--------------------+----------+-----------+-------------+
|order_id|                  id|                name|               genre|popularity|duration_ms|      artists|
+--------+--------------------+--------------------+--------------------+----------+-----------+-------------+
|      76|3FmS1F9CSpVoHIMss...|       Red Dragonfly|experimental hip hop|         0|   282624.0|  Black Taffy|
|      77|47xKNeYXHnsFcISHz...|Searching For Memory|  african electronic|      NULL|       15.0|       281714|
|      78|5Ov9SR4f9SzJWkNQy...|Far From the Eyes...|                 nan|         0|   329639.0|   C.L.A.W.S.|
|      79|07ZIVNs0d8TQqoVfW...|       January's End|                 nan|         0|   408248.0|Nigil Caenaan|
|      80|3EXE6xuaGxSwgW9p2...|          Around You|                 nan|        30|   333559.0|Zane Schaffer|
+--------+--------------------+--------------------+--------------------+----------+-----------+-------------+
o

In [19]:
spark_songs.count()

56

## Wait a while till the data table have a few songs 

# Now that the Data Stream is running and we can collect the intel, we proceed to extract the data info of user. In this project, i will be using a playlist based on my liked musics on the Spotify 

In [20]:
#Put your Spotify API key
os.environ["SPOTIPY_CLIENT_ID"] = #YourClientID
os.environ["SPOTIPY_CLIENT_SECRET"] = #YourClientSecret
os.environ["SPOTIPY_REDIRECT_URI"] = 'http://localhost:7777/callback'

In [21]:
#Extraction scope of users preferences 
scope = 'user-library-read'

In [22]:
#Spotify e-mail used 
username = #YourEmail

In [23]:
token = spotipy.util.prompt_for_user_token(username, scope)

In [24]:
spotify = spotipy.Spotify(auth= token)

In [25]:
#Extract till 50 songs of the user's favorites list  
saved_tracks = spotify.current_user_saved_tracks(limit = 50)

In [26]:
#function to extract attributes of user's song list
def select_features(track_response):
    return{
        'id':str(track_response['track']['id']),
        'id_artists':str(track_response['track']['artists'][0]['id']),
        'name':str(track_response['track']['name']),
        'artists':[artist['name'] for artist in track_response['track']['artists']],
        'popularity':track_response['track']['popularity']
    }

In [27]:
tracks = [select_features(track) for track in saved_tracks['items']]

In [28]:
tracks

[{'id': '56sk7jBpZV0CD31G9hEU3b',
  'id_artists': '2xiIXseIJcq3nG7C8fHeBj',
  'name': 'Animal I Have Become',
  'artists': ['Three Days Grace'],
  'popularity': 77},
 {'id': '7piw04hPQZ1OHZ9Fq9JOXR',
  'id_artists': '2xiIXseIJcq3nG7C8fHeBj',
  'name': 'Time of Dying',
  'artists': ['Three Days Grace'],
  'popularity': 68},
 {'id': '3HE50TVRquwXe9yv2HFoNL',
  'id_artists': '2xiIXseIJcq3nG7C8fHeBj',
  'name': 'Never Too Late',
  'artists': ['Three Days Grace'],
  'popularity': 71},
 {'id': '0M955bMOoilikPXwKLYpoi',
  'id_artists': '2xiIXseIJcq3nG7C8fHeBj',
  'name': 'I Hate Everything About You',
  'artists': ['Three Days Grace'],
  'popularity': 78},
 {'id': '1nltpRhEiXikwDlVn4UADk',
  'id_artists': '246dkjvS1zLTtiykXe5h60',
  'name': 'One Right Now (with The Weeknd)',
  'artists': ['Post Malone', 'The Weeknd'],
  'popularity': 71},
 {'id': '5Ja2u1FPlVxrxeMWEa2dcn',
  'id_artists': '246dkjvS1zLTtiykXe5h60',
  'name': 'When I’m Alone',
  'artists': ['Post Malone'],
  'popularity': 57},
 

In [29]:
#Extract the user's favorite song attributes
while saved_tracks['next']:
    saved_tracks = spotify.next(saved_tracks)
    tracks.extend([select_features(track) for track in saved_tracks['items']])

In [30]:
#Create pandas dataframe
df_tracks = pd.DataFrame(tracks)
pd.set_option('display.max_rows', len(tracks))
df_tracks['artists'] = df_tracks['artists'].apply(lambda artists: artists[0])


In [31]:
df_tracks.head(10)

Unnamed: 0,id,id_artists,name,artists,popularity
0,56sk7jBpZV0CD31G9hEU3b,2xiIXseIJcq3nG7C8fHeBj,Animal I Have Become,Three Days Grace,77
1,7piw04hPQZ1OHZ9Fq9JOXR,2xiIXseIJcq3nG7C8fHeBj,Time of Dying,Three Days Grace,68
2,3HE50TVRquwXe9yv2HFoNL,2xiIXseIJcq3nG7C8fHeBj,Never Too Late,Three Days Grace,71
3,0M955bMOoilikPXwKLYpoi,2xiIXseIJcq3nG7C8fHeBj,I Hate Everything About You,Three Days Grace,78
4,1nltpRhEiXikwDlVn4UADk,246dkjvS1zLTtiykXe5h60,One Right Now (with The Weeknd),Post Malone,71
5,5Ja2u1FPlVxrxeMWEa2dcn,246dkjvS1zLTtiykXe5h60,When I’m Alone,Post Malone,57
6,1D2L6MefbXon28PzIk9I3r,246dkjvS1zLTtiykXe5h60,92 Explorer,Post Malone,69
7,5VuxWXbt7XENQCtE9TzpTv,246dkjvS1zLTtiykXe5h60,Spoil My Night (feat. Swae Lee),Post Malone,66
8,7dt6x5M1jzdTEt8oCbisTK,246dkjvS1zLTtiykXe5h60,Better Now,Post Malone,82
9,0e7ipj03S05BNilyu5bRzt,246dkjvS1zLTtiykXe5h60,rockstar (feat. 21 Savage),Post Malone,84


In [32]:
#Creating dict for the audio attributes 
audio_features = {}

In [33]:
#Extract the audio attributes 
for idd in df_tracks['id'].tolist():
    audio_features[idd] = spotify.audio_features(idd)[0]

In [34]:
audio_features

{'56sk7jBpZV0CD31G9hEU3b': {'danceability': 0.553,
  'energy': 0.853,
  'key': 8,
  'loudness': -3.668,
  'mode': 1,
  'speechiness': 0.0339,
  'acousticness': 0.000684,
  'instrumentalness': 0,
  'liveness': 0.126,
  'valence': 0.506,
  'tempo': 122.023,
  'type': 'audio_features',
  'id': '56sk7jBpZV0CD31G9hEU3b',
  'uri': 'spotify:track:56sk7jBpZV0CD31G9hEU3b',
  'track_href': 'https://api.spotify.com/v1/tracks/56sk7jBpZV0CD31G9hEU3b',
  'analysis_url': 'https://api.spotify.com/v1/audio-analysis/56sk7jBpZV0CD31G9hEU3b',
  'duration_ms': 231400,
  'time_signature': 4},
 '7piw04hPQZ1OHZ9Fq9JOXR': {'danceability': 0.306,
  'energy': 0.873,
  'key': 0,
  'loudness': -3.668,
  'mode': 0,
  'speechiness': 0.0567,
  'acousticness': 0.00105,
  'instrumentalness': 0,
  'liveness': 0.52,
  'valence': 0.452,
  'tempo': 198.387,
  'type': 'audio_features',
  'id': '7piw04hPQZ1OHZ9Fq9JOXR',
  'uri': 'spotify:track:7piw04hPQZ1OHZ9Fq9JOXR',
  'track_href': 'https://api.spotify.com/v1/tracks/7piw04

In [35]:
#Creating dict to the audio genres
audio_genres = {}

In [36]:
#Extract the audio genres
for i in df_tracks['id_artists'].tolist():
    audio_genres[i] = spotify.artist(i)['genres']

In [37]:
audio_genres

{'2xiIXseIJcq3nG7C8fHeBj': ['alternative metal',
  'canadian metal',
  'canadian rock',
  'nu metal',
  'post-grunge',
  'rock'],
 '246dkjvS1zLTtiykXe5h60': ['dfw rap', 'melodic rap', 'pop', 'rap'],
 '1Xyo4u8uXC1ZmMpatF05PJ': ['canadian contemporary r&b',
  'canadian pop',
  'pop'],
 '1URnnhqYAYcrqrcwql10ft': ['atl hip hop', 'hip hop', 'rap'],
 '0Y5tJX1MQlPlqiwlOH1tJY': ['hip hop', 'rap', 'slap house'],
 '7tYKF4w9nC0nq9CsPZTHyP': ['pop', 'r&b', 'rap'],
 '4RddZ3iHvSpGV4dvATac9X': ['alternative metal',
  'nu metal',
  'rap metal',
  'rap rock',
  'rock'],
 '4DWX7u8BV0vZIQSpJQQDWU': ['alternative metal', 'nu metal', 'post-grunge'],
 '0iEtIxbK0KxaSlF7G42ZOp': ['hip hop', 'rap'],
 '4DdkRBBYG6Yk9Ka8tdJ9BW': ['atl hip hop',
  'hip hop',
  'pop rap',
  'rap',
  'trap'],
 '3TVXtAsR1Inumwj472S9r4': ['canadian hip hop',
  'canadian pop',
  'hip hop',
  'pop rap',
  'rap'],
 '7dGJo4pcD2V6oG8kP0tJRR': ['detroit hip hop', 'hip hop', 'rap'],
 '6XyY86QOPPrYVGvF9ch6wz': ['alternative metal',
  'nu meta

In [38]:
#Append the audio attributes to the dataframe
df_tracks['genre'] = df_tracks['id_artists'].apply(lambda art_id: audio_genres[art_id])
df_tracks['acousticness'] = df_tracks['id'].apply(lambda idd: audio_features[idd]['acousticness'])
df_tracks['speechiness'] = df_tracks['id'].apply(lambda idd: audio_features[idd]['speechiness'])
df_tracks['key'] = df_tracks['id'].apply(lambda idd: str(audio_features[idd]['key']))
df_tracks['liveness'] = df_tracks['id'].apply(lambda idd: audio_features[idd]['liveness'])
df_tracks['instrumentalness'] = df_tracks['id'].apply(lambda idd: audio_features[idd]['instrumentalness'])
df_tracks['energy'] = df_tracks['id'].apply(lambda idd: audio_features[idd]['energy'])
df_tracks['tempo'] = df_tracks['id'].apply(lambda idd: audio_features[idd]['tempo'])
df_tracks['loudness'] = df_tracks['id'].apply(lambda idd: audio_features[idd]['loudness'])
df_tracks['danceability'] = df_tracks['id'].apply(lambda idd: audio_features[idd]['danceability'])
df_tracks['valence'] = df_tracks['id'].apply(lambda idd: audio_features[idd]['valence'])

In [39]:
df_tracks

Unnamed: 0,id,id_artists,name,artists,popularity,genre,acousticness,speechiness,key,liveness,instrumentalness,energy,tempo,loudness,danceability,valence
0,56sk7jBpZV0CD31G9hEU3b,2xiIXseIJcq3nG7C8fHeBj,Animal I Have Become,Three Days Grace,77,"[alternative metal, canadian metal, canadian r...",0.000684,0.0339,8,0.126,0.0,0.853,122.023,-3.668,0.553,0.506
1,7piw04hPQZ1OHZ9Fq9JOXR,2xiIXseIJcq3nG7C8fHeBj,Time of Dying,Three Days Grace,68,"[alternative metal, canadian metal, canadian r...",0.00105,0.0567,0,0.52,0.0,0.873,198.387,-3.668,0.306,0.452
2,3HE50TVRquwXe9yv2HFoNL,2xiIXseIJcq3nG7C8fHeBj,Never Too Late,Three Days Grace,71,"[alternative metal, canadian metal, canadian r...",0.00838,0.0371,2,0.163,0.0,0.778,149.864,-4.68,0.433,0.217
3,0M955bMOoilikPXwKLYpoi,2xiIXseIJcq3nG7C8fHeBj,I Hate Everything About You,Three Days Grace,78,"[alternative metal, canadian metal, canadian r...",0.00461,0.0421,6,0.139,0.0,0.83,89.342,-5.157,0.498,0.453
4,1nltpRhEiXikwDlVn4UADk,246dkjvS1zLTtiykXe5h60,One Right Now (with The Weeknd),Post Malone,71,"[dfw rap, melodic rap, pop, rap]",0.0363,0.0477,1,0.103,0.0,0.764,97.023,-5.411,0.682,0.701
5,5Ja2u1FPlVxrxeMWEa2dcn,246dkjvS1zLTtiykXe5h60,When I’m Alone,Post Malone,57,"[dfw rap, melodic rap, pop, rap]",0.0346,0.0528,10,0.155,0.0,0.656,140.107,-6.121,0.485,0.106
6,1D2L6MefbXon28PzIk9I3r,246dkjvS1zLTtiykXe5h60,92 Explorer,Post Malone,69,"[dfw rap, melodic rap, pop, rap]",0.265,0.0476,7,0.114,1e-06,0.733,146.046,-4.811,0.798,0.861
7,5VuxWXbt7XENQCtE9TzpTv,246dkjvS1zLTtiykXe5h60,Spoil My Night (feat. Swae Lee),Post Malone,66,"[dfw rap, melodic rap, pop, rap]",0.341,0.034,1,0.106,2.4e-05,0.717,160.981,-2.714,0.672,0.225
8,7dt6x5M1jzdTEt8oCbisTK,246dkjvS1zLTtiykXe5h60,Better Now,Post Malone,82,"[dfw rap, melodic rap, pop, rap]",0.331,0.04,10,0.135,0.0,0.578,145.038,-5.804,0.68,0.341
9,0e7ipj03S05BNilyu5bRzt,246dkjvS1zLTtiykXe5h60,rockstar (feat. 21 Savage),Post Malone,84,"[dfw rap, melodic rap, pop, rap]",0.124,0.0712,5,0.131,7e-05,0.52,159.801,-6.136,0.585,0.129


In [40]:
df_tracks['genre'] = [i[0] if i else None for i in df_tracks['genre']]

In [41]:
df_tracks

Unnamed: 0,id,id_artists,name,artists,popularity,genre,acousticness,speechiness,key,liveness,instrumentalness,energy,tempo,loudness,danceability,valence
0,56sk7jBpZV0CD31G9hEU3b,2xiIXseIJcq3nG7C8fHeBj,Animal I Have Become,Three Days Grace,77,alternative metal,0.000684,0.0339,8,0.126,0.0,0.853,122.023,-3.668,0.553,0.506
1,7piw04hPQZ1OHZ9Fq9JOXR,2xiIXseIJcq3nG7C8fHeBj,Time of Dying,Three Days Grace,68,alternative metal,0.00105,0.0567,0,0.52,0.0,0.873,198.387,-3.668,0.306,0.452
2,3HE50TVRquwXe9yv2HFoNL,2xiIXseIJcq3nG7C8fHeBj,Never Too Late,Three Days Grace,71,alternative metal,0.00838,0.0371,2,0.163,0.0,0.778,149.864,-4.68,0.433,0.217
3,0M955bMOoilikPXwKLYpoi,2xiIXseIJcq3nG7C8fHeBj,I Hate Everything About You,Three Days Grace,78,alternative metal,0.00461,0.0421,6,0.139,0.0,0.83,89.342,-5.157,0.498,0.453
4,1nltpRhEiXikwDlVn4UADk,246dkjvS1zLTtiykXe5h60,One Right Now (with The Weeknd),Post Malone,71,dfw rap,0.0363,0.0477,1,0.103,0.0,0.764,97.023,-5.411,0.682,0.701
5,5Ja2u1FPlVxrxeMWEa2dcn,246dkjvS1zLTtiykXe5h60,When I’m Alone,Post Malone,57,dfw rap,0.0346,0.0528,10,0.155,0.0,0.656,140.107,-6.121,0.485,0.106
6,1D2L6MefbXon28PzIk9I3r,246dkjvS1zLTtiykXe5h60,92 Explorer,Post Malone,69,dfw rap,0.265,0.0476,7,0.114,1e-06,0.733,146.046,-4.811,0.798,0.861
7,5VuxWXbt7XENQCtE9TzpTv,246dkjvS1zLTtiykXe5h60,Spoil My Night (feat. Swae Lee),Post Malone,66,dfw rap,0.341,0.034,1,0.106,2.4e-05,0.717,160.981,-2.714,0.672,0.225
8,7dt6x5M1jzdTEt8oCbisTK,246dkjvS1zLTtiykXe5h60,Better Now,Post Malone,82,dfw rap,0.331,0.04,10,0.135,0.0,0.578,145.038,-5.804,0.68,0.341
9,0e7ipj03S05BNilyu5bRzt,246dkjvS1zLTtiykXe5h60,rockstar (feat. 21 Savage),Post Malone,84,dfw rap,0.124,0.0712,5,0.131,7e-05,0.52,159.801,-6.136,0.585,0.129


In [42]:
#Let's pick up a song randomly 
random_song = random.randint(0,len(df_tracks)-1)


In [43]:
df_random_song = df_tracks[random_song:random_song+1]

In [44]:
random_song

33

In [45]:
df_random_song

Unnamed: 0,id,id_artists,name,artists,popularity,genre,acousticness,speechiness,key,liveness,instrumentalness,energy,tempo,loudness,danceability,valence
33,2nLtzopw4rPReszdYBJU6h,6XyY86QOPPrYVGvF9ch6wz,Numb,Linkin Park,88,alternative metal,0.0046,0.0381,9,0.639,0.0,0.863,110.018,-4.153,0.496,0.243


In [46]:
spark_songs.show(5)

+--------+--------------------+--------------------+--------------------+----------+-----------+-------------+--------------------+--------------------+------------+------+----+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+--------------------+
|order_id|                  id|                name|               genre|popularity|duration_ms|      artists|          id_artists|        release_date|danceability|energy| key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|time_signature|           timestamp|
+--------+--------------------+--------------------+--------------------+----------+-----------+-------------+--------------------+--------------------+------------+------+----+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+--------------------+
|      76|3FmS1F9CSpVoHIMss...|       Red Dragonfly|experimental hip hop|         0|   282624.0|  Black Taffy|0Y9vf

In [47]:
spark_songs.columns

['order_id',
 'id',
 'name',
 'genre',
 'popularity',
 'duration_ms',
 'artists',
 'id_artists',
 'release_date',
 'danceability',
 'energy',
 'key',
 'loudness',
 'mode',
 'speechiness',
 'acousticness',
 'instrumentalness',
 'liveness',
 'valence',
 'tempo',
 'time_signature',
 'timestamp']

In [48]:
# We dont need those columns anymore
spark_songs = spark_songs.drop('order_id', 
                               'mode', 
                               'release_date', 
                               'time_signature', 
                               'duration_ms',
                               'timestamp',
                               'id_artists')

In [49]:
#create a dataframe with the randomly chosen song 
df_sp = spark.createDataFrame(df_random_song)

In [50]:
df_sp = df_sp.drop('id_artists')

In [51]:
df_sp.show()

+--------------------+----+-----------+----------+-----------------+------------+-----------+---+--------+----------------+------+-------+--------+------------+-------+
|                  id|name|    artists|popularity|            genre|acousticness|speechiness|key|liveness|instrumentalness|energy|  tempo|loudness|danceability|valence|
+--------------------+----+-----------+----------+-----------------+------------+-----------+---+--------+----------------+------+-------+--------+------------+-------+
|2nLtzopw4rPReszdY...|Numb|Linkin Park|        88|alternative metal|      0.0046|     0.0381|  9|   0.639|             0.0| 0.863|110.018|  -4.153|       0.496|  0.243|
+--------------------+----+-----------+----------+-----------------+------------+-----------+---+--------+----------------+------+-------+--------+------------+-------+



In [52]:
df_sp.columns

['id',
 'name',
 'artists',
 'popularity',
 'genre',
 'acousticness',
 'speechiness',
 'key',
 'liveness',
 'instrumentalness',
 'energy',
 'tempo',
 'loudness',
 'danceability',
 'valence']

In [53]:
spark_songs.columns

['id',
 'name',
 'genre',
 'popularity',
 'artists',
 'danceability',
 'energy',
 'key',
 'loudness',
 'speechiness',
 'acousticness',
 'instrumentalness',
 'liveness',
 'valence',
 'tempo']

In [54]:
df = spark_songs.union(df_sp)

## Data Pre-proccessing

In [55]:
#Label-encode column "genres"
string_indexer = StringIndexer(inputCol= 'genre', outputCol='genre_encoded')
model_string_indexer = string_indexer.fit(df)
df = model_string_indexer.setHandleInvalid("skip").transform(df)

In [56]:
df.columns

['id',
 'name',
 'genre',
 'popularity',
 'artists',
 'danceability',
 'energy',
 'key',
 'loudness',
 'speechiness',
 'acousticness',
 'instrumentalness',
 'liveness',
 'valence',
 'tempo',
 'genre_encoded']

In [57]:
vector = VectorAssembler(inputCols = ['danceability',
                                     'energy',
                                     'loudness',
                                     'speechiness',
                                     'acousticness',
                                     'instrumentalness',
                                     'liveness',
                                     'valence',
                                     'tempo',
                                     'genre_encoded'], 
                        outputCol = 'song_features')

In [58]:
assembled = vector.setHandleInvalid("skip").transform(df)

In [59]:
#Preparing stander 
std = StandardScaler(inputCol= 'song_features', outputCol= 'standardized')

In [60]:
#Train stander 
scale = std.fit(assembled)

In [61]:
df = scale.transform(assembled)

In [62]:
df.show(5)

+--------------------+--------------------+--------------------+----------+-------------+------------+------+---+--------+-----------+------------+----------------+--------+-------+-------+-------------+--------------------+--------------------+
|                  id|                name|               genre|popularity|      artists|danceability|energy|key|loudness|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|genre_encoded|       song_features|        standardized|
+--------------------+--------------------+--------------------+----------+-------------+------------+------+---+--------+-----------+------------+----------------+--------+-------+-------+-------------+--------------------+--------------------+
|3FmS1F9CSpVoHIMss...|       Red Dragonfly|experimental hip hop|         0|  Black Taffy|       0.181| 0.355|  5| -13.951|     0.0368|       0.171|           0.729|   0.321|  0.134|156.977|         85.0|[0.181,0.355,-13....|[1.07126440842253...|
|5Ov9SR4f9SzJWkN

In [63]:
df.columns

['id',
 'name',
 'genre',
 'popularity',
 'artists',
 'danceability',
 'energy',
 'key',
 'loudness',
 'speechiness',
 'acousticness',
 'instrumentalness',
 'liveness',
 'valence',
 'tempo',
 'genre_encoded',
 'song_features',
 'standardized']

## Machine learning with unsupervisioned learning 

In [64]:
kmeans_object = KMeans(featuresCol= 'standardized', k = 4)

In [65]:
model_kmeans = kmeans_object.fit(df)

In [66]:
df_output = model_kmeans.transform(df)

In [67]:
df_output.columns

['id',
 'name',
 'genre',
 'popularity',
 'artists',
 'danceability',
 'energy',
 'key',
 'loudness',
 'speechiness',
 'acousticness',
 'instrumentalness',
 'liveness',
 'valence',
 'tempo',
 'genre_encoded',
 'song_features',
 'standardized',
 'prediction']

# Recommender System 

## Content-based Filtering: Content-based Filtering is a recommender system technique that uses data about user preferences and attributes of items to model the likelihood a given user will like a specific item.

## In the Spark environment, the Content-based algorithm doesn't exists, so we have to build one from the beginning 

In [68]:
# Class
class RecoSystem():
    
    # Método construtor
    def __init__(self, data):
        self.data_ = data
    
    # Método de recomendação
    def Recomm(self, name_song, amount = 3):
        
        # Lista para as distâncias
        distances = []
        
        # Seleciona a música
        song = self.data_[(self.data_.name.str.lower() == name_song.lower())].values[0]
        res_dt = self.data_[self.data_.name.str.lower() != name_song.lower()]
        
        # Loop para o cálculo das distâncias
        for i_song in tqdm(res_dt.values):
            
            # Inicializa a distância
            distance = 0
            
            # Loop para calcular a distância
            for col in np.arange(len(res_dt.columns)):
                if not col in [0,1,2,3,15]:
                    distance = distance + np.absolute(float(song[col]) - float(i_song[col]))
            
            # Adiciona na lista de distâncias
            distances.append(distance)
        
        res_dt['distance'] = distances
        res_dt = res_dt.sort_values('distance')
        
        columns = ['id','name', 
                   'artists',
                   'genre',
                   'acousticness', 
                   'liveness', 
                   'instrumentalness', 
                   'energy', 
                   'danceability', 
                   'valence']
        
        return res_dt[columns][:amount]

In [69]:
datalabel = df_output.select('id',
                             'name',
                             'artists',
                             'genre',
                             'genre_encoded',
                             'danceability',
                             'energy',
                             'key',
                             'loudness',
                             'speechiness',
                             'acousticness',
                             'instrumentalness',
                             'liveness',
                             'valence',
                             'tempo',
                             'prediction')

In [70]:
df_final = datalabel.toPandas()
df_final.drop(df_final[df_final['artists'] == '0'].index, inplace = True)
df_final.drop_duplicates(inplace = True)
df_final.drop(df_final[df_final['danceability'] == 0.0000].index, inplace = True)
df_final.drop(df_final[df_final['liveness'] == 0.000].index, inplace = True)
df_final.drop(df_final[df_final['instrumentalness'] == 0.000000].index, inplace = True)
df_final.drop(df_final[df_final['energy'] == 0.0000].index, inplace = True)
df_final.drop(df_final[df_final['danceability'] == 0.000].index, inplace = True)
df_final.drop(df_final[df_final['valence'] == 0.000].index, inplace = True)
df_final.dropna(inplace = True)

In [71]:
df_final.shape

(291, 16)

In [72]:
df_final.columns

Index(['id', 'name', 'artists', 'genre', 'genre_encoded', 'danceability',
       'energy', 'key', 'loudness', 'speechiness', 'acousticness',
       'instrumentalness', 'liveness', 'valence', 'tempo', 'prediction'],
      dtype='object')

In [73]:
reco_obj = RecoSystem(df_final)

In [74]:
music = df_random_song['name'].tolist()[0]

In [75]:
print(music)

Numb


In [76]:
recommendation = reco_obj.Recomm(music)

100%|█████████████████████████████████████████████████████████████████████████████| 290/290 [00:00<00:00, 29011.79it/s]
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  res_dt['distance'] = distances


In [77]:
recommendation

Unnamed: 0,id,name,artists,genre,acousticness,liveness,instrumentalness,energy,danceability,valence
168,03NSuaEAS6MKGzjTti7zhV,Chwiac,Ebauche,drone ambient,0.836,0.0938,0.903,0.358,0.165,0.103
305,1LKRnFOCwT8NuOuIq1LnzW,Get over You,Lydia Evangeline,brighton indie,0.746,0.122,5.6e-05,0.271,0.424,0.216
113,6B8rJlMrDhN9B7BuSKRDXJ,Gateway,Shy Layers,experimental electronic,0.351,0.142,0.811,0.837,0.663,0.957


In [78]:
#Extract info about the random selected song
y = df_random_song[['id','name', 
                   'artists',
                   'genre',
                   'acousticness', 
                   'liveness', 
                   'instrumentalness', 
                   'energy', 
                   'danceability', 
                   'valence']]

In [79]:
y

Unnamed: 0,id,name,artists,genre,acousticness,liveness,instrumentalness,energy,danceability,valence
33,2nLtzopw4rPReszdYBJU6h,Numb,Linkin Park,alternative metal,0.0046,0.639,0.0,0.863,0.496,0.243


In [80]:
# Return 3 recommend songs
df_rec = pd.concat([recommendation,y])
df_rec

Unnamed: 0,id,name,artists,genre,acousticness,liveness,instrumentalness,energy,danceability,valence
168,03NSuaEAS6MKGzjTti7zhV,Chwiac,Ebauche,drone ambient,0.836,0.0938,0.903,0.358,0.165,0.103
305,1LKRnFOCwT8NuOuIq1LnzW,Get over You,Lydia Evangeline,brighton indie,0.746,0.122,5.6e-05,0.271,0.424,0.216
113,6B8rJlMrDhN9B7BuSKRDXJ,Gateway,Shy Layers,experimental electronic,0.351,0.142,0.811,0.837,0.663,0.957
33,2nLtzopw4rPReszdYBJU6h,Numb,Linkin Park,alternative metal,0.0046,0.639,0.0,0.863,0.496,0.243


# End