In [1]:
%load_ext autoreload
%autoreload 2

# Importar bibliotecas padrão
import time
from datetime import timedelta
import os

# Carregar variáveis de ambiente
from dotenv import load_dotenv
load_dotenv(os.path.join('config', '.env'))

# Importar bibliotecas para PostgreSQL
import psycopg2

# Importar bibliotecas para Spotify API
import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
import src.spotify_data as spotify_data

# Importar bibliotecas para PySpark
from pyspark.sql import SparkSession


jana = False
start_time_geral = time.time()

## Configuração de conexão Pyspark

In [2]:
# Configurar SparkSession
spark = SparkSession.builder \
    .appName("PostgreSQL to Spark") \
    .config("spark.jars", "/caminho/para/postgresql-<versao>.jar") \
    .getOrCreate()


In [3]:
if jana:
    db_config = {
        'host': os.getenv('DB_HOST'),
        'port': os.getenv('DB_PORT'),
        'dbname': os.getenv('DB_NAME_JANA'),
        'user': os.getenv('DB_USER'),
        'password': os.getenv('DB_PASSWORD')
    }
else:
    db_config = {
        'host': os.getenv('DB_HOST'),
        'port': os.getenv('DB_PORT'),
        'dbname': os.getenv('DB_NAME'),
        'user': os.getenv('DB_USER'),
        'password': os.getenv('DB_PASSWORD')
    }

# URL de conexão JDBC
jdbc_url = f"jdbc:postgresql://{db_config['host']}:{db_config['port']}/{db_config['dbname']}"

# Propriedades de conexão
connection_properties = {
    "user": db_config['user'],
    "password": db_config['password'],
    "driver": "org.postgresql.Driver"
}

## Configuração de conexão ao Spotify

In [4]:
# Carregar variáveis de ambiente do arquivo .env na pasta config 
if jana:
    client_id = os.getenv('CLIENT_ID_JANA') 
    client_secret = os.getenv('CLIENT_SECRET_JANA')
else:
    client_id = os.getenv('CLIENT_ID') 
    client_secret = os.getenv('CLIENT_SECRET')

# Configuração da autenticação
client_credentials_manager = SpotifyClientCredentials(client_id=client_id, client_secret=client_secret)
sp = spotipy.Spotify(client_credentials_manager=client_credentials_manager)

## Coleta de dados: Banco de dados

In [5]:
start_time = time.time()

In [6]:
df = spark.read.jdbc(url=jdbc_url, table="all_tracks_registry", properties=connection_properties)
df.show(4)


+-------------------+--------------------+---------+------------+--------------+--------------------------+---------------------------------+--------------------------------+--------------------+------------+-----------------+-------------------+------------+----------+-------+-------+-------+-----------------+--------------+
|                 ts|            platform|ms_played|conn_country|       ip_addr|master_metadata_track_name|master_metadata_album_artist_name|master_metadata_album_album_name|   spotify_track_uri|episode_name|episode_show_name|spotify_episode_uri|reason_start|reason_end|shuffle|skipped|offline|offline_timestamp|incognito_mode|
+-------------------+--------------------+---------+------------+--------------+--------------------------+---------------------------------+--------------------------------+--------------------+------------+-----------------+-------------------+------------+----------+-------+-------+-------+-----------------+--------------+
|2019-11-11 00:1

In [7]:
df.count()

74704

In [8]:
df.printSchema()

root
 |-- ts: timestamp (nullable = true)
 |-- platform: string (nullable = true)
 |-- ms_played: integer (nullable = true)
 |-- conn_country: string (nullable = true)
 |-- ip_addr: string (nullable = true)
 |-- master_metadata_track_name: string (nullable = true)
 |-- master_metadata_album_artist_name: string (nullable = true)
 |-- master_metadata_album_album_name: string (nullable = true)
 |-- spotify_track_uri: string (nullable = true)
 |-- episode_name: string (nullable = true)
 |-- episode_show_name: string (nullable = true)
 |-- spotify_episode_uri: string (nullable = true)
 |-- reason_start: string (nullable = true)
 |-- reason_end: string (nullable = true)
 |-- shuffle: boolean (nullable = true)
 |-- skipped: boolean (nullable = true)
 |-- offline: boolean (nullable = true)
 |-- offline_timestamp: timestamp (nullable = true)
 |-- incognito_mode: boolean (nullable = true)



