<a href="https://colab.research.google.com/github/Mathvivas/Artificial-Intelligence/blob/main/SistemaRecomendacaoPySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Usando PySpark

In [1]:
# Importando a Classe SparkSession que é por onde há a interação com o Spark
from pyspark.sql import SparkSession

In [2]:
# Criando a sessão
sessao = SparkSession.builder\
                    .appName("Recomendador PySpark")\
                    .getOrCreate()
sessao

## Leitura dos Dados

In [3]:
from pyspark import SparkFiles

In [4]:
url_dados = 'https://github.com/IgorNascAlves/dados/blob/main/dados_musicas.csv?raw=true'
sessao.sparkContext.addFile(url_dados)
# Pegar o arquivo dentro da sessão
path_dados_file = 'file://' + SparkFiles.get('dados_musicas.csv')

In [5]:
dados = sessao.read.csv(path_dados_file, header=True, sep=';', inferSchema=True)

In [6]:
dados.show()

+------------------+----+------------+--------------+------------------+-----------+------------------+--------+--------------------+--------------------+---+--------+-------------------+----+--------------------+----------+-----------+------------------+--------------------+
|           valence|year|acousticness|       artists|      danceability|duration_ms|            energy|explicit|                  id|    instrumentalness|key|liveness|           loudness|mode|                name|popularity|speechiness|             tempo|        artists_song|
+------------------+----+------------+--------------+------------------+-----------+------------------+--------+--------------------+--------------------+---+--------+-------------------+----+--------------------+----------+-----------+------------------+--------------------+
|             0.285|2000|     0.00239|      Coldplay|             0.429|     266773|0.6609999999999999|       0|3AJwUDP919kvQ9Qco...|             1.21E-4| 11|   0.234|  

In [7]:
dados.printSchema()

root
 |-- valence: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- acousticness: double (nullable = true)
 |-- artists: string (nullable = true)
 |-- danceability: double (nullable = true)
 |-- duration_ms: integer (nullable = true)
 |-- energy: double (nullable = true)
 |-- explicit: integer (nullable = true)
 |-- id: string (nullable = true)
 |-- instrumentalness: double (nullable = true)
 |-- key: integer (nullable = true)
 |-- liveness: double (nullable = true)
 |-- loudness: double (nullable = true)
 |-- mode: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- popularity: integer (nullable = true)
 |-- speechiness: double (nullable = true)
 |-- tempo: double (nullable = true)
 |-- artists_song: string (nullable = true)



In [8]:
dados.count()

20311

In [9]:
len(dados.columns)

19

In [10]:
import pyspark.sql.functions as f

In [11]:
dados.select([f.count(f.when(f.isnull(c), 1)).alias(c)
                    for c in dados.columns]).show()

+-------+----+------------+-------+------------+-----------+------+--------+---+----------------+---+--------+--------+----+----+----------+-----------+-----+------------+
|valence|year|acousticness|artists|danceability|duration_ms|energy|explicit| id|instrumentalness|key|liveness|loudness|mode|name|popularity|speechiness|tempo|artists_song|
+-------+----+------------+-------+------------+-----------+------+--------+---+----------------+---+--------+--------+----+----+----------+-----------+-----+------------+
|      0|   0|           0|      0|           0|          0|     0|       0|  0|               0|  0|       0|       0|   0|   0|         0|          0|    0|           0|
+-------+----+------------+-------+------------+-----------+------+--------+---+----------------+---+--------+--------+----+----+----------+-----------+-----+------------+



In [12]:
sorted(dados.select('year').distinct().collect())

[Row(year=2000),
 Row(year=2001),
 Row(year=2002),
 Row(year=2003),
 Row(year=2004),
 Row(year=2005),
 Row(year=2006),
 Row(year=2007),
 Row(year=2008),
 Row(year=2009),
 Row(year=2010),
 Row(year=2011),
 Row(year=2012),
 Row(year=2013),
 Row(year=2014),
 Row(year=2015),
 Row(year=2016),
 Row(year=2017),
 Row(year=2018),
 Row(year=2019),
 Row(year=2020)]

