In [None]:
import os
import sys

# Add project root to Python path so that local imports work
# Assuming the project structure is like this:
# your_project/
# ├── src/
# │   ├── citibike/
# │   │   ├── __init__.py
# │   │   └── citibike_utils.py
# │   └── utils/
# │       ├── __init__.py
# │       └── datetime_utils.py
# └── 02_silver_citibike.ipynb

current_dir = os.getcwd()
project_root = os.path.abspath(os.path.join(current_dir, '..', '..', '..'))
sys.path.append(project_root)

In [11]:
from src.citibike.citibike_utils import get_trip_duration_mins
from src.utils.datetime_utils import timestamp_to_date_col
from pyspark.sql.functions import create_map, lit

In [12]:
# Databricks notebook source

def get_widget_or_default(name, default):
    try:
        return dbutils.widgets.get(name)
    except Exception:
        return default

pipeline_id = get_widget_or_default("pipeline_id", "default_pipeline")
run_id = get_widget_or_default("run_id", "default_run")
task_id = get_widget_or_default("task_id", "default_task")
processed_timestamp = get_widget_or_default("processed_timestamp", "2025-07-22T00:00:00Z")
catalog = get_widget_or_default("catalog", "citibike_dev")


# pipeline_id = dbutils.widgets.get("pipeline_id")
# run_id = dbutils.widgets.get("run_id")
# task_id = dbutils.widgets.get("task_id")
# processed_timestamp = dbutils.widgets.get("processed_timestamp")
# catalog = dbutils.widgets.get("catalog")

In [13]:
df = spark.read.table(f"{catalog}.01_bronze.jc_citibike")

In [14]:
df = get_trip_duration_mins(spark, df, "started_at", "ended_at", "trip_duration_mins")

In [15]:
df = timestamp_to_date_col(spark, df, "started_at", "trip_start_date")

In [16]:
df = df.withColumn("metadata", 
              create_map(
                  lit("pipeline_id"), lit(pipeline_id),
                  lit("run_id"), lit(run_id),
                  lit("task_id"), lit(task_id),
                  lit("processed_timestamp"), lit(processed_timestamp)
                  ))

In [17]:
df = df.select(
    "ride_id",
    "trip_start_date",
    "started_at",
    "ended_at",
    "start_station_name",
    "end_station_name",
    "trip_duration_mins",
    "metadata"
    )

In [18]:
df.write.\
    mode("overwrite").\
    option("overwriteSchema", "true").\
    saveAsTable(f"{catalog}.02_silver.jc_citibike")

HBox(children=(IntProgress(value=0, bar_style='success'), Label(value='')))