In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, DecimalType, TimestampType

spark = SparkSession.builder.appName("Transformação e Ingestão Silver").getOrCreate()

In [0]:
def ingestao_silver(df, table):

    spark.sql(f"CREATE SCHEMA IF NOT EXISTS silver.delivery_center")
    
    df_silver = df.write.format("delta").mode("overwrite").saveAsTable(f"silver.delivery_center.{table}")

    return f"Tabela {table} criada na silver."

In [0]:
def renamed_columns(df):
    df = (
        df
        # channels
        .withColumnRenamed("channel_id", "id_canal") 
        .withColumnRenamed("channel_name", "nome_canal")
        .withColumnRenamed("channel_type", "tipo_canal")
        # deliveries
        .withColumnRenamed("delivery_id", "id_entrega")
        .withColumnRenamed("delivery_order_id", "id_pedido_entrega")
        .withColumnRenamed("driver_id", "id_entregador")
        .withColumnRenamed("delivery_distance_meters", "distancia_entrega_metros")
        .withColumnRenamed("delivery_status", "status_entrega")
        # drivers
        .withColumnRenamed("driver_id", "id_entregador")
        .withColumnRenamed("driver_modal", "modalidade_entregador")
        .withColumnRenamed("driver_type", "tipo_entregador")
        # hubs
        .withColumnRenamed("hub_id", "id_centro")
        .withColumnRenamed("hub_name", "nome_centro")
        .withColumnRenamed("hub_city", "cidade_centro")
        .withColumnRenamed("hub_state", "estado_centro")
        .withColumnRenamed("hub_latitude", "latitude_centro")
        .withColumnRenamed("hub_longitude", "longitude_centro")
        # orders
        .withColumnRenamed("order_id", "id_pedido")
        .withColumnRenamed("store_id", "id_loja")
        .withColumnRenamed("channel_id", "id_canal")
        .withColumnRenamed("payment_order_id", "id_pagamento_pedido")
        .withColumnRenamed("delivery_order_id", "id_entrega_pedido")
        .withColumnRenamed("order_status", "status_pedido")
        .withColumnRenamed("order_amount", "valor_pedido")
        .withColumnRenamed("order_delivery_fee", "taxa_entrega")
        .withColumnRenamed("order_delivery_cost", "custo_entrega")
        .withColumnRenamed("order_created_hour", "hora_criacao_pedido")
        # payments
        .withColumnRenamed("payment_id", "id_pagamento")
        .withColumnRenamed("payment_order_id", "id_pedido_pagamento")
        .withColumnRenamed("payment_amount", "valor_pagamento")
        .withColumnRenamed("payment_fee", "taxa_pagamento")
        .withColumnRenamed("payment_method", "metodo_pagamento")
        .withColumnRenamed("payment_status", "status_pagamento")
        # stores
        .withColumnRenamed("store_id", "id_loja")
        .withColumnRenamed("hub_id", "id_centro")
        .withColumnRenamed("store_segment", "segmento_loja")
        .withColumnRenamed("store_name", "nome_loja")
        .withColumnRenamed("store_plan_price", "preco_plano_loja")
        .withColumnRenamed("store_latitude", "latitude_loja")
        .withColumnRenamed("store_longitude", "longitude_loja")
    )

    return df

In [0]:
schema_dict = {
    "channels": StructType([
        StructField("channel_id", IntegerType(), False),
        StructField("channel_name", StringType(), True),
        StructField("channel_type", StringType(), True)
    ]),
    "deliveries": StructType([
        StructField("delivery_id", IntegerType(), False),
        StructField("delivery_order_id", IntegerType(), False),
        StructField("driver_id", IntegerType(), True),
        StructField("delivery_distance_meters", FloatType(), True),
        StructField("delivery_status", StringType(), True)
    ]),
    "drivers": StructType([
        StructField("driver_id", IntegerType(), False),
        StructField("driver_modal", StringType(), True),
        StructField("driver_type", StringType(), True)
    ]),
    "hubs": StructType([
        StructField("hub_id", IntegerType(), False),
        StructField("hub_city", StringType(), True),
        StructField("hub_state", StringType(), True),
        StructField("hub_latitude", DecimalType(15,0), True),
        StructField("hub_longitude", DecimalType(15,0), True)
    ]),
    "orders": StructType([
        StructField("order_id", IntegerType(), False),
        StructField("store_id", IntegerType(), False),
        StructField("channel_id", IntegerType(), False),
        StructField("payment_order_id", IntegerType(), False),
        StructField("delivery_order_id", IntegerType(), False),
        StructField("order_status", StringType(), True),
        StructField("order_amount", FloatType(), True),
        StructField("order_delivery_fee", FloatType(), True),
        StructField("order_delivery_cost", FloatType(), True),
        StructField("order_created_hour", TimestampType(), True)
    ]),
    "payments": StructType([
        StructField("payment_id", IntegerType(), False),
        StructField("payment_order_id", IntegerType(), False),
        StructField("paymet_amount", FloatType(), True),
        StructField("payment_fee", FloatType(), True),
        StructField("payment_method", StringType(), True),
        StructField("payment_status", StringType(), True)
    ]),
    "stores": StructType([
        StructField("store_id", IntegerType(), False),
        StructField("hub_id", IntegerType(), False),
        StructField("store_name", StringType(), True),
        StructField("store_segment", StringType(), True),
        StructField("store_plan_price", FloatType(), True),
        StructField("store_latitude", DecimalType(15,0), True),
        StructField("store_longitude", DecimalType(15,0), True)
    ])
}

tables = ["channels", "deliveries", "drivers", "hubs", "orders", "payments", "stores"]

for table in tables:
    df_bronze = spark.read.schema(schema_dict[table]).table(f"bronze.delivery_center.{table}")
    
    new_table = []
    if table == "channels":
        new_table.append("canais")
        df_bronze = df_bronze.select("channel_id", "channel_name", "channel_type")
    elif table == "deliveries":
        new_table.append("entregas")
        df_bronze = df_bronze.select("delivery_id", "delivery_order_id", "driver_id", "delivery_distance_meters", "delivery_status")
    elif table == "drivers":
        new_table.append("entregadores")
        df_bronze = df_bronze.select("driver_id", "driver_modal", "driver_type")
    elif table == "hubs":
        new_table.append("centros")
        df_bronze = df_bronze.select("hub_id", "hub_name", "hub_city", "hub_state", "hub_latitude", "hub_longitude")
    elif table == "orders":
        new_table.append("pedidos")
        df_bronze = df_bronze.select("order_id", "store_id", "channel_id", "payment_order_id", "delivery_order_id", "order_status", "order_amount", "order_delivery_fee", "order_delivery_cost", "order_created_hour")
    elif table == "payments":
        new_table.append("pagamentos")
        df_bronze = df_bronze.select("payment_id", "payment_order_id", "payment_amount", "payment_fee", "payment_method", "payment_status")
    elif table == "stores":
        new_table.append("lojas")
        df_bronze = df_bronze.select("store_id", "hub_id", "store_segment", "store_name", "store_plan_price", "store_latitude", "store_longitude")

    df_silver = renamed_columns(df_bronze)

    for tabela in new_table:
        ingestao = ingestao_silver(df_silver, tabela)
        print(ingestao)