## Análise dos Dados

In [13]:
url_anos_dados = 'https://github.com/IgorNascAlves/dados/blob/main/dados_musicas_ano.csv?raw=true'

sessao.sparkContext.addFile(url_anos_dados)
path_dados_file = 'file://' + SparkFiles.get('dados_musicas_ano.csv')

dados_anos = sessao.read.csv(path_dados_file, header=True, inferSchema=True)

In [14]:
dados_anos.show()

+----+----+------------------+-------------------+------------------+-------------------+-------------------+-------------------+-------------------+-------------------+------------------+-------------------+-------------------+---+
|mode|year|      acousticness|       danceability|       duration_ms|             energy|   instrumentalness|           liveness|           loudness|        speechiness|             tempo|            valence|         popularity|key|
+----+----+------------------+-------------------+------------------+-------------------+-------------------+-------------------+-------------------+-------------------+------------------+-------------------+-------------------+---+
|   1|1921|0.8868960000000005| 0.4185973333333336|260537.16666666663|0.23181513333333334|0.34487805886666656|            0.20571| -17.04866666666665|           0.073662|101.53149333333329|0.37932666666666665| 0.6533333333333333|  2|
|   1|1922|0.9385915492957748| 0.4820422535211267|165469.74647887325

In [15]:
# Filtrar os dados para anos +2000
dados_anos = dados_anos.filter('year >= 2000')
dados_anos.show()

+----+----+-------------------+------------------+------------------+------------------+-------------------+-------------------+-------------------+-------------------+------------------+------------------+------------------+---+
|mode|year|       acousticness|      danceability|       duration_ms|            energy|   instrumentalness|           liveness|           loudness|        speechiness|             tempo|           valence|        popularity|key|
+----+----+-------------------+------------------+------------------+------------------+-------------------+-------------------+-------------------+-------------------+------------------+------------------+------------------+---+
|   1|2000|0.28932270051635994| 0.590918047034764| 242724.6426380368|0.6254128323108387|0.10116776879345596| 0.1976860429447853| -8.247765848670758|0.08920541922290394| 118.9993231083843|0.5594754601226991|  46.6840490797546|  7|
|   1|2001| 0.2868424748428934|0.5833178553615969|240307.79600997505|0.626985522

In [16]:
dados_anos.count()

21

In [17]:
len(dados_anos.columns)

14

### ToPandas

In [18]:
import plotly.express as px

In [19]:
fig = px.line(dados_anos.toPandas(), x='year', y='loudness', markers=True,
              title='Variação do Loudness Conforme os Anos')
fig.show()

In [20]:
import plotly.graph_objects as go

In [21]:
fig = go.Figure()
temp = dados_anos.toPandas()
fig.add_trace(go.Scatter(x=temp['year'], y=temp['acousticness'],
                         name='Acousticness'))
fig.add_trace(go.Scatter(x=temp['year'], y=temp['valence'],
                        name='Valence'))
fig.add_trace(go.Scatter(x=temp['year'], y=temp['danceability'],
                        name='Danceability'))
fig.add_trace(go.Scatter(x=temp['year'], y=temp['energy'],
                        name='Energy'))
fig.add_trace(go.Scatter(x=temp['year'], y=temp['instrumentalness'],
                        name='Instrumentalness'))
fig.add_trace(go.Scatter(x=temp['year'], y=temp['liveness'],
                        name='Liveness'))
fig.add_trace(go.Scatter(x=temp['year'], y=temp['speechiness'],
                        name='Speechiness'))
fig.show()

### Matrix de Correlação

In [22]:
fig = px.imshow(dados_anos.drop('mode').toPandas().corr(), text_auto=True)
fig.show()

In [23]:
url_dados_generos = 'https://github.com/IgorNascAlves/dados/blob/main/dados_musicas_genero.csv?raw=true'

