## ‚öôÔ∏è Initial Setup

In [1]:
# Install necessary libraries
%pip install -r requirements.txt

Collecting py4j==0.10.9.9 (from -r requirements.txt (line 23))
  Obtaining dependency information for py4j==0.10.9.9 from https://files.pythonhosted.org/packages/bd/db/ea0203e495be491c85af87b66e37acfd3bf756fd985f87e46fc5e3bf022c/py4j-0.10.9.9-py2.py3-none-any.whl.metadata
  Using cached py4j-0.10.9.9-py2.py3-none-any.whl.metadata (1.3 kB)
Collecting pyspark==4.1.1 (from -r requirements.txt (line 25))
  Using cached pyspark-4.1.1-py2.py3-none-any.whl
Using cached py4j-0.10.9.9-py2.py3-none-any.whl (203 kB)
Installing collected packages: py4j, pyspark
  Attempting uninstall: py4j
    Found existing installation: py4j 0.10.9.7
    Uninstalling py4j-0.10.9.7:
      Successfully uninstalled py4j-0.10.9.7
  Attempting uninstall: pyspark
    Found existing installation: pyspark 3.5.3
    Uninstalling pyspark-3.5.3:
      Successfully uninstalled pyspark-3.5.3
Successfully installed py4j-0.10.9.9 pyspark-4.1.1
Note: you may need to restart the kernel to use updated packages.


ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
delta-spark 3.0.0 requires pyspark<3.6.0,>=3.5.0, but you have pyspark 4.1.1 which is incompatible.

[notice] A new release of pip is available: 23.2.1 -> 26.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [2]:
# Importing necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp,col, dayofweek,when,hour,from_utc_timestamp, concat,year,month,lit,lower,to_date,  sum as _sum

In [3]:
# Checking environment ( Local or Databricks )
try:
    spark
    print("‚úÖ Spark session already exists.")
    IS_DATABRICKS = True
except NameError:
    print("‚ùå Spark session not found")
    IS_DATABRICKS = False

‚ùå Spark session not found


In [None]:
# Creating Spark session for local environment
if not IS_DATABRICKS:
    try:
        spark = SparkSession.builder\
            .appName("Analytics_Spotify")\
            .master("local[*]")\
            .config("spark.driver.memory", "4g")\
            .getOrCreate()
        print("‚úÖ Spark session created successfully.")
    except ImportError:
        print("‚ùå Failed to create Spark session. Please ensure PySpark is installed.")

In [None]:
# Define constants for file paths and table names

## Databricks
PATH_ORIGIN_DATABRICKS = "/Volumes/sandbox_prd/raw_layer/files/spotify/me/extended/Streaming_History_*.json"
NAME_TABLE_BRONZE_DATABRICKS = "sandbox_prd.bronze_layer.streaming_history_user_spotify"
NAME_TABLE_SILVER_DATABRICKS = "sandbox_prd.silver_layer.streaming_history_user_spotify"

## Local
PATH_ORIGIN_LOCAL = "data/extended/Streaming_History_*.json"
NAME_TABLE_BRONZE_LOCAL = "bronze_streaming_history_user_spotify"
NAME_TABLE_SILVER_LOCAL = "silver_streaming_history_user_spotify"


## ü•â Bronze Layer
Reading JSON files and writing to Delta table.

In [0]:
# 1. Reading JSON files (Inferring Schema)
if IS_DATABRICKS:
    print("Is Databricks")
    df_input = (spark.read
                .format("json")
                .option("multiline", "true") 
                .option("inferSchema", "true") 
                .load(PATH_ORIGIN_DATABRICKS))
else:
    print("Is Local")
    df_input = (spark.read
                .format("json")
                .option("multiline", "true") 
                .option("inferSchema", "true") 
                .load(PATH_ORIGIN_LOCAL))

In [0]:
# 2. Enriching data with Unity Catalog metadata
if IS_DATABRICKS:
    df_bronze = df_input.select(
    "*", 
    current_timestamp().alias("dt_ingestion"), 
    col("_metadata.file_path").alias("source_file") 
    )
