In [1]:
import os
from pathlib import Path

import yaml
from pyspark.sql import functions as F
from pyspark.sql.types import (
    StructType, StructField,
    IntegerType, LongType, StringType
)

from libs.spark_session import get_spark

In [2]:
spark = get_spark("MusicEng_Discovery")
spark

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/05 15:56:49 WARN Utils: Your hostname, MacBook-Pro-de-Guilherme.local, resolves to a loopback address: 127.0.0.1; using 192.168.15.135 instead (on interface en0)
25/12/05 15:56:49 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/05 15:56:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Landing Zone

In [3]:
CONFIG_FILE = Path("config/paths.yaml")

if not CONFIG_FILE.exists():
    raise FileNotFoundError(f"Arquivo de configuração não encontrado em: {CONFIG_FILE}")

with open(CONFIG_FILE, "r") as f:
    paths = yaml.safe_load(f)

raw_path = Path(paths["raw"])
RAW = str(raw_path)

RAW, raw_path

('/Volumes/external/Dados/dataset_2/archive/medallionLayers/raw',
 PosixPath('/Volumes/external/Dados/dataset_2/archive/medallionLayers/raw'))

### Consistência e Padrão dos nomes na Landing Zone

In [4]:
files = [
    f for f in os.listdir(RAW)
    if f.endswith(".csv") and not f.startswith(".")
]
files

['user_top_albums.csv',
 'user_top_artists.csv',
 'user_top_tracks.csv',
 'users.csv']

### Análise da integridade dos arquivos

In [5]:
for f in files:
    print(f'\n--------HEAD of {f}--------- \n')
    !head -n 10 "$RAW/$f"


--------HEAD of user_top_albums.csv--------- 

user_id,rank,album_name,artist_name,playcount,mbid
1,1,Visions,Grimes,243,0136a543-33fa-4b80-a8ca-4a6094db3b71
1,2,Only Built 4 Cuban Linx,Raekwon,240,c49af812-cb43-45ca-81e6-b81b7c287624
1,3,Tales From the Pantry,Lederhosen Lucil,204,1f195540-653f-4db4-bb74-e7d215de9bae
1,4,Crystal Castles,Crystal Castles,200,101f993f-d5e1-3f14-b40b-71850f1f435a
1,5,Klanguage,Klanguage,184,e82aa7a2-9d34-431f-9199-c8eba2df0261
1,6,Box Set,Misfits,183,""
1,7,604,Ladytron,179,12b0278a-0dcc-4203-9b89-7d017da66e9c
1,8,Cansei de Ser Sexy,Cansei de Ser Sexy,171,267ae70d-6a82-4fd4-be60-d37bf25cde66
1,9,House of Balloons,The Weeknd,161,0391b278-6408-4909-8c0a-649d6f62dfb8

--------HEAD of user_top_artists.csv--------- 

user_id,rank,artist_name,playcount,mbid
1,1,Crystal Castles,1034,b1570544-93ab-4b2b-8398-131735394202
1,2,Radiohead,972,a74b1b7f-71a5-4011-9441-d0b5e4122711
1,3,Ladytron,831,b45335d1-5219-4262-a44d-936aa36eeaed
1,4,Ghostface Killah,801,3b39abeb-00

In [6]:
for f in files:
    !file -I "$RAW/$f"

/Volumes/external/Dados/dataset_2/archive/medallionLayers/raw/user_top_albums.csv: text/csv; charset=utf-8
/Volumes/external/Dados/dataset_2/archive/medallionLayers/raw/user_top_artists.csv: text/csv; charset=utf-8
/Volumes/external/Dados/dataset_2/archive/medallionLayers/raw/user_top_tracks.csv: text/csv; charset=utf-8
/Volumes/external/Dados/dataset_2/archive/medallionLayers/raw/users.csv: text/csv; charset=us-ascii


In [7]:
for f in files:
    print(f'\n--------HEAD of {f}--------- \n')
    !sed -n l "$RAW/$f" | head -n 10


--------HEAD of user_top_albums.csv--------- 

