In [3]:
# Imports
import os
import time
import random
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import Normalizer, StandardScaler

In [5]:
# Dirección del servidor Kafka
SERVER = 'localhost:9092'

In [6]:
# Nombre del tema
TOPIC = "ericpassos"

In [7]:
# Conectores Spark para Apache Spark
spark_jars =  ("{},{},{},{},{}".format(os.getcwd() + "/jars/spark-sql-kafka-0-10_2.12-3.2.1.jar",  
                                       os.getcwd() + "/jars/kafka-clients-2.1.1.jar", 
                                       os.getcwd() + "/jars/spark-streaming-kafka-0-10-assembly_2.12-3.3.2.jar", 
                                       os.getcwd() + "/jars/commons-pool2-2.8.0.jar",  
                                       os.getcwd() + "/jars/spark-token-provider-kafka-0-10_2.12-3.1.2.jar"))

In [10]:
# Inicializar sesión de Spark
spark = SparkSession \
        .builder \
        .config("spark.jars", spark_jars) \
        .appName("eric-passos") \
        .getOrCreate()

In [11]:
spark.sparkContext.setLogLevel("ERROR")

In [12]:
# Usamos Spark para leer el flujo de datos de Kafka y guardalo en un dataframe
df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", SERVER) \
        .option("subscribe", TOPIC) \
        .option("startingOffsets", "latest") \
        .load()

In [13]:
# Seleccionamos la columna timestamp como una string y la guardamos en un nuevo marco de datos
df1 = df.selectExpr("CAST(value AS STRING)", "timestamp") 

In [14]:
# Definimos el schema com el nombre de cada columna y el tipo de datos
def_schema = "order_id INT, id STRING, name STRING, popularity INT, duration_ms DOUBLE, " \
             + "artists STRING, id_artists STRING, release_date STRING, " \
             + "danceability DOUBLE,energy DOUBLE, key INT, loudness DOUBLE, " \
             + "mode INT,speechiness DOUBLE," \
             + "acousticness DOUBLE, instrumentalness DOUBLE, liveness DOUBLE, " \
             + "valence DOUBLE, tempo DOUBLE, time_signature DOUBLE"

In [15]:
# Seleccionamos el flujo de datos de acuerdo con el schema y lo guardamos en un nuevo marco de datos
df2 = df1.select(from_csv(col("value"), def_schema).alias("song"), "timestamp")