else:
    df_bronze = df_input.select(
    "*", 
    current_timestamp().alias("dt_ingestion"), 
    lit("local_file").alias("source_file") 
    )

In [0]:
# 3. Writing data to Delta table (Schema Evolution)
if IS_DATABRICKS:
    (df_bronze.write
        .format("delta")
        .mode("overwrite")              
        .option("mergeSchema", "true")  
        .saveAsTable(NAME_TABLE_BRONZE)
    )
    print(f"‚úÖ Loaded table: {NAME_TABLE_BRONZE}")
else:
    df_bronze.coalesce(1).write.format("csv").mode("overwrite").option("header", "true").save(NAME_TABLE_BRONZE)
    print(f"‚úÖ Loaded CSV: {NAME_TABLE_BRONZE}")

In [None]:
erro

## üîç Quality Check

Performing quality checks on the Bronze layer data.

In [0]:
%sql
-- Valida√ß√£o 1: Volume de dados por arquivo de origem
SELECT 
    source_file,
    count(*) as total_linhas
FROM sandbox_prd.bronze_layer.streaming_history_user_spotify
GROUP BY source_file
ORDER BY source_file

In [0]:
%sql
-- Valida√ß√£o 2: Per√≠odo dos dados (Min e Max)
SELECT 
    min(ts) as primeira_reproducao,
    max(ts) as ultima_reproducao,
    count(*) as total_geral
FROM sandbox_prd.bronze_layer.streaming_history_user_spotify

In [0]:
# Valida√ß√£o 3: Verificando consist√™ncia dos campos principais
df_check = spark.read.table("sandbox_prd.bronze_layer.streaming_history_user_spotify")

# Conta quantos nulos existem nas colunas chave
df_check.select(
    _sum(col("master_metadata_track_name").isNull().cast("int")).alias("nulos_track_name"),
    _sum(col("master_metadata_album_artist_name").isNull().cast("int")).alias("nulos_artist_name"),
    _sum(col("ts").isNull().cast("int")).alias("nulos_timestamp")
).display()

## ü•à Silver Layer

Tratamento dos dados

In [0]:
# L√™ a base bronze
df_bronze = spark.read.table("sandbox_prd.bronze_layer.streaming_history_user_spotify")

# Listas colunas para remover 
drop_columns = [
    "audiobook_chapter_title",
    "audiobook_chapter_uri",
    "audiobook_title",
    "audiobook_uri",
    "conn_country",
    "incognito_mode",
    "ip_addr",
    "dt_ingestion",
    "source_file"
]

# Remove as colunas desnecess√°rias
df_silver = df_bronze.drop(*drop_columns)

In [0]:
v_hora_brasil = hour(from_utc_timestamp(col("ts"), "America/Sao_Paulo"))

