# Transformation notebook


This notebook is responsible for the transformation of Sales tables from AdventureWorks database from RAW to STG.

In [None]:
%pip install pandas SQLAlchemy==1.4.54 pyspark==3.5.0 setuptools
dbutils.library.restartPython()  # Reinicia o kernel para carregar as novas bibliotecas

In [None]:
# Importações necessárias
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, concat_ws, sha2, coalesce, sum, when
from pyspark.sql.types import StringType

# Criar SparkSession
spark = SparkSession.builder.appName("TransformRawToStg").getOrCreate()

# Definir catálogos de origem (RAW) e destino (STG)
catalog_raw = "mateus_marin_raw"
catalog_stg = "mateus_marin_stg"

print("Spark configurado com sucesso!")

In [None]:
# Criar SparkSession
spark = SparkSession.builder \
    .appName("Teste Spark") \
    .getOrCreate()

# Exibir versão do Spark
print(f"Spark versão: {spark.version}")

# Criar DataFrame de teste
data = [("Mateus", 29), ("João", 35), ("Ana", 23)]
columns = ["Nome", "Idade"]
df = spark.createDataFrame(data, columns)

df.show()

In [None]:
# Inicializar a sessão Spark (se ainda não estiver ativa)
spark = SparkSession.builder.appName("ETL_SALES").getOrCreate()

# Obter todas as tabelas dinamicamente do schema RAW.sales
raw_tables = [row["tableName"] for row in spark.sql("SHOW TABLES IN mateus_marin_raw.sales").collect()]

print(f"Tabelas descobertas no schema RAW.sales: {raw_tables}")

In [None]:
def get_column_order(table_name):
    """
    Retorna a ordem das colunas para uma tabela específica no RAW.sales.
    """
    try:
        return spark.read.table(f"mateus_marin_raw.sales.{table_name}").columns
    except Exception as e:
        print(f"Erro ao obter colunas da tabela {table_name}: {e}")
        return None

In [None]:
# Mapeamento de chaves únicas por tabela
unique_keys = {
    "countryregioncurrency": ["CountryRegionCode", "CurrencyCode", "ModifiedDate"],
    "creditcard": ["CreditCardID", "ModifiedDate"],
    "currency": ["CurrencyCode", "ModifiedDate"],
    "currencyrate": ["CurrencyRateID", "ModifiedDate"],
    "customer": ["CustomerID", "ModifiedDate"],
    "personcreditcard": ["BusinessEntityID", "CreditCardID", "ModifiedDate"],
    "salesorderdetail": ["SalesOrderID", "SalesOrderDetailID", "ModifiedDate"],
    "salesorderheader": ["SalesOrderID", "ModifiedDate"],
    "salesorderheadersalesreason": ["SalesOrderID", "SalesReasonID", "ModifiedDate"],
    "salesperson": ["BusinessEntityID", "ModifiedDate"],
    "salespersonquotahistory": ["BusinessEntityID", "QuotaDate", "ModifiedDate"],
    "salesreason": ["SalesReasonID", "ModifiedDate"],
    "salestaxrate": ["SalesTaxRateID", "ModifiedDate"],
    "salesterritory": ["TerritoryID", "ModifiedDate"],
    "salesterritoryhistory": ["BusinessEntityID", "StartDate", "TerritoryID", "ModifiedDate"],
    "shoppingcartitem": ["ShoppingCartItemID", "ModifiedDate"],
    "specialoffer": ["SpecialOfferID", "ModifiedDate"],
    "specialofferproduct": ["SpecialOfferID", "ProductID", "ModifiedDate"],
    "store": ["BusinessEntityID", "ModifiedDate"]
}

print("Mapeamento de chaves únicas configurado!")

