In [0]:
%sql
USE CATALOG medalhao;
USE SCHEMA silver;

In [0]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import DateType, DecimalType
from datetime import datetime, timedelta
from pyspark.sql.window import Window
from pyspark.sql.functions import col, to_date, lead

# Definição do caminho do schema Bronze
catalogo = 'medalhao'
silver_db_name = 'silver'

spark = SparkSession.builder.getOrCreate()

In [0]:
# ft_consumidores em bronze

df_ft_consumidores = spark.table('medalhao.bronze.ft_consumidores')
df_ft_consumidores.limit(5).display()

# ft_consumidores em silver

# contagem de duplicatas em id_consumidor
duplicatas_id_consumidor = df_ft_consumidores.groupBy('customer_id').count().filter(F.col('count') > 1).count()
print(f'duplicatas em id_consumidor: {duplicatas_id_consumidor}')

df_ft_consumidores_final = df_ft_consumidores.select(
    F.col('customer_id').alias('id_consumidor'),
    F.col('customer_zip_code_prefix').alias('prefixo_cep'),
    F.col('customer_city').alias('cidade'),
    F.col('customer_state').alias('estado'),
    F.col('data_ingestao')
).withColumn('cidade', F.upper(F.col('cidade'))).withColumn('estado', F.upper(F.col('estado'))).dropDuplicates(['id_consumidor'])

df_ft_consumidores_final.limit(5).display()

# salvando df_ft_consumidores_final no schema silver
df_ft_consumidores_final.write.mode('overwrite').saveAsTable(f'{silver_db_name}.ft_consumidores')

In [0]:
# ft_pedidos em bronze

df_ft_pedidos = spark.table('medalhao.bronze.ft_pedidos')
df_ft_pedidos.limit(5).display()

# ft_pedidos em silver

df_ft_pedidos = df_ft_pedidos.withColumn(
    'order_status',
    F.when(F.col('order_status') == 'delivered', 'entregue')
     .when(F.col('order_status') == 'invoiced', 'faturado')
     .when(F.col('order_status') == 'shipped', 'enviado')
     .when(F.col('order_status') == 'processing', 'em processamento')
     .when(F.col('order_status') == 'unavailable', 'indisponível')
     .when(F.col('order_status') == 'canceled', 'cancelado')
     .when(F.col('order_status') == 'created', 'criado')
     .when(F.col('order_status') == 'approved', 'aprovado')
     .otherwise(F.col('order_status'))
)

df_ft_pedidos.select('order_status').distinct().display()

df_ft_pedidos_final = df_ft_pedidos.select(
    F.col('order_id').alias('id_pedido'),
    F.col('customer_id').alias('id_consumidor'),
    F.col('order_status').alias('status'),
    F.col('order_purchase_timestamp').alias('pedido_compra_timestamp'),
    F.col('order_approved_at').alias('pedido_aprovado_timestamp'),
    F.col('order_delivered_carrier_date').alias('pedido_carregado_timestamp'),
    F.col('order_delivered_customer_date').alias('pedido_entregue_timestamp'),
    F.col('order_estimated_delivery_date').alias('pedido_estimativa_entrega_timestamp'),
    F.col('data_ingestao')
)

df_ft_pedidos_final = df_ft_pedidos_final \
    .withColumn(
        'tempo_entrega_dias',
        F.datediff('pedido_entregue_timestamp', 'pedido_compra_timestamp')
    ) \
    .withColumn(
        'tempo_entrega_estimado_dias',
        F.datediff('pedido_estimativa_entrega_timestamp', 'pedido_compra_timestamp')
    ) \
    .withColumn(
        'diferenca_entrega_dias',
        F.col('tempo_entrega_dias') - F.col('tempo_entrega_estimado_dias')
    ) \
    .withColumn(
        'entrega_no_prazo',
        F.when(F.col('pedido_entregue_timestamp').isNull(), 'Não Entregue')
         .when(F.col('diferenca_entrega_dias') <= 0, 'Sim')
         .otherwise('Não')
    )

df_ft_pedidos_final.select(
    F.min('tempo_entrega_dias').alias('min_tempo_entrega_dias'),
    F.max('tempo_entrega_dias').alias('max_tempo_entrega_dias'),
    F.min('tempo_entrega_estimado_dias').alias('min_tempo_entrega_estimado_dias'),
    F.max('tempo_entrega_estimado_dias').alias('max_tempo_entrega_estimado_dias'),
    F.min('diferenca_entrega_dias').alias('min_diferenca_entrega_dias'),
    F.max('diferenca_entrega_dias').alias('max_diferenca_entrega_dias')
).display()