user_id,rank,album_name,artist_name,playcount,mbid$
1,1,Visions,Grimes,243,0136a543-33fa-4b80-a8ca-4a6094db3b71\
$
1,2,Only Built 4 Cuban Linx,Raekwon,240,c49af812-cb43-45ca-\
81e6-b81b7c287624$
1,3,Tales From the Pantry,Lederhosen Lucil,204,1f195540-653\
f-4db4-bb74-e7d215de9bae$
1,4,Crystal Castles,Crystal Castles,200,101f993f-d5e1-3f14-\
b40b-71850f1f435a$
1,5,Klanguage,Klanguage,184,e82aa7a2-9d34-431f-9199-c8eba2d\
sed: stdout: Broken pipe

--------HEAD of user_top_artists.csv--------- 

user_id,rank,artist_name,playcount,mbid$
1,1,Crystal Castles,1034,b1570544-93ab-4b2b-8398-1317353942\
02$
1,2,Radiohead,972,a74b1b7f-71a5-4011-9441-d0b5e4122711$
1,3,Ladytron,831,b45335d1-5219-4262-a44d-936aa36eeaed$
1,4,Ghostface Killah,801,3b39abeb-0064-4eed-9ddd-ee47a45c54\
cb$
1,5,UNKLE,722,6648391e-7890-4f6c-b939-976f215195d3$
1,6,DJ Shadow,711,efa2c11a-1a35-4b60-bc1b-66d37de88511$
1,7,Buck 65,700,8d18b680-368c-4649-a5e3-85e0c2dd6fc2$
sed: stdout

In [8]:
lista_arquivos = list(files)
lista_arquivos

['user_top_albums.csv',
 'user_top_artists.csv',
 'user_top_tracks.csv',
 'users.csv']

In [9]:
def contar_delimitadores_ignorando_aspas(texto, delimitador=','):
    """
    Conta delimitadores na linha,
    ignorando os que estão dentro de aspas duplas.
    """
    em_aspas = False
    contagem = 0

    for char in texto:
        if char == '"':
            em_aspas = not em_aspas
        elif char == delimitador and not em_aspas:
            contagem += 1

    return contagem

In [10]:
def detectar_truncamento(pasta_base, lista_arquivos, delimitador=',', max_linhas=40000):


    for nome_arquivo in lista_arquivos:

        caminho_completo = os.path.join(pasta_base, nome_arquivo)

        with open(caminho_completo, 'r', encoding='utf-8', errors='replace') as f:
            header = f.readline().rstrip("\n")
            # usa a função nova para contar delimitadores no header
            expected_delims = contar_delimitadores_ignorando_aspas(header, delimitador)
            expected_cols = expected_delims + 1

            truncamentos = []
            linhas_excesso = []
            aspas_bugadas = []

            for num_linha, linha in enumerate(f, start=2):
                if num_linha > max_linhas:
                    break

                texto = linha.rstrip("\n")

                if texto.strip() == '':
                    continue

                # aspas ímpares continuam sendo um alerta
                if texto.count('"') % 2 != 0:
                    aspas_bugadas.append((num_linha, texto[:120]))

                # agora contamos delimitadores ignorando vírgulas entre aspas
                delims_linha = contar_delimitadores_ignorando_aspas(texto, delimitador)
                qtd_cols = delims_linha + 1

                if qtd_cols < expected_cols:
                    truncamentos.append((num_linha, qtd_cols, texto[:120]))

                if qtd_cols > expected_cols:
                    linhas_excesso.append((num_linha, qtd_cols, texto[:120]))

        print("\n===============================")
        print(f"Arquivo: {nome_arquivo}")
        print(f"Header: {header}")
        print(f"Colunas esperadas (header): {expected_cols}")

        if truncamentos:
            print(f"Linhas TRUNCADAS (menos colunas): {len(truncamentos)}")
            for info in truncamentos[:5]:
                print("  ", info)

        if linhas_excesso:
            print(f"⚠ Linhas com colunas A MAIS: {len(linhas_excesso)}")
            for info in linhas_excesso[:5]:
                print("  ", info)

        if aspas_bugadas:
            print(f"⚠ Linhas com ASPAS INCONSISTENTES: {len(aspas_bugadas)}")
            for info in aspas_bugadas[:5]:
                print("  ", info)

        if not truncamentos and not linhas_excesso and not aspas_bugadas:
            print("Nenhum sinal de truncamento detectado.")
        print("===============================")


