## **Preparando Ambiente**: Iniciando spark

In [1]:
import findspark

findspark.init()

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Recomendação de Músicas com Spark").getOrCreate()
spark

## **Carregando os Dados**: Análise exploratória dos Dados

In [3]:
from pyspark import SparkFiles

In [4]:
path = "data/dados_musicas.csv"

spark.sparkContext.addFile(path)

In [5]:
data = spark.read.csv(SparkFiles.get("dados_musicas.csv"), sep=";", header=True, inferSchema=True)

In [6]:
data.limit(5).show()

+------------------+----+------------+------------+------------+-----------+------------------+--------+--------------------+----------------+---+--------+-------------------+----+-------------------+----------+-----------+-------+--------------------+
|           valence|year|acousticness|     artists|danceability|duration_ms|            energy|explicit|                  id|instrumentalness|key|liveness|           loudness|mode|               name|popularity|speechiness|  tempo|        artists_song|
+------------------+----+------------+------------+------------+-----------+------------------+--------+--------------------+----------------+---+--------+-------------------+----+-------------------+----------+-----------+-------+--------------------+
|             0.285|2000|     0.00239|    Coldplay|       0.429|     266773|0.6609999999999999|       0|3AJwUDP919kvQ9Qco...|         1.21E-4| 11|   0.234|             -7.227|   1|             Yellow|        84|     0.0281|173.372|   Coldpla

In [7]:
data.printSchema()