In [9]:
end_time = time.time()
elapsed_time = end_time - start_time
# Converter o tempo decorrido para o formato HH:MM:SS 
print(f"Tempo de processamento: {str(timedelta(seconds=int(elapsed_time)))}")

Tempo de processamento: 0:00:23


## Coleta dados do Spotify e cria novos bancos

In [10]:
start_time = time.time()

In [11]:
# Funções de banco de dados Tracks e Artistics
import src.db_table_track as db_table_track
import src.db_table_artistcs as db_table_artistcs

In [12]:
# Selecionar e coletar os dados distintos 
distinct_uris = df.select('spotify_track_uri').distinct().collect() 
# Transformar em lista de valores simples 
uris_list = [row['spotify_track_uri'] for row in distinct_uris]
print(f'Quantidade de faixas: {len(uris_list)}')

Quantidade de faixas: 14403


In [15]:
n = 890
for track_uri in uris_list[n:]:
    print(f'Faixa número:{n}')
    # Primeiro Verifica se o track já existe na tabela, se FALSE, realiza requisição no spotify
    if db_table_track.track_exists(track_uri, conn=psycopg2.connect(**db_config)):
        print(f'n:{n}, TRACK Já existe no banco')
        main_artist_uri = db_table_track.get_main_artist_uri(track_uri, conn=psycopg2.connect(**db_config))

    else:
        # Realiza requisição da faixa e depois inseri no banco
        dados_faixa = spotify_data.obter_dados_faixa(sp = sp, track_uri = track_uri)
        db_table_track.insert_track_data(dados_faixa, conn=psycopg2.connect(**db_config))

        # Coleta de dados do Artista no Spotify
        main_artist_uri = dados_faixa['main_artist_URI']
        time.sleep(5)

    if db_table_artistcs.artist_exists(main_artist_uri, conn=psycopg2.connect(**db_config)):
        # Não fazer nada se o artista já existir
        print(f'n:{n}, ARTISTIC Já existe no banco')
        pass
    else:
        # Realiza requisição do artista e depois inseri no banco
        dados_artista = spotify_data.obter_dados_artista(sp=sp, artist_uri = main_artist_uri)
        db_table_artistcs.insert_artist_data(dados_artista, conn=psycopg2.connect(**db_config))
    
    n = n + 1  

Faixa número:755
n:755, TRACK Já existe no banco
n:755, ARTISTIC Já existe no banco
Faixa número:756
n:756, TRACK Já existe no banco
n:756, ARTISTIC Já existe no banco
Faixa número:757
n:757, TRACK Já existe no banco
n:757, ARTISTIC Já existe no banco
Faixa número:758
n:758, TRACK Já existe no banco
n:758, ARTISTIC Já existe no banco
Faixa número:759
n:759, TRACK Já existe no banco
n:759, ARTISTIC Já existe no banco
Faixa número:760
n:760, ARTISTIC Já existe no banco
Faixa número:761
Faixa número:762
n:762, ARTISTIC Já existe no banco
Faixa número:763
n:763, ARTISTIC Já existe no banco
Faixa número:764
n:764, ARTISTIC Já existe no banco
Faixa número:765
Faixa número:766
Faixa número:767
n:767, ARTISTIC Já existe no banco
Faixa número:768
n:768, ARTISTIC Já existe no banco
Faixa número:769
Faixa número:770
Faixa número:771
Faixa número:772
Faixa número:773
Faixa número:774
Faixa número:775
Faixa número:776
Faixa número:777
Faixa número:778
n:778, ARTISTIC Já existe no banco
Faixa número

KeyboardInterrupt: 

In [None]:
end_time = time.time()
elapsed_time = end_time - start_time
# Converter o tempo decorrido para o formato HH:MM:SS 
print(f"Tempo de processamento: {str(timedelta(seconds=int(elapsed_time)))}")


In [None]:
# import pyspark.sql.functions as F
# df = df.withColumn('ano-mes-dia', F.to_date(F.col("ts")))
# df = df.withColumn("ano-mes", F.concat_ws("-", F.year(df["ts"]), F.month(df["ts"])))
# df = df.withColumn("ano_mes_str", F.date_format(df["ts"], "yyyy-MM")) 
# df = df.withColumn("ano_mes_date", F.to_date(df["ano_mes_str"], "yyyy-MM"))
# df.show(5, truncate=False)