In [1]:
from pyspark.sql.types import StructType, StructField, StringType, DecimalType, TimestampType
from pyspark.sql.functions import create_map, lit

### Bronze Layer

In [None]:
def widget(name, default=None):
    try:
        return dbutils.widgets.get(name)
    except:
        return default



dbutils.widgets.text("pipeline_id", "")
dbutils.widgets.text("run_id", "")
dbutils.widgets.text("task_id", "")
dbutils.widgets.text("processed_timestamp", "")
dbutils.widgets.text("catalog", "")




pipleline_id = widget("pipeline_id")
run_id = widget("run_id")
task_id = widget("task_id")
processed_timestamp = widget("processed_timestamp")
catalog = widget("catalog")

To use databricks widgets interactively in your notebook, please install databricks sdk using:
	pip install 'databricks-sdk[notebook]'
Falling back to default_value_only implementation for databricks widgets.


In [23]:
schema = StructType([
    StructField("ride_id", StringType(), True),
    StructField("rideable_type", StringType(), True),
    StructField("started_at", TimestampType(), True),
    StructField("ended_at", TimestampType(), True),
    StructField("start_station_name", StringType(), True),
    StructField("start_station_id", StringType(), True),
    StructField("end_station_name", StringType(), True),
    StructField("end_station_id", StringType(), True),
    StructField("start_lat", DecimalType(), True),
    StructField("start_lng", DecimalType(), True),
    StructField("end_lat", DecimalType(), True),
    StructField("end_lng", DecimalType(), True),
    StructField("member_casual", StringType(), True)
])

In [None]:
df = (spark
        .read
        .csv(f"/Volumes/{catalog}/00_landing/src_citibank_data", schema=schema, header=True)
    )

In [None]:
df = df.withColumn("metadata",
                    create_map(
                        lit("pipeline_id"), lit(pipleline_id),
                        lit("run_id"), lit(run_id),
                        lit("task_id"), lit(task_id),
                        lit("processed_timestamp"), lit(processed_timestamp)
                    ))

In [None]:
(df.write
    .mode("overwrite")
    .option("overWriteSchema","true")
    .saveAsTable(f"{catalog}.01_bronze.jc_citibike")
)
