# <font color='blue'>Data Science Academy</font>
# <font color='blue'>Big Data Real-Time Analytics com Python e Spark</font>

## <font color='blue'>Mini-Projeto 7</font>

### <font color='blue'>Sistema de Recomendação em Tempo Real com Machine Learning, PySpark, Spark Streaming e Kafka</font>

![title](imagens/MP7.png)

In [None]:
# Versão da Linguagem Python
from platform import python_version
print('Versão da Linguagem Python Usada Neste Jupyter Notebook:', python_version())

In [None]:
# Para atualizar um pacote, execute o comando abaixo no terminal ou prompt de comando:
# pip install -U nome_pacote

# Para instalar a versão exata de um pacote, execute o comando abaixo no terminal ou prompt de comando:
#!pip install nome_pacote==versão_desejada

# Depois de instalar ou atualizar o pacote, reinicie o jupyter notebook.

# Instala o pacote watermark. 
# Esse pacote é usado para gravar as versões de outros pacotes usados neste jupyter notebook.
!pip install -q -U watermark

In [None]:
# https://kafka-python.readthedocs.io/en/master/
!pip install -q kafka-python

In [None]:
# Imports
import time
import random
import kafka
import numpy as np
import pandas as pd
from json import dumps
from kafka import KafkaProducer
import warnings
warnings.filterwarnings('ignore')

In [None]:
# Versões dos pacotes usados neste jupyter notebook
%reload_ext watermark
%watermark -a "Data Science Academy" --iversions

In [None]:
# Endereço do servidor Kafka
SERVER = 'localhost:9092'

In [None]:
# Nome do tópico
TOPIC = "dsaminiprojeto7"

In [None]:
# Carregamos o conjunto de dados de músicas
df_dsaminiprojeto7 = pd.read_csv("dados/dataset.csv")

In [None]:
# Ajustamos o formato de 3 colunas importantes: order_id, Artist Name(s) e Artist IDs
df_dsaminiprojeto7['order_id'] = np.arange(len(df_dsaminiprojeto7))
df_dsaminiprojeto7['Artist Name(s)'] = df_dsaminiprojeto7['Artist Name(s)'].str.replace('[^a-zA-Z]', '')
df_dsaminiprojeto7['Artist IDs'] = df_dsaminiprojeto7['Artist IDs'].str.replace('[^a-zA-Z]', '')

In [None]:
df_dsaminiprojeto7.shape

In [None]:
df_dsaminiprojeto7.head(10)

In [None]:
# Convertemos o dataframe em um dicionário de músicas
dict_musicas = df_dsaminiprojeto7.to_dict(orient = "records")

In [None]:
dict_musicas[1:3]

In [None]:
# Kafka Producer
if __name__ == "__main__":

    # Cria o producer
    producer = KafkaProducer(bootstrap_servers = SERVER, 
                             value_serializer = lambda x: x.encode('utf-8'))
    
    # Variáveis de controle
    send = []    
    send = None

    # Loop pelo dicionário de músicas
    for musica in dict_musicas:
        
        # Cria a lista com dados que serão enviados para o Kafka
        sending = []
        
        # Append de cada coluna
        sending.append(musica["order_id"])
        sending.append(musica["Spotify ID"])
        sending.append(musica["Track Name"])
        sending.append(musica["Popularity"])
        sending.append(musica["Duration (ms)"])
        sending.append(musica["Artist Name(s)"])
        sending.append(musica["Artist IDs"])
        sending.append(musica["Release Date"])
        sending.append(musica["Danceability"])
        sending.append(musica["Energy"])
        sending.append(musica["Key"])
        sending.append(musica["Loudness"])
        sending.append(musica["Mode"])
        sending.append(musica["Speechiness"])
        sending.append(musica["Acousticness"])
        sending.append(musica["Instrumentalness"])
        sending.append(musica["Liveness"])
        sending.append(musica["Valence"])
        sending.append(musica["Tempo"])
        sending.append(musica["Time Signature"])
        
        # Junta tudo
        musica = ','.join(str(v) for v in sending)

        # Envia os dados para o tópico
        print("Próxima Música:" )
        print(musica)
        producer.send(TOPIC, musica)
        time.sleep(1)

    print("Concluído")

# Fim