### üí∞ fat_vendas

- PK: 
  - `cd_venda`

- Sequence By:
  - `ts_load`

- Descri√ß√£o: 
  - M√©tricas das transa√ß√µes.

- Objetivo: 
| Campo       | Tipo   | Descri√ß√£o   |
| ----------- | ------ | ----------- |
| cd_venda    | INT    | PK          |
| cd_cliente  | INT    | FK cliente  |
| cd_produto  | INT    | FK produto  |
| cd_data     | INT    | FK data     |
| cd_pais     | INT    | FK pa√≠s     |
| qt_produto    | INT    | Quantidade  |
| vl_total_venda | DOUBLE | Valor total |

Autor: David Costa

## 01. Descri√ß√µes/Par√¢metros

In [0]:
# Defini√ß√µes de par√¢metros e Descri√ß√£o.
dicionario = {
    ## Destino.
    "catalog": "lakeflow",
    "schema": "d_gold",
    "prefix": "fat",
    "table": "venda",
    ## Origem.
    "orCatalog": "lakeflow",
    "orSchema": "c_silver",
    "orTable": "slv_orders",
    # Informa√ß√µes.
    "PK": "cd_venda",
    "partition": "dt_partition",
    "sequence_by": "ts_load",
    "Descri√ß√£o": "M√©tricas das transa√ß√µes."
}

# Cria o caminho de origem dos arquivos.
orPath = f'{dicionario["orCatalog"]}.{dicionario["orSchema"]}.{dicionario["orTable"]}'
print(f"Caminho de origem (Bronze): {orPath}")

# Cria o caminho de destino dos arquivos.
path = f'{dicionario["catalog"]}.{dicionario["schema"]}.{dicionario["prefix"]}_{dicionario["table"]}'
print(f"Caminho de destino (Silver): {path}")

## 01.2 Dicion√°rio de Colunas

In [0]:
# Defini√ß√µes de par√¢metros e Descri√ß√£o.
orders_schema_dict = {
    "cd_venda": {
        "datatype": "integer",
        "description": "C√≥digo √∫nico da transa√ß√£o."
    },
    "cd_cliente": {
        "datatype": "integer",
        "description": "C√≥digo √∫nico do Cliente."
    },
    "cd_produto": {
        "datatype": "integer",
        "description": "C√≥digo √∫nico do Produto"
    },
    "cd_tempo": {
        "datatype": "integer",
        "description": "C√≥digo √∫nico que faz Ref a Data."
    },
    "cd_pais": {
        "datatype": "integer",
        "description": "C√≥digo √∫nico do Pais."
    },
    "qt_produto": {
        "datatype": "integer",
        "description": "Quantidade de produtos vendidos."
    },
    "vl_total_venda": {
        "datatype": "double",
        "description": "Valor total da venda."
    },
    "dt_partition": {
        "datatype": "Date",
        "description": "Data de parti√ß√£o."
    },
    "ts_load": {
        "datatype": "Timestamp",
        "description": "Data e hora de ingest√£o do registro no Data Lake."
    }
}

## 01.3 Importa√ß√£o de Bibliotecas

In [0]:
from pyspark.sql.functions import col, max, min, current_date, current_timestamp, monotonically_increasing_id, year, month, dayofmonth
from delta.tables import DeltaTable

## 01.4 Configura√ß√µes de Ambientes

In [0]:
# Verifica se a tabela de origem existe.
if not spark.catalog.tableExists(orPath):
    dbutils.notebook.exit("Execu√ß√£o abortada: Tabela origem n√£o localizada.")

In [0]:
# Define se a carga ser√° completa ou incremental.
boolean_carga_full = True

# Verifica se a tabela de destino j√° existe. Se existir, realiza carga incremental.
if spark.catalog.tableExists(path):
    boolean_carga_full = False

    # Retorna uma lista de objetos de parti√ß√£o
    partitions = spark.sql(f"SHOW PARTITIONS {path}")

    # Obt√©m a data da √∫ltima parti√ß√£o criada.
    max_partition = partitions.select(max(dicionario["partition"])).first()[0]
    print(f"Carga incremental a partir da data: {max_partition}")

## 02. Leitura de Tabelas Delta

## 02. Leitura da Camada Silver

In [0]:
# Leitura das informa√ß√µes do arquivo CSV na camada Bronze.
if boolean_carga_full == False:
    # Carga incremental: l√™ apenas as parti√ß√µes mais recentes.
    df_silver = (
        spark.read \
        .table(orPath) \
        .filter(col("dt_partition") >= max_partition)
    )
else:
    # Carga completa: l√™ todos os pedidos da tabela de origem.
    df_silver = (
        spark.read \
        .table(orPath)
    )

print(f"Quantidade de registros lidos: {df_silver.count()}")
# Exibe o esquema da tabela.
df_silver.printSchema()

## 02.2 Leitura da Camada Gold: `dim_cliente`

In [0]:
# Define o caminho da tabela de clientes.
path_cliente = f'{dicionario["catalog"]}.{dicionario["schema"]}.dim_cliente'

# Obt√©m lista de IDs de clientes presentes nos pedidos lidos.
clientes_ids = [row.customer_id for row in df_silver.select("customer_id").distinct().toLocalIterator()]

# L√™ a tabela de clientes e filtra apenas os clientes presentes nos pedidos.
df_cliente = (
    spark.read \
    .table(path_cliente) \
    .filter(col("id_cliente").isin(clientes_ids)) \
    .select(
        col("cd_cliente"),
        col("id_cliente")
    )
)