sessao.sparkContext.addFile(url_dados_generos)
path_dados_file = 'file://' + SparkFiles.get('dados_musicas_genero.csv')

dados_generos = sessao.read.csv(path_dados_file, header=True, inferSchema=True)

In [24]:
dados_generos.show()

+----+--------------------+-------------------+-------------------+------------------+-------------------+--------------------+-------------------+-------------------+--------------------+------------------+-------------------+------------------+---+
|mode|              genres|       acousticness|       danceability|       duration_ms|             energy|    instrumentalness|           liveness|           loudness|         speechiness|             tempo|            valence|        popularity|key|
+----+--------------------+-------------------+-------------------+------------------+-------------------+--------------------+-------------------+-------------------+--------------------+------------------+-------------------+------------------+---+
|   1|21st century clas...| 0.9793333333333332|0.16288333333333335|160297.66666666663|0.07131666666666665|          0.60683367|             0.3616|-31.514333333333337| 0.04056666666666667|           75.3365|0.10378333333333334| 27.83333333333333| 

In [25]:
dados_generos.count()

2973

In [26]:
dados_generos.select('genres').distinct().count()

2973

In [27]:
len(dados_generos.columns)

14

In [28]:
from pyspark.ml.feature import VectorAssembler

In [29]:
X = dados_generos.columns
X.remove('genres')
X

['mode',
 'acousticness',
 'danceability',
 'duration_ms',
 'energy',
 'instrumentalness',
 'liveness',
 'loudness',
 'speechiness',
 'tempo',
 'valence',
 'popularity',
 'key']

In [30]:
dados_generos_vector = VectorAssembler(inputCols=X, outputCol='features')\
                            .transform(dados_generos)\
                            .select(['features', 'genres'])
