# Desafio Gerando Falções
## ETL

Este notebook o processo de ETL para a extração e processamento dos dados fornecidos.

### Importações

In [0]:
import datetime
import re

import matplotlib.pyplot as plt
import seaborn as sns
import pyspark.sql.functions as F

from pyspark.sql.utils import AnalysisException
from pyspark.sql.types import IntegerType, StringType, FloatType, DoubleType, DateType, TimestampType, StructField, StructType

### Funções Utilitárias

Abaixo, encontam-se as funções com as rotinas necessárias para cada etapa do processo de ETL

#### Extração

In [0]:

def obtem_dados(table_name, csv_path, database="default"):
    try:
        # Check if the table already exists
        df = spark.read.format("delta").table(f"{database}.{table_name}")
    except AnalysisException:
        # Read the CSV file into a DataFrame
        df = spark.read.csv(csv_path, header=True, inferSchema=True)
        
        # Write the DataFrame to a Delta table
        df.write.format("delta").saveAsTable(f"{database}.{table_name}")
        print(f"Table {database}.{table_name} created from {csv_path}.")
    
    return df

#### Transformação

In [0]:
def cria_df_unificado(vendas_df, produtos_df, clientes_df):
    # Cria dataframe unificado a partir 
    df_unificado = (
        vendas_df
        .join(produtos_df, "id_produto", "inner")
        .join(clientes_df, "id_cliente", "inner")
    )

    # Adiciona coluna ano-mes, com o ano e o mês da data de venda
    df_unificado = df_unificado.withColumn("ano_mes_venda", F.date_format(F.col('data_venda'), "yyyy-MM"))

    # Re-ordena e seleciona somente as colunas relevantes
    # Exclusão das colunas marca e cidade, como demonstrado na análise exploratória
    df_unificado = df_unificado.select(
        'id_venda', 'canal_venda', 'valor', 'data_venda', 'ano_mes_venda',
        'id_produto', 'nome_produto', 'descricao', 
        'id_cliente', 'nome', 'email', 'telefone', 'estado', 'data_cadastro'
    )

    return df_unificado

In [0]:
# Definição do schema para a UDF
schema_cat_brand_model = StructType([
    StructField("categoria", StringType(), True),
    StructField("marca", StringType(), True),
    StructField("modelo", StringType(), True),
])

# Função UDF para extrair categoria, marca e modelo do nome do produto, com base em uma lista de BRANDS
@F.udf(returnType=schema_cat_brand_model)
def extract_cat_brand_model(nome_produto: str):
    if not nome_produto:
        return {"categoria": None, "marca": None, "modelo": None}

    toks = nome_produto.strip().split()
    marca_idx = None
    marca_name = None

    # Identifica a marca no nome do produto, com base na lista BRANDS
    for i, tok in enumerate(toks):
        tok_clean = re.sub(r"[\-\+]", " ", tok)
        for brand in BRANDS:
            if tok.lower() == brand.lower() or tok_clean.lower() == brand.lower():
                marca_idx = i
                marca_name = brand
                break
        if marca_idx is not None:
            break

    # Extrai categoria e modelo com base na posição da marca
    if marca_idx is not None:
        categoria = " ".join(toks[:marca_idx]).strip() or None
        modelo = " ".join(toks[marca_idx+1:]).strip() or None
        return {"categoria": categoria, "marca": marca_name, "modelo": modelo}
    else:
        categoria = toks[0]
        modelo = " ".join(toks[1:]).strip() or None
        return {"categoria": categoria, "marca": None, "modelo": modelo}

# Função para substituir valores vazios por nulos
def null_if_empty(col):
    return F.when(F.trim(col) == "", None).otherwise(col)