df_ft_pedidos_final.groupBy('entrega_no_prazo').count().display()

df_ft_pedidos_final.limit(5).display()

# salvando df_ft_pedidos_final no schema silver
df_ft_pedidos_final.write.mode('overwrite').saveAsTable(f'{silver_db_name}.ft_pedidos')

In [0]:
# ft_itens_pedidos em bronze

df_ft_itens_pedidos = spark.table('medalhao.bronze.ft_itens_pedidos')
df_ft_itens_pedidos.limit(5).display()

df_ft_itens_pedidos_final = df_ft_itens_pedidos.select(
    F.col('order_id').alias('id_pedido'),
    F.col('order_item_id').alias('id_item'),
    F.col('product_id').alias('id_produto'),
    F.col('seller_id').alias('id_vendedor'),
    F.col('price').alias('preco_BRL'),
    F.col('freight_value').alias('preco_frete'),
    F.col('data_ingestao')
)

df_ft_itens_pedidos_final.select().display()

# salvando df_ft_itens_pedidos_final no schema silver
df_ft_itens_pedidos_final.write.mode('overwrite').saveAsTable(f'{silver_db_name}.ft_itens_pedidos')

In [0]:
# ft_pagamentos_pedidos em bronze

df_ft_pagamentos_pedidos = spark.table('medalhao.bronze.ft_pagamentos_pedidos')
df_ft_pagamentos_pedidos.limit(5).display()

# ft_pagamentos_pedidos em silver

df_ft_pagamentos_pedidos = df_ft_pagamentos_pedidos.withColumn(
    'forma_pagamento',
    F.when(F.col('payment_type') == 'credit_card', 'Cartão de Crédito')
     .when(F.col('payment_type') == 'boleto', 'Boleto')
     .when(F.col('payment_type') == 'voucher', 'Voucher')
     .when(F.col('payment_type') == 'debit_card', 'Cartão de Débito')
     .otherwise('Outro')
)

df_ft_pagamentos_pedidos.select('forma_pagamento').distinct().display()

df_ft_pagamentos_pedidos_final = df_ft_pagamentos_pedidos.select(
    F.col('order_id').alias('id_pedido'),
    F.col('payment_sequential').alias('codigo_pagamento'),
    F.col('payment_type').alias('forma_pagamento'),
    F.col('payment_installments').alias('parcelas'),
    F.col('payment_value').alias('valor_pagamento'),
    F.col('data_ingestao')
)

df_ft_pagamentos_pedidos_final.limit(5).display()

# salvando df_ft_pagamentos_pedidos_final no schema silver
df_ft_pagamentos_pedidos_final.write.mode('overwrite').saveAsTable(f'{silver_db_name}.ft_pagamentos_pedidos')

In [0]:
# ft_avaliacoes_pedidos em bronze

df_ft_avaliacoes_pedidos = spark.table('medalhao.bronze.ft_avaliacoes_pedidos')
df_ft_avaliacoes_pedidos.limit(5).display()

# ft_avaliacoes_pedidos em silver

# Remover registros com id_pedido inválido ou datas incorretas

# id considerado invalido quando e nulo E/OU nao segue o padrao de 32 caracteres 
df_ft_avaliacoes_pedidos = df_ft_avaliacoes_pedidos \
    .where(F.col('order_id').isNotNull() & (F.length(F.col('order_id')) == 32))

# Remover registros com datas nulas, formato inconsistente ou datas futuras fora do escopo
hoje = datetime.now().date()

date_cols = [
    'review_creation_date',
    'review_answer_timestamp'
]

# a data e considerada errada quando:
# review_creation_date maior que review_answer_timestamp E/OU
#  e nula E/OU
#  o formato e nulo E/OU
#  a data e maior que o dia de hoje

for col in date_cols:
    df_ft_avaliacoes_pedidos = df_ft_avaliacoes_pedidos \
        .where(
            F.col(col).isNotNull() &
            (F.to_date(F.col(col)).isNotNull()) &
            (F.to_date(F.col(col)) <= F.lit(hoje))
        )

df_ft_avaliacoes_pedidos = df_ft_avaliacoes_pedidos.where(
    F.to_date(F.col('review_creation_date')) <= F.to_date(F.col('review_answer_timestamp'))
)

df_ft_avaliacoes_pedidos_final = df_ft_avaliacoes_pedidos.select(
    F.col('review_id').alias('id_avaliacao'),
    F.col('order_id').alias('id_pedido'),
    F.col('review_score').alias('avaliacao'),
    F.col('review_comment_title').alias('titulo_comentario'),
    F.col('review_comment_message').alias('comentario'),
    F.col('review_creation_date').alias('data_comentario'),
    F.col('review_answer_timestamp').alias('data_resposta'),
    F.col('data_ingestao')
)

