In [0]:
from pyspark.sql import functions as F

bronze_path = 'bikestore.bronze'
silver_path = 'bikestore.silver'

# Subconsulta order_items
order_items = (
    spark.table(bronze_path+".order_items")
    .select(
        "order_id",
        "item_id",
        "product_id",
        "quantity",
        "list_price",
        ((F.col("list_price") * F.col("quantity")) * (1 - F.col("discount")))
        .cast("double")
        .alias("total_sale"),
        "discount"
    )
)

# Tabelas principais
orders = spark.table(bronze_path+".orders")
stores = spark.table(bronze_path+".stores")
staffs = spark.table(bronze_path+".staffs")

# Join e seleção final
df_orders_silver = (
    orders
    .join(stores, orders.store_id == stores.store_id, "left")
    .join(staffs, orders.staff_id == staffs.staff_id, "left")
    .join(order_items, orders.order_id == order_items.order_id, "left")
    .select(
        orders.order_id,
        orders.customer_id,
        F.when(orders.order_status == 1, "Pending")
         .when(orders.order_status == 2, "Processing")
         .when(orders.order_status == 3, "Shipped")
         .when(orders.order_status == 4, "Delivered")
         .otherwise("Unknown")
         .alias("status"),
        orders.order_status,
        orders.order_date,
        orders.required_date,
        orders.shipped_date,
        stores.store_name,
        stores.state,
        stores.city,
        staffs.first_name.alias("first_name_staff"),
        staffs.active.alias("active_staff"),
        staffs.email,
        order_items.product_id,
        order_items.quantity,
        order_items.total_sale,
        order_items.list_price,
        order_items.discount
    )
)

df_orders_silver.write.mode("overwrite").\
                    option('mergeSchema', 'true').\
                    saveAsTable(silver_path+".orders")