In [16]:
# Creamos una view en la memoria de Spark y visualizamos el esquema
df3 = df2.select("song.*", "timestamp")  
df3.createOrReplaceTempView("df3_View");
df3.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- popularity: integer (nullable = true)
 |-- duration_ms: double (nullable = true)
 |-- artists: string (nullable = true)
 |-- id_artists: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- danceability: double (nullable = true)
 |-- energy: double (nullable = true)
 |-- key: integer (nullable = true)
 |-- loudness: double (nullable = true)
 |-- mode: integer (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- liveness: double (nullable = true)
 |-- valence: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- time_signature: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [17]:
# Seleccionamos los datos con las canciones del stream
musicas_stream = spark.sql("SELECT * FROM df3_View")

In [18]:
# Todavía no podemos obtener una vista previa porque tenemos que generar la transmisión Spark Streaming
# musicas_stream.show()

In [19]:
# Creamos el flujo de datos en Spark Streaming
musicas_stream_spark = musicas_stream \
        .writeStream \
        .trigger(processingTime = '5 seconds') \
        .outputMode("append") \
        .option("truncate", "false") \
        .format("memory") \
        .queryName("tabela_spark") \
        .start()

musicas_stream_spark.awaitTermination(1)

False

In [20]:
# Seleccionamos las canciones de la tabla de stream de Spark
spark_songs = spark.sql("SELECT * FROM tabela_spark")

In [22]:
# Ahora podemos visualizar la transmisión en tiempo real como una tabla Spark
spark_songs.show(5)

+--------+--------------------+----------+----------+-----------+------------+--------------------+------------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+--------------------+
|order_id|                  id|      name|popularity|duration_ms|     artists|          id_artists|release_date|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|time_signature|           timestamp|
+--------+--------------------+----------+----------+-----------+------------+--------------------+------------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+--------------------+
|    2822|4pouRHxJycYQsSFBB...|     Young|         9|   198513.0|  BeachYouth|      ynePiZgwbyifcL|  2016-05-27|       0.584| 0.679|  2|  -7.782|   1|      0.032|     3.53E-4|         8.03E-4|   0.275|  0.666|125.118|           4.0|2023-0

In [23]:
# Solo podemos ver algunas columnas, por ejemplo
spark_songs.select('order_id', 'id', 'name', 'popularity', 'duration_ms', 'artists').show(5)

+--------+--------------------+----------+----------+-----------+------------+
|order_id|                  id|      name|popularity|duration_ms|     artists|
+--------+--------------------+----------+----------+-----------+------------+
|    2822|4pouRHxJycYQsSFBB...|     Young|         9|   198513.0|  BeachYouth|
|    2823|75sG1EaHaBAzCRQns...|    Radius|         0|   146750.0|       Hater|
|    2824|43ZLh2Lyx0NMD53Sq...|     Doubt|         0|   227133.0|   Megafauna|
|    2825|7mwkbmWQty61ZCRpw...|Pool Party|         0|   255600.0|JuliaJacklin|
|    2826|33t3YCIYgAMBsPFTd...|     Sleep|        26|   260866.0|  SarahKlang|
+--------+--------------------+----------+----------+-----------+------------+
only showing top 5 rows



In [38]:
# Recuento de canciones extraídas en tiempo real
spark_songs.count()

175

Espere unos minutos antes de continuar con la ejecución para que se puedan recopilar los datos de transmisión.

> Ahora trabajemos en la extracción de datos de Spotify.

In [39]:
# https://pypi.org/project/spotipy/
!pip install -q spotipy

In [40]:
# Imports
import os
import ujson
import spotipy
import spotipy.util
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
import warnings
warnings.filterwarnings("ignore")

Lea el contenido README sobre la creación de API

In [42]:
# Aquí pones tus claves API de Spotify
os.environ["SPOTIPY_CLIENT_ID"] = 'inclua_aqui_seu_client_id'
os.environ["SPOTIPY_CLIENT_SECRET"] = 'inclua_aqui_seu_client_secret'
os.environ["SPOTIPY_REDIRECT_URI"] = 'http://localhost:7777/callback'

https://developer.spotify.com/documentation/general/guides/authorization/scopes/

In [43]:
# Ámbito de extracción de las preferencias del usuario
scope = 'user-library-read'

In [44]:
# Username do Spotify
username = 'inclua_aqui_seu_email_spotify'

In [45]:
# Creación de tokens de acceso
token = spotipy.util.prompt_for_user_token(username, scope)

In [46]:
# Crear el objeto de autenticación
spotipy_obj = spotipy.Spotify(auth = token)

In [47]:
# Extrae hasta 50 canciones de la lista de favoritos del usuario
saved_tracks = spotipy_obj.current_user_saved_tracks(limit = 50) 

In [48]:
# Número de canciones extraídas
n_tracks = saved_tracks['total']
print('Total de Tracks: %d ' % n_tracks)

Total de Tracks: 35 


In [49]:
# Función para extraer los atributos de la lista de reproducción del usuario
def select_features(track_response):
    return {        
        'id': str(track_response['track']['id']),
        'name': str(track_response['track']['name']),
        'artists': [artist['name'] for artist in track_response['track']['artists']],
        'popularity': track_response['track']['popularity']
    }

In [50]:
# Aplicar la función
tracks = [select_features(track) for track in saved_tracks['items']]

In [51]:
# Extrae los atributos de las canciones favoritas del usuario
while saved_tracks['next']:
    saved_tracks = spotipy_obj.next(saved_tracks)
    tracks.extend([select_features(track) for track in saved_tracks['items']])

In [52]:
# Creamos el dataframe de pandas
df_tracks = pd.DataFrame(tracks)
pd.set_option('display.max_rows', len(tracks))
df_tracks['artists'] = df_tracks['artists'].apply(lambda artists: artists[0])

In [53]:
df_tracks.head(10)

Unnamed: 0,id,name,artists,popularity
0,3BjD16l0CBQxUvcRI7vLWN,Advice For The Young At Heart,Tears For Fears,58
1,4RvWPyQ5RL0ao9LPZeSouE,Everybody Wants To Rule The World,Tears For Fears,85
2,4Y8vb1uy9IjM2V1hqvrAid,You'll Be In My Heart,Phil Collins,64
3,5LYJ631w9ps5h9tdvac7yP,Easy Lover,Philip Bailey,68
4,0YhwwpABBK0GzeUaB8OzIm,Night Fever,Bee Gees,0
5,2xiOdusRnZezQok1RgLNeS,You Should Be Dancing,Bee Gees,67
6,3EQpCDfP5RdF58lL3d73jn,More Than A Woman,Bee Gees,0
7,2E0kGMK29BUaVQN4zC3Vpj,How Deep Is Your Love,Bee Gees,0
8,3VZqFp0wim9TLsfsUHgHWg,"You're The First, The Last, My Everything",Barry White,33
9,1Xf1lWBSml62NG1du3Ro14,Just The Way You Are,Barry White,62


In [54]:
# Diccionario de atributos de audio
audio_features = {}

In [55]:
# Extraer atributos de audio
for idd in df_tracks['id'].tolist():
    audio_features[idd] = spotipy_obj.audio_features(idd)[0]

In [56]:
# Agregamos los atributos de audio al marco de datos
df_tracks['acousticness'] = df_tracks['id'].apply(lambda idd: audio_features[idd]['acousticness'])
df_tracks['speechiness'] = df_tracks['id'].apply(lambda idd: audio_features[idd]['speechiness'])
df_tracks['key'] = df_tracks['id'].apply(lambda idd: str(audio_features[idd]['key']))
df_tracks['liveness'] = df_tracks['id'].apply(lambda idd: audio_features[idd]['liveness'])
df_tracks['instrumentalness'] = df_tracks['id'].apply(lambda idd: audio_features[idd]['instrumentalness'])
df_tracks['energy'] = df_tracks['id'].apply(lambda idd: audio_features[idd]['energy'])
df_tracks['tempo'] = df_tracks['id'].apply(lambda idd: audio_features[idd]['tempo'])
df_tracks['loudness'] = df_tracks['id'].apply(lambda idd: audio_features[idd]['loudness'])
df_tracks['danceability'] = df_tracks['id'].apply(lambda idd: audio_features[idd]['danceability'])
df_tracks['valence'] = df_tracks['id'].apply(lambda idd: audio_features[idd]['valence'])

In [57]:
df_tracks.head()

Unnamed: 0,id,name,artists,popularity,acousticness,speechiness,key,liveness,instrumentalness,energy,tempo,loudness,danceability,valence
0,3BjD16l0CBQxUvcRI7vLWN,Advice For The Young At Heart,Tears For Fears,58,0.0736,0.0328,5,0.143,0.000132,0.787,122.478,-8.393,0.665,0.358
1,4RvWPyQ5RL0ao9LPZeSouE,Everybody Wants To Rule The World,Tears For Fears,85,0.347,0.0527,7,0.104,0.00389,0.795,112.067,-12.095,0.645,0.535
2,4Y8vb1uy9IjM2V1hqvrAid,You'll Be In My Heart,Phil Collins,64,0.0626,0.0299,8,0.0947,2e-06,0.748,96.59,-6.54,0.568,0.623
3,5LYJ631w9ps5h9tdvac7yP,Easy Lover,Philip Bailey,68,0.105,0.0369,1,0.0825,0.000163,0.923,128.871,-5.082,0.74,0.93
4,0YhwwpABBK0GzeUaB8OzIm,Night Fever,Bee Gees,0,0.014,0.0291,1,0.0794,0.000101,0.783,109.183,-6.341,0.696,0.768


In [58]:
# Seleccionamos al azar una canción
musica_randomica = random. randint(0,len(df_tracks)-1)
df_musica_randomica = df_tracks.head(musica_randomica)[-1:]
df_musica_randomica

Unnamed: 0,id,name,artists,popularity,acousticness,speechiness,key,liveness,instrumentalness,energy,tempo,loudness,danceability,valence
32,7KGFNruz20BVdcSzoQrSlu,After Slice,Ivory Waves,30,0.289,0.0357,6,0.0632,0.0221,0.348,122.654,-8.23,0.664,0.607


In [59]:
# Transmisión de canciones Spark
spark_songs.show(5)

+--------+--------------------+----------+----------+-----------+------------+--------------------+------------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+--------------------+
|order_id|                  id|      name|popularity|duration_ms|     artists|          id_artists|release_date|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|time_signature|           timestamp|
+--------+--------------------+----------+----------+-----------+------------+--------------------+------------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+--------------------+
|    2822|4pouRHxJycYQsSFBB...|     Young|         9|   198513.0|  BeachYouth|      ynePiZgwbyifcL|  2016-05-27|       0.584| 0.679|  2|  -7.782|   1|      0.032|     3.53E-4|         8.03E-4|   0.275|  0.666|125.118|           4.0|2023-0

In [60]:
# Ya no necesitamos estas columnas
spark_songs = spark_songs.drop('order_id', 
                               'mode', 
                               'release_date', 
                               'id_artists',
                               'time_signature', 
                               'duration_ms',
                               'timestamp')

In [61]:
# Crea el dataframe con música elegida al azar
df_sp = spark.createDataFrame(df_musica_randomica)

In [62]:
# Concatenar música en streaming de Spark con música de Spotify
df = spark_songs.union(df_sp)

In [63]:
df.show(5)

+--------------------+----------+----------+------------+------------+------+---+--------+-----------+------------+----------------+--------+-------+-------+
|                  id|      name|popularity|     artists|danceability|energy|key|loudness|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|
+--------------------+----------+----------+------------+------------+------+---+--------+-----------+------------+----------------+--------+-------+-------+
|4pouRHxJycYQsSFBB...|     Young|         9|  BeachYouth|       0.584| 0.679|  2|  -7.782|      0.032|     3.53E-4|         8.03E-4|   0.275|  0.666|125.118|
|75sG1EaHaBAzCRQns...|    Radius|         0|       Hater|       0.153| 0.586|  7|  -8.055|     0.0338|       0.236|           0.848|   0.377|  0.418|152.535|
|43ZLh2Lyx0NMD53Sq...|     Doubt|         0|   Megafauna|       0.264| 0.922|  2|  -5.148|     0.0758|     4.38E-4|         7.94E-5|   0.297|  0.361|159.939|
|7mwkbmWQty61ZCRpw...|Pool Party|         0|JuliaJac

## Preprocesamiento de datos

In [64]:
# Preparamos el VectorAssembler
vetor = VectorAssembler(inputCols = ['danceability',
                                     'energy',
                                     'loudness',
                                     'speechiness',
                                     'acousticness',
                                     'instrumentalness',
                                     'liveness',
                                     'valence',
                                     'tempo'], 
                        outputCol = 'song_features')

In [65]:
# Descartamos valores inválidos
assembled = vetor.setHandleInvalid("skip").transform(df)

In [66]:
# Preparamos el estandarizador
std = StandardScaler(inputCol = 'song_features', outputCol = 'standardized')

In [67]:
# Entrenamos al estandarizador
scale = std.fit(assembled)

In [68]:
# Dataframe con datos estandarizados
df = scale.transform(assembled)

In [69]:
df.show(5)

+--------------------+----------+----------+------------+------------+------+---+--------+-----------+------------+----------------+--------+-------+-------+--------------------+--------------------+
|                  id|      name|popularity|     artists|danceability|energy|key|loudness|speechiness|acousticness|instrumentalness|liveness|valence|  tempo|       song_features|        standardized|
+--------------------+----------+----------+------------+------------+------+---+--------+-----------+------------+----------------+--------+-------+-------+--------------------+--------------------+
|4pouRHxJycYQsSFBB...|     Young|         9|  BeachYouth|       0.584| 0.679|  2|  -7.782|      0.032|     3.53E-4|         8.03E-4|   0.275|  0.666|125.118|[0.584,0.679,-7.7...|[3.17071410703442...|
|75sG1EaHaBAzCRQns...|    Radius|         0|       Hater|       0.153| 0.586|  7|  -8.055|     0.0338|       0.236|           0.848|   0.377|  0.418|152.535|[0.153,0.586,-8.0...|[0.83068366160319...|


## Machine Learning con Aprendizaje no Supervisado

In [70]:
# Crear el objeto modelo
objeto_KMeans = KMeans(featuresCol = 'standardized', k = 3)

In [71]:
# Entrenar al modelo
modelo_KMeans = objeto_KMeans.fit(df)

In [72]:
# Modelo de predicciones
df_output = modelo_KMeans.transform(df)

## Sistema de Recomendación

In [73]:
# Clase
class RecoSystem():
    
    # Método constructor
    def __init__(self, data):
        self.data_ = data
    
    # Método de recomendación
    def Recomm(self, nome_musica, amount = 1):
        
        # lista de distancias
        distancias = []
        
        # Selecciona la musica
        song = self.data_[(self.data_.name.str.lower() == nome_musica.lower())].head(1).values[0]
        res_dt = self.data_[self.data_.name.str.lower() != nome_musica.lower()]
        
        # Loop para calcular distancias
        for i_song in tqdm(res_dt.values):
            
            # Inicializar la distancia
            distancia = 0
            
            # Loop para calcular a distância
            for col in np.arange(len(res_dt.columns)):
                if not col in [0,1,2,14]:
                    distancia = distancia + np.absolute(float(song[col]) - float(i_song[col]))
            
            # Añadir a la lista de distancias
            distancias.append(distancia)
        
        res_dt['distance'] = distancias
        res_dt = res_dt.sort_values('distance')
        
        columns = ['id','name', 
                   'artists', 
                   'acousticness', 
                   'liveness', 
                   'instrumentalness', 
                   'energy', 
                   'danceability', 
                   'valence']
        
        return res_dt[columns][:amount]

In [74]:
# Nombres de columnas
datalabel = df_output.select('id',
                             'name',
                             'artists',
                             'danceability',
                             'energy',
                             'key',
                             'loudness',
                             'speechiness',
                             'acousticness',
                             'instrumentalness',
                             'liveness',
                             'valence',
                             'tempo',
                             'prediction')

In [75]:
# Dataset final
df_final = datalabel.toPandas()
df_final.drop(df_final[df_final['artists'] == '0'].index, inplace = True)
df_final.drop_duplicates(inplace = True)
df_final.drop(df_final[df_final['danceability'] == 0.0000].index, inplace = True)
df_final.drop(df_final[df_final['liveness'] == 0.000].index, inplace = True)
df_final.drop(df_final[df_final['instrumentalness'] == 0.000000].index, inplace = True)
df_final.drop(df_final[df_final['energy'] == 0.0000].index, inplace = True)
df_final.drop(df_final[df_final['danceability'] == 0.000].index, inplace = True)
df_final.drop(df_final[df_final['valence'] == 0.000].index, inplace = True)

In [76]:
df_final.shape

(210, 14)

In [77]:
df_final.sample(5)

Unnamed: 0,id,name,artists,danceability,energy,key,loudness,speechiness,acousticness,instrumentalness,liveness,valence,tempo,prediction
58,4DbBrWnJSyFzhsSiUmbFUp,Ready For This - Radio Mix,WeirdTogether,0.774,0.74,2,-6.069,0.0469,0.00944,0.0219,0.292,0.413,134.979,0
142,54y28hHMODdCpjHSj6ggDN,Bluffs & Blocks,StayatHomeSon,0.452,0.658,1,-9.319,0.0457,0.462,0.00171,0.146,0.365,78.985,0
99,3pS5SUvGmrzZUYjsWo7ISz,Back and Forth,HolyOker,0.882,0.329,5,-18.496,0.123,0.235,0.928,0.095,0.628,110.03,1
201,4kBrbBGK0vLW1CihB20Qtq,100K,Antwon,0.556,0.449,7,-10.363,0.227,0.0807,0.0372,0.446,0.484,89.984,0
12,0L8Jl4sRg5JEItM01NElhT,Beast Mode - Hype Mix,SnappyJitJamminJoe,0.857,0.904,2,-4.841,0.219,0.00173,1.9e-05,0.0936,0.891,159.987,0


[Stage 129:>                                                        (0 + 1) / 1]                                                                                

In [78]:
# Crear el objeto
reco_obj = RecoSystem(df_final)

In [79]:
musica = df_musica_randomica['name'].tolist()[0]

In [80]:
print(musica)

After Slice


In [81]:
# Ejecutar la recomendación
recomendacao = reco_obj.Recomm(musica)

100%|█████████████████████████████████████████████████████████████████████████████████████████████████| 209/209 [00:00<00:00, 35689.66it/s]


In [82]:
# Extraiga música aleatoria de la lista de favoritos de Spotify
y = df_musica_randomica[['id','name', 
                         'artists',  
                         'acousticness', 
                         'liveness', 
                         'instrumentalness', 
                         'energy', 
                         'danceability', 
                         'valence']]

In [83]:
# Concatena la recomendación con la canción aleatoria de la lista de favoritos de Spotify
recomendacao = pd.concat([recomendacao, y])

In [84]:
# Guardar recomendación en disco
#recomendacao.to_csv('recomendacoes/recomendacao.csv')

In [85]:
# Cargar archivo desde disco
df_reco = (spark.read.format("csv").options(header = "true").load("recomendacoes/recomendacao.csv"))

In [86]:
# Recomendación musical
df_reco.show()

+---+--------------------+--------------------+-----------+------------+--------+----------------+------+------------+-------+
|_c0|                  id|                name|    artists|acousticness|liveness|instrumentalness|energy|danceability|valence|
+---+--------------------+--------------------+-----------+------------+--------+----------------+------+------------+-------+
|128|268dguW2tmVGU6TK4...|Banging the Neigh...| ManaIsland|       0.145|   0.196|        2.59e-05| 0.763|       0.441|  0.374|
| 32|7KGFNruz20BVdcSzo...|         After Slice|Ivory Waves|       0.289|  0.0632|          0.0221| 0.348|       0.664|  0.607|
+---+--------------------+--------------------+-----------+------------+--------+----------------+------+------------+-------+



                                                                                

# Fin