### importando funcoes

In [0]:
from pyspark.sql.functions import col, initcap, when, to_timestamp, regexp_replace
from pyspark.sql.types import DecimalType
from delta.tables import DeltaTable

### tratamento tabela customers

In [0]:
# lê as camadas 
bronze_path = "dbfs:/mnt/olist/Bronze/olist_customers_dataset"
silver_path = "dbfs:/mnt/olist/Silver/olist_customers_dataset"

df_customers_bronze = spark.read.parquet(bronze_path)

# transformações
df_customers_bronze = (
    df_customers_bronze
    .withColumnRenamed('customer_id', 'cliente_id')
    .withColumnRenamed('customer_unique_id', 'id_unico_cliente')
    .withColumnRenamed('customer_zip_code_prefix', 'prefixo_cep')
    .withColumnRenamed('customer_city', 'cidade')
    .withColumnRenamed('customer_state', 'estado')
)

if DeltaTable.isDeltaTable(spark, silver_path):
    df_customer_silver = DeltaTable.forPath(spark, silver_path)
    df_customer_silver.alias("silver").merge(df_customers_bronze.alias("bronze"),"silver.cliente_id = bronze.cliente_id"
        ).whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute()
else:
    df_customers_bronze.write.format("delta").mode("overwrite").save(silver_path)

#display(spark.read.format("delta").load("dbfs:/mnt/olist/Silver/olist_customers_dataset"))

### tratamento tabela geolocation

In [0]:

# lê as camadas 
bronze_path = "dbfs:/mnt/olist/Bronze/olist_geolocation_dataset"
silver_path = "dbfs:/mnt/olist/Silver/olist_geolocation_dataset"

df_geolocation_bronze = spark.read.parquet(bronze_path)

# transformações
df_geolocation_bronze = (
    df_geolocation_bronze
    .withColumnRenamed("geolocation_zip_code_prefix", "prefixo_cep")
    .withColumnRenamed("geolocation_lat", "latitude")
    .withColumnRenamed("geolocation_lng", "longitude")
    .withColumnRenamed("geolocation_city", "cidade")
    .withColumnRenamed("geolocation_state", "estado")
    .withColumn("latitude", col("latitude").cast(DecimalType(18, 15)))
    .withColumn("longitude", col("longitude").cast(DecimalType(18, 15)))
    .withColumn("cidade", initcap(col("cidade")))
)

df_geolocation_bronze = df_geolocation_bronze.dropDuplicates(["prefixo_cep", "latitude", "longitude"])

if DeltaTable.isDeltaTable(spark, silver_path):
    df_geolocation_silver = DeltaTable.forPath(spark, silver_path)
    df_geolocation_silver.alias("silver").merge(df_geolocation_bronze.alias("bronze"),"silver.longitude = bronze.longitude AND silver.latitude = bronze.latitude AND silver.     prefixo_cep = bronze.prefixo_cep"
        ).whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute()
else:
    df_geolocation_bronze.write.format("delta").mode("overwrite").save(silver_path)

#display(spark.read.format("delta").load("dbfs:/mnt/olist/Silver/olist_geolocation_dataset"))

### tratamento tabela orders_itens

In [0]:
# lê as camadas 
bronze_path = "dbfs:/mnt/olist/Bronze/olist_order_items_dataset"
silver_path = "dbfs:/mnt/olist/Silver/olist_order_items_dataset"

df_orders_items_bronze = spark.read.parquet(bronze_path)

['order_id',
 'order_item_id',
 'product_id',
 'seller_id',
 'shipping_limit_date',
 'price',
 'freight_value']

# transformações
df_orders_items_bronze = (
    df_orders_items_bronze
    .withColumnRenamed("order_id", "pedido_id")
    .withColumnRenamed("order_item_id", "pedido_item_id")
    .withColumnRenamed("product_id", "produto_id")
    .withColumnRenamed("seller_id", "vendedor_id")
    .withColumnRenamed("shipping_limit_date", "data_limite_envio")
    .withColumnRenamed("price", "preco")
    .withColumnRenamed("freight_value", "valor_frete")
    .withColumn("preco", col("preco").cast(DecimalType(10, 2)))
    .withColumn("valor_frete", col("valor_frete").cast(DecimalType(10, 2)))
    
)

#df_orders_items_bronze = df_orders_items_bronze.dropDuplicates(["prefixo_cep", "latitude", "longitude"])

if DeltaTable.isDeltaTable(spark, silver_path):
    df_orders_items_silver = DeltaTable.forPath(spark, silver_path)
    df_orders_items_silver.alias("silver").merge(df_orders_items_bronze.alias("bronze"),"silver.pedido_id = bronze.pedido_id AND silver.pedido_item_id = bronze.pedido_item_id"
        ).whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute()