dados_generos_vector.show(truncate=False, n=5)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------+
|features                                                                                                                                                                                                                   |genres                |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------+
|[1.0,0.9793333333333332,0.16288333333333335,160297.66666666663,0.07131666666666665,0.60683367,0.3616,-31.514333333333337,0.04056666666666667,75.3365,0.10378333333333334,27.83333333333333,6.0]                            |21st century classical|
|[1.0,0.49478,0.2993

In [31]:
from pyspark.ml.feature import StandardScaler

In [32]:
scaler = StandardScaler(inputCol='features', outputCol='scaled_features')
scaler_model = scaler.fit(dados_generos_vector)
dados_generos_scaler = scaler_model.transform(dados_generos_vector)
dados_generos_scaler.show(truncate=False, n=5)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|features                                                                                                                                                                                                                   |genres                |scaled_features                                                                                                                                                                                                                                       |
+---------------

### PCA

In [33]:
from pyspark.ml.feature import PCA

In [34]:
pca = PCA(k=2, inputCol='scaled_features', outputCol='pca_features')
model_pca = pca.fit(dados_generos_scaler)
dados_generos_pca = model_pca.transform(dados_generos_scaler)

In [35]:
dados_generos_pca.select('pca_features').show(truncate=False)

+------------------------------------------+
|pca_features                              |
+------------------------------------------+
|[-2.5070953668885663,-0.43816913737698293]|
|[0.596967905663348,-4.981612052751353]    |
|[4.158460276223559,0.83665250810799]      |
|[2.3873448785122156,0.4877989015663361]   |
|[2.650121837167907,0.575681976882043]     |
|[1.4965091203367622,-1.8644183183717828]  |
|[3.9235207721573224,-0.28518350023528705] |
|[4.611011109831113,0.6783790472312338]    |
|[2.8376900630842288,0.57129937165805]     |
|[2.706690139892782,1.2593788079708255]    |
|[4.698331383924286,-1.2765569680619455]   |
|[3.375987496679866,-0.7560741064307498]   |
|[5.608998877066019,-1.042731164439324]    |
|[-0.2954946352117699,0.2763864586236268]  |
|[2.572559106287041,1.3169815431109746]    |
|[3.400822802049343,-0.5073029625781936]   |
|[4.366720316263417,0.33648270597710517]   |
|[2.7254698167723985,-0.5058604987046403]  |
|[4.958112358381603,-1.2627579957290722]   |
|[3.693495

In [36]:
from pyspark.ml import Pipeline

### Criando um Pipeline

In [37]:
pca_pipeline = Pipeline(stages=[
    VectorAssembler(inputCols=X, outputCol='features'),
    StandardScaler(inputCol='features', outputCol='scaled_features'),
    PCA(k=2, inputCol='scaled_features', outputCol='pca_features')
])

In [38]:
pca_pipeline_model = pca_pipeline.fit(dados_generos)

In [39]:
dados_generos_pca = pca_pipeline_model.transform(dados_generos)

In [40]:
dados_generos_pca.show()

+----+--------------------+-------------------+-------------------+------------------+-------------------+--------------------+-------------------+-------------------+--------------------+------------------+-------------------+------------------+---+--------------------+--------------------+--------------------+
|mode|              genres|       acousticness|       danceability|       duration_ms|             energy|    instrumentalness|           liveness|           loudness|         speechiness|             tempo|            valence|        popularity|key|            features|     scaled_features|        pca_features|
+----+--------------------+-------------------+-------------------+------------------+-------------------+--------------------+-------------------+-------------------+--------------------+------------------+-------------------+------------------+---+--------------------+--------------------+--------------------+
|   1|21st century clas...| 0.9793333333333332|0.162883333

In [41]:
dados_generos_pca.select('pca_features').show(truncate=False)

+------------------------------------------+
|pca_features                              |
+------------------------------------------+
|[-2.5070953668885663,-0.43816913737698293]|
|[0.596967905663348,-4.981612052751353]    |
|[4.158460276223559,0.83665250810799]      |
|[2.3873448785122156,0.4877989015663361]   |
|[2.650121837167907,0.575681976882043]     |
|[1.4965091203367622,-1.8644183183717828]  |
|[3.9235207721573224,-0.28518350023528705] |
|[4.611011109831113,0.6783790472312338]    |
|[2.8376900630842288,0.57129937165805]     |
|[2.706690139892782,1.2593788079708255]    |
|[4.698331383924286,-1.2765569680619455]   |
|[3.375987496679866,-0.7560741064307498]   |
|[5.608998877066019,-1.042731164439324]    |
|[-0.2954946352117699,0.2763864586236268]  |
|[2.572559106287041,1.3169815431109746]    |
|[3.400822802049343,-0.5073029625781936]   |
|[4.366720316263417,0.33648270597710517]   |
|[2.7254698167723985,-0.5058604987046403]  |
|[4.958112358381603,-1.2627579957290722]   |
|[3.693495

### Agrupamento com KMEANS

In [42]:
from pyspark.ml.clustering import KMeans

In [43]:
SEED = 1224
kmeans = KMeans(featuresCol='pca_features',
                predictionCol='cluster_pca').setK(5).setSeed(SEED)

In [44]:
model_kmeans = kmeans.fit(dados_generos_pca)

In [45]:
predictions_kmeans = model_kmeans.transform(dados_generos_pca)

In [46]:
predictions_kmeans.select('pca_features', 'cluster_pca').show(truncate=False)

+------------------------------------------+-----------+
|pca_features                              |cluster_pca|
+------------------------------------------+-----------+
|[-2.5070953668885663,-0.43816913737698293]|2          |
|[0.596967905663348,-4.981612052751353]    |2          |
|[4.158460276223559,0.83665250810799]      |4          |
|[2.3873448785122156,0.4877989015663361]   |0          |
|[2.650121837167907,0.575681976882043]     |0          |
|[1.4965091203367622,-1.8644183183717828]  |2          |
|[3.9235207721573224,-0.28518350023528705] |4          |
|[4.611011109831113,0.6783790472312338]    |1          |
|[2.8376900630842288,0.57129937165805]     |4          |
|[2.706690139892782,1.2593788079708255]    |0          |
|[4.698331383924286,-1.2765569680619455]   |3          |
|[3.375987496679866,-0.7560741064307498]   |4          |
|[5.608998877066019,-1.042731164439324]    |1          |
|[-0.2954946352117699,0.2763864586236268]  |2          |
|[2.572559106287041,1.316981543

### Plotando os Clusters

In [47]:
from pyspark.ml.functions import vector_to_array

In [48]:
pca_features_xy = predictions_kmeans.withColumn('x', vector_to_array('pca_features')[0])\
                  .withColumn('y', vector_to_array('pca_features')[1])\
                  .select(['x', 'y', 'cluster_pca', 'genres'])

In [49]:
pca_features_xy.show()

+-------------------+--------------------+-----------+--------------------+
|                  x|                   y|cluster_pca|              genres|
+-------------------+--------------------+-----------+--------------------+
|-2.5070953668885663|-0.43816913737698293|          2|21st century clas...|
|  0.596967905663348|  -4.981612052751353|          2|               432hz|
|  4.158460276223559|    0.83665250810799|          4|               8-bit|
| 2.3873448785122156|  0.4877989015663361|          0|                  []|
|  2.650121837167907|   0.575681976882043|          0|          a cappella|
| 1.4965091203367622| -1.8644183183717828|          2|            abstract|
| 3.9235207721573224|-0.28518350023528705|          4|      abstract beats|
|  4.611011109831113|  0.6783790472312338|          1|    abstract hip hop|
| 2.8376900630842288|    0.57129937165805|          4|           accordeon|
|  2.706690139892782|  1.2593788079708255|          0|           accordion|
|  4.6983313

In [50]:
fig = px.scatter(pca_features_xy.toPandas(),
                 x='x', y='y',
                 color='cluster_pca',
                 hover_data=['x', 'y', 'genres'])
fig.show()

In [51]:
pca_pipeline_model.stages[2].explainedVariance

DenseVector([0.2975, 0.1212])

- Dentro de 14 variáveis, o componente x está explicando somente quase 30% das variáveis e o componente y está explicando somente 12% (somando menos da metade). Há grande perda de informação

In [52]:
X = dados.columns
X.remove('artists')
X.remove('id')
X.remove('name')
X.remove('artists_song')
X

['valence',
 'year',
 'acousticness',
 'danceability',
 'duration_ms',
 'energy',
 'explicit',
 'instrumentalness',
 'key',
 'liveness',
 'loudness',
 'mode',
 'popularity',
 'speechiness',
 'tempo']

In [53]:
dados_encoded_vector = VectorAssembler(inputCols=X, outputCol='features')\
                            .transform(dados)

In [55]:
dados_encoded_vector.select('features').show(truncate=False, n=5)

+-------------------------------------------------------------------------------------------------------------------------+
|features                                                                                                                 |
+-------------------------------------------------------------------------------------------------------------------------+
|[0.285,2000.0,0.00239,0.429,266773.0,0.6609999999999999,0.0,1.21E-4,11.0,0.234,-7.227,1.0,84.0,0.0281,173.372]           |
|[0.613,2000.0,0.143,0.843,270507.0,0.8059999999999999,1.0,0.0,4.0,0.0771,-5.9460000000000015,0.0,80.0,0.269,94.948]      |
|[0.4,2000.0,0.00958,0.556,216880.0,0.8640000000000001,0.0,0.0,3.0,0.209,-5.87,0.0,84.0,0.0584,105.143]                   |
|[0.5429999999999999,2000.0,0.00664,0.545,233933.0,0.865,0.0,1.1E-5,11.0,0.168,-5.7079999999999975,0.0,78.0,0.0286,99.009]|
|[0.76,2000.0,0.0302,0.949,284200.0,0.6609999999999999,1.0,0.0,5.0,0.0454,-4.244,0.0,80.0,0.0572,104.504]                 |
+-------

In [56]:
scaler = StandardScaler(inputCol='features', outputCol='features_scaled')
model_scaler = scaler.fit(dados_encoded_vector)
dados_musicas_scaler = model_scaler.transform(dados_encoded_vector)

In [57]:
dados_musicas_scaler.select('features_scaled').show(truncate=False, n=5)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|features_scaled                                                                                                                                                                                                                                                               |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[1.156596679221045,329.8170613984441,0.008403188822334736,2.5040545743834373,3.7858811737298526,2.980511298201045,0.0,6.070790766862789E-4,3.0696424994844875,1.4832857988766257,-1.

In [58]:
k = len(X)
k

15

In [59]:
pca = PCA(k=k, inputCol='features_scaled', outputCol='pca_features')
model_pca = pca.fit(dados_musicas_scaler)
dados_musicas_pca = model_pca.transform(dados_musicas_scaler)

In [61]:
sum(model_pca.explainedVariance) * 100

np.float64(100.00000000000003)

- Aqui estão sendo usadas todas as variáveis, então é previsto 100%

In [62]:
lista_valores = [sum(model_pca.explainedVariance[0:i+1]) for i in range(k)]
lista_valores

[np.float64(0.21297354127107349),
 np.float64(0.3459774162295507),
 np.float64(0.43989805678254446),
 np.float64(0.5267820578269103),
 np.float64(0.6001038103117929),
 np.float64(0.666571951365239),
 np.float64(0.7282174213581728),
 np.float64(0.7845103843321023),
 np.float64(0.8350002357002614),
 np.float64(0.8812446547406769),
 np.float64(0.9151737562703048),
 np.float64(0.9435370133056039),
 np.float64(0.9699018569070161),
 np.float64(0.9900947792885582),
 np.float64(1.0000000000000002)]

In [63]:
import numpy as np

In [64]:
k = sum(np.array(lista_valores) <= 0.7)
k

np.int64(6)

In [65]:
pca = PCA(k=k, inputCol='features_scaled', outputCol='pca_features')
model_pca = pca.fit(dados_musicas_scaler)
dados_musicas_pca_final = model_pca.transform(dados_musicas_scaler)

In [66]:
dados_musicas_pca_final.select('pca_features').show(truncate=False, n=5)

+----------------------------------------------------------------------------------------------------------------------+
|pca_features                                                                                                          |
+----------------------------------------------------------------------------------------------------------------------+
|[-34.71004464775702,-165.3684878490617,-11.163498729833675,-139.1137428085442,14.152192300931317,-6.254422223810444]  |
|[-36.49885762774904,-167.85185045616547,-10.729023248907904,-135.90435265671286,13.38840189058717,-3.972980727830917] |
|[-35.46030091694054,-165.99633344577876,-11.361290348241944,-138.24276881246985,13.866654919305718,-4.342675920458773]|
|[-35.56301325520582,-165.59515096480598,-11.46022600703173,-137.52095740570027,15.568542803922659,-4.542073725584329] |
|[-36.54556974907565,-167.37512505802474,-11.881276527236897,-136.27798590243967,14.056847990344485,-3.583390145779214]|
+-------------------------------

In [67]:
sum(model_pca.explainedVariance) * 100

np.float64(66.6571951365239)

In [68]:
pca_pipeline = Pipeline(stages=[
    VectorAssembler(inputCols=X, outputCol='features'),
    StandardScaler(inputCol='features', outputCol='features_scaled'),
    PCA(k=6, inputCol='features_scaled', outputCol='pca_features')
])

In [69]:
model_pca_pipeline = pca_pipeline.fit(dados)
projection = model_pca_pipeline.transform(dados)

In [70]:
projection.select('pca_features').show(truncate=False, n=5)

+----------------------------------------------------------------------------------------------------------------------+
|pca_features                                                                                                          |
+----------------------------------------------------------------------------------------------------------------------+
|[-34.71004464775702,-165.3684878490617,-11.163498729833675,-139.1137428085442,14.152192300931317,-6.254422223810444]  |
|[-36.49885762774904,-167.85185045616547,-10.729023248907904,-135.90435265671286,13.38840189058717,-3.972980727830917] |
|[-35.46030091694054,-165.99633344577876,-11.361290348241944,-138.24276881246985,13.866654919305718,-4.342675920458773]|
|[-35.56301325520582,-165.59515096480598,-11.46022600703173,-137.52095740570027,15.568542803922659,-4.542073725584329] |
|[-36.54556974907565,-167.37512505802474,-11.881276527236897,-136.27798590243967,14.056847990344485,-3.583390145779214]|
+-------------------------------

In [71]:
kmeans = KMeans(k=50, featuresCol='pca_features', predictionCol='cluster_pca',
                seed=SEED)
modelo_kmeans = kmeans.fit(projection)
projection_kmeans = modelo_kmeans.transform(projection)

In [73]:
projection_kmeans.select('pca_features', 'cluster_pca').show(n=10)

+--------------------+-----------+
|        pca_features|cluster_pca|
+--------------------+-----------+
|[-34.710044647757...|          8|
|[-36.498857627749...|          4|
|[-35.460300916940...|         35|
|[-35.563013255205...|         16|
|[-36.545569749075...|         34|
|[-36.713222290262...|         34|
|[-36.013246178822...|         34|
|[-36.542687712104...|         23|
|[-36.425249009784...|         16|
|[-35.872074915770...|         34|
+--------------------+-----------+
only showing top 10 rows


### Analisando Graficamente

- Não será possível plotar todas as 6 variáveis, serão escolhidas duas

In [74]:
projection_kmeans = projection_kmeans\
                        .withColumn('x', vector_to_array('pca_features')[0])\
                        .withColumn('y', vector_to_array('pca_features')[1])
projection_kmeans.select(['x', 'y', 'cluster_pca', 'artists_song']).show()

+-------------------+-------------------+-----------+--------------------+
|                  x|                  y|cluster_pca|        artists_song|
+-------------------+-------------------+-----------+--------------------+
| -34.71004464775702| -165.3684878490617|          8|   Coldplay - Yellow|
| -36.49885762774904|-167.85185045616547|          4|OutKast - Ms. Jac...|
| -35.46030091694054|-165.99633344577876|         35|Linkin Park - In ...|
| -35.56301325520582|-165.59515096480598|         16|3 Doors Down - Kr...|
| -36.54556974907565|-167.37512505802474|         34|Eminem - The Real...|
|  -36.7132222902623|  -166.482084830997|         34|Disturbed - Down ...|
| -36.01324617882219|-166.63514920955333|         34| Nelly - Ride Wit Me|
| -36.54268771210487| -166.9792823483323|         23|       Eminem - Stan|
|  -36.4252490097843|-165.10535006330755|         16|*NSYNC - Bye Bye Bye|
| -35.87207491577071| -166.0309883824791|         34|Britney Spears - ...|
|-31.639065936568734|-166

In [75]:
fig = px.scatter(projection_kmeans.toPandas(),
                 x='x', y='y', color='cluster_pca',
                 hover_data=['artists_song'])
fig.show()

## Sistema de Recomendação

In [76]:
nome_musica = 'Taylor Swift - Blank Space'

In [77]:
cluster = projection_kmeans.filter(
    projection_kmeans.artists_song == nome_musica).select('cluster_pca').collect()[0][0]
cluster

3

In [78]:
musicas_recomendadas = projection_kmeans.filter(
    projection_kmeans.cluster_pca == cluster
).select('artists_song', 'id', 'pca_features')
musicas_recomendadas.show(n=5)

+--------------------+--------------------+--------------------+
|        artists_song|                  id|        pca_features|
+--------------------+--------------------+--------------------+
|Usher - Yeah! (fe...|5rb9QrpfcKFHM1EUb...|[-36.050436933493...|
|Kelly Clarkson - ...|3xrn9i8zhNZsTtcoW...|[-35.191226411336...|
| Keyshia Cole - Love|0W4NhJhcqKCqEP2GI...|[-34.751332435196...|
|Mariah Carey - We...|3LmvfNUQtglbTryds...|[-35.144610288312...|
|Korn - Coming Undone|6p2liQLGoDaLXgND6...|[-35.272207170621...|
+--------------------+--------------------+--------------------+
only showing top 5 rows


In [79]:
componentes = musicas_recomendadas.filter(
    musicas_recomendadas.artists_song == nome_musica
).select('pca_features').collect()[0][0]
componentes

DenseVector([-35.461, -166.9658, -12.7925, -137.9457, 13.1234, -4.1374])

In [81]:
from scipy.spatial.distance import euclidean
from pyspark.sql.types import FloatType

In [82]:
def calculaDistancia(valor):
    return euclidean(componentes, valor)

In [90]:
# Transforma a função para PySpark
udf_calcula_distancia = f.udf(calculaDistancia, FloatType())

musicas_recomendadas_dist = musicas_recomendadas.withColumn(
    'Dist', udf_calcula_distancia('pca_features')
)

recomendadas = sessao.createDataFrame(
    musicas_recomendadas_dist.sort('Dist').take(10)).select(['artists_song', 'id', 'Dist'])

recomendadas.show()

+--------------------+--------------------+-------------------+
|        artists_song|                  id|               Dist|
+--------------------+--------------------+-------------------+
|Taylor Swift - Bl...|1p80LdxRV74UKvL8g...|                0.0|
|Imagine Dragons -...|4uGY9CqDtGtaTTLg1...|0.37921473383903503|
|The All-American ...|6ihL9TjfRjadfEePz...| 0.6039153933525085|
|Darius Rucker - B...|36ISlLb12gKuCCVTY...| 0.6824935674667358|
|Old Dominion - Sn...|7I5fYc4qKJddht8Oz...| 0.7364036440849304|
|Dan + Shay - Noth...|4W38RXuQNuoTSwVsQ...| 0.7841535210609436|
|Twenty One Pilots...|7qxjGHW485TL8ciwk...| 0.8002141118049622|
|Jon Bellion - Mor...|2JXNOtb0ANe6MzgRY...| 0.8098159432411194|
|Kip Moore - More ...|73WWkaWkIRSP3MYGZ...| 0.8557121753692627|
|Calibre 50 - Corr...|0ehx1p1HMn7NMKisZ...| 0.8788507580757141|
+--------------------+--------------------+-------------------+



In [91]:
def recomendador(nome_musica):
    cluster = projection_kmeans.filter(
        projection_kmeans.artists_song == nome_musica
    ).select('cluster_pca').collect()[0][0]

    musicas_recomendadas = projection_kmeans.filter(
        projection_kmeans.cluster_pca == cluster
    ).select('artists_song', 'id', 'pca_features')

    componentes = musicas_recomendadas.filter(
        musicas_recomendadas.artists_song == nome_musica
    ).select('pca_features').collect()[0][0]

    def calculaDistancia(valor):
        return euclidean(componentes, valor)

    udf_calcula_distancia = f.udf(calculaDistancia, FloatType())

    musicas_recomendadas_dist = musicas_recomendadas.withColumn(
        'Dist', udf_calcula_distancia('pca_features')
    )

    recomendadas = sessao.createDataFrame(
        musicas_recomendadas_dist.sort('Dist').take(10)
    ).select(['artists_song', 'id', 'Dist'])

    recomendadas.select('artists_song').show(truncate=False)

In [92]:
recomendador('Taylor Swift - Blank Space')

+-----------------------------------------+
|artists_song                             |
+-----------------------------------------+
|Taylor Swift - Blank Space               |
|Imagine Dragons - Machine                |
|The All-American Rejects - Gives You Hell|
|Darius Rucker - Beers And Sunshine       |
|Old Dominion - Snapback                  |
|Dan + Shay - Nothin' Like You            |
|Twenty One Pilots - Not Today            |
|Jon Bellion - Morning In America         |
|Kip Moore - More Girls Like You          |
|Calibre 50 - Corrido De Juanito          |
+-----------------------------------------+