# Função para enriquecer o DataFrame de produtos
def enriquece_produtos(df, BRANDS):
    """
    Enriquecer DataFrame de produtos com categoria, marca, modelo, capacidade, dimensões, potência e memória.
    
    Parâmetros:
        df : PySpark DataFrame
            DataFrame de entrada com pelo menos as colunas 'nome_produto' e 'descricao'.
    
    Retorna:
        PySpark DataFrame com colunas adicionadas:
        ['categoria', 'marca', 'modelo', 'capacidade', 'dimensao', 'potencia']
    """
    # Extrai categoria, marca e modelo do nome do produto
    df_enriquecido = df.withColumn(
        "cat_brand_model",
        extract_cat_brand_model(F.col("nome_produto"))
    ).withColumn("categoria", F.col("cat_brand_model.categoria")) \
     .withColumn("marca", F.col("cat_brand_model.marca")) \
     .withColumn("modelo", F.col("cat_brand_model.modelo")) \
     .drop("cat_brand_model")

    # Cria um padrão regex para corresponder a qualquer nome de marca
    brands_pattern = '|'.join(BRANDS)

    # Atualiza o DataFrame para remover nomes de marcas da coluna 'nome_produto'
    df_enriquecido = df_enriquecido.withColumn(
        "nome_produto_cleaned",
        F.regexp_replace(F.col("nome_produto"), f"\\b({brands_pattern})\\b", "")
    )
    
    # Atualiza a coluna 'categoria' com base no nome do produto limpo
    # A Categoria consiste em strings de até 2 palavras (3 se conter stop-words) que descrevem em termos gerais, a categoria do produto
    df_enriquecido = df_enriquecido.withColumn(
        "categoria",
        F.when(
            (F.size(F.split(F.col("nome_produto_cleaned"), " ")) > 3) & 
            (F.array_contains(F.array(F.lit("de"), F.lit("da"), F.lit("do"), F.lit("das"), F.lit("dos"), F.lit("a"), F.lit("o"), F.lit("e")), 
                            F.split(F.col("nome_produto_cleaned"), " ")[1])),
            F.concat_ws(" ", F.slice(F.split(F.col("nome_produto_cleaned"), " "), 1, 3))
        ).when(
            F.size(F.split(F.col("nome_produto_cleaned"), " ")) > 2,
            F.concat_ws(" ", F.slice(F.split(F.col("nome_produto_cleaned"), " "), 1, 2))
        ).otherwise(F.col("nome_produto_cleaned"))
    )

    # Extrai capacidade em litros da descrição ou nome do produto
    df_enriquecido = df_enriquecido.withColumn(
        "capacidade",
        F.coalesce(
            null_if_empty(F.regexp_extract(F.col("descricao"), r"(\d+(?:,\d+)?\s?[lL])", 1)),
            null_if_empty(F.regexp_extract(F.col("nome_produto"), r"(\d+(?:,\d+)?\s?[lL])", 1))
        )
    ).withColumn(
        # Extrai dimensões da descrição ou nome do produto
        "dimensao",
        F.coalesce(
            null_if_empty(F.regexp_extract(F.col("descricao"), r"(\d+(?:,\d+)?\s?cm)", 1)),
            null_if_empty(F.regexp_extract(F.col("nome_produto"), r"(\d+(?:,\d+)?\s?cm)", 1)),
            null_if_empty(F.regexp_extract(F.col("descricao"), r"(\d+(?:\.\d+)?\s*(\"|”|''|pol))", 1)),
            null_if_empty(F.regexp_extract(F.col("nome_produto"), r"(\d+(?:\.\d+)?\s*(\"|”|''|pol))", 1))
        )
    ).withColumn(
        # Extrai potência em watts da descrição ou nome do produto
        "potencia",
        F.coalesce(
            null_if_empty(F.regexp_extract(F.col("descricao"), r"(\d+(?:,\d+)?\s?W)", 1)),
            null_if_empty(F.regexp_extract(F.col("nome_produto"), r"(\d+(?:,\d+)?\s?W)", 1))
        )
    )

    # Seleciona e re-ordena colunas
    df_enriquecido = df_enriquecido.select(
        'id_venda', 'canal_venda', 'valor', 'data_venda', 'ano_mes_venda',
        'id_produto', 'nome_produto', 'descricao', 'categoria', 'marca', 'modelo', 'capacidade', 'dimensao', 'potencia',
        'id_cliente', 'nome', 'email', 'telefone', 'estado', 'data_cadastro'
    )

    return df_enriquecido

#### Carregamento

In [0]:
def cria_tabela_delta(df, table_name, database="default"):
    # Escreve o DataFrame em uma tabela Delta
    df.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable(f"{database}.{table_name}")
    print(f"Tabela {database}.{table_name} criada ou sobrescrita.")

In [0]:
def exibe_df(df, nome_df):    
    # Exibe informações úteis
    print(f"\nNome do DataFrame: {nome_df}")
    print(f"Número de linhas: {df.count()}")
    print(f"Número de colunas: {len(df.columns)}")
    print("Schema:")
    df.printSchema()

    # Exibe o DataFrame
    display(df)

### Execução da ETL

In [0]:
#############################
# CONFIGURAÇÃO DE VARIÁVEIS
#############################

# Define paths
clientes_csv_path = "/Users/bentofreitas/Documents/clientes.csv"
produtos_csv_path = "/Users/bentofreitas/Documents/produtos.csv"
vendas_csv_path = "/Users/bentofreitas/Documents/vendas.csv"

# Lista de marcas conhecidas
BRANDS = ["Mondial", "Dell", "Samsung", "LG", "Nespresso", "Arno", "Electrolux", "Brastemp", 
          "Black+Decker", "HP", "Lorenzetti", "Fischer", "Philips Walita", "Redragon", 
          "Logitech", "Xiaomi", "Apple", "Motorola", "Acer", "ThunderX3", "Fifine", 
          "Lenovo", "JBL", "Kingston", "Seagate", "TP-Link", "Clamper", "Britânia", "WAP"]

#############################
# EXTRAÇÃO DOS DADOS
#############################

# Cria tabelas Delta e obtém DataFrames
# Extração dos dados dos arquivos CSV e criação das tabelas Delta correspondentes
clientes_df = obtem_dados("clientes", clientes_csv_path)
produtos_df = obtem_dados("produtos", produtos_csv_path)
vendas_df = obtem_dados("vendas", vendas_csv_path)

#############################
# TRANSFORMAÇÃO DOS DADOS
#############################

# Unificação dos DataFrames de vendas, produtos e clientes
df_unificado = cria_df_unificado(vendas_df, produtos_df, clientes_df)

# Enriquecimento do DataFrame unificado com informações adicionais dos produtos
df_enriquecido = enriquece_produtos(df_unificado, BRANDS)

#############################
# CARREGAMENTO DOS DADOS
#############################

# Exibição do DataFrame enriquecido para verificação
exibe_df(df_enriquecido, "base_unificada")

# Grava resultados em tabelas Delta
cria_tabela_delta(df_enriquecido, "Base_Unificada_GF")