else:
    df_orders_items_bronze.write.format("delta").mode("overwrite").save(silver_path)

#display(spark.read.format("delta").load("dbfs:/mnt/olist/Silver/olist_order_items_dataset"))

### tratamento tabela payment

In [0]:

# lê as camadas 
bronze_path = "dbfs:/mnt/olist/Bronze/olist_order_payments_dataset"
silver_path = "dbfs:/mnt/olist/Silver/olist_order_payments_dataset"

df_payments_bronze = spark.read.parquet(bronze_path)


# transformações
df_payments_bronze = (
    df_payments_bronze
    .withColumnRenamed("order_id", "pedido_id")
    .withColumnRenamed("payment_sequential", "sequencial_pagamento")
    .withColumnRenamed("payment_type", "tipo_pagamento")
    .withColumnRenamed("payment_installments", "parcelas_pagamento")
    .withColumnRenamed("payment_value", "valor_pagamento")
    .withColumn("valor_pagamento", col("valor_pagamento").cast(DecimalType(10, 2)))
    .withColumn("tipo_pagamento_formatado",
                 when(col("tipo_pagamento") == "boleto", "Boleto")
                .when(col("tipo_pagamento") == "not_defined", "Indefinido")
                .when(col("tipo_pagamento") == "credit_card", "Cartao de Credito")
                .when(col("tipo_pagamento") == "voucher", "PIX")
                .when(col("tipo_pagamento") == "debit_card", "Cartao de Debito")
                .otherwise(col("tipo_pagamento")))
    
)


if DeltaTable.isDeltaTable(spark, silver_path):
    df_payments_silver = DeltaTable.forPath(spark, silver_path)
    df_payments_silver.alias("silver").merge(df_payments_bronze.alias("bronze"),"silver.pedido_id = bronze.pedido_id AND silver.sequencial_pagamento = bronze.sequencial_pagamento"
        ).whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute()
else:
    df_payments_bronze.write.format("delta").mode("overwrite").save(silver_path)

#display(spark.read.format("delta").load("dbfs:/mnt/olist/Silver/olist_order_payments_dataset"))

### tratamento tabela reviews

In [0]:

# lê as camadas 
bronze_path = "dbfs:/mnt/olist/Bronze/olist_order_reviews_dataset"
silver_path = "dbfs:/mnt/olist/Silver/olist_order_reviews_dataset"

df_reviews_bronze = spark.read.parquet(bronze_path)

# transformações

df_reviews_bronze = df_reviews_bronze.withColumn("review_score", col("review_score").cast("int"))

df_reviews_bronze = df_reviews_bronze.dropna(subset=["review_score"])

df_reviews_bronze = (
    df_reviews_bronze
    .withColumnRenamed("review_id", "avaliacao_id")
    .withColumnRenamed("order_id", "pedido_id")
    .withColumnRenamed("review_score", "avaliacao_score")
    .withColumnRenamed("review_comment_title", "titulo_comentario_avaliacao")
    .withColumnRenamed("review_comment_message", "mensagem_comentario_avaliacao")
    .withColumnRenamed("review_creation_date", "data_criacao_avaliacao")
    .withColumnRenamed("review_answer_timestamp", "data_resposta_avaliacao")
    .withColumn("avaliacao_score", col("avaliacao_score").cast("int"))
    .withColumn("data_criacao_avaliacao", to_timestamp("data_criacao_avaliacao", "yyyy-MM-dd HH:mm:ss"))
    .withColumn("data_resposta_avaliacao", to_timestamp("data_resposta_avaliacao", "yyyy-MM-dd HH:mm:ss"))
)

if DeltaTable.isDeltaTable(spark, silver_path):
    df_reviews_silver = DeltaTable.forPath(spark, silver_path)
    df_reviews_silver.alias("silver").merge(df_reviews_bronze.alias("bronze"),"silver.avaliacao_id = bronze.avaliacao_id AND silver.pedido_id = bronze.pedido_id"
        ).whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute()
else:
    df_reviews_bronze.write.format("delta").mode("overwrite").save(silver_path)

#display(spark.read.format("delta").load("dbfs:/mnt/olist/Silver/olist_order_reviews_dataset"))


### tratamento tabela orders

In [0]:
# lê as camadas 
bronze_path = "dbfs:/mnt/olist/Bronze/olist_orders_dataset"
silver_path = "dbfs:/mnt/olist/Silver/olist_orders_dataset"

df_orders_bronze = spark.read.parquet(bronze_path)

# transformações

