In [0]:
dbutils.fs.ls('/FileStore/tables/')

In [0]:
caminho_data = '/FileStore/tables/data.csv'

In [0]:
df_data = spark.read.csv(caminho_data, inferSchema=True, header=True)

In [0]:
display(df_data)

In [0]:
df_data = df_data.pandas_api()

In [0]:
type(df_data)

In [0]:
df_data.head()

In [0]:
df_data.dtypes

In [0]:
colunas_float = ['acousticness', 'danceability', 'energy', 'instrumentalness', 'liveness', 'loudness', 'speechiness', 'tempo', 'valence']
colunas_int = ['duration_ms', 'mode', 'key', 'explicit', 'popularity']

In [0]:
df_data[colunas_float] = df_data[colunas_float].astype(float)
df_data[colunas_int] = df_data[colunas_int].astype(int)

In [0]:
df_data.info()

In [0]:
df_data.head()

In [0]:
df_data['artists'] = df_data.artists.str.replace("\[|\]|\'", "")
df_data['artists'] = df_data.artists.str.replace(",", ";")

In [0]:
df_data.head()

In [0]:
dbutils.fs.mkdirs('dbfs:/FileStore/data_processed')

In [0]:
df_data.to_parquet('dbfs:/FileStore/data_processed/data.parquet')

In [0]:
dbutils.fs.ls('dbfs:/FileStore/data_processed')

In [0]:
import pyspark.pandas as  ps

In [0]:
df = ps.read_parquet('dbfs:/FileStore/data_processed/data.parquet')
display(df)

In [0]:
df.describe()

In [0]:
len(df.year.unique())

In [0]:
df.year.value_counts().sort_index().plot('bar')

In [0]:
df['decade'] = df.year.apply(lambda year: f'{(year//10)*10}s')
df.head()

In [0]:
df_data_2 = df[['decade']]
df_data_2['qtd'] = 1

In [0]:
df_data_2 = df_data_2.groupby('decade').sum()
df_data_2

In [0]:
df_data_2.sort_index().plot.bar(y='qtd')

In [0]:
dbutils.fs.ls('/FileStore/tables/')

In [0]:
paths = ['dbfs:/FileStore/tables/data_w_genres.csv',
        'dbfs:/FileStore/tables/data_by_year.csv',
        'dbfs:/FileStore/tables/data_by_genres.csv',
        'dbfs:/FileStore/tables/data_by_artist.csv']

In [0]:
import os

for path in paths:
    df_data = spark.read.csv(path, inferSchema=True, header=True)
    name = os.path.basename(path)
    df_data = df_data.pandas_api()
    df_data.to_parquet(f'/FileStore/data_processed/{name}.parquet')

In [0]:
dbutils.fs.ls('/FileStore/data_processed/')

In [0]:
df_year = ps.read_parquet('dbfs:/FileStore/data_processed/data_by_year.csv.parquet')
display(df_year)

In [0]:
df_year.plot.line(x='year', y='duration_ms')

In [0]:
df_year.plot.line(x='year', y=['acousticness', 'danceability', 'energy',
       'instrumentalness', 'liveness', 'speechiness', 'valence'])

In [0]:
df_year['decade'] = df_year['year'].apply(lambda row: f'{(row//10)*10}s')
df_year['qtd'] = 1

In [0]:
df_year_2 = df_year.groupby('decade').agg({
    'acousticness': 'mean',
    'danceability': 'mean',
    'duration_ms': 'mean',
    'energy': 'mean',
    'instrumentalness': 'mean',
    'liveness': 'mean',
    'loudness': 'mean',
    'speechiness': 'mean', 
    'tempo': 'mean',
    'valence': 'mean', 
    'popularity': 'mean',
    'qtd': 'sum' }).sort_index().reset_index()

In [0]:
df_year_2.plot.line(x='decade', y=['acousticness', 'danceability', 'energy',
       'instrumentalness', 'liveness', 'speechiness', 'valence'])

In [0]:
df_genres = ps.read_parquet('dbfs:/FileStore/data_processed/data_by_genres.csv.parquet')
display(df_genres)

In [0]:
df_wgenres = ps.read_parquet('dbfs:/FileStore/data_processed/data_w_genres.csv.parquet')
display(df_wgenres)

In [0]:
df_artists = ps.read_parquet('dbfs:/FileStore/data_processed/data_by_artist.csv.parquet')
display(df_artists)