df_ft_avaliacoes_pedidos_final.limit(5).display()

# salvando df_ft_avaliacoes_pedidos_final no schema silver
df_ft_avaliacoes_pedidos_final.write.mode('overwrite').saveAsTable(f'{silver_db_name}.ft_avaliacoes_pedidos')

In [0]:
# ft_produtos em bronze

df_ft_produtos = spark.table('medalhao.bronze.ft_produtos')
df_ft_produtos.limit(5).display()

# ft_produtos em silver

df_ft_produtos_final = df_ft_produtos.select(
    F.col('product_id').alias('id_produto'),
    F.col('product_category_name').alias('categoria_produto'),
    F.col('product_weight_g').alias('peso_produto_gramas'),
    F.col('product_length_cm').alias('comprimento_centimetros'),
    F.col('product_height_cm').alias('altura_centimetros'),
    F.col('product_width_cm').alias('largura_centimetros'),
    F.col('data_ingestao')
)

df_ft_produtos_final.limit(5).display()

# salvando df_ft_produtos_final no schema silver
df_ft_produtos_final.write.mode('overwrite').saveAsTable(f'{silver_db_name}.ft_produtos')

In [0]:
# ft_vendedores em bronze

df_ft_vendedores = spark.table('medalhao.bronze.ft_vendedores')
df_ft_vendedores.limit(5).display()

# ft_vendedores em silver

df_ft_vendedores_final = df_ft_vendedores.select(
    F.col('seller_id').alias('id_vendedor'),
    F.col('seller_zip_code_prefix').alias('prefixo_cep'),
    F.upper(F.col('seller_city')).alias('cidade'),
    F.upper(F.col('seller_state')).alias('estado'),
    F.col('data_ingestao')
)

df_ft_vendedores_final.limit(5).display()

# salvando df_ft_vendedores_final no schema silver
df_ft_vendedores_final.write.mode('overwrite').saveAsTable(f'{silver_db_name}.ft_vendedores')

In [0]:
# dm_categoria_produtos_traducao em bronze

df_dm_categoria_produtos_traducao = spark.table('medalhao.bronze.dm_categoria_produtos_traducao')
df_dm_categoria_produtos_traducao.limit(5).display()

# dm_categoria_produtos_traducao em silver

df_dm_categoria_produtos_traducao_final = df_dm_categoria_produtos_traducao.select(
    F.col('product_category_name').alias('nome_produto_pt'),
    F.col('product_category_name_english').alias('nome_produto_en'),
    F.col('data_ingestao')
)

df_dm_categoria_produtos_traducao_final.limit(5).display()

# salvando df_dm_categoria_produtos_traducao_final no schema silver
df_dm_categoria_produtos_traducao_final.write.mode('overwrite').saveAsTable(f'{silver_db_name}.dm_categoria_produtos_traducao')

In [0]:
# dm_cotacao_dolar em bronze

df_dm_cotacao_dolar = spark.table('medalhao.bronze.dm_cotacao_dolar')
df_dm_cotacao_dolar.limit(5).display()

# dm_cotacao_dolar em silver

# Adiciona coluna com a última cotação anterior (forward fill)
window_spec = Window.orderBy('data_hora_cotacao').rowsBetween(Window.unboundedPreceding, 0)

df_dm_cotacao_dolar_ffill = df_dm_cotacao_dolar.withColumn(
    'cotacao_ffill',
    F.last('cotacaoCompra', ignorenulls=True).over(window_spec)
)

# gera todos os dias do intervalo
min_data_str = df_dm_cotacao_dolar.select(F.min('dataHoraCotacao')).first()[0]
max_data_str = df_dm_cotacao_dolar.select(F.max('dataHoraCotacao')).first()[0]
# print(min_data_str, max_data_str)

# converte para datetime.date
min_data = datetime.strptime(min_data_str, "%Y-%m-%d %H:%M:%S.%f").date()
max_data = datetime.strptime(max_data_str, "%Y-%m-%d %H:%M:%S.%f").date()

dias = spark.range(0, (max_data - min_data).days + 1).withColumn(
    'data',
    F.expr(f"date_add('{min_data}', CAST(id AS INT))")
)
dias.display()

# junta com as cotações existentes
df_dm_cotacao_dolar_com_data = df_dm_cotacao_dolar.withColumn('data_cotacao', F.to_date('dataHoraCotacao'))