df_orders_bronze = (
    df_orders_bronze
    .withColumnRenamed("order_id", "pedido_id")
    .withColumnRenamed("customer_id", "cliente_id")
    .withColumnRenamed("order_status", "status_pedido")
    .withColumnRenamed("order_purchase_timestamp", "data_compra_pedido")
    .withColumnRenamed("order_approved_at", "data_aprovacao_pedido")
    .withColumnRenamed("order_delivered_carrier_date", "data_entrega_operadora_pedido")
    .withColumnRenamed("order_delivered_customer_date", "data_entrega_cliente_pedido")
    .withColumnRenamed("order_estimated_delivery_date", "data_entrega_estimada_pedido")
    .withColumn("status_pedido_formatado",
                 when(col("status_pedido") == "shipped", "Enviado")
                .when(col("status_pedido") == "canceled", "Cancelado")
                .when(col("status_pedido") == "approved", "Aprovado")
                .when(col("status_pedido") == "invoiced", "Faturado")
                .when(col("status_pedido") == "delivered", "Entregue")
                .when(col("status_pedido") == "unavailable", "Indisponivel")
                .when(col("status_pedido") == "processing", "Processando")
                .when(col("status_pedido") == "created", "Criado")
                .otherwise(col("status_pedido")))
)

if DeltaTable.isDeltaTable(spark, silver_path):
    df_orders_silver = DeltaTable.forPath(spark, silver_path)
    df_orders_silver.alias("silver").merge(df_orders_bronze.alias("bronze"),"silver.pedido_id = bronze.pedido_id"
        ).whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute()
else:
    df_orders_bronze.write.format("delta").mode("overwrite").save(silver_path)

#display(spark.read.format("delta").load("dbfs:/mnt/olist/Silver/olist_orders_dataset"))

### tratamento tabela products



In [0]:
# lê as camadas 
bronze_path = "dbfs:/mnt/olist/Bronze/olist_products_dataset"
silver_path = "dbfs:/mnt/olist/Silver/olist_products_dataset"

df_products_bronze = spark.read.parquet(bronze_path)

# transformações
df_products_bronze = (
    df_products_bronze
    .withColumnRenamed("product_id", "produto_id")
    .withColumnRenamed("product_category_name", "nome_categoria_produto")
    .withColumnRenamed("product_name_lenght", "tamanho_nome_produto")
    .withColumnRenamed("product_description_lenght", "tamanho_descricao_produto")
    .withColumnRenamed("product_photos_qty", "quantidade_fotos_produto")
    .withColumnRenamed("product_weight_g", "peso_produto")
    .withColumnRenamed("product_length_cm", "comprimento_produto")
    .withColumnRenamed("product_height_cm", "altura_produto")
    .withColumnRenamed("product_width_cm", "largura_produto")
    .withColumn("nome_categoria_produto_formatado", regexp_replace(col("nome_categoria_produto"), "_", " "))
    .withColumn("nome_categoria_produto_formatado", regexp_replace(col("nome_categoria_produto_formatado"), "2", ""))
    .withColumn("nome_categoria_produto_formatado", initcap(col("nome_categoria_produto_formatado")))
)


if DeltaTable.isDeltaTable(spark, silver_path):
    df_products_silver = DeltaTable.forPath(spark, silver_path)
    df_products_silver.alias("silver").merge(df_products_bronze.alias("bronze"),"silver.produto_id = bronze.produto_id"
        ).whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute()
else:
    df_products_bronze.write.format("delta").mode("overwrite").save(silver_path)

#display(spark.read.format("delta").load("dbfs:/mnt/olist/Silver/olist_products_dataset"))

### tratamento tabela sellers


In [0]:
# lê as camadas 
bronze_path = "dbfs:/mnt/olist/Bronze/olist_sellers_dataset"
silver_path = "dbfs:/mnt/olist/Silver/olist_sellers_dataset"

df_sellers_bronze = spark.read.parquet(bronze_path)

# transformações
df_sellers_bronze = (
    df_sellers_bronze
    .withColumnRenamed("seller_id", "vendedor_id")
    .withColumnRenamed("seller_zip_code_prefix", "cep_prefixo_vendedor")
    .withColumnRenamed("seller_city", "cidade_vendedor")
    .withColumnRenamed("seller_state", "estado_vendedor")
    )


if DeltaTable.isDeltaTable(spark, silver_path):
    df_sellers_silver = DeltaTable.forPath(spark, silver_path)
    df_sellers_silver.alias("silver").merge(df_sellers_bronze.alias("bronze"),"silver.vendedor_id = bronze.vendedor_id"
        ).whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute()
else:
    df_sellers_bronze.write.format("delta").mode("overwrite").save(silver_path)

#display(spark.read.format("delta").load("dbfs:/mnt/olist/Silver/olist_sellers_dataset"))