In [0]:
artista_ordenado = df_artists.sort_values(by='count', ascending=False)
display(artista_ordenado)

In [0]:
top_artistas = artista_ordenado.iloc[:10]
display(top_artistas)

In [0]:
plot_title = 'Top 10 Artistas'
top_artistas.plot.bar(x='count', y = 'artists', title = plot_title)

In [0]:
lista_artistas = top_artistas.artists.unique().to_list()
lista_artistas

In [0]:
artista_genero = df_wgenres.loc[df_wgenres['artists'].isin(lista_artistas)]
artista_genero = artista_genero[['genres', 'artists']]
display(artista_genero)

In [0]:
display(df_data)

In [0]:
df_data.dropna(inplace=True)

In [0]:
df_data['artists_song'] = df_data.artists + ' - ' + df_data.name

In [0]:
df_data.head()

In [0]:
df_data.info()

In [0]:
X = df_data.columns.to_list()
X.remove('artists')
X.remove('id')
X.remove('name')
X.remove('artists_song')
X.remove('release_date')
X

In [0]:
df_data = df_data.to_spark()

In [0]:
from pyspark.ml.feature import VectorAssembler

In [0]:
dados_encoded_vector = VectorAssembler(inputCols=X, outputCol='features').transform(df_data)

In [0]:
dados_encoded_vector.select('features').show()

In [0]:
from pyspark.ml.feature import StandardScaler

In [0]:
scaler = StandardScaler(inputCol='features', outputCol='features_scaled')
model_scaler = scaler.fit(dados_encoded_vector)
model_scaler

In [0]:
dados_musicas_scaler = model_scaler.transform(dados_encoded_vector)
dados_musicas_scaler

In [0]:
dados_musicas_scaler.select('features_scaled').show(5)

In [0]:
k = len(X)
k

In [0]:
from pyspark.ml.feature import PCA

In [0]:
pca = PCA(k=k, inputCol='features_scaled', outputCol='pca_features')
model_pca = pca.fit(dados_musicas_scaler)
model_pca

In [0]:
dados_musicas_pca = model_pca.transform(dados_musicas_scaler)
dados_musicas_pca

In [0]:
dados_musicas_pca.show(5)

In [0]:
model_pca.explainedVariance * 100

In [0]:
lista_valores = [sum(model_pca.explainedVariance[0:i+1]) for i in range(k)]
lista_valores

In [0]:
import numpy as np

In [0]:
k = sum(np.array(lista_valores) <= 0.7)
k

In [0]:
pca = PCA(k=6, inputCol='features_scaled', outputCol='pca_features')
model_pca = pca.fit(dados_musicas_scaler)
dados_musicas_pca_final = model_pca.transform(dados_musicas_scaler)
dados_musicas_pca_final.select('pca_features').show(truncate=False, n=5)

# Clustering

In [0]:
from pyspark.ml import Pipeline

In [0]:
pca_pipeline = Pipeline(stages=[VectorAssembler(inputCols=X, outputCol='features'),
                                StandardScaler(inputCol='features', outputCol='features_scaled'),
                                PCA(k=6, inputCol='features_scaled', outputCol='pca_features')])

In [0]:
model_pca_pipeline = pca_pipeline.fit(df_data)

In [0]:
projection = model_pca_pipeline.transform(df_data)

In [0]:
projection.select('pca_features').show()

In [0]:
from pyspark.ml.clustering import KMeans

In [0]:
kmeans = KMeans(k=50, featuresCol='pca_features', predictionCol='cluster_pca', seed=1224)

In [0]:
modelo_kmeans = kmeans.fit(projection)

In [0]:
projection_kmeans = modelo_kmeans.transform(projection) 

In [0]:
projection_kmeans.select(['pca_features','cluster_pca']).show()

In [0]:
from pyspark.ml.functions import vector_to_array

In [0]:
projection_kmeans = projection_kmeans.withColumn('x', vector_to_array('pca_features'))\
                                     .withColumn('y', vector_to_array('pca_features'))

In [0]:
projection_kmeans.select(['x', 'y', 'cluster_pca', 'artists_song']).show()

In [0]:
import plotly.express as px

In [0]:
fig = px.scatter(projection_kmeans.toPandas(), x='x', y='y', color='cluster_pca', hover_data=['artists_song'])
fig.show()

In [0]:
nome_musica = 'Taylor Swift - Blank Space'