## 02.3 Leitura da Camada Gold: `dim_produto`

In [0]:
# Define o caminho da tabela de produtos.
path_produto = f'{dicionario["catalog"]}.{dicionario["schema"]}.dim_produto'

# Obt√©m lista de IDs de produtos presentes nos pedidos lidos.
produtos_ids = [row.product_id for row in df_silver.select("product_id").distinct().toLocalIterator()]

# L√™ a tabela de produtos e filtra apenas os clientes presentes nos pedidos.
df_produto = (
    spark.read \
    .table(path_produto) \
    .filter(col("id_produto").isin(produtos_ids)) \
    .select(
        col("cd_produto"),
        col("id_produto")
    )
)

## 02.4 Leitura da Camada Gold: `dim_tempo`

In [0]:
# Define o caminho da dimens√£o de tempo.
path_tempo = f'{dicionario["catalog"]}.{dicionario["schema"]}.dim_tempo'

# Obt√©m a menor data de transa√ß√£o presente nos pedidos.
tempo_id = df_silver.select(min(col("order_date"))).first()[0]

# L√™ a tabela de tempo e filtra apenas os registros com datas maiores ou iguais a menor data de transa√ß√£o.
df_tempo = (
    spark.read \
    .table(path_tempo) \
    .filter(col("dt_venda") >= tempo_id) \
    .select(
        col("cd_tempo"),
        col("dt_venda")
    )
)

## 02.5 Leitura da Camada Gold: `dim_pais`

In [0]:
# Define o caminho da tabela de pais.
path_pais = f'{dicionario["catalog"]}.{dicionario["schema"]}.dim_pais'

# Obt√©m lista de IDs de pais presentes nos pedidos lidos.
pais_ids = [row.country for row in df_silver.select("country").distinct().toLocalIterator()]

# L√™ a tabela de pais e filtra apenas os paises presentes nos pedidos.
df_pais = (
    spark.read \
    .table(path_pais) \
    .filter(col("nm_pais").isin(pais_ids)) \
    .select(
        col("cd_pais"),
        col("nm_pais")
    )
)

## 03. Joins

In [0]:
# Realiza o join dos pedidos com as dimens√µes de cliente, produto, tempo e pa√≠s.
df_join = df_silver.alias("S") \
    .join(df_cliente.alias("C"), 
        col("S.customer_id") == col("C.id_cliente"), "left"  # Join com dimens√£o cliente.
    ) \
    .join(df_produto.alias("P"), 
        col("S.product_id") == col("P.id_produto"), "left"  # Join com dimens√£o produto.
    ) \
    .join(df_tempo.alias("T"), 
        col("S.order_date") == col("T.dt_venda"), "left"  # Join com dimens√£o tempo.
    ) \
    .join(df_pais.alias("PA"), 
        col("S.country") == col("PA.nm_pais"), "left"  # Join com dimens√£o pa√≠s.
    ) \
    .select(
        col("C.cd_cliente"),  # C√≥digo do cliente.
        col("P.cd_produto"),  # C√≥digo do produto.
        col("T.cd_tempo"),    # C√≥digo do tempo.
        col("PA.cd_pais"),    # C√≥digo do pa√≠s.
        col("S.quantity").alias("qt_produto"),  # Quantidade de produtos vendidos.
        col("S.total_price").alias("vl_total_venda")  # Valor total da venda.
    )

## 05. Adicionando data e hora da carga

In [0]:
# Adiciona uma coluna com a data de ingest√£o.
df_ts = (
    df_join \
        .withColumn('dt_partition', current_date())
        .withColumn('ts_load', current_timestamp())
)

## 06. Cria/Atualiza Tabela

In [0]:
if boolean_carga_full:
    # Carga completa: sobrescreve a tabela Silver com todos os dados tratados.
    (
        df_ts \
            .withColumn("cd_venda", monotonically_increasing_id()) \
            .write \
            .format("delta") \
            .mode("overwrite") \
            .partitionBy(dicionario['partition']) \
            .saveAsTable(path)
    )

else:
    # Carga incremental: realiza merge (upsert) dos dados tratados na tabela Silver existente.
    delta_table = DeltaTable.forName(spark, path)

    (
        delta_table.alias("t")
        .merge(
            source=df_ts.alias("s"),
            condition=f"t.{dicionario['PK']} = s.{dicionario['PK']}"
        )
        # Realiza atualiza√ß√£o das linhas existentes e insere novas linhas.
        .whenMatchedUpdate(set={
            col: f"s.{col}" 
            for col in df_ts.columns if col not in [dicionario['PK'], 'cd_venda']
        })
        .whenNotMatchedInsertAll()
        .execute()
    )

## 07. Adiciona Comentarios

## 07.1 Adiciona Comentarios: Tabela

In [0]:
# Adiciona as descri√ß√£o na Tabela.    
spark.sql(f"COMMENT ON TABLE {path} IS '{dicionario['Descri√ß√£o']}'")

## 07.2 Adiciona Comentarios: Colunas

In [0]:
# Adiciona as descri√ß√£o na Tabela.
if len(orders_schema_dict):
    for field in orders_schema_dict:
        # Guarda a descri√ß√£o da coluna.
        description = orders_schema_dict[field]['description']
        # Cria o comando SQL para adicionar a descri√ß√£o.
        sql_cd = f"ALTER TABLE {path} ALTER COLUMN {field} COMMENT '{description}'"
        # Executa o comando SQL.
        spark.sql(sql_cd)