In [16]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import create_map, lit



In [18]:
from citibike.citibike_utils import get_trip_duration_mins
from utils.datetime_utils import timestamp_to_date_col
from pyspark.sql.functions import create_map, lit

In [19]:
class MockDBUtils:
    class Widgets:
        def __init__(self):
            self.values = {}

        def text(self, name, default_value=""):
            self.values[name] = default_value

        def get(self, name):
            return self.values.get(name, "")

    def __init__(self):
        self.widgets = self.Widgets()

In [23]:
spark = SparkSession.builder \
    .appName("Citibike Data") \
    .getOrCreate()

In [24]:
dbutils.widgets.text("catalog", "citibike_dev")  # ✅ default catalog
dbutils.widgets.text("pipeline_id", "local_pipeline")
dbutils.widgets.text("run_id", "local_run")
dbutils.widgets.text("task_id", "local_task")
dbutils.widgets.text("processed_timestamp", "2025-09-14T00:00:00Z")

In [25]:
pipeline_id = dbutils.widgets.get("pipeline_id")
run_id = dbutils.widgets.get("run_id")
task_id = dbutils.widgets.get("task_id")
processed_timestamp = dbutils.widgets.get("processed_timestamp")
catalog = dbutils.widgets.get("catalog") or "citibike_dev"

In [26]:
print(f'Catalog = {catalog}')

Catalog = citibike_dev


In [27]:
df = spark.read.table(f"{catalog}.01_bronze.jc_citibike")

In [28]:
df = get_trip_duration_mins(spark, df, "started_at", "ended_at", "trip_duration_mins")

In [29]:
df = timestamp_to_date_col(spark, df, "started_at", "trip_start_date")

In [30]:
df = df.withColumn("metadata", 
              create_map(
                  lit("pipeline_id"), lit(pipeline_id),
                  lit("run_id"), lit(run_id),
                  lit("task_id"), lit(task_id),
                  lit("processed_timestamp"), lit(processed_timestamp)
                  ))

In [31]:
df = df.select(
    "ride_id",
    "trip_start_date",
    "started_at",
    "ended_at",
    "start_station_name",
    "end_station_name",
    "trip_duration_mins",
    "metadata"
    )

In [24]:
df.write \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(f"{catalog}.`02_silver`.jc_citibike")