In [None]:
def transform_table(table_name):
    """
    Transforma uma tabela do RAW.sales para STG.sales, aplicando regras específicas.
    """
    try:
        # Lendo a tabela do RAW
        df = spark.read.table(f"mateus_marin_raw.sales.{table_name}")

        # Criar GlobalID de forma consistente
        if "SalesOrderID" in df.columns and "SalesOrderDetailID" in df.columns:
            df = df.withColumn("GlobalID", concat_ws("-", col("SalesOrderID"), col("SalesOrderDetailID")))
        elif "SalesOrderID" in df.columns and "CustomerID" in df.columns:
            df = df.withColumn("GlobalID", concat_ws("-", col("SalesOrderID"), col("CustomerID")))
        elif "CustomerID" in df.columns:
            df = df.withColumn("GlobalID", col("CustomerID"))
        
        # Ordenar as colunas
        column_order = get_column_order(table_name)
        if column_order:
            df = df.select(column_order)

        # Salvar na camada STG
        df.write.mode("overwrite").saveAsTable(f"mateus_marin_stg.sales.{table_name}")
        print(f"Tabela `{table_name}` transformada e carregada na STG.sales!")

    except Exception as e:
        print(f"Erro ao transformar a tabela {table_name}: {e}")

In [None]:
for table in raw_tables:
    transform_table(table)

print("Transformação RAW → STG concluída com sucesso!")

In [None]:
# Obter todas as tabelas disponíveis no STG
stg_tables = [row["tableName"] for row in spark.sql("SHOW TABLES IN mateus_marin_stg.sales").collect()]

# Mostrar quais tabelas foram identificadas
print(f"Tabelas disponíveis na STG: {stg_tables}")

In [None]:
def rename_conflicting_columns(df, table_name):
    """
    Renomeia colunas `ModifiedDate` e `rowguid` para evitar conflitos de nome antes do JOIN.
    Exemplo: `rowguid` da tabela `customer` será `customer_rowguid`.
    """
    if "ModifiedDate" in df.columns:
        df = df.withColumnRenamed("ModifiedDate", f"{table_name}_ModifiedDate")
    
    if "rowguid" in df.columns:
        df = df.withColumnRenamed("rowguid", f"{table_name}_rowguid")
    
    return df

def load_table(table_name):
    """
    Carrega a tabela STG e aplica correções para minimizar valores NULL.
    """
    try:
        df = spark.read.table(f"{catalog_stg}.sales.{table_name}")
        df = rename_conflicting_columns(df, table_name)

        # Corrigir valores NULL dependendo do tipo da coluna
        for col_name in df.columns:
            dtype = df.schema[col_name].dataType.simpleString()

            if dtype.startswith("string"):
                df = df.withColumn(col_name, coalesce(col(col_name), lit("N/A")))

            elif dtype.startswith("int") or dtype.startswith("bigint") or dtype.startswith("smallint"):
                df = df.withColumn(col_name, coalesce(col(col_name), lit(0)))

            elif dtype.startswith("decimal") or dtype.startswith("double") or dtype.startswith("float"):
                df = df.withColumn(col_name, coalesce(col(col_name), lit(0.0)))

            elif dtype.startswith("timestamp") or dtype.startswith("date"):
                df = df.withColumn(col_name, coalesce(col(col_name), lit("1970-01-01 00:00:00").cast("timestamp")))

        print(f"`{table_name}` carregada e pré-processada com sucesso!")
        return df

    except Exception as e:
        print(f"Erro ao carregar `{table_name}`: {e}")
        return None

In [None]:
# Carregar tabelas necessárias para a criação da `one_big_table`
df_salesorderheader = load_table("salesorderheader")
df_salesorderdetail = load_table("salesorderdetail")

# Garantir que ambas possuem a chave SalesOrderID antes do JOIN
if df_salesorderheader and df_salesorderdetail and "SalesOrderID" in df_salesorderheader.columns and "SalesOrderID" in df_salesorderdetail.columns:
    df_final = df_salesorderdetail.join(df_salesorderheader, ["SalesOrderID"], "left")
    print("JOIN inicial entre SalesOrderDetail e SalesOrderHeader realizado com sucesso!")