In [11]:
detectar_truncamento(RAW, lista_arquivos, delimitador=',', max_linhas=40000)


Arquivo: user_top_albums.csv
Header: user_id,rank,album_name,artist_name,playcount,mbid
Colunas esperadas (header): 6
Nenhum sinal de truncamento detectado.

Arquivo: user_top_artists.csv
Header: user_id,rank,artist_name,playcount,mbid
Colunas esperadas (header): 5
Nenhum sinal de truncamento detectado.

Arquivo: user_top_tracks.csv
Header: user_id,rank,track_name,artist_name,playcount,mbid
Colunas esperadas (header): 6
Nenhum sinal de truncamento detectado.

Arquivo: users.csv
Header: user_id,country,total_scrobbles
Colunas esperadas (header): 3
Nenhum sinal de truncamento detectado.


In [12]:
df_users_raw = (
    spark.read
    .option("header", "true")
    .option("inferSchema", "false")  # tudo vira string
    .csv(f"{RAW}/users.csv")
)

df_top_albums_raw = (
    spark.read
    .option("header", "true")
    .option("inferSchema", "false")
    .csv(f"{RAW}/user_top_albums.csv")
)

df_top_artists_raw = (
    spark.read
    .option("header", "true")
    .option("inferSchema", "false")
    .csv(f"{RAW}/user_top_artists.csv")
)

df_top_tracks_raw = (
    spark.read
    .option("header", "true")
    .option("inferSchema", "false")
    .csv(f"{RAW}/user_top_tracks.csv")
)

In [13]:
dfs = [df_top_albums_raw, df_top_artists_raw, df_top_tracks_raw, df_users_raw]

for df in dfs:
    df.printSchema()

root
 |-- user_id: string (nullable = true)
 |-- rank: string (nullable = true)
 |-- album_name: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- playcount: string (nullable = true)
 |-- mbid: string (nullable = true)

root
 |-- user_id: string (nullable = true)
 |-- rank: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- playcount: string (nullable = true)
 |-- mbid: string (nullable = true)

root
 |-- user_id: string (nullable = true)
 |-- rank: string (nullable = true)
 |-- track_name: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- playcount: string (nullable = true)
 |-- mbid: string (nullable = true)

root
 |-- user_id: string (nullable = true)
 |-- country: string (nullable = true)
 |-- total_scrobbles: string (nullable = true)



In [14]:
def testar_inteiro(df, nome_df="df"):
    """
    Verifica para TODAS as colunas de um DataFrame string se elas se comportam como inteiro.

    Para cada coluna, retorna:
      - total_linhas
      - nao_nulos
      - nulos
      - distintos
      - so_digitos  (quantas linhas são apenas [0-9])

    Mostra um DataFrame resumo, bem legível.
    """
    resultados = []

    for col in df.columns:
        stats = (
            df.select(
                F.count("*").alias("total_linhas"),
                F.count(F.col(col)).alias("nao_nulos"),
                F.count(F.when(F.col(col).isNull(), col)).alias("nulos"),
                F.countDistinct(col).alias("distintos"),
                F.sum(
                    F.when(F.col(col).rlike(r"^[0-9]+$"), 1).otherwise(0)
                ).alias("so_digitos")
            )
            .withColumn("coluna", F.lit(col))
            .first()  # pega a linha única com as métricas
        )

        # Guarda como dicionário pra virar um DF depois
        resultados.append({
            "coluna": stats["coluna"],
            "total_linhas": stats["total_linhas"],
            "nao_nulos": stats["nao_nulos"],
            "nulos": stats["nulos"],
            "distintos": stats["distintos"],
            "so_digitos": stats["so_digitos"],
        })

    # Cria um DF só de resumo
    df_resultado = spark.createDataFrame(resultados) \
                        .select("coluna", "total_linhas", "nao_nulos",
                                "nulos", "distintos", "so_digitos")

    print(f"\n=== Analisando o df: {nome_df} ===")
    df_resultado.show(truncate=False)

    return df_resultado