df_dias = dias.select(F.col('data').alias('data_cotacao'))

window_spec = Window.orderBy('data_cotacao').rowsBetween(Window.unboundedPreceding, 0)

df_dm_cotacao_dolar_completo = df_dias.join(
    df_dm_cotacao_dolar_com_data,
    on='data_cotacao',
    how='left'
).withColumn(
    'cotacao_ffill',
    F.last('cotacaoCompra', ignorenulls=True).over(window_spec)
).dropDuplicates(['data_cotacao'])

df_dm_cotacao_dolar_completo.limit(5).display()

# salvando df_dm_cotacao_dolar_completo no schema silver
df_dm_cotacao_dolar_completo.write.mode('overwrite').saveAsTable(f'{silver_db_name}.dm_cotacao_dolar')

In [0]:
# verificacao de linhas orfans
df_pedidos = spark.table(f'{catalogo}.{silver_db_name}.ft_pedidos')
df_consumidores = spark.table(f'{catalogo}.{silver_db_name}.ft_consumidores')
df_itens_pedidos = spark.table(f'{catalogo}.{silver_db_name}.ft_itens_pedidos')

# verifica pedidos sem consumidor correspondente
pedidos_sem_consumidor = df_pedidos.join(
    df_consumidores.select('id_consumidor'),
    on='id_consumidor',
    how='left_anti'
)

pedidos_orfaos = pedidos_sem_consumidor.count()
print(f'pedidos sem id_consumidor: {pedidos_orfaos}')

df_pedidos_final = df_pedidos.exceptAll(pedidos_sem_consumidor)
df_pedidos_final.write.mode('overwrite').saveAsTable(f'{catalogo}.{silver_db_name}.ft_pedidos')
df_pedidos = df_pedidos_final
print(f'total de ft_pedidos novo: {df_pedidos.count()}')

# verifica itens de pedidos sem pedido correspondente
itens_sem_pedido = df_itens_pedidos.join(
    df_pedidos.select('id_pedido'),
    on='id_pedido',
    how='left_anti'
)
count_itens_sem_pedido = itens_sem_pedido.count()
print(f'itens sem id_pedido: {count_itens_sem_pedido}')

df_itens_pedidos_final = df_itens_pedidos.exceptAll(itens_sem_pedido)
df_itens_pedidos_final.write.mode('overwrite').saveAsTable(f'{catalogo}.{silver_db_name}.ft_itens_pedidos')
df_itens_pedidos = df_itens_pedidos_final
print(f'total de ft_itens_pedidos novo: {df_itens_pedidos.count()}')

In [0]:
# criacao da tabela silver.ft_pedido_total
df_pedidos = spark.table(f'{catalogo}.{silver_db_name}.ft_pedidos')
df_consumidores = spark.table(f'{catalogo}.{silver_db_name}.ft_consumidores')
df_pagamentos = spark.table(f'{catalogo}.{silver_db_name}.ft_pagamentos_pedidos')
df_cotacao = spark.table(f'{catalogo}.{silver_db_name}.dm_cotacao_dolar')

df_pagamentos_agregados = (
    df_pagamentos.groupBy('id_pedido')
    .agg(F.sum('valor_pagamento').alias('valor_total_pago_brl'))
)

df_total = (
    df_pedidos
    .join(df_consumidores.select('id_consumidor', 'cidade', 'estado'), on='id_consumidor', how='inner')
    .join(df_pagamentos_agregados, on='id_pedido', how='inner')
)

df_total = df_total.withColumn(
    'data_pedido',
    F.to_date(F.col('pedido_compra_timestamp')).cast(DateType())
)

df_total_cotacao = df_total.join(
    df_cotacao.select(F.col('data_cotacao').alias('data'), F.col('cotacao_ffill').alias('cotacao_dolar')),
    df_total['data_pedido'] == F.col('data'),
    'left'
)

df_pedido_total = df_total_cotacao.select(
    F.col('data_pedido'),
    F.col('id_pedido'),
    F.col('id_consumidor'),
    F.col('status'),
    F.col('valor_total_pago_brl'),
    (F.col('valor_total_pago_brl') / F.coalesce(F.col('cotacao_dolar'), F.lit(1))).alias('valor_total_pago_usd')
)


df_pedido_total = df_pedido_total.withColumn(
    'valor_total_pago_usd',
    F.col('valor_total_pago_usd').cast(DecimalType(12, 2))
)

display(df_pedido_total.limit(10))

# salvando df_pedido_total no schema silver
df_pedido_total.write.mode('overwrite').saveAsTable(f'{catalogo}.{silver_db_name}.ft_pedido_total')