In [0]:
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
import datetime
month = datetime.datetime.now().month

In [0]:
@dlt.table
def df_orders():
    df = (spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("inferSchema", "true")
        .option("header", "true")
        .option("sep", ",")
        .load("s3://yadi-pipeline/month-{}/orders".format(month))
    )
    return df.withColumn("order_id", col("order_id").cast(IntegerType())).withColumn("user_id", col("user_id").cast(IntegerType())).withColumn("order_number", col("order_number").cast(IntegerType())).withColumn("order_dow", col("order_dow").cast(IntegerType())).withColumn("order_hour_of_day", col("order_hour_of_day").cast(IntegerType())).withColumn("days_since_prior_order", col("days_since_prior_order").cast(DoubleType()))

dlt.create_target_table("silver_order")

dlt.apply_changes(
  target = "silver_order",
  source = "df_orders",
  keys = ["order_id"],
  sequence_by = col("order_id")
)

@dlt.table
def df_products():
    df = (spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("inferSchema", "true")
        .option("header", "true")
        .option("sep", ",")
        .load("s3://yadi-pipeline/month-{}/products".format(month)))
    return df.withColumn("product_id", col("product_id").cast(IntegerType())).withColumn("aisle_id", col("aisle_id").cast(IntegerType())).withColumn("department_id", col("department_id").cast(IntegerType()))

dlt.create_target_table("silver_products")

dlt.apply_changes(
  target = "silver_products",
  source = "df_products",
  keys = ["product_id"],
  sequence_by = col("product_id")
)

@dlt.table
def df_departments():
    df = (spark
          .readStream
          .format("cloudFiles")
          .option("cloudFiles.format", "csv")
          .option("inferSchema", "true")
          .option("header", "true")
          .option("sep", ",")
          .load("s3://yadi-pipeline/month-{}/departments".format(month)))
    return df.withColumn("department_id", col("department_id").cast(IntegerType()))

dlt.create_target_table("silver_departments")

dlt.apply_changes(
  target = "silver_departments",
  source = "df_departments",
  keys = ["department_id"],
  sequence_by = col("department_id")
)

@dlt.table
def df_aisles():
    df = (spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("inferSchema", "true")
        .option("header", "true")
        .option("sep", ",")
        .load("s3://yadi-pipeline/month-{}/aisles".format(month)))
    return df.withColumn("aisle_id", col("aisle_id").cast(IntegerType()))

dlt.create_target_table("silver_aisles")

dlt.apply_changes(
  target = "silver_aisles",
  source = "df_aisles",
  keys = ["aisle_id"],
  sequence_by = col("aisle_id")
)

@dlt.table
def df_prior():
    df = (spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("inferSchema", "true")
        .option("header", "true")
        .option("sep", ",")
        .load("s3://yadi-pipeline/month-{}/order_products__prior".format(month))
    )
    return df.withColumn("order_id", col("order_id").cast(IntegerType())).withColumn("product_id", col("product_id").cast(IntegerType())).withColumn("add_to_cart_order", col("add_to_cart_order").cast(IntegerType())).withColumn("reordered", col("reordered").cast(IntegerType()))

dlt.create_target_table("silver_prior")

dlt.apply_changes(
  target = "silver_prior",
  source = "df_prior",
  keys = ["order_id", "product_id"],
  sequence_by = col("order_id")
)

@dlt.table
def df_train():
    df = (spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("inferSchema", "true")
        .option("header", "true")
        .option("sep", ",")
        .load("s3://yadi-pipeline/month-{}/order_products__train".format(month))
    )
    return df.withColumn("order_id", col("order_id").cast(IntegerType())).withColumn("product_id", col("product_id").cast(IntegerType())).withColumn("add_to_cart_order", col("add_to_cart_order").cast(IntegerType())).withColumn("reordered", col("reordered").cast(IntegerType()))

dlt.create_target_table("silver_train")

dlt.apply_changes(
  target = "silver_train",
  source = "df_train",
  keys = ["order_id", "product_id"],
  sequence_by = col("order_id")
)

@dlt.table
def df_pad():
    return (spark.sql("select p.product_id, p.product_name, p.aisle_id, a.aisle, p.department_id, d.department from LIVE.silver_products p, LIVE.silver_aisles a, LIVE.silver_departments d " + "where p.aisle_id == a.aisle_id and p.department_id == d.department_id " + "order by p.product_id").withColumnRenamed("aisle", "aisle_name").withColumnRenamed("department", "department_name"))

# dlt.create_streaming_live_table("silver_pad")

# dlt.apply_changes(
#   target = "silver_pad",
#   source = "df_pad",
#   key = ["product_id"],
#   sequence_by = col("ModifiedDate"),
#   apply_as_deletes = expr("operation = 'DELETE'"),
#   except_column_list = ["operation", "ModifiedDate"],
#   stored_as_scd_type = "2"  
# )

@dlt.table
def df_op():
    return (spark.sql("SELECT a.order_id, a.user_id, a.eval_set, a.order_number, a.order_dow, a.order_hour_of_day, a.days_since_prior_order, b.product_id, b.add_to_cart_order, b.reordered FROM LIVE.silver_order a JOIN LIVE.silver_prior b ON a.order_id = b.order_id WHERE a.eval_set = 'prior'"))

# dlt.create_streaming_live_table("silver_op")

# dlt.apply_changes(
#     target = "silver_op",
#     source = "df_op",
#     key = ["order_id"],
#     sequence_by = col("ModifiedDate"),
#     apply_as_deletes = expr("operation = 'DELETE'"),
#     except_column_list = ["operation", "ModifiedDate"],
#     stored_as_scd_type = "2"  
# )

@dlt.table
def df_opp():
    return (spark.sql("select p.order_id, o.user_id, p.product_id, a.product_name, a.aisle_id, a.aisle_name, a.department_id, a.department_name, p.add_to_cart_order, p.reordered from LIVE.silver_prior p, LIVE.silver_order o, LIVE.df_pad a " + "where p.order_id == o.order_id and p.product_id == a.product_id " + "order by p.order_id"))

# dlt.create_streaming_live_table("silver_opp")

# dlt.apply_changes(
#     target = "silver_opp",
#     source = "df_opp",
#     key = ["order_id"],
#     sequence_by = col("ModifiedDate"),
#     apply_as_deletes = expr("operation = 'DELETE'"),
#     except_column_list = ["operation", "ModifiedDate"],
#     stored_as_scd_type = "2"  
# )

@dlt.table
def df_otp():
    return (spark.sql("select t.order_id, o.user_id, t.product_id, a.product_name, a.aisle_id, a.aisle_name, a.department_id, a.department_name, t.add_to_cart_order, t.reordered from LIVE.silver_train t, LIVE.silver_order o, LIVE.df_pad a " + "where t.order_id == o.order_id and t.product_id == a.product_id " + "order by t.order_id"))

# dlt.create_streaming_live_table("silver_otp")

# dlt.apply_changes(
#     target = "silver_otp",
#     source = "df_otp",
#     key = ["order_id"],
#     sequence_by = col("ModifiedDate"),
#     apply_as_deletes = expr("operation = 'DELETE'"),
#     except_column_list = ["operation", "ModifiedDate"],
#     stored_as_scd_type = "2"  
# )