In [15]:
dfs = [
    ("user_top_albums",  df_top_albums_raw),
    ("user_top_artists", df_top_artists_raw),
    ("user_top_tracks",  df_top_tracks_raw),
    ("users",            df_users_raw),
]

resultados_por_df = {}

for nome, df in dfs:
    resultados_por_df[nome] = testar_inteiro(df, nome_df=nome)


                                                                                


=== Analisando o df: user_top_albums ===
+-----------+------------+---------+-------+---------+----------+
|coluna     |total_linhas|nao_nulos|nulos  |distintos|so_digitos|
+-----------+------------+---------+-------+---------+----------+
|user_id    |23822550    |23822550 |0      |476451   |23822550  |
|rank       |23822550    |23822550 |0      |50       |23822550  |
|album_name |23822550    |23822550 |0      |1423082  |139733    |
|artist_name|23822550    |23822548 |2      |482467   |9822      |
|playcount  |23822550    |23822548 |2      |20975    |23816072  |
|mbid       |23822550    |18092480 |5730070|480325   |6367      |
+-----------+------------+---------+-------+---------+----------+



                                                                                


=== Analisando o df: user_top_artists ===
+-----------+------------+---------+-------+---------+----------+
|coluna     |total_linhas|nao_nulos|nulos  |distintos|so_digitos|
+-----------+------------+---------+-------+---------+----------+
|user_id    |23822550    |23822550 |0      |476451   |23822550  |
|rank       |23822550    |23822550 |0      |50       |23822550  |
|artist_name|23822550    |23822550 |0      |531590   |10956     |
|playcount  |23822550    |23822550 |0      |43693    |23822514  |
|mbid       |23822550    |20327314 |3495236|180180   |20        |
+-----------+------------+---------+-------+---------+----------+



                                                                                


=== Analisando o df: user_top_tracks ===
+-----------+------------+---------+-------+---------+----------+
|coluna     |total_linhas|nao_nulos|nulos  |distintos|so_digitos|
+-----------+------------+---------+-------+---------+----------+
|user_id    |23822550    |23822550 |0      |476451   |23822550  |
|rank       |23822550    |23822550 |0      |50       |23822550  |
|track_name |23822550    |23822550 |0      |2349421  |91863     |
|artist_name|23822550    |23822547 |3      |537484   |12165     |
|playcount  |23822550    |23822542 |8      |14060    |23819953  |
|mbid       |23822550    |18412491 |5410059|1507358  |2222      |
+-----------+------------+---------+-------+---------+----------+


=== Analisando o df: users ===
+---------------+------------+---------+-----+---------+----------+
|coluna         |total_linhas|nao_nulos|nulos|distintos|so_digitos|
+---------------+------------+---------+-----+---------+----------+
|user_id        |476451      |476451   |0    |476451   |47645

In [16]:
schema_users = StructType([
    StructField("user_id",         IntegerType(), nullable=False),  # só dígitos, sem nulos
    StructField("country",         StringType(),  nullable=True),   # 7 nulos
    StructField("total_scrobbles", LongType(),    nullable=False),  # só dígitos, sem nulos
])

schema_user_top_albums = StructType([
    StructField("user_id",     IntegerType(), nullable=False),  # só dígitos, sem nulos
    StructField("rank",        IntegerType(), nullable=False),  # só dígitos, sem nulos, 50 distintos
    StructField("album_name",  StringType(),  nullable=False),  # texto, sem nulos
    StructField("artist_name", StringType(),  nullable=True),   # texto, 2 nulos
    StructField("playcount",   LongType(),    nullable=True),   # majoritariamente dígitos, 2 nulos
    StructField("mbid",        StringType(),  nullable=True),   # texto, muitos nulos
])

