In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("Atividade2").getOrCreate()

catalogo = 'atividade_2_rocketlab'
bronze_schema = 'bronze'

tables = spark.catalog.listTables(f"{catalogo}.{bronze_schema}")
for table in tables:
    print(table.name)

ft_avaliacoes_pedidos

In [0]:
av = spark.read.table(f"{catalogo}.{bronze_schema}.ft_avaliacoes_pedidos")
av.display()

In [0]:
"""
Analise ft_avaliacoes_pedidos:
    inicio (vizualmente):
        - review_id: algumas inconcitencias (data-hora, titulos de comentarios)
        - order_id: algumas inconstitencias (data-hora, titulos de comentarios e comentarios)
        - review_comment_title: faltam muitos, mas não é critico (remover datas -> não inforamdo)
            numeros (provavelmente notas), não tem como saber, melhor manter e informar
        - review_comment_message: ok, verificar apenas se tem alguam data-hora
    
    soluções:
        - verificar se tem "-", " ", ou ".", ids n tem 
"""
# intervalo de 1-5 no review_score
# removendo aleatoriedade dos review_id e order_id
# colunas para portugues
# padronização
# remocao de nulos e duplicatas

s_av = (av
      .withColumn('review_error', F.when(av['review_score'].isin(['1','2', '3', '4','5']), 'False').otherwise('True'))
      .withColumn('id_error', F.when(F.col("review_id").rlike("[- .]"), 'True').otherwise('False'))
      .withColumn('id_error', F.when(F.col("order_id").rlike("[- .]"), 'True').otherwise(F.col('id_error')))
      #Removendo linhas com erros
      .filter((F.col('review_error') == 'False') & (F.col('id_error') == 'False'))
      .select(
          F.col('review_id').alias('id_avaliacao'),
          F.col('order_id').alias('id_pedido'),
          F.col('review_score').alias('avaliacao').cast('int'),
          F.lower(F.col('review_comment_title')).alias('titulo_comentario'),
          F.lower(F.col('review_comment_message')).alias('comentario'),
          F.col('review_creation_date').alias('data_comentario').try_cast('timestamp'),
          F.col('review_answer_timestamp').alias('data_resposta').try_cast('timestamp'),
          F.col('origem')
      )
      .filter(F.col('id_avaliacao').isNotNull())
      .filter(F.col('id_pedido') .isNotNull())
      .filter(F.col('data_comentario') .isNotNull())
      .filter(F.col('data_resposta') .isNotNull())
      .distinct()
)

# !! ERROS CRITICOS !! ajeitar no sistema ou no processo 
# av_erros = av.filter((av['review_error'] == 'True') | (av['id_error'] == 'True')).display()

s_av.display()


ft_consumidores

In [0]:
cs = spark.read.table(f"{catalogo}.{bronze_schema}.ft_consumidores")
cs.display()

In [0]:
"""
Analise ft_consumidores:
    inicio (vizualmente):
        - Sem erro aparente:
            Tipagens corretas, sem irregularidades viziveis
        - Unico erro possivel é o prefixo do cep n corresponder com estado (não da para verificar cidade so com prefixo)
"""

