In [11]:
from pyspark.sql.types import StructType, StructField, StringType, DecimalType, TimestampType
from pyspark.sql.functions import create_map, lit

In [None]:
pipeline_id = dbutils.widgets.get("pipeline_id")
run_id = dbutils.widgets.get("run_id") 
task_id = dbutils.widgets.get("task_id")
processed_timestamp = dbutils.widgets.get("processed_timestamp")
catalog = dbutils.widgets.get("catalog")

In [12]:
schema = StructType([
    StructField("ride_id", StringType(), True),
    StructField("rideable_type", StringType(), True),
    StructField("started_at", TimestampType(), True),
    StructField("ended_at", TimestampType(), True),
    StructField("start_station_name", StringType(), True), 
    StructField("start_station_id", StringType(), True),   
    StructField("end_station_name", StringType(), True), 
    StructField("end_station_id", StringType(), True), 
    StructField("start_lat", DecimalType(), True), 
    StructField("start_lng", DecimalType(), True), 
    StructField("end_lat", DecimalType(), True), 
    StructField("end_lng", DecimalType(), True), 
    StructField("member_casual", StringType(), True), 
])

In [None]:
df = spark.read.csv(f"/Volumes/{catalog}/00_landing/source_citybike_data/JC-202503-citibike-tripdata.csv", schema=schema, header=True)

In [17]:
df.count()

73293

In [18]:
df.show(5)

+----------------+-------------+--------------------+--------------------+------------------+----------------+--------------------+--------------+---------+---------+-------+-------+-------------+
|         ride_id|rideable_type|          started_at|            ended_at|start_station_name|start_station_id|    end_station_name|end_station_id|start_lat|start_lng|end_lat|end_lng|member_casual|
+----------------+-------------+--------------------+--------------------+------------------+----------------+--------------------+--------------+---------+---------+-------+-------+-------------+
|29DAF43DD84B4B7A|electric_bike|2025-03-20 18:58:...|2025-03-20 19:00:...|   6 St & Grand St|           HB302|Mama Johnson Fiel...|         HB404|       41|      -74|     41|    -74|       member|
|B11B4220F7195025|electric_bike|2025-03-29 11:01:...|2025-03-29 11:11:...|  Heights Elevator|           JC059|        Jersey & 3rd|         JC074|       41|      -74|     41|    -74|       member|
|18D5B30305F602

In [None]:
df = df.withColumn("metadata", create_map(lit("pipeline_id"), lit(pipeline_id), 
                                          lit("run_id"), lit(run_id), 
                                          lit("task_id"), lit(task_id), 
                                          lit("processed_timestamp"), lit(processed_timestamp)))    

In [None]:
df.write.\
    mode("overwrite").\
    option("overwriteSchema", "true").\
    saveAsTable(f"{catalog}.01_bronze.jc_citibike")