schema_user_top_artists = StructType([
    StructField("user_id",     IntegerType(), nullable=False),
    StructField("rank",        IntegerType(), nullable=False),
    StructField("artist_name", StringType(),  nullable=False),  # texto, sem nulos
    StructField("playcount",   LongType(),    nullable=True),   # quase sempre dígitos
    StructField("mbid",        StringType(),  nullable=True),
])

schema_user_top_tracks = StructType([
    StructField("user_id",     IntegerType(), nullable=False),
    StructField("rank",        IntegerType(), nullable=False),
    StructField("track_name",  StringType(),  nullable=False),  # texto, sem nulos
    StructField("artist_name", StringType(),  nullable=True),   # texto, 3 nulos
    StructField("playcount",   LongType(),    nullable=True),   # quase sempre dígitos, 8 nulos
    StructField("mbid",        StringType(),  nullable=True),
])


In [17]:
df_users = (
    spark.read
    .schema(schema_users)
    .option("header", "true")
    .csv(f"{RAW}/users.csv")
)

df_top_albums = (
    spark.read
    .schema(schema_user_top_albums)
    .option("header", "true")
    .csv(f"{RAW}/user_top_albums.csv")
)

df_top_artists = (
    spark.read
    .schema(schema_user_top_artists)
    .option("header", "true")
    .csv(f"{RAW}/user_top_artists.csv")
)

df_top_tracks = (
    spark.read
    .schema(schema_user_top_tracks)
    .option("header", "true")
    .csv(f"{RAW}/user_top_tracks.csv")
)


In [18]:
df_top_albums.limit(10).show()

+-------+----+--------------------+------------------+---------+--------------------+
|user_id|rank|          album_name|       artist_name|playcount|                mbid|
+-------+----+--------------------+------------------+---------+--------------------+
|      1|   1|             Visions|            Grimes|      243|0136a543-33fa-4b8...|
|      1|   2|Only Built 4 Cuba...|           Raekwon|      240|c49af812-cb43-45c...|
|      1|   3|Tales From the Pa...|  Lederhosen Lucil|      204|1f195540-653f-4db...|
|      1|   4|     Crystal Castles|   Crystal Castles|      200|101f993f-d5e1-3f1...|
|      1|   5|           Klanguage|         Klanguage|      184|e82aa7a2-9d34-431...|
|      1|   6|             Box Set|           Misfits|      183|                NULL|
|      1|   7|                 604|          Ladytron|      179|12b0278a-0dcc-420...|
|      1|   8|  Cansei de Ser Sexy|Cansei de Ser Sexy|      171|267ae70d-6a82-4fd...|
|      1|   9|   House of Balloons|        The Weeknd|

In [19]:
dfs_schema = [df_top_albums, df_top_artists, df_top_tracks, df_users]

for df in dfs_schema:
    df.printSchema()
    print('-------- stats ---------')
    print(f'quantidade de linhas: {df.count()}')
    print('\n-------- finish ---------')
   

root
 |-- user_id: integer (nullable = true)
 |-- rank: integer (nullable = true)
 |-- album_name: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- playcount: long (nullable = true)
 |-- mbid: string (nullable = true)

-------- stats ---------


                                                                                

quantidade de linhas: 23822550

-------- finish ---------
root
 |-- user_id: integer (nullable = true)
 |-- rank: integer (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- playcount: long (nullable = true)
 |-- mbid: string (nullable = true)

-------- stats ---------


                                                                                

quantidade de linhas: 23822550

-------- finish ---------
root
 |-- user_id: integer (nullable = true)
 |-- rank: integer (nullable = true)
 |-- track_name: string (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- playcount: long (nullable = true)
 |-- mbid: string (nullable = true)

-------- stats ---------
quantidade de linhas: 23822550

-------- finish ---------
root
 |-- user_id: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- total_scrobbles: long (nullable = true)

-------- stats ---------
quantidade de linhas: 476451

-------- finish ---------


                                                                                