prefixos_por_estado = {
    'AC': [(69900, 69999)],
    'AL': [(57000, 57999)],
    'AM': [(69000, 69299), (69400, 69899)],
    'AP': [(68900, 68999)],
    'BA': [(40000, 48999)],
    'CE': [(60000, 63999)],
    'DF': [(70000, 72799), (73000, 73699)],
    'ES': [(29000, 29999)],
    'GO': [(72800, 72999), (73700, 76799)],
    'MA': [(65000, 65999)],
    'MG': [(30000, 39999)],
    'MS': [(79000, 79999)],
    'MT': [(78000, 78899)],
    'PA': [(66000, 68899)],
    'PB': [(58000, 58999)],
    'PE': [(50000, 56999)],
    'PI': [(64000, 64999)],
    'PR': [(80000, 87999)],
    'RJ': [(20000, 28999)],
    'RN': [(59000, 59999)],
    'RO': [(76800, 76999)],
    'RR': [(69300, 69399)],
    'RS': [(90000, 99999)],
    'SC': [(88000, 89999)],
    'SE': [(49000, 49999)],
    'SP': [(1000, 19999)],
    'TO': [(77000, 77999)],
}
s_cs = (cs
      .select(
          F.col('customer_id').alias('id_consumidor'),
          F.col('customer_zip_code_prefix').alias('prefixo_cep'),
          F.upper(F.col('customer_city')).alias('cidade'),
          F.upper(F.col('customer_state')).alias('estado'),
          F.col('data_ingestao'),
          F.col('origem')
      )
      .filter(F.col('id_consumidor').isNotNull())
      .filter(F.col('prefixo_cep').isNotNull())
      .filter(F.col('cidade').isNotNull())
      .distinct()
)
# Verificação de inconsistencias (cep-estado)
# Sem inconcistencias (sem necessidade de tratamento)
"""
for row in s_cs.collect():
    estado = row['estado']
    intervalos = prefixos_por_estado[estado]
    if len(intervalos) > 1:
        if (row['prefixo_cep'] >= intervalos[0][0] and row['prefixo_cep'] <= intervalos[0][1]) or (row['prefixo_cep'] >= intervalos[1][0] and row['prefixo_cep'] <= intervalos[1][1]):
            pass
        else:
            print('incongruencia')
    else:
        if (row['prefixo_cep'] >= intervalos[0][0] and row['prefixo_cep'] <= intervalos[0][1]):
            pass
        else:
            print('incongruencia')
"""

        
s_cs.display()

ft_pedidos

In [0]:
ped = spark.read.table(f"{catalogo}.{bronze_schema}.ft_pedidos")
ped.display()

In [0]:
"""
Analise ft_pedidos:
    inicio (vizualmente):
        - Sem erro aparente:
            Tipagens corretas, sem irregularidades viziveis
    atividade:
        - map (ing -> pt)
        - criar colunas (tempo_entrega_dias, tempo_entrega_estimado_dias, diferenca_entrega_dias, entrega_no_prazo)
"""

dict_ing_to_pt = {
    "delivered": "entregue",
    "invoiced": "faturado",
    "shipped": "enviado",
    "processing": "em processamento",
    "unavailable": "indisponível",
    "canceled": "cancelado",
    "created": "criado",
    "approved": "aprovado"
}
map_trans = F.create_map([F.lit(k) for kv in dict_ing_to_pt.items() for k in kv])
s_ped = (ped
         .select(
              F.col('order_id').alias('id_pedido'),
              F.col('customer_id').alias('id_consumidor'),
              F.col('order_status').alias('status'),
              F.col('order_purchase_timestamp').alias('pedido_compra_timestamp'),
              F.col('order_approved_at').alias('pedido_aprovado_timestamp'),
              F.col('order_delivered_carrier_date').alias('pedido_carregado_timestamp'),
              F.col('order_delivered_customer_date').alias('pedido_entregue_timestamp'),
              F.col('order_estimated_delivery_date').alias('pedido_estimativa_entrega_timestamp'),
              F.col('origem')
             )
        .withColumn('status', map_trans[F.col("status")])
        .withColumn('tempo_entrega_dias', F.datediff(F.col('pedido_entregue_timestamp'), F.col('pedido_compra_timestamp')))
        .withColumn('tempo_entrega_estimado_dias', F.datediff(F.col('pedido_estimativa_entrega_timestamp'), F.col('pedido_compra_timestamp')))
        .withColumn('diferenca_entrega_dias', F.datediff(F.col('pedido_entregue_timestamp'), F.col('pedido_estimativa_entrega_timestamp')))
        .withColumn('entrega_no_prazo', F.when(F.col('diferenca_entrega_dias') <= 0, 'sim').otherwise('nao'))
        .filter(F.col('id_consumidor').isNotNull())

        .filter(F.col('status').isNotNull())
)

