### Orders Table

In [0]:
# Create Silver schema
spark.sql("CREATE SCHEMA IF NOT EXISTS main.instacart_silver")


In [0]:
from pyspark.sql.functions import col

bronze_orders = spark.table("main.instacart_bronze.orders")

silver_orders = bronze_orders.select(
    col("order_id").cast("int"),
    col("user_id").cast("int"),
    col("eval_set"),
    col("order_number").cast("int"),
    col("order_dow").cast("int"),
    col("order_hour_of_day").cast("int"),
    col("days_since_prior_order").cast("float")
)


In [0]:
#Data Quality Filter
silver_orders = silver_orders.filter(col("order_id").isNotNull())


In [0]:
#Save as Managed Silver Table
silver_orders.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("main.instacart_silver.orders")


In [0]:
#Validate
spark.sql("SELECT COUNT(*) FROM main.instacart_silver.orders").show()
spark.table("main.instacart_silver.orders").printSchema()


### Products Table

In [0]:
bronze_products = spark.table("main.instacart_bronze.products")
bronze_products.printSchema()


In [0]:
from pyspark.sql.functions import expr, col

bronze_products = spark.table("main.instacart_bronze.products")

silver_products = bronze_products.select(
    expr("try_cast(product_id as string)").alias("product_id"),
    col("product_name"),
    expr("try_cast(aisle_id as string)").alias("aisle_id"),
    expr("try_cast(department_id as string)").alias("department_id")
)


In [0]:
silver_products = silver_products.filter(
    col("product_id").isNotNull() &
    col("aisle_id").isNotNull() &
    col("department_id").isNotNull()
)


In [0]:
silver_products.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("main.instacart_silver.products")


In [0]:
spark.sql("SELECT COUNT(*) FROM main.instacart_silver.products").show()


### Order Items Table


In [0]:
# Read both bronze tables
prior = spark.table("main.instacart_bronze.order_products__prior")
train = spark.table("main.instacart_bronze.order_products__train")


In [0]:
# Clean + Cast Both
from pyspark.sql.functions import expr

prior_clean = prior.select(
    expr("try_cast(order_id as int)").alias("order_id"),
    expr("try_cast(product_id as int)").alias("product_id"),
    expr("try_cast(add_to_cart_order as int)").alias("add_to_cart_order"),
    expr("try_cast(reordered as int)").alias("reordered")
)

train_clean = train.select(
    expr("try_cast(order_id as int)").alias("order_id"),
    expr("try_cast(product_id as int)").alias("product_id"),
    expr("try_cast(add_to_cart_order as int)").alias("add_to_cart_order"),
    expr("try_cast(reordered as int)").alias("reordered")
)


In [0]:
# Union of both tables
order_items = prior_clean.unionByName(train_clean)


In [0]:
# Remove Bad Records
from pyspark.sql.functions import col

order_items = order_items.filter(
    col("order_id").isNotNull() &
    col("product_id").isNotNull()
)

In [0]:
# Save
order_items.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("main.instacart_silver.order_items")


In [0]:
# Validate
spark.sql("SELECT COUNT(*) FROM main.instacart_silver.order_items").show()


### Departments Table

In [0]:
from pyspark.sql.functions import expr, col

bronze_departments = spark.table("main.instacart_bronze.departments")

silver_departments = bronze_departments.select(
    expr("try_cast(department_id as int)").alias("department_id"),
    col("department")
).filter(col("department_id").isNotNull())


In [0]:
silver_departments.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("main.instacart_silver.departments")


### Aisles Table

In [0]:
bronze_aisles = spark.table("main.instacart_bronze.aisles")

silver_aisles = bronze_aisles.select(
    expr("try_cast(aisle_id as int)").alias("aisle_id"),
    col("aisle")
).filter(col("aisle_id").isNotNull())


In [0]:
silver_aisles.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("main.instacart_silver.aisles")
