In [0]:
import os
import sys
sys.path.append(os.path.abspath("../../src/"))

In [0]:
from heromotocomp.hero_utils.bike_utils import get_trip_duration_mins
from heromotocomp.global_utils.datetime_utils import timestamp_to_date_col
from pyspark.sql.functions import create_map, lit

In [0]:
dbutils.widgets.text("pipeline_id", "")
dbutils.widgets.text("run_id", "")
dbutils.widgets.text("task_id", "")
dbutils.widgets.text("processed_timestamp", "")
dbutils.widgets.text("catalog", "")

In [0]:
pipeline_id = dbutils.widgets.get("pipeline_id")
run_id = dbutils.widgets.get("run_id")
task_id = dbutils.widgets.get("task_id")
processed_timestamp = dbutils.widgets.get("processed_timestamp")
catalog = dbutils.widgets.get("catalog")

In [0]:
df = spark.read.table(f"{catalog}.bronze.bike_data")
df = get_trip_duration_mins(df, "started_at", "ended_at", "trip_duration_mins")
df = timestamp_to_date_col(df, "started_at", "trip_start_date")

In [0]:
df = df.withColumn("metadata", 
              create_map(
                  lit("pipeline_id"), lit(pipeline_id),
                  lit("run_id"), lit(run_id),
                  lit("task_id"), lit(task_id),
                  lit("processed_timestamp"), lit(processed_timestamp)
                  ))

In [0]:
df = df.select(
    "ride_id",
    "trip_start_date",
    "started_at",
    "ended_at",
    "start_station_name",
    "end_station_name",
    "trip_duration_mins",
    "metadata"
    )

In [0]:
df.write.\
    mode("overwrite").\
    option("overwriteSchema", "true").\
    saveAsTable(f"{catalog}.silver.bike_data_cleaned")