s_ped.display()

ft_itens_pedidos

In [0]:

i_ped = spark.read.table(f"{catalogo}.{bronze_schema}.ft_itens_pedidos")
i_ped.display()

In [0]:
"""
Analise ft_itens_pedidos:
    inicio (vizualmente):
        - Sem erro aparente:
            Tipagens corretas, sem irregularidades viziveis
"""

s_i_ped = (i_ped
           .select(
                F.col('order_id').alias('id_pedido'),
                F.col('order_item_id').alias('id_item'),
                F.col('product_id').alias('id_produto'),
                F.col('seller_id').alias('id_vendedor'),
                F.col('price').alias('preco_BRL'),
                F.col('freight_value').alias('preco_frete'),
                F.col('origem')
                )
           .filter(F.col('id_pedido').isNotNull())
           .filter(F.col('id_item').isNotNull())
           .filter(F.col('id_produto').isNotNull())
           .filter(F.col('id_vendedor').isNotNull())
           )
s_i_ped.display()

ft_pagamentos_pedidos

In [0]:
pg_ped = spark.read.table(f"{catalogo}.{bronze_schema}.ft_pagamentos_pedidos")
pg_ped.display()

In [0]:
"""
Analise ft_pagamentos_pedido:
    inicio (vizualmente):
        - Sem erro aparente:
            Tipagens corretas, sem irregularidades viziveis
    atividade:
        - map (ing -> pt)
"""

dict_ing_to_pt = {
    "credit_card": "Cartão de Crédito",
    "boleto": "Boleto",
    "voucher": "Voucher"
}
ks = list(dict_ing_to_pt.keys())
map_trans = F.create_map([F.lit(k) for kv in dict_ing_to_pt.items() for k in kv])

s_pg_ped = (pg_ped
            .select(
                F.col('order_id').alias('id_pedido'),
                F.col('payment_sequential').alias('codigo_pagamento'),
                F.col('payment_type').alias('forma_pagamento'),
                F.col('payment_installments').alias('parcelas'),
                F.col('payment_value').alias('valor_pagamento'),
                F.col('origem')
                )
            .withColumn('forma_pagamento', F.when(F.col('forma_pagamento').isin(ks), map_trans[F.col('forma_pagamento')]).otherwise('Outro'))
            .filter(F.col('id_pedido').isNotNull())
            )

ft_produtos

In [0]:
prod = spark.read.table(f"{catalogo}.{bronze_schema}.ft_produtos")
prod.display()

In [0]:
"""
Analise ft_pagamentos_pedido:
    inicio (vizualmente):
        - Sem erro aparente:
            Tipagens corretas, sem irregularidades viziveis
"""
s_prod = (prod
    .select(
        F.col("product_id").alias("id_produto"),
        F.col("product_category_name").alias("categoria_produto"),
        F.col("product_weight_g").alias("peso_produto_gramas"),
        F.col("product_length_cm").alias("comprimento_centimetros"),
        F.col("product_height_cm").alias("altura_centimetros"),
        F.col("product_width_cm").alias("largura_centimetros"),
        F.col("origem")
    )
    .filter(F.col('id_produto').isNotNull())
)

ft_vendedores

In [0]:
vend = spark.read.table(f"{catalogo}.{bronze_schema}.ft_vendedores")
vend.display()

In [0]:
cols = ["cep_vendedor", "cidade", "estado"]
s_vend = (vend
    .select(
        F.col("seller_id").alias("id_vendedor"),
        F.col("seller_zip_code_prefix").alias("cep_vendedor"),
        F.col("seller_city").alias("cidade"),
        F.col("seller_state").alias("estado"),
        F.col("origem")
    )
    .filter(F.col('id_vendedor').isNotNull())
    .na.drop(how="all", subset=cols) # remove apenas se os 3 forem nulos
)

