## Transformação camada prata: productcategory

In [0]:
%run ../Config/DeltaFunctions

In [0]:
%run ../Config/LogProcessamento

In [0]:
from pyspark.sql import DataFrame, Window
from pyspark.sql import functions as F
from pyspark.sql.types import (
    IntegerType, StringType, TimestampType, StructType, StructField, BooleanType
)

In [0]:
# Habilitar a evolução automática de esquemas
spark.sql("SET spark.databricks.delta.schema.autoMerge.enabled = true")
spark.sql('USE CATALOG hive_metastore')

# Informações da Tabela Fonte
source_table = "production_productcosthistory"
source_database = "adventure_works_bronze"
bronze_source_table = spark.read.table(f"{source_database}.{source_table}")

# Informações da Tabela Destino (target)
target_table_name = "production_productcosthistory"
target_database = "adventure_works_silver"
target_table = f"{target_database}.{target_table_name}"

primary_keys = ["ProductID", "StartDate"]

In [0]:
expected_schema = StructType([
    StructField("ProductID", IntegerType(), False),         # int NOT NULL
    StructField("StartDate", TimestampType(), False),       # datetime NOT NULL
    StructField("EndDate", TimestampType(), True),          # datetime NULL
    StructField("StandardCost", DecimalType(19, 4), False), # money NOT NULL
    StructField("ModifiedDate", TimestampType(), False)     # datetime NOT NULL
])


In [0]:
def transform_production_ProductCostHistory(ProductCostHistory: DataFrame) -> DataFrame:
    '''
    Transformação da tabela: ProductCostHistory
    Parâmetros:
        ProductCostHistory (DataFrame): DataFrame contendo os dados da tabela ProductCostHistory

    Retorna:
        DataFrame: O DataFrame resultante após a transformação e deduplicação.
    '''
     
    # Define valores padrão para campos que podem ser nulos
    # Aplicar regras de integridade e preencher campos nulos
    ProductCostHistory = ProductCostHistory.withColumn(
        'EndDate', F.when(F.col('EndDate') < F.col('StartDate'), None).otherwise(F.col('EndDate'))
    )

    # Filtrar linhas com StandardCost >= 0
    ProductCostHistory = ProductCostHistory.filter(F.col('StandardCost') >= 0.00)

    # Gerar valores de ModifiedDate para linhas onde não existe
    ProductCostHistory = ProductCostHistory.withColumn(
        'ModifiedDate', 
        F.when(F.col('ModifiedDate').isNull(), F.current_timestamp()).otherwise(F.col('ModifiedDate'))
    )

    # Deduplicação utilizando uma função de janela
    window_spec = Window.partitionBy('ProductID', 'StartDate').orderBy(F.col('ModifiedDate').desc())
    ProductCostHistory = ProductCostHistory.withColumn('row_num', F.row_number().over(window_spec))

    # Filtrar para manter apenas a linha mais recente
    ProductCostHistory = ProductCostHistory.filter(F.col('row_num') == 1).drop('row_num')

    # Seleção final com CAST explícito dos tipos de dados
    ProductCostHistory = ProductCostHistory.select(
        F.col('ProductID').cast(IntegerType()).alias('ProductID'),
        F.col('StartDate').cast(TimestampType()).alias('StartDate'),
        F.col('EndDate').cast(TimestampType()).alias('EndDate'),
        F.col('StandardCost').cast(DecimalType(19, 4)).alias('StandardCost'),
        F.col('ModifiedDate').cast(TimestampType()).alias('ModifiedDate')
    )

    return ProductCostHistory

## Aplicar Transformação

In [0]:
# Estrutura do log para registrar informações sobre o processo
log_data = {
    "log_tabela": source_table,
    "log_camada": "Silver",
    "log_origem": "adventure_works_bronze",
    "log_destino": "adventure_works_silver",
}

# Registra o início do processo
addlog(**log_data, log_status='Início', atualizacao=0)

try:
    # Realiza a transformação dos dados
    transformed_df = transform_production_ProductCostHistory(ProductCostHistory=bronze_source_table)

    # Verifica rapidamente o número de linhas e o schema do DataFrame
    row_count = transformed_df.count()
    transformed_df.printSchema()

    # Validação do schema
    is_schema_valid = _validate_schema(transformed_df, expected_schema)
    if is_schema_valid:
        addlog(**log_data, log_status='Sucesso', atualizacao=1)
        print("O schema do DataFrame está correto.")
    else:
        raise ValueError("Schema validation failed.")
    
except Exception as e:
    # Registra erro caso ocorra uma exceção
    addlog(**log_data, log_status='Falha', atualizacao=1)
    print(f"Erro ao processar a tabela: {str(e)}")
    raise  

# Se o schema for válido, realiza o upsert
_upsert_silver_table(transformed_df, target_table, primary_keys, not_matched_by_source_action="DELETE")
