In [None]:
# Bibliotecas
import os
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, count, to_date
from pyspark.sql.types import IntegerType, FloatType


# Caminhos (ajuste conforme o seu ambiente)
caminho_clientes = r"C:/Users/gusta/Downloads/clientes.csv"
caminho_vendas = r"C:/Users/gusta/Downloads/vendas.txt"
output_dir = r"C:/Users/gusta/Downloads/output"
caminho_resumo_clientes = os.path.join(output_dir, "resumo_clientes.csv")
caminho_balanco_produtos = os.path.join(output_dir, "balanco_produtos.csv")

# Garante diretório de saída
os.makedirs(output_dir, exist_ok=True)


In [11]:

#Criando a sessão do Spark para processar dados em 
spark = SparkSession.builder.appName("Desafio ETL PySpark").getOrCreate()


In [None]:
# Comentário: Usamos a opção 'encoding' para garantir que caracteres acentuados sejam lidos corretamente.
def ler_clientes(caminho):
    clientes = (
        spark.read.format("csv")
        .option("delimiter", ";")
        .option("header", "true")
        .option("encoding", "ISO-8859-1") 
        .load(caminho)
    )
    # Conversões de tipos e datas
    clientes = clientes.withColumn("cliente_id", col("cliente_id").cast(IntegerType()))
    # A coluna data_nascimento está dd/MM/yyyy no CSV.
    clientes = clientes.withColumn("data_nascimento", to_date(col("data_nascimento"), "dd/MM/yyyy"))
    clientes.show(5, truncate=False)
    return clientes


In [None]:
# Comentário: Para arquivos de texto, usamos o format("text") e também definimos 'encoding'.
def ler_vendas(caminho):
    vendas = (
        spark.read.format("text")
        .option("encoding", "ISO-8859-1") 
        .load(caminho)
    )
    vendas.show(5, truncate=False)

    # Extrai colunas por faixa fixa (posições 1-based)
    vendas = vendas.select(
        col("value").substr(1, 5).alias("venda_id"),
        col("value").substr(6, 5).alias("cliente_id"),
        col("value").substr(11, 5).alias("produto_id"),
        col("value").substr(16, 8).alias("valor"),
        col("value").substr(24, 8).alias("data_venda"),
    )

    # Tipos
    vendas = vendas.withColumn("venda_id", col("venda_id").cast(IntegerType()))
    vendas = vendas.withColumn("cliente_id", col("cliente_id").cast(IntegerType()))
    vendas = vendas.withColumn("produto_id", col("produto_id").cast(IntegerType()))
    vendas = vendas.withColumn("valor", col("valor").cast(FloatType()))
    vendas = vendas.fillna({"valor": 0.0})

    # Datas no formato yyyyMMdd
    vendas = vendas.withColumn("data_venda", to_date(col("data_venda"), "yyyyMMdd"))
    vendas.show(5, truncate=False)
    return vendas


In [None]:

# Comentário: Join clientes x vendas e agregações (total, quantidade, ticket médio).
def resumo_clientes(clientes, vendas):
    clientes_vendas = clientes.join(vendas, "cliente_id", "inner")
    clientes_vendas.show(5, truncate=False)

    resumo = clientes_vendas.groupBy("cliente_id", "nome").agg(
        sum("valor").alias("total_vendas"),
        count("venda_id").alias("quantidade_vendas"),
        (sum("valor") / count("venda_id")).alias("ticket_medio"),
    )
    # Spark usa Unicode internamente; aqui apenas convertemos para Pandas para salvar em CSV
    return resumo.toPandas()


In [15]:

# %% [título] Transformações: balanço por produto
def balanco_produtos(vendas):
    balanco = vendas.groupBy("produto_id").agg(
        sum("valor").alias("total_vendas_produto"),
        count("venda_id").alias("quantidade_vendas_produto"),
        (sum("valor") / count("venda_id")).alias("ticket_medio_produto"),
    )
    return balanco.toPandas()


In [16]:

# %% [título] Escrita dos resultados (UTF-8 com BOM para Excel)
# Comentário: Usamos 'utf-8-sig' para facilitar a visualização correta no Excel do Windows.
def salvar_resultados(resumo_cli_pd, balanco_prod_pd, caminho_resumo, caminho_balanco):
    resumo_cli_pd.to_csv(caminho_resumo, index=False, sep=";", encoding="utf-8-sig")
    balanco_prod_pd.to_csv(caminho_balanco, index=False, sep=";", encoding="utf-8-sig")


In [None]:

# %% [título] Executar ETL completo
def run_etl():
    clientes = ler_clientes(caminho_clientes)
    vendas = ler_vendas(caminho_vendas)
    resumo_pd = resumo_clientes(clientes, vendas)
    balanco_pd = balanco_produtos(vendas)
    salvar_resultados(resumo_pd, balanco_pd, caminho_resumo_clientes, caminho_balanco_produtos)
    print("Pipeline ETL concluído com sucesso. Arquivos gerados em:", output_dir)

# Descomente a linha abaixo para executar imediatamente no notebook
run_etl()

##Utilizar smallfile se necessário


+----------+-----------+---------------+
|cliente_id|nome       |data_nascimento|
+----------+-----------+---------------+
|1         |João Silva |1980-05-12     |
|2         |Maria Souza|1995-07-30     |
+----------+-----------+---------------+

+-------------------------------+
|value                          |
+-------------------------------+
|0000500001100011234500020251020|
|0000300002100021200002020230403|
+-------------------------------+

+--------+----------+----------+----------+----------+
|venda_id|cliente_id|produto_id|valor     |data_venda|
+--------+----------+----------+----------+----------+
|5       |1         |10001     |1.2345E7  |2025-10-20|
|3       |2         |10002     |1.200002E7|2023-04-03|
+--------+----------+----------+----------+----------+

+----------+-----------+---------------+--------+----------+----------+----------+
|cliente_id|nome       |data_nascimento|venda_id|produto_id|valor     |data_venda|
+----------+-----------+---------------+--------+--