dm_categoria_produtos_traducao

In [0]:
ct_pro_trad = spark.read.table(f"{catalogo}.{bronze_schema}.dm_categorias_produtos_traducao")
ct_pro_trad.display()

In [0]:
s_ct_pro_trad = (ct_pro_trad
    .select(
        F.col("product_category_name").alias("nome_produto_pt"),
        F.col("product_category_name_english").alias("nome_produto_en"),
        F.col("origem")
        )
)

dm_cotacao_dolar

In [0]:
cot_dol = spark.read.table(f"{catalogo}.{bronze_schema}.dm_cotacao_dolar")
cot_dol.display()

In [0]:
from pyspark.sql.window import Window
import pyspark.pandas as ps
import calendar

"""
Analise dm_cotacao_dolar:
    inicio (vizualmente):
        - Sem erro:
            cotacao -> double
            data -> string
        - ativiades:
            - cast de data para date (a hora não é relevante, visto que so temos a cotação de um momento do dia)
            - cast de cotacao para float
            - cotacao finais de semana
"""
# Posso criar um df contendo toda as datas entre o intervalo que tem no cot_dol
#   Pegar todos os valores de cot que existem
#   E para os que faltam pegar o ultimo disponivel 
s_cot_dol = (cot_dol
    .select(
        F.col("cotacaoCompra").alias("cotacao_dolar").try_cast('float'),
        F.col("dataHoraCotacao").alias("data").try_cast('date'),
        F.col("origem")
        )
    .orderBy(F.col("data").asc())
    )
begin = s_cot_dol.head()['data']
end = s_cot_dol.tail(1)[0]['data']

date_series = ps.date_range(start=begin, end=end, freq='D')
date_series = date_series.to_frame(name="data").to_spark()

test = s_cot_dol.join(date_series, on='data', how='left')

s_cot_dol = (
    date_series
    .join(s_cot_dol, on="data", how="left") 
    .withColumn('cotacao_dolar', F.last('cotacao_dolar', ignorenulls=True).over(w))
    .withColumn('origem', F.when(F.col('origem').isNull(), F.lit('BRONZE_TO_SILVER')).otherwise(F.col('origem')))
    .withColumn('data', F.col('data').cast('date'))
)

w = Window.orderBy("data").rowsBetween(Window.unboundedPreceding,0)
s_cot_dol.display()


ft_pedido_total

In [0]:
print(s_cs.columns)
print(s_ped.columns)
print(s_pg_ped.columns)
print(s_cot_dol.columns)

In [0]:
from pyspark.sql.types import DecimalType

ped_orf = (s_ped.join(s_cs, on='id_consumidor', how='left_anti'))
itens_of = (s_i_ped.join(s_ped, on='id_pedido', how='left_anti'))

ped_orf = ped_orf.count()
itens_of = itens_of.count()

if ped_orf:
    print(f'{ped_orf} pedidos orfaos')
    s_ped = (s_ped.join(s_cs, on='id_consumidor', how='left_semi'))

if pedi:
    print(f'{pedi} itens orfaos')
    s_i_ped = (s_i_ped.join(s_ped, on='id_pedido', how='left_semi'))


s_ped_total = (s_ped
               .select(
                    F.col('id_pedido'), 
                    F.col('id_consumidor'),
                    F.col('status'), 
                    F.col('pedido_compra_timestamp').alias('data').cast('date')
                    )
               .join(s_pg_ped.select(F.col('valor_pagamento'), F.col('parcelas'), F.col('id_pedido')) , on='id_pedido', how='left')
               .join(s_cot_dol.select(F.col('data'), F.col('cotacao_dolar')), on='data', how='left')
               .withColumn('valor_total_pago_brl', (F.col('valor_pagamento') * (F.col('parcelas'))).cast(DecimalType(10,2)))
               .withColumn('valor_total_pago_usd', (F.col('valor_pagamento') * (F.col('parcelas')/ F.col('cotacao_dolar'))).cast(DecimalType(10,2)))
               .withColumnRenamed('data', 'data_pedido')
               .withColumn('origem', F.array(F.lit('kaggle'), F.lit('API_BANCO_CENTRAL')))
               .select('id_pedido', 'id_consumidor', 'status', 'data_pedido', 'valor_total_pago_brl', 'valor_total_pago_usd', 'origem', 'cotacao_dolar')
               .orderBy('data_pedido')
               )


