# Projeto de Recomendação de Músicas com PySpark

## Índice
1. [Introdução](#introdução)
2. [Configuração do Ambiente](#configuração-do-ambiente)
3. [Carregamento e Análise Exploratória dos Dados](#carregamento-e-análise-exploratória-dos-dados)
4. [Clusterização de Gêneros Musicais](#clusterização-de-gêneros-musicais)
5. [Clusterização de Músicas](#clusterização-de-músicas)
6. [Sistema de Recomendação](#sistema-de-recomendação)
7. [Conclusão](#conclusão)

## Introdução

Este projeto implementa um sistema de recomendação de músicas utilizando PySpark. O objetivo é agrupar músicas similares com base em suas características e gêneros, e então recomendar músicas semelhantes a uma dada entrada.

## Configuração do Ambiente

Primeiramente, configuramos o ambiente PySpark. O PySpark é uma interface Python para o Apache Spark, que nos permite processar grandes volumes de dados de forma distribuída.

In [2]:
from pyspark.sql import SparkSession
import findspark

findspark.init()

spark = SparkSession.builder.appName("Recomendação de Músicas com Spark").getOrCreate()
spark

## Carregamento e Análise Exploratória dos Dados

### Carregamento dos Dados
Carregamos os dados do arquivo CSV usando o SparkContext:


In [3]:
from pyspark import SparkFiles

path = "data/dados_musicas.csv"

spark.sparkContext.addFile(path)

data = spark.read.csv(SparkFiles.get("dados_musicas.csv"), sep=";", header=True, inferSchema=True)

data.limit(5).show()
data.printSchema()

+------------------+----+------------+------------+------------+-----------+------------------+--------+--------------------+----------------+---+--------+-------------------+----+-------------------+----------+-----------+-------+--------------------+
|           valence|year|acousticness|     artists|danceability|duration_ms|            energy|explicit|                  id|instrumentalness|key|liveness|           loudness|mode|               name|popularity|speechiness|  tempo|        artists_song|
+------------------+----+------------+------------+------------+-----------+------------------+--------+--------------------+----------------+---+--------+-------------------+----+-------------------+----------+-----------+-------+--------------------+
|             0.285|2000|     0.00239|    Coldplay|       0.429|     266773|0.6609999999999999|       0|3AJwUDP919kvQ9Qco...|         1.21E-4| 11|   0.234|             -7.227|   1|             Yellow|        84|     0.0281|173.372|   Coldpla

Este código adiciona o arquivo CSV ao SparkContext e o lê como um DataFrame Spark. Usamos inferSchema=True para que o Spark tente inferir automaticamente os tipos de dados de cada coluna.

### Análise Exploratória
Realizamos uma análise exploratória para entender melhor nossos dados:

In [4]:
import pyspark.sql.functions as f

# Verificando dados nulos
data.select([
    f.sum(f.when(f.col(c).isNull(), 1).otherwise(0)).alias(c) 
    for c in data.columns
]).show()

# Analisando a distribuição dos anos
data.select("year").distinct().orderBy("year").show()

+-------+----+------------+-------+------------+-----------+------+--------+---+----------------+---+--------+--------+----+----+----------+-----------+-----+------------+
|valence|year|acousticness|artists|danceability|duration_ms|energy|explicit| id|instrumentalness|key|liveness|loudness|mode|name|popularity|speechiness|tempo|artists_song|
+-------+----+------------+-------+------------+-----------+------+--------+---+----------------+---+--------+--------+----+----+----------+-----------+-----+------------+
|      0|   0|           0|      0|           0|          0|     0|       0|  0|               0|  0|       0|       0|   0|   0|         0|          0|    0|           0|
+-------+----+------------+-------+------------+-----------+------+--------+---+----------------+---+--------+--------+----+----+----------+-----------+-----+------------+

+----+
|year|
+----+
|2000|
|2001|
|2002|
|2003|
|2004|
|2005|
|2006|
|2007|
|2008|
|2009|
|2010|
|2011|
|2012|
|2013|
|2014|
|2015|
|2016|

Aqui, verificamos a presença de valores nulos em cada coluna e analisamos a distribuição dos anos das músicas. Isso nos ajuda a entender a qualidade e a abrangência temporal dos nossos dados.

Para visualizar as características das músicas ao longo do tempo:

In [5]:
import plotly.graph_objects as go

temp = data.groupBy("year").avg().toPandas().drop(columns="avg(year)")
fig = go.Figure()

fig.add_trace(go.Line(y=temp["avg(acousticness)"], name="Acousticness"))
fig.add_trace(go.Line(y=temp["avg(danceability)"], name="Danceability"))
fig.add_trace(go.Line(y=temp["avg(energy)"], name="Energy"))
fig.add_trace(go.Line(y=temp["avg(instrumentalness)"], name="Instrumentalness"))
fig.add_trace(go.Line(y=temp["avg(liveness)"], name="Liveness"))
fig.add_trace(go.Line(y=temp["avg(speechiness)"], name="Speechiness"))
fig.add_trace(go.Line(y=temp["avg(valence)"], name="Valence"))

fig.update_layout(title_text="Distribuição das Características das Músicas por Ano")
fig.show()


plotly.graph_objs.Line is deprecated.
Please replace it with one of the following more specific types
  - plotly.graph_objs.scatter.Line
  - plotly.graph_objs.layout.shape.Line
  - etc.




Este gráfico nos mostra como as diferentes características musicais (como acústica, dançabilidade, energia) evoluíram ao longo dos anos.

Para visualizar as correlações entre as características das músicas:

In [6]:
import plotly.express as px

px.imshow(temp.corr(), text_auto=True, width=1000, height=600, title="Correlação entre as Características das Músicas")

## Clusterização de Gêneros Musicais

### Preparação dos Dados
Primeiro, carregamos os dados de gêneros musicais:

In [7]:
path = "data/dados_musicas_genero.csv"

spark.sparkContext.addFile(path)

data_genero = spark.read.csv(SparkFiles.get("dados_musicas_genero.csv"), header=True, inferSchema=True)

x_generos = data_genero.columns
x_generos.remove("genres")

Aqui, carregamos um conjunto de dados separado para gêneros musicais e preparamos as colunas que usaremos como features.

### Pipeline de Pré-processamento
Criamos um pipeline para pré-processar nossos dados:

In [8]:
from pyspark.ml.feature import VectorAssembler, StandardScaler, PCA
from pyspark.ml import Pipeline

pipeline_generos = Pipeline(stages=[
    VectorAssembler(inputCols=x_generos, outputCol="features"),
    StandardScaler(inputCol="features", outputCol="scaledFeatures"),
    PCA(k=2, inputCol="scaledFeatures", outputCol="pcaFeatures")
])

pipeline_generos_model = pipeline_generos.fit(data_genero)
processed_data_generos = pipeline_generos_model.transform(data_genero)

processed_data_generos.limit(5).show()

+----+--------------------+------------------+-------------------+------------------+-------------------+--------------------+-------------------+-------------------+-------------------+------------------+-------------------+------------------+---+--------------------+--------------------+--------------------+
|mode|              genres|      acousticness|       danceability|       duration_ms|             energy|    instrumentalness|           liveness|           loudness|        speechiness|             tempo|            valence|        popularity|key|            features|      scaledFeatures|         pcaFeatures|
+----+--------------------+------------------+-------------------+------------------+-------------------+--------------------+-------------------+-------------------+-------------------+------------------+-------------------+------------------+---+--------------------+--------------------+--------------------+
|   1|21st century clas...|0.9793333333333332|0.1628833333333333

Este pipeline realiza três etapas principais:
1. Vetorização: combina todas as features em um único vetor.
2. Padronização: coloca todas as features na mesma escala.
3. PCA: reduz a dimensionalidade dos dados para facilitar a visualização e o processamento.


### Clusterização

Utilizamos o algoritmo K-Means para agrupar os gêneros:

In [9]:
from pyspark.ml.clustering import KMeans

kmeans_pca_generos = KMeans(featuresCol="pcaFeatures", predictionCol="pcaClusters", seed=1, k=10)

data_clusters_generos = kmeans_pca_generos.fit(processed_data_generos).transform(processed_data_generos)

data_clusters_generos.limit(5).show()

+----+--------------------+------------------+-------------------+------------------+-------------------+--------------------+-------------------+-------------------+-------------------+------------------+-------------------+------------------+---+--------------------+--------------------+--------------------+-----------+
|mode|              genres|      acousticness|       danceability|       duration_ms|             energy|    instrumentalness|           liveness|           loudness|        speechiness|             tempo|            valence|        popularity|key|            features|      scaledFeatures|         pcaFeatures|pcaClusters|
+----+--------------------+------------------+-------------------+------------------+-------------------+--------------------+-------------------+-------------------+-------------------+------------------+-------------------+------------------+---+--------------------+--------------------+--------------------+-----------+
|   1|21st century clas...|0

K-Means é um algoritmo de clusterização que agrupa os dados em K clusters baseados na similaridade das features.

### Visualização dos Clusters

Para visualizar os clusters de gêneros:

In [10]:
from pyspark.ml.functions import vector_to_array
import plotly.express as px

data_clusters_generos = data_clusters_generos.withColumn("PCA_1", vector_to_array("pcaFeatures")[0])
data_clusters_generos = data_clusters_generos.withColumn("PCA_2", vector_to_array("pcaFeatures")[1])

fig = px.scatter(data_clusters_generos.toPandas(), x="PCA_1", y="PCA_2", color="pcaClusters", hover_data=["genres"])
fig.show()

Este gráfico nos mostra como os diferentes gêneros musicais se agrupam com base em suas características.

## Clusterização de Músicas

A clusterização de músicas segue um processo similar ao que fizemos com os gêneros musicais, mas com algumas diferenças importantes. Vamos passar por cada etapa detalhadamente.

### Preparação dos Dados

Primeiro, precisamos selecionar as features que usaremos para a clusterização:

In [11]:
x = data.columns

# Removendo colunas que não são features numéricas
x.remove("id")
x.remove("name")
x.remove("artists")
x.remove("artists_song")

print("Features selecionadas:", x)

Features selecionadas: ['valence', 'year', 'acousticness', 'danceability', 'duration_ms', 'energy', 'explicit', 'instrumentalness', 'key', 'liveness', 'loudness', 'mode', 'popularity', 'speechiness', 'tempo']


Aqui, estamos removendo colunas que não são relevantes para a clusterização, como identificadores e nomes. As features restantes são características musicais numéricas que usaremos para agrupar as músicas.

### Pipeline de Pré-processamento

Em seguida, criamos um pipeline para pré-processar nossos dados:

In [12]:
from pyspark.ml.feature import VectorAssembler, StandardScaler, PCA
from pyspark.ml import Pipeline

pipeline = Pipeline(stages = [
    VectorAssembler(inputCols=x, outputCol="features"),
    StandardScaler(inputCol="features", outputCol="scaledFeatures"),
    PCA(k=4, inputCol="scaledFeatures", outputCol="pcaFeatures")
])

pipeline_model = pipeline.fit(data)
data_processed = pipeline_model.transform(data)

print("Colunas após o pré-processamento:", data_processed.columns)
data_processed.select("features", "scaledFeatures", "pcaFeatures").show(truncate=False, n=2)

Colunas após o pré-processamento: ['valence', 'year', 'acousticness', 'artists', 'danceability', 'duration_ms', 'energy', 'explicit', 'id', 'instrumentalness', 'key', 'liveness', 'loudness', 'mode', 'name', 'popularity', 'speechiness', 'tempo', 'artists_song', 'features', 'scaledFeatures', 'pcaFeatures']
+-------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------+
|features                                                                                                           |scaledFeatures                                                                                            

Este pipeline realiza três etapas principais:
1. Vetorização: combina todas as features selecionadas em um único vetor.
2. Padronização: coloca todas as features na mesma escala, o que é importante para o algoritmo de clusterização.
3. PCA: reduz a dimensionalidade dos dados para 4 componentes principais, o que ajuda a lidar com a alta dimensionalidade e possível correlação entre as features.

Após aplicar o pipeline, mostramos as primeiras linhas do resultado para verificar se o pré-processamento foi aplicado corretamente.


### Avaliação do PCA

É importante entender quanto da variância original dos dados foi mantida após a redução de dimensionalidade:

In [13]:
pca_model = pipeline_model.stages[-1]

print(f"O PCA manteve {sum(pca_model.explainedVariance)*100:.2f}% da informação original")

O PCA manteve 52.68% da informação original


Esta informação nos ajuda a avaliar se a redução de dimensionalidade foi efetiva sem perder muita informação importante.

### Clusterização com K-Means
Agora, aplicamos o algoritmo K-Means para agrupar as músicas:

In [14]:
from pyspark.ml.clustering import KMeans

kmeans = KMeans(featuresCol="pcaFeatures", predictionCol="clusters", seed=1, k=10)
kmeans_model = kmeans.fit(data_processed)
results = kmeans_model.transform(data_processed)

results.select("id", "artists_song", "pcaFeatures", "clusters").show(truncate=False, n=5)

+----------------------+----------------------------+---------------------------------------------------------------------------------+--------+
|id                    |artists_song                |pcaFeatures                                                                      |clusters|
+----------------------+----------------------------+---------------------------------------------------------------------------------+--------+
|3AJwUDP919kvQ9QcozQPxg|Coldplay - Yellow           |[-34.71004464775704,-165.36848784906184,-11.163498729833321,-139.11374280854437] |9       |
|0I3q5fE6wg7LIfHGngUTnV|OutKast - Ms. Jackson       |[-36.498857627749054,-167.85185045616558,-10.729023248907561,-135.90435265671297]|0       |
|60a0Rd6pjrkxjPbaKzXjfq|Linkin Park - In the End    |[-35.460300916940554,-165.99633344577887,-11.361290348241594,-138.24276881247]   |4       |
|6ZOBP3NvffbU4SZcrnt1k6|3 Doors Down - Kryptonite   |[-35.56301325520583,-165.59515096480607,-11.46022600703138,-137.5209574057004

Aqui, estamos usando K-Means com 10 clusters (k=10). O algoritmo agrupa as músicas baseado na similaridade de suas features reduzidas pelo PCA. Após o ajuste do modelo, aplicamos a transformação aos dados processados para obter as atribuições de cluster para cada música.

### Preparação para Visualização

Para visualizar os resultados em 3D, precisamos extrair as três primeiras componentes principais:

In [15]:
from pyspark.ml.functions import vector_to_array

results = results.withColumn("PCA_1", vector_to_array("pcaFeatures")[0])
results = results.withColumn("PCA_2", vector_to_array("pcaFeatures")[1])
results = results.withColumn("PCA_3", vector_to_array("pcaFeatures")[2])

results.select("artists_song", "PCA_1", "PCA_2", "PCA_3", "clusters").show(truncate=False, n=5)

+----------------------------+-------------------+-------------------+-------------------+--------+
|artists_song                |PCA_1              |PCA_2              |PCA_3              |clusters|
+----------------------------+-------------------+-------------------+-------------------+--------+
|Coldplay - Yellow           |-34.71004464775704 |-165.36848784906184|-11.163498729833321|9       |
|OutKast - Ms. Jackson       |-36.498857627749054|-167.85185045616558|-10.729023248907561|0       |
|Linkin Park - In the End    |-35.460300916940554|-165.99633344577887|-11.361290348241594|4       |
|3 Doors Down - Kryptonite   |-35.56301325520583 |-165.59515096480607|-11.46022600703138 |2       |
|Eminem - The Real Slim Shady|-36.54556974907567 |-167.37512505802482|-11.881276527236556|7       |
+----------------------------+-------------------+-------------------+-------------------+--------+
only showing top 5 rows



Aqui, extraímos as três primeiras componentes do vetor PCA para usar na visualização.

### Visualização dos Clusters

Por fim, criamos uma visualização 3D dos clusters de músicas:

In [16]:
import plotly.express as px

fig = px.scatter_3d(results.toPandas(), x="PCA_1", y="PCA_2", z="PCA_3", 
                    color="clusters", hover_data=["artists_song"])

fig.update_layout(width=1000, height=600, title_text="Clusters de Músicas")
fig.update_traces(marker_size=2)

fig.show()

Este gráfico 3D nos permite visualizar como as músicas se agrupam no espaço tridimensional das três primeiras componentes principais. Cada ponto representa uma música, e a cor indica o cluster ao qual ela pertence.

## Sistema de Recomendação

Após a clusterização das músicas, podemos criar nosso sistema de recomendação. A ideia principal é encontrar músicas similares à música de entrada dentro do mesmo cluster.

### Função de Recomendação

Agora, vamos criar nossa função principal de recomendação:


In [17]:
from scipy.spatial.distance import euclidean

def recomenda(musica_nome, n_recomendacoes=10):
    # Passo 1: Encontrar a música de entrada
    musica = results.filter(results.artists_song.contains(musica_nome))
    
    if musica.count() == 0:
        print("Música não encontrada")
        return

    elif musica.count() > 1:
        for i, row in enumerate(musica.collect()):
            print(f"{i+1} - {row['artists_song']}")
        print("\n")
        musica_id = int(input("Selecione a música desejada: "))
        musica = musica.collect()[musica_id-1]
    
    else:
        musica = musica.collect()[0]

    # Passo 2: Extrair informações da música
    music_name = musica["artists_song"]
    music_cluster = musica["clusters"]
    music_components = musica["pcaFeatures"]
    
    print(f"Música encontrada: {music_name}")
    print(f"Cluster: {music_cluster}")

    udf = f.udf(lambda row: euclidean(row, music_components))

    # Passo 3: Encontrar músicas similares
    recomendacoes = results.filter(results.artists_song != music_name) \
                           .filter(results.clusters == music_cluster) \
                           .withColumn("distance", udf("pcaFeatures")) \
                           .orderBy("distance")

    # Passo 4: Mostrar recomendações
    print(f"\nTop {n_recomendacoes} recomendações:")
    recomendacoes.select("artists_song", "distance").show(n=n_recomendacoes, truncate=False)

    return recomendacoes

Vamos analisar cada passo desta função:

1. **Encontrar a música de entrada**: 
   - Usamos `filter` para encontrar a música no DataFrame `results`.
   - Tratamos casos em que a música não é encontrada ou múltiplas músicas correspondem à entrada.

2. **Extrair informações da música**:
   - Obtemos o nome da música, seu cluster e suas componentes PCA.

3. **Encontrar músicas similares**:
   - Filtramos músicas do mesmo cluster (excluindo a própria música de entrada).
   - Calculamos a distância entre cada música e a música de entrada usando nossa função UDF.
   - Ordenamos as músicas por distância (as mais próximas primeiro).

4. **Mostrar recomendações**:
   - Exibimos as top N recomendações (por padrão, 10).

### Uso do Sistema de Recomendação

Podemos agora usar nossa função para obter recomendações:

In [25]:
# Exemplo de uso
recomenda("Lose Yourself")

1 - "Eminem - Lose Yourself - From ""8 Mile"" Soundtrack"
2 - Eminem - Lose Yourself
3 - Daft Punk - Lose Yourself to Dance (feat. Pharrell Williams)
4 - Eminem - Lose Yourself - Soundtrack Version


Música encontrada: "Eminem - Lose Yourself - From ""8 Mile"" Soundtrack"
Cluster: 6

Top 10 recomendações:
+--------------------------------------------------+-------------------+
|artists_song                                      |distance           |
+--------------------------------------------------+-------------------+
|Eminem - Lose Yourself                            |0.31637079617429625|
|Playboi Carti - Long Time - Intro                 |0.3444820452504516 |
|Nipsey Hussle - Victory Lap (feat. Stacy Barthe)  |0.45882756880661113|
|Motionless In White - Reincarnate                 |0.5562662605187816 |
|T.I. - That's All She Wrote                       |0.569085015134548  |
|Meek Mill - Litty (feat. Tory Lanez)              |0.6118976933585115 |
|Rick Ross - Stay Schemin           

DataFrame[valence: double, year: int, acousticness: double, artists: string, danceability: double, duration_ms: int, energy: double, explicit: int, id: string, instrumentalness: double, key: int, liveness: double, loudness: double, mode: int, name: string, popularity: int, speechiness: double, tempo: double, artists_song: string, features: vector, scaledFeatures: vector, pcaFeatures: vector, clusters: int, PCA_1: double, PCA_2: double, PCA_3: double, distance: string]

## Conclusão

Este projeto demonstra a aplicação de técnicas de processamento de dados em larga escala e aprendizado de máquina para criar um sistema de recomendação de músicas. Utilizamos PySpark para processar grandes volumes de dados, aplicamos técnicas de redução de dimensionalidade (PCA) e clusterização (K-Means) para agrupar músicas similares, e implementamos um sistema de recomendação baseado na distância euclidiana entre as características das músicas.

### Possíveis melhorias incluem:
1. Otimização do número de clusters
2. Incorporação de feedback do usuário
3. Utilização de técnicas mais avançadas de recomendação, como filtragem colaborativa

Este projeto serve como uma base sólida para um sistema de recomendação de músicas e demonstra habilidades em processamento de dados em larga escala, aprendizado de máquina e visualização de dados.