else:
    print("Coluna 'SalesOrderID' não encontrada ou tabelas vazias, pulando JOIN inicial.")
    df_final = df_salesorderdetail if df_salesorderdetail else None

In [None]:
# Lista de tabelas adicionais que serão adicionadas à `one_big_table`
tables_to_join = ["customer", "salesperson", "creditcard", "salesterritory"]

if df_final:
    for table in tables_to_join:
        df_temp = load_table(table)
        
        if df_temp:
            # Identificar chaves comuns para JOIN
            common_keys = list(set(df_final.columns) & set(df_temp.columns))
            
            if common_keys:
                print(f"Fazendo JOIN com `{table}` usando as chaves {common_keys}")
                df_final = df_final.join(df_temp, common_keys, "left")
            else:
                print(f"Nenhuma chave comum encontrada para `{table}`, pulando...")
        else:
            print(f"Tabela `{table}` não carregada, pulando...")
else:
    print("`one_big_table` não pode ser criada pois o DataFrame base está vazio!")

In [None]:
if df_final:
    # Criar UniqueID
    if "SalesOrderID" in df_final.columns and "SalesOrderDetailID" in df_final.columns:
        df_final = df_final.withColumn("UniqueID", concat_ws("-", col("SalesOrderID"), col("SalesOrderDetailID")))
    elif "CustomerID" in df_final.columns:
        df_final = df_final.withColumn("UniqueID", col("CustomerID"))

    # Criar GlobalID
    if "SalesOrderID" in df_final.columns and "CustomerID" in df_final.columns:
        df_final = df_final.withColumn("GlobalID", concat_ws("-", col("SalesOrderID"), col("CustomerID")))

    print("UniqueID e GlobalID criados com sucesso!")

In [None]:
if df_final:
    # Remover duplicatas antes de salvar
    df_final = df_final.dropDuplicates()

    # Salvar a tabela final apenas se houver dados
    if df_final.count() > 0:
        df_final.write.mode("overwrite").saveAsTable(f"{catalog_stg}.sales.one_big_table")
        print("`one_big_table` corrigida e salva com sucesso!")
    else:
        print("`one_big_table` não pôde ser criada pois não há dados disponíveis.")

In [None]:
if df_final:
    # Exibir estrutura final
    print("Estrutura da `one_big_table`:")
    df_final.printSchema()

    print("Exemplo de dados na `one_big_table`:")
    df_final.show(10, False)

    # Contagem de valores nulos por coluna
    print("Contagem de valores nulos por coluna:")
    null_counts = df_final.select([sum(col(c).isNull().cast("int")).alias(c) for c in df_final.columns])
    null_counts.show(truncate=False)

    # Contagem de registros sem UniqueID e GlobalID
    missing_uniqueid_count = df_final.filter(col("UniqueID").isNull()).count()
    missing_globalid_count = df_final.filter(col("GlobalID").isNull()).count()
    print(f"Registros sem UniqueID: {missing_uniqueid_count}")
    print(f"Registros sem GlobalID: {missing_globalid_count}")

    # Exemplo de registros sem UniqueID
    print("Exemplo de registros sem UniqueID:")
    df_final.filter(col("UniqueID").isNull()).show(10, False)

    # Verificar colunas duplicadas
    duplicate_cols = [c for c in df_final.columns if df_final.columns.count(c) > 1]
    print(f"Colunas duplicadas (se houver): {duplicate_cols}")

    # Exibir as primeiras linhas para conferência geral
    df_final.show(5, False)

In [None]:
# Obter a lista de tabelas no schema sales do catalog STG
#tables_query = "SHOW TABLES IN mateus_marin_stg.sales"
#df_tables = spark.sql(tables_query)
#
## Criar lista de tabelas
#tables_list = [row.tableName for row in df_tables.collect()]
#
## Loop para dropar cada tabela individualmente
#for table in tables_list:
#    drop_query = f"DROP TABLE IF EXISTS mateus_marin_stg.sales.{table}"
#    spark.sql(drop_query)
#    print(f"Tabela {table} removida com sucesso! ✅")