In [0]:
cluster = projection_kmeans.filter(projection_kmeans.artists_song == nome_musica).select('cluster_pca').collect()[0][0]
cluster

In [0]:
musicas_recomendadas = projection_kmeans.filter(projection_kmeans.cluster_pca == cluster)\
                                       .select('artists_song', 'id', 'pca_features')
musicas_recomendadas.show()

In [0]:
componenetes_musica = musicas_recomendadas.filter(musicas_recomendadas.artists_song == nome_musica)\
                                          .select('pca_features').collect()[0][0]
componenetes_musica                             

In [0]:
from scipy.spatial.distance import euclidean

In [0]:
def calcula_distance(value):
    return euclidean(componenetes_musica, value)

In [0]:
from pyspark.sql.types import FloatType
import pyspark.sql.functions as f

In [0]:
udf_calcula_distance = f.udf(calcula_distance, FloatType())
musicas_recomendadas_dist = musicas_recomendadas.withColumn('Dist', udf_calcula_distance('pca_features'))
recomendadas = spark.createDataFrame(musicas_recomendadas_dist.sort('Dist').take(10)).select(['artists_song', 'id', 'Dist'])
recomendadas.show()

In [0]:
def recomendador(nome_musica):
    # Filtra o dataframe projetion_kmeans para obter o cluster da música com o nome especificado.
    cluster = projection_kmeans.filter(projection_kmeans.artists_song == nome_musica).select('cluster_pca').collect()[0][0]

    # Filtra o dataframe projetion_kmeans para obter todas as músicas que pertencem ao mesmo cluster.
    # Seleciona apenas as colunas artists_song, id e pca_features.
    musicas_recomendadas = projection_kmeans.filter(projection_kmeans.cluster_pca == cluster)\
                                           .select('artists_song', 'id', 'pca_features')

    # Filtra o dataframe musicas_recomendadas para obter os componentes da música com o nome especificado.
    # Seleciona apenas a coluna pca_features e coleta o primeiro resultado.
    componenetes_musica = musicas_recomendadas.filter(musicas_recomendadas.artists_song == nome_musica)\
                                              .select('pca_features').collect()[0][0]

    # Define uma função para calcular a distância euclidiana entre os componentes da música especificada e cada música recomendada.
    def calcula_distance(value):
        return euclidean(componenetes_musica, value)

    # Cria um User-Defined Function (UDF) a partir da função calcula_distance.
    udf_calcula_distance = f.udf(calcula_distance, FloatType())

    # Adiciona uma nova coluna ao dataframe musicas_recomendadas contendo a distância entre cada música e a música especificada.
    musicas_recomendadas_dist = musicas_recomendadas.withColumn('Dist', udf_calcula_distance('pca_features'))

    # Cria um novo dataframe contendo as 10 músicas mais próximas à música especificada, ordenadas por distância.
    recomendadas = spark.createDataFrame(musicas_recomendadas_dist.sort('Dist').take(10)).select(['artists_song', 'id', 'Dist'])

    # Retorna o dataframe com as músicas recomendadas.
    return recomendadas

In [0]:
df_recomedada = recomendador('Taylor Swift - Blank Space')
df_recomedada.show()

In [0]:
!pip install spotipy

In [0]:
import spotipy
from spotipy.oauth2 import SpotifyOAuth, SpotifyClientCredentials

In [0]:
scope = "user-library-read playlist-modify-private"

OAuth = SpotifyOAuth(
        scope=scope,         
        redirect_uri='localhost:8888/callback',
        client_id = '',
        client_secret = '')

In [0]:
client_credentials_manager = SpotifyClientCredentials(client_id = '',
                                                      client_secret = '')

sp = spotipy.Spotify(client_credentials_manager = client_credentials_manager)

In [0]:
id = projection_kmeans.filter(projection_kmeans.artists_song == nome_musica).select('id').collect()[0][0]
id

In [0]:
sp.track(id)

In [0]:
playlist_id = df_recomedada.select('id').collect()

In [0]:
playlist_track = []
for id in playlist_id:
    playlist_track.append(sp.track(id[0]))

In [0]:
playlist_track