s_ped_total.display()


Ingetar todas as tabelas na camada Silver + data_ingestao

In [0]:
catalogo = 'atividade_2_rocketlab'
silver_schema = 'silver'

spark.sql(f'CREATE SCHEMA IF NOT EXISTS {catalogo}.{silver_schema}')

geo = spark.read.table(f"{catalogo}.{bronze_schema}.ft_geolocalizacao")

(geo
 .withColumn('data_ingestao',F.current_timestamp())
 .write.mode("overwrite").option("overwriteSchema", "true").format('delta').saveAsTable(f"{catalogo}.{silver_schema}.ft_geolocalizacao")
)
(s_av
 .withColumn('data_ingestao',F.current_timestamp())
 .write.mode("overwrite").option("overwriteSchema", "true").format('delta').saveAsTable(f"{catalogo}.{silver_schema}.ft_avaliacoes_pedidos")
)
(s_cs
 .withColumn('data_ingestao',F.current_timestamp())
 .write.mode("overwrite").option("overwriteSchema", "true").format('delta').saveAsTable(f"{catalogo}.{silver_schema}.ft_consumidores")
)
(s_i_ped
.withColumn('data_ingestao',F.current_timestamp())
.write.mode("overwrite").option("overwriteSchema", "true").format('delta').saveAsTable(f"{catalogo}.{silver_schema}.ft_itens_pedidos")
)
(s_ct_pro_trad
.withColumn('data_ingestao',F.current_timestamp())
.write.mode("overwrite").option("overwriteSchema", "true").format('delta').saveAsTable(f"{catalogo}.{silver_schema}.dm_categorias_produtos_traducao")
)
(s_cot_dol
.withColumn('data_ingestao',F.current_timestamp())
.write.mode("overwrite").option("overwriteSchema", "true").format('delta').saveAsTable(f"{catalogo}.{silver_schema}.dm_cotacao_dolar")
)
(s_pg_ped
.withColumn('data_ingestao',F.current_timestamp())
.write.mode("overwrite").option("overwriteSchema", "true").format('delta').saveAsTable(f"{catalogo}.{silver_schema}.ft_pagamentos_pedidos")
)
(s_ped_total
.withColumn('data_ingestao',F.current_timestamp())
.write.mode("overwrite").option("overwriteSchema", "true").format('delta').saveAsTable(f"{catalogo}.{silver_schema}.ft_pedido_total")
)
(s_ped
.withColumn('data_ingestao',F.current_timestamp())
.write.mode("overwrite").option("overwriteSchema", "true").format('delta').saveAsTable(f"{catalogo}.{silver_schema}.ft_pedidos")
)
(s_prod
.withColumn('data_ingestao',F.current_timestamp())
.write.mode("overwrite").option("overwriteSchema", "true").format('delta').saveAsTable(f"{catalogo}.{silver_schema}.ft_produtos")
)
(s_vend
.withColumn('data_ingestao',F.current_timestamp())
.write.mode("overwrite").option("overwriteSchema", "true").format('delta').saveAsTable(f"{catalogo}.{silver_schema}.ft_vendedores")
)
(s_prod
.withColumn('data_ingestao',F.current_timestamp())
.write.mode("overwrite").option("overwriteSchema", "true").format('delta').saveAsTable(f"{catalogo}.{silver_schema}.ft_produtos")
)