df_silver = df_silver.select(
    # Renomia e alteara o tipo da coluna
    col("episode_name").alias("nm_episode_name"),
    col("episode_show_name").alias("nm_episode_show_name"),
    col("master_metadata_album_album_name").alias("nm_album_name"),
    col("master_metadata_album_artist_name").alias("nm_artist_name"),
    col("master_metadata_track_name").alias("nm_track_name"),
    col("ms_played").alias("qt_played_ms"),
    col("offline").alias("fl_offline"),
    col("offline_timestamp").alias("ts_offline"),
    col("platform").alias("ds_platform"),
    col("reason_end").alias("ds_reason_end"),
    col("reason_start").alias("ds_reason_start"),
    col("shuffle").alias("fl_shuffle"),
    col("skipped").alias("fl_skipped"),
    col("ts").cast("Timestamp").alias("ts_streaming"),
    col("spotify_episode_uri").alias("ds_spotify_episode_uri"),
    col("spotify_track_uri").alias("ds_spotify_track_uri"),
    v_hora_brasil.alias("nr_hora_brasil"),

    # Cria coluna ano m√™s
    concat(
        year(col("ts")).cast("String"),
        lit("-"),
        month(col("ts")).cast("String")
    ).alias("dt_ano_mes"),

    # Cria coluna dura√ß√£o em segundos
    (col("ms_played")/1000).cast("Int").alias("ts_duration_seconds"),

    # Cria coluna de minutos
    (col("ms_played")/1000/60).cast("Int").alias("ts_duration_minutes"),

    # Cria coluna de dia da semana
    when(dayofweek(col("ts")) == 1, "Domingo")
        .when(dayofweek(col("ts")) == 2, "Segunda-feira")
        .when(dayofweek(col("ts")) == 3, "Ter√ßa-feira")
        .when(dayofweek(col("ts")) == 4, "Quarta-feira")
        .when(dayofweek(col("ts")) == 5, "Quinta-feira")
        .when(dayofweek(col("ts")) == 6, "Sexta-feira")
        .when(dayofweek(col("ts")) == 7, "S√°bado")
        .alias("ds_day_of_week"),
    
    # Cria coluna per√≠odo do dia
    when(v_hora_brasil < 6, "Madrugada")
        .when(v_hora_brasil < 12, "Manh√£")
        .when(v_hora_brasil < 18, "Tarde")
        .otherwise("Noite").alias("ds_periodo_dia"),

    # Cria coluna ordem do per√≠odo do dia
    when(v_hora_brasil < 6, 1)
    .when(v_hora_brasil < 12, 2)
    .when(v_hora_brasil < 18, 3)
    .otherwise(4).alias("nr_ordem_periodo"),


    # Cria coluna tipo do inicio
    when(col("reason_start") == "trackdone", "Reprodu√ß√£o Autom√°tica")
    .when(col("reason_start") == "clickrow", "Sele√ß√£o Manual")
    .when(col("reason_start") == "appload", "Retomada App")
    .when(col("reason_start") == "playbtn", "Bot√£o Play")
    .when(col("reason_start").isin("fwdbtn", "backbtn"), "Navega√ß√£o (Pular/Voltar)")
    .when(col("reason_start") == "remote", "Controle Externo")
    .otherwise("Outros").alias("ds_tipo_inicio"),

    # Cria coluna device_type
    when(lower(col("platform")).contains("android"), "Android")
    .when(lower(col("platform")).contains("ios"), "iOS")
    .when(lower(col("platform")).contains("web"), "Web")
    .when(lower(col("platform")).contains("windows"), "Windows")
    .when(lower(col("platform")).contains("mac"), "Mac")
    .when(lower(col("platform")).contains("linux"), "Linux")
    .when(lower(col("platform")).contains("tv"), "TV")
    .when(lower(col("platform")).contains("echo_show_5"), "Echo_Show")
    .when(lower(col("platform")).contains("other"), "Outros")
    .otherwise("N√£o identificado").alias("ds_device_type"),


    # Cria coluna de link de musica clicavel
   when(col("master_metadata_track_name").isNotNull(), 
         concat(lit("https://open.spotify.com/track/"), col("spotify_track_uri"))
    )
    .when(col("master_metadata_album_album_name").isNotNull(), 
          concat(lit("https://open.spotify.com/album/"), col("spotify_track_uri"))
    )
    .when(col("episode_show_name").isNotNull(), 
          concat(lit("https://open.spotify.com/episode/"), col("spotify_track_uri"))
    )
    .otherwise(lit("N√£o identificado")).alias("ds_link_musica"),
    to_date(col("ts")).alias("dt_referencia"),

)


In [0]:
display(df_silver)

In [0]:
(df_silver.write
    .format("delta")
    .mode("overwrite")              
    .option("mergeSchema", "true")  
    .saveAsTable(NOME_TABELA_DESTINO_SILVER)
)

print(f"‚úÖ Carga conclu√≠da com sucesso em: {NOME_TABELA_DESTINO_SILVER}")