In [0]:
def recomendador(nome_musica):
    cluster = projection_kmeans.filter(projection_kmeans.artists_song == nome_musica).select('cluster_pca').collect()[0][0]
    musicas_recomendadas = projection_kmeans.filter(projection_kmeans.cluster_pca == cluster)\
                                       .select('artists_song', 'id', 'pca_features')
    componenetes_musica = musicas_recomendadas.filter(musicas_recomendadas.artists_song == nome_musica)\
                                          .select('pca_features').collect()[0][0]

    def calcula_distance(value):
        return euclidean(componenetes_musica, value)

    udf_calcula_distance = f.udf(calcula_distance, FloatType())

    musicas_recomendadas_dist = musicas_recomendadas.withColumn('Dist', udf_calcula_distance('pca_features'))

    recomendadas = spark.createDataFrame(musicas_recomendadas_dist.sort('Dist').take(10)).select(['artists_song', 'id', 'Dist'])

    id = projection_kmeans.filter(projection_kmeans.artists_song == nome_musica).select('id').collect()[0][0]


    playlist_id = recomendadas.select('id').collect()

    playlist_track = []

    for id in playlist_id:
        playlist_track.append(sp.track(id[0]))

    return len(playlist_track)

In [0]:
recomendador('Taylor Swift - Blank Space')

In [0]:
import imageio

In [0]:
import matplotlib.pyplot as plt
nome_musica = 'Taylor Swift - Blank Space'

id = projection_kmeans\
          .filter(projection_kmeans.artists_song == nome_musica)\
          .select('id').collect()[0][0]

track = sp.track(id)

url = track["album"]["images"][1]["url"]
name = track["name"]

image = imageio.imread(url)
plt.imshow(image)
plt.xlabel(name, fontsize = 10)
plt.show()

In [0]:
import matplotlib.pyplot as plt


def visualize_songs(name,url):

    plt.figure(figsize=(15,10))
    columns = 5
    for i, u in enumerate(url):
        ax = plt.subplot(len(url) // columns + 1, columns, i + 1)
        image = imageio.imread(u)
        plt.imshow(image)
        ax.get_yaxis().set_visible(False)
        plt.xticks(color = 'w', fontsize = 0.1)
        plt.yticks(color = 'w', fontsize = 0.1)
        plt.xlabel(name[i], fontsize = 10)
        plt.tight_layout(h_pad=0.7, w_pad=0)
        plt.subplots_adjust(wspace=None, hspace=None)
        plt.grid(visible=None)
    plt.show()

In [0]:
playlist_id = df_recomedada.select('id').collect()

name = []
url = []
for i in playlist_id:
    track = sp.track(i[0])
    url.append(track["album"]["images"][1]["url"])
    name.append(track["name"])

In [0]:
visualize_songs(name,url)

In [0]:
def recomendador(nome_musica):
  # Calcula musicas recomendadas
    cluster = projection_kmeans.filter(projection_kmeans.artists_song == nome_musica).select('cluster_pca').collect()[0][0]
    musicas_recomendadas = projection_kmeans.filter(projection_kmeans.cluster_pca == cluster)\
                                       .select('artists_song', 'id', 'pca_features')
    componenetes_musica = musicas_recomendadas.filter(musicas_recomendadas.artists_song == nome_musica)\
                                          .select('pca_features').collect()[0][0]

    def calcula_distance(value):
        return euclidean(componenetes_musica, value)

    udf_calcula_distance = f.udf(calcula_distance, FloatType())

    musicas_recomendadas_dist = musicas_recomendadas.withColumn('Dist', udf_calcula_distance('pca_features'))

    recomendadas = spark.createDataFrame(musicas_recomendadas_dist.sort('Dist').take(10)).select(['artists_song', 'id', 'Dist'])

  #Pegar informações da API

    playlist_id = recomendadas.select('id').collect()

    name = []
    url = []
    for i in playlist_id:
        track = sp.track(i[0])
        url.append(track["album"]["images"][1]["url"])
        name.append(track["name"])

  #Plotando capas 

    plt.figure(figsize=(15,10))
    columns = 5
    for i, u in enumerate(url):
        ax = plt.subplot(len(url) // columns + 1, columns, i + 1)
        image = imageio.imread(u)
        plt.imshow(image)
        ax.get_yaxis().set_visible(False)
        plt.xticks(color = 'w', fontsize = 0.1)
        plt.yticks(color = 'w', fontsize = 0.1)
        plt.xlabel(name[i], fontsize = 10)
        plt.tight_layout(h_pad=0.7, w_pad=0)
        plt.subplots_adjust(wspace=None, hspace=None)
        plt.grid(visible=None)
    plt.show()

In [0]:
recomendador('Taylor Swift - Blank Space')