root
 |-- valence: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- artists: string (nullable = true)
 |-- danceability: double (nullable = true)
 |-- duration_ms: integer (nullable = true)
 |-- energy: double (nullable = true)
 |-- explicit: integer (nullable = true)
 |-- id: string (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- key: integer (nullable = true)
 |-- liveness: double (nullable = true)
 |-- loudness: double (nullable = true)
 |-- mode: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- popularity: integer (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- artists_song: string (nullable = true)



### **Análise exploratória:** Entendendo os dados

In [8]:
import pyspark.sql.functions as f

In [9]:
## verificando dados nulos
data.select([f.sum(f.when(f.col(c).isNull(), 1).otherwise(0)).alias(c) for c in data.columns]).show()

+-------+----+------------+-------+------------+-----------+------+--------+---+----------------+---+--------+--------+----+----+----------+-----------+-----+------------+
|valence|year|acousticness|artists|danceability|duration_ms|energy|explicit| id|instrumentalness|key|liveness|loudness|mode|name|popularity|speechiness|tempo|artists_song|
+-------+----+------------+-------+------------+-----------+------+--------+---+----------------+---+--------+--------+----+----+----------+-----------+-----+------------+
|      0|   0|           0|      0|           0|          0|     0|       0|  0|               0|  0|       0|       0|   0|   0|         0|          0|    0|           0|
+-------+----+------------+-------+------------+-----------+------+--------+---+----------------+---+--------+--------+----+----+----------+-----------+-----+------------+



In [10]:
len(data.columns)

19

In [11]:
data.select("year").distinct().orderBy("year").show()

+----+
|year|
+----+
|2000|
|2001|
|2002|
|2003|
|2004|
|2005|
|2006|
|2007|
|2008|
|2009|
|2010|
|2011|
|2012|
|2013|
|2014|
|2015|
|2016|
|2017|
|2018|
|2019|
+----+
only showing top 20 rows



### **Análise Gráfica**: Entendendo as características de cada ano

In [12]:
import plotly.graph_objects as go

In [13]:
temp = data.groupBy("year").avg().toPandas().drop(columns="avg(year)")

In [14]:
fig = go.Figure()

fig.add_trace(go.Line(y=temp["avg(acousticness)"], name="Acousticness"))
fig.add_trace(go.Line(y=temp["avg(danceability)"], name="Danceability"))
fig.add_trace(go.Line(y=temp["avg(energy)"], name="Energy"))
fig.add_trace(go.Line(y=temp["avg(instrumentalness)"], name="Instrumentalness"))
fig.add_trace(go.Line(y=temp["avg(liveness)"], name="Liveness"))
fig.add_trace(go.Line(y=temp["avg(speechiness)"], name="Speechiness"))
fig.add_trace(go.Line(y=temp["avg(valence)"], name="Valence"))

fig.update_layout(width=1000, height=600, title_text="Distribuição das Características das Músicas por Ano")
fig.show()


plotly.graph_objs.Line is deprecated.
Please replace it with one of the following more specific types
  - plotly.graph_objs.scatter.Line
  - plotly.graph_objs.layout.shape.Line
  - etc.




In [15]:
import plotly.express as px

In [16]:
px.imshow(temp.corr(), text_auto=True, width=1000, height=600, title="Correlação entre as Características das Músicas")

## **Clusterização Gêneros**: Dividindo gêneros em grupos

### **Obtenção dos dados**: Análise exploratória dos gêneros

In [17]:
path = "data/dados_musicas_genero.csv"

spark.sparkContext.addFile(path)

In [18]:
data_genero = spark.read.csv(SparkFiles.get("dados_musicas_genero.csv"), header=True, inferSchema=True)

In [19]:
data_genero.show(n=1)

+----+--------------------+------------------+-------------------+------------------+-------------------+----------------+--------+-------------------+-------------------+-------+-------------------+-----------------+---+
|mode|              genres|      acousticness|       danceability|       duration_ms|             energy|instrumentalness|liveness|           loudness|        speechiness|  tempo|            valence|       popularity|key|
+----+--------------------+------------------+-------------------+------------------+-------------------+----------------+--------+-------------------+-------------------+-------+-------------------+-----------------+---+
|   1|21st century clas...|0.9793333333333332|0.16288333333333335|160297.66666666663|0.07131666666666665|      0.60683367|  0.3616|-31.514333333333337|0.04056666666666667|75.3365|0.10378333333333334|27.83333333333333|  6|
+----+--------------------+------------------+-------------------+------------------+-------------------+-------

In [20]:
print(f"{data_genero.count()} linhas e {data_genero.select('genres').distinct().count()} gêneros")

2973 linhas e 2973 gêneros


In [21]:
x_generos = data_genero.columns
x_generos.remove("genres")
x_generos

['mode',
 'acousticness',
 'danceability',
 'duration_ms',
 'energy',
 'instrumentalness',
 'liveness',
 'loudness',
 'speechiness',
 'tempo',
 'valence',
 'popularity',
 'key']

In [22]:
raw_generos_data = data_genero

### **Vetorização**: Criando a coluna features

In [23]:
from pyspark.ml.feature import VectorAssembler

In [24]:
generos_vectorizer = VectorAssembler(inputCols=x_generos, outputCol="features")

In [25]:
data_genero = generos_vectorizer.transform(data_genero)

### **Padronização**: Colocando todos os dados na mesma escala

In [26]:
from pyspark.ml.feature import StandardScaler

In [27]:
generos_scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures").fit(data_genero)

In [28]:
data_genero = generos_scaler.transform(data_genero)

### **PCA**: Reduzindo a dimensionalidade com Análise de Componentes Principais

In [29]:
from pyspark.ml.feature import PCA

In [30]:
pca_generos = PCA(k=2, inputCol="scaledFeatures", outputCol="pcaFeatures")
pca_generos_model = pca_generos.fit(data_genero)

In [31]:
pca_generos_model.transform(data_genero).select("pcaFeatures").show(truncate=False, n=3)

+---------------------------------------+
|pcaFeatures                            |
+---------------------------------------+
|[2.507095366888567,0.4381691373769795] |
|[-0.5969679056633482,4.981612052751348]|
|[-4.158460276223561,-0.836652508107994]|
+---------------------------------------+
only showing top 3 rows



### **Pipeline**: Automatizando o processo de pré-processamento

In [32]:
from pyspark.ml import Pipeline

In [33]:
pipeline_generos = Pipeline(stages=[generos_vectorizer, generos_scaler, pca_generos])

In [34]:
pipeline_generos_model = pipeline_generos.fit(raw_generos_data)

In [35]:
pipeline_generos_model.transform(raw_generos_data).show(truncate=False, n=3)

+----+----------------------+------------------+-------------------+------------------+-------------------+------------------+--------+-------------------+-------------------+------------------+-------------------+-----------------+---+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------+
|mode|genres                |acousticness      |danceability       |duration_ms       |energy             |instrumentalness  |liveness|loudness           |speechiness        |tempo             |valence            |popularity       |key|features                                        

In [36]:
processed_data_generos = pipeline_generos_model.transform(raw_generos_data).select("features", "scaledFeatures", "pcaFeatures", "genres")

processed_data_generos.show(truncate=False, n=3)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------+----------------------+
|features                                                                                                                                                                                       |scaledFeatures                                                                                                                                                                                                                                        |pcaFeatures                            |genres          

### **KMeans**: Agrupando dados em clusters

In [37]:
from pyspark.ml.clustering import KMeans

In [38]:
kmeans_features_generos = KMeans(featuresCol="features", predictionCol="featuresClusters", seed=1, k=10).fit(processed_data_generos)
kmeans_scaled_generos = KMeans(featuresCol="scaledFeatures", predictionCol="scaledClusters", seed=1, k=10).fit(processed_data_generos)
kmeans_pca_generos = KMeans(featuresCol="pcaFeatures", predictionCol="pcaClusters", seed=1, k=10).fit(processed_data_generos)

In [39]:
data_clusters_generos = kmeans_pca_generos.transform(processed_data_generos)
data_clusters_generos = kmeans_scaled_generos.transform(data_clusters_generos)
data_clusters_generos = kmeans_features_generos.transform(data_clusters_generos)

In [40]:
data_clusters_generos.show(truncate=False, n=10)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------+----------------------+-----------+--------------+----------------+
|features                                                                                                                                                                                                                   |scaledFeatures                                                                                                                                                                                           

In [41]:
from pyspark.ml.functions import vector_to_array

In [42]:
data_clusters_generos = data_clusters_generos.withColumn("PCA_1", vector_to_array("pcaFeatures")[0])
data_clusters_generos = data_clusters_generos.withColumn("PCA_2", vector_to_array("pcaFeatures")[1])

### **Plots**: Visualizaçaõ dos Clusters

In [43]:
import plotly.express as px

In [44]:
fig = px.scatter(data_clusters_generos.toPandas(), x="PCA_1", y="PCA_2", color="pcaClusters", hover_data=["genres", "pcaClusters", "PCA_1", "PCA_2"])
fig.update_layout(width=1000, height=600, title_text="Clusters de Gêneros Musicais")

fig.show()

## **Clusterização Músicas**: Dividindo músicas em grupos

### Separando os Dados

In [45]:
x = data.columns

x

['valence',
 'year',
 'acousticness',
 'artists',
 'danceability',
 'duration_ms',
 'energy',
 'explicit',
 'id',
 'instrumentalness',
 'key',
 'liveness',
 'loudness',
 'mode',
 'name',
 'popularity',
 'speechiness',
 'tempo',
 'artists_song']

In [46]:
x.remove("id")
x.remove("name")
x.remove("artists")
x.remove("artists_song")
x

['valence',
 'year',
 'acousticness',
 'danceability',
 'duration_ms',
 'energy',
 'explicit',
 'instrumentalness',
 'key',
 'liveness',
 'loudness',
 'mode',
 'popularity',
 'speechiness',
 'tempo']

### **Pipeline**: Pré-processamento

In [47]:
pipeline = Pipeline(stages = [
    VectorAssembler(inputCols=x, outputCol="features"),
    StandardScaler(inputCol="features", outputCol="scaledFeatures"),
    PCA(k=4, inputCol="scaledFeatures", outputCol="pcaFeatures")
])

In [48]:
pipeline_model = pipeline.fit(data)

In [49]:
data_processed = pipeline_model.transform(data)

In [50]:
data_processed.show(truncate=False, n=3)

+-------+----+------------+-----------+------------+-----------+------------------+--------+----------------------+----------------+---+--------+-------------------+----+-----------+----------+-----------+-------+------------------------+-------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------+
|valence|year|acousticness|artists    |danceability|duration_ms|energy            |explicit|id                    |instrumentalness|key|liveness|loudness           |mode|name       |popularity|speechiness|tempo  |artists_song            |features                                               

In [51]:
print(f"O PCA manteve {sum(pipeline_model.stages[-1].explainedVariance)*100:.2f}% da informação original") 

O PCA manteve 52.68% da informação original


### **KMeans**: Agrupamento das músicas

In [52]:
kmeans = KMeans(featuresCol="pcaFeatures", predictionCol="clusters", seed=1, k=10)

In [53]:
kmeans_model = kmeans.fit(data_processed)

In [54]:
results = kmeans_model.transform(data_processed).select("id", "artists_song", "pcaFeatures", "clusters")

In [55]:
results.show(truncate=False, n=3)

+----------------------+------------------------+--------------------------------------------------------------------------------+--------+
|id                    |artists_song            |pcaFeatures                                                                     |clusters|
+----------------------+------------------------+--------------------------------------------------------------------------------+--------+
|3AJwUDP919kvQ9QcozQPxg|Coldplay - Yellow       |[-34.71004464775703,-165.36848784906184,-11.163498729833321,-139.11374280854437]|9       |
|0I3q5fE6wg7LIfHGngUTnV|OutKast - Ms. Jackson   |[-36.498857627749054,-167.85185045616558,-10.729023248907561,-135.904352656713] |0       |
|60a0Rd6pjrkxjPbaKzXjfq|Linkin Park - In the End|[-35.460300916940554,-165.99633344577887,-11.361290348241594,-138.24276881247]  |4       |
+----------------------+------------------------+--------------------------------------------------------------------------------+--------+
only showing top 3 r

### **Visualização**: Análise gráfica dos clusters

In [56]:
from pyspark.ml.functions import vector_to_array

In [57]:
results = results.withColumn("PCA_1", vector_to_array("pcaFeatures")[0])
results = results.withColumn("PCA_2", vector_to_array("pcaFeatures")[1])
results = results.withColumn("PCA_3", vector_to_array("pcaFeatures")[2])

results.show(truncate=False, n=3)

+----------------------+------------------------+--------------------------------------------------------------------------------+--------+-------------------+-------------------+-------------------+
|id                    |artists_song            |pcaFeatures                                                                     |clusters|PCA_1              |PCA_2              |PCA_3              |
+----------------------+------------------------+--------------------------------------------------------------------------------+--------+-------------------+-------------------+-------------------+
|3AJwUDP919kvQ9QcozQPxg|Coldplay - Yellow       |[-34.71004464775703,-165.36848784906184,-11.163498729833321,-139.11374280854437]|9       |-34.71004464775703 |-165.36848784906184|-11.163498729833321|
|0I3q5fE6wg7LIfHGngUTnV|OutKast - Ms. Jackson   |[-36.498857627749054,-167.85185045616558,-10.729023248907561,-135.904352656713] |0       |-36.498857627749054|-167.85185045616558|-10.729023248907561|


In [58]:
fig = px.scatter_3d(results.toPandas(), x="PCA_1", y="PCA_2", z="PCA_3", color="clusters", hover_data=["artists_song", "PCA_1", "PCA_2", "PCA_3"])
fig.update_layout(width=1000, height=600, title_text="Clusters de Músicas")
fig.update_traces(marker_size=2)
fig.show()

## **Recomendador**: Finalizando a recomendação

### **Obtendo dados da música input**

In [59]:
music_name = "Drake - Fireworks"

In [60]:
music = results.filter(f"artists_song = '{music_name}'")

music.show(truncate=False)

+----------------------+-----------------+-----------------------------------------------------------------------------+--------+------------------+----------------+-------------------+
|id                    |artists_song     |pcaFeatures                                                                  |clusters|PCA_1             |PCA_2           |PCA_3              |
+----------------------+-----------------+-----------------------------------------------------------------------------+--------+------------------+----------------+-------------------+
|73tgFzBug5Ifk1Retdtwk7|Drake - Fireworks|[-35.16014803814927,-167.23000280554,-10.047888963996275,-136.11389293187594]|0       |-35.16014803814927|-167.23000280554|-10.047888963996275|
+----------------------+-----------------+-----------------------------------------------------------------------------+--------+------------------+----------------+-------------------+



In [61]:
music_cluster = results.filter(f"artists_song = '{music_name}'").select("clusters").collect()[0][0]

music_cluster

0

In [62]:
music_components = results.filter(f"artists_song = '{music_name}'").select("pcaFeatures").collect()[0][0]

music_components

DenseVector([-35.1601, -167.23, -10.0479, -136.1139])

### **Criando cálculo das distâcias**

In [63]:
from scipy.spatial.distance import euclidean

In [64]:
def get_distance(row):
    return euclidean(row, music_components)

In [65]:
from pyspark.sql.types import FloatType

In [66]:
udf = f.udf(get_distance, FloatType())

### **Definindo recomendações**

In [67]:
recomendacoes = results.filter(results.artists_song != music_name).filter(results.clusters == music_cluster).withColumn("distance", udf("pcaFeatures")).orderBy("distance")

In [68]:
recomendacoes.show(truncate=False, n=10)

### **FUNÇÃO FINAL**

In [None]:
def recomenda(musica_nome):

    musica = results.filter(results.artists_song.contains(musica_nome))

    if musica.count() == 0:
        print("Música não encontrada")
        return

    if musica.count() > 1:
        for i, row in enumerate(musica.collect()):
            print(f"{i+1} - {row['artists_song']}")
        print("\n")
        musica_id = int(input("Selecione a música desejada: "))
        musica = musica.collect()[musica_id-1]
    else:
        musica = musica.collect()[0]

    music_name = musica["artists_song"]
    music_cluster = musica["clusters"]
    music_components = musica["pcaFeatures"]
    
    print("Música encontrada:")
    print(music_name)
    print("\n")

    udf = f.udf(lambda row: euclidean(row, music_components), FloatType())

    recomendacoes = results.filter(results.artists_song != music_name).filter(results.clusters == music_cluster).withColumn("distance", udf("pcaFeatures")).orderBy("distance")

    recomendacoes.show(truncate=False, n=10)

In [None]:
recomenda("Taylor Swift")

Música encontrada:
Taylor Swift - Blank Space


+----------------------+------------------------------------------------+---------------------------------------------------------------------------------+--------+-------------------+-------------------+-------------------+----------+
|id                    |artists_song                                    |pcaFeatures                                                                      |clusters|PCA_1              |PCA_2              |PCA_3              |distance  |
+----------------------+------------------------------------------------+---------------------------------------------------------------------------------+--------+-------------------+-------------------+-------------------+----------+
|2RiBogNRfulkNf7fVbPOrJ|Vance Joy - Saturday Sun                        |[-35.48704195972509,-166.99687408808364,-12.862124721717187,-138.0884906118913]  |4       |-35.48704195972509 |-166.99687408808364|-12.862124721717187|0.1639104 |
|5yFSF6q