In [1]:
import os
import sys

current_dir = os.getcwd()
project_root = os.path.abspath(os.path.join(current_dir,"..","..",".."))

sys.path.append(project_root)

In [2]:
from pyspark.sql.functions import create_map,lit

In [None]:
try:
    pipeline_id = dbutils.widgets.get("pipeline_id")
    run_id = dbutils.widgets.get("run_id")
    task_id = dbutils.widgets.get("task_id")
    processed_timestamp = dbutils.widgets.get("processed_timestamp")
    catalog = dbutils.widgets.get("catalog")
    
except KeyError:
    pipeline_id = "default_pipeline"
    run_id = "default_run"
    task_id = "default_task"
    processed_timestamp = ""

In [None]:
df = spark.read.table(f"{catalog}.01_bronze.jc_citibike")

In [4]:
df.show()

+----------------+-------------+--------------------+--------------------+------------------+----------------+--------------------+--------------+---------+---------+-------+-------+-------------+--------------------+
|         ride_id|rideable_type|          started_at|            ended_at|start_station_name|start_station_id|    end_station_name|end_station_id|start_lat|start_lng|end_lat|end_lng|member_casual|            metadata|
+----------------+-------------+--------------------+--------------------+------------------+----------------+--------------------+--------------+---------+---------+-------+-------+-------------+--------------------+
|29DAF43DD84B4B7A|electric_bike|2025-03-20 18:58:...|2025-03-20 19:00:...|   6 St & Grand St|           HB302|Mama Johnson Fiel...|         HB404|       41|      -74|     41|    -74|       member|{pipeline_id -> d...|
|B11B4220F7195025|electric_bike|2025-03-29 11:01:...|2025-03-29 11:11:...|  Heights Elevator|           JC059|        Jersey & 3

In [14]:
from src.citibike.citibike_utils import get_trip_duration_mins

In [15]:
df = get_trip_duration_mins(spark,df, "started_at", "ended_at", "trip_duration_mins")

In [16]:
from src.utils.datetime_utils import timestamp_to_date_col

In [17]:
df = timestamp_to_date_col(spark,df,"started_at","trip_start_date")

In [20]:
df = df.withColumn("metadata",
            create_map(
                lit("pipeline_id"),lit(pipeline_id),
                lit("run_id"),lit(run_id),
                lit("task_id"),lit(task_id),
                lit("processed_timestamp"),lit(processed_timestamp)
            ))

In [21]:
df = df.select(
    "ride_id",
    "trip_start_date",
    "started_at",
    "ended_at",
    "start_station_name",
    "end_station_name",
    "trip_duration_mins",
    "metadata"
)

In [22]:
display(df)

Unnamed: 0,ride_id,trip_start_date,started_at,ended_at,start_station_name,end_station_name,trip_duration_mins,metadata
0,29DAF43DD84B4B7A,2025-03-20,2025-03-20 18:58:31.217,2025-03-20 19:00:46.466,6 St & Grand St,Mama Johnson Field - 4 St & Jackson St,2.25,"{'pipeline_id': 'default_pipeline', 'run_id': 'default_run', 'task_id': 'default_task', 'processed_timestamp': ''}"
1,B11B4220F7195025,2025-03-29,2025-03-29 11:01:25.124,2025-03-29 11:11:09.383,Heights Elevator,Jersey & 3rd,9.733333,"{'pipeline_id': 'default_pipeline', 'run_id': 'default_run', 'task_id': 'default_task', 'processed_timestamp': ''}"
2,18D5B30305F602B9,2025-03-01,2025-03-01 16:05:32.346,2025-03-01 16:07:43.156,Jersey & 3rd,Hamilton Park,2.183333,"{'pipeline_id': 'default_pipeline', 'run_id': 'default_run', 'task_id': 'default_task', 'processed_timestamp': ''}"
3,532EB2D9DB68567D,2025-03-21,2025-03-21 18:44:15.137,2025-03-21 18:51:00.763,Jersey & 3rd,Jersey & 6th St,6.75,"{'pipeline_id': 'default_pipeline', 'run_id': 'default_run', 'task_id': 'default_task', 'processed_timestamp': ''}"
4,EA7C9C945D7D57AA,2025-03-20,2025-03-20 11:08:27.226,2025-03-20 11:12:28.545,6 St & Grand St,Madison St & 1 St,4.016667,"{'pipeline_id': 'default_pipeline', 'run_id': 'default_run', 'task_id': 'default_task', 'processed_timestamp': ''}"
5,DA232FF47222E86C,2025-03-13,2025-03-13 11:11:25.452,2025-03-13 11:15:29.146,6 St & Grand St,Madison St & 1 St,4.066667,"{'pipeline_id': 'default_pipeline', 'run_id': 'default_run', 'task_id': 'default_task', 'processed_timestamp': ''}"
6,416547516DE5132F,2025-03-28,2025-03-28 21:51:52.621,2025-03-28 21:57:01.336,Hilltop,Leonard Gordon Park,5.15,"{'pipeline_id': 'default_pipeline', 'run_id': 'default_run', 'task_id': 'default_task', 'processed_timestamp': ''}"
7,E25EDA33910F90F0,2025-03-13,2025-03-13 18:21:57.969,2025-03-13 18:26:48.536,Hilltop,Leonard Gordon Park,4.85,"{'pipeline_id': 'default_pipeline', 'run_id': 'default_run', 'task_id': 'default_task', 'processed_timestamp': ''}"
8,D209FF2521E26D16,2025-03-01,2025-03-01 14:59:20.947,2025-03-01 15:06:34.299,Jackson Square,Bergen Ave,7.233333,"{'pipeline_id': 'default_pipeline', 'run_id': 'default_run', 'task_id': 'default_task', 'processed_timestamp': ''}"
9,BC9F0D06A5AFF751,2025-03-04,2025-03-04 09:55:18.140,2025-03-04 09:59:59.031,6 St & Grand St,Southwest Park - Jackson St & Observer Hwy,4.683333,"{'pipeline_id': 'default_pipeline', 'run_id': 'default_run', 'task_id': 'default_task', 'processed_timestamp': ''}"


In [None]:
df.write.mode('overwrite').option("overwriteSchema","true").saveAsTable(f"{catalog}.02_silver.jc_citibike")