In [0]:
import dlt
from pyspark.sql.functions import col, to_date, count, sum, avg, round, current_timestamp

In [0]:
target="nyctaxi_test.default"
raw_path = "/Volumes/nyctaxi_test/default/simple_volume/nyctaxi_data.csv"

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType

nyc_taxi_schema = StructType([
    StructField("vendor_id", StringType(), True),
    StructField("pickup_datetime", TimestampType(), True),
    StructField("dropoff_datetime", TimestampType(), True),
    StructField("passenger_count", IntegerType(), True),
    StructField("trip_distance", DoubleType(), True),
    StructField("pickup_longitude", DoubleType(), True),
    StructField("pickup_latitude", DoubleType(), True),
    StructField("rate_code_id", IntegerType(), True),
    StructField("store_and_fwd_flag", StringType(), True),
    StructField("dropoff_longitude", DoubleType(), True),
    StructField("dropoff_latitude", DoubleType(), True),
    StructField("payment_type", StringType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("extra", DoubleType(), True),
    StructField("mta_tax", DoubleType(), True),
    StructField("tip_amount", DoubleType(), True),
    StructField("tolls_amount", DoubleType(), True),
    StructField("total_amount", DoubleType(), True)
])


In [0]:
@dlt.table(
  comment="This table is used to store the raw data from the bronze table.",
  table_properties={"quality":"bronze"}
)
def nyc_taxi_bronze():
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("header", "true")
        .schema(nyc_taxi_schema)
        .load(raw_path)
        .select(
            "*",
            current_timestamp().alias("processing_time")
        )
        .limit(10000)
    )

In [0]:
@dlt.table(
  comment="This table is used to store the cleaned data from the bronze table.",
  table_properties={"quality":"silver"}
)
def nyc_taxi_silver():
    return (
        dlt.read_stream("nyc_taxi_bronze")
        .filter(
            (col("fare_amount") > 0) &
            (col("trip_distance") > 0) &
            (col("pickup_datetime").isNotNull()) &
            (col("dropoff_datetime").isNotNull())
        )
        .select(
            col("vendor_id").cast("string"),
            col("pickup_datetime").cast("timestamp"),
            col("dropoff_datetime").cast("timestamp"),
            col("passenger_count").cast("int"),
            col("trip_distance").cast("double"),
            col("pickup_longitude").cast("double"),
            col("pickup_latitude").cast("double"),
            col("rate_code_id").cast("int"),
            col("store_and_fwd_flag").cast("string"),
            col("dropoff_longitude").cast("double"),
            col("dropoff_latitude").cast("double"),
            col("payment_type").cast("string"),
            col("fare_amount").cast("double"),
            col("extra").cast("double"),
            col("mta_tax").cast("double"),
            col("tip_amount").cast("double"),
            col("tolls_amount").cast("double"),
            col("total_amount").cast("double")
        )
    )



In [0]:
# @dlt.view(
#     comment="Intermediate view for daily revenue summary calculations."
# )
# def nyc_taxi_gold_daily_summary_view():
#     return (
#         dlt.read_stream("nyc_taxi_silver")
#         .withColumn("trip_date", to_date(col("pickup_datetime")))
#         .groupBy("trip_date")
#         .agg(
#             count("*").alias("total_trips"),
#             sum("passenger_count").alias("total_passengers"),
#             round(sum("total_amount"), 2).alias("total_revenue"),
#             round(avg("total_amount"), 2).alias("avg_revenue"),
#             round(avg("trip_distance"), 2).alias("avg_distance")
#         )
#     )

In [0]:
# @dlt.view(
#     comment="Daily revenue and tip summary grouped by payment type"
# )
# def nyctaxi_gold_payment_type_summary():
#     df = dlt.read("nyc_taxi_silver")
#     return (
#         df.withColumn("trip_date", to_date(col("pickup_datetime")))
#           .groupBy("trip_date", "payment_type")
#           .agg(
#               count("*").alias("total_trips"),
#               round(sum(col("total_amount")), 2).alias("total_revenue"),
#               round(sum(col("tip_amount")), 2).alias("total_tips")
#           )
#     )
