# City Safety Data - ETL - Sample

## About this Notebook

- This notebook performs a sample ETL operation using Microsoft Open datasets (city safety data).
- It **doesn't require** a default lakehouse attached and instead uses absolute paths to create/load managed tables.
- Can be run from any Fabric workspace as long as proper access to provided to write to the targets(workspace and lakehouse).
- Data can be loaded:
    - into a new table 
    - into an existing table in append mode  (by setting - cleanup = False)
    - into an existing table in overwrite mode (by setting - cleanup = True)

## Libraries

In [1]:
import json
from pyspark.sql.functions import lit, to_utc_timestamp, unix_timestamp, avg, max, min, sum, count
from delta.tables import DeltaTable
from typing import Optional

StatementMeta(, , -1, Cancelled, , Cancelled)

## Configuration

### Set External parameters

In [None]:
# Keep Only External parameters in this cell.
onelake_name = "onelake"
workspace_name = "ws-mdw"
lakehouse_name = "LH_Taxi"

StatementMeta(, , , Waiting, , Waiting)

In [None]:
print(f"{onelake_name = }")
print(f"{workspace_name = }")
print(f"{lakehouse_name = }")

StatementMeta(, , , Waiting, , Waiting)

### Set local parameters

In [None]:
# Complete paths - This way we are independent of local workspace - we can connect to any workspace and any lake house as long we have the proper access. 
lakehouse_table_name = "tbl_city_safety_data"
cities = ("Boston", "Chicago", "NewYorkCity", "Seattle", "SanFrancisco") # ("Boston", )

print(f"{workspace_name = }")
print(f"{cities = }")
print(f"{lakehouse_table_name = }")

# Microsoft Open dataset - Safety data - Ref: https://learn.microsoft.com/en-us/azure/open-datasets/dataset-new-york-city-safety?tabs=pyspark
# Azure storage access info  
blob_account_name = "azureopendatastorage"
blob_container_name = "citydatacontainer"
blob_relative_path = "Safety/Release/"
blob_sas_token = r""

onelake_path = f"abfss://{workspace_name}@{onelake_name}.dfs.fabric.microsoft.com/{lakehouse_name}.lakehouse"
onelake_file_path = f"{onelake_path}/Files"
onelake_table_path = f"{onelake_path}/Tables"

# Allow Spark remote read
wasbs_path = f"wasbs://{blob_container_name}@{blob_account_name}.blob.core.windows.net/{blob_relative_path}"
spark.conf.set( f"fs.azure.sas.{blob_container_name}.{blob_account_name}.blob.core.windows.net", blob_sas_token)

StatementMeta(, , , Waiting, , Waiting)

In [None]:
# Check onelake existence - otherwise abort notebook execution
error_message = f"Specfied lakehouse table path {onelake_table_path} doesn't exist. Ensure onelake={onelake_name}, workspace={workspace_name} and lakehouse={lakehouse_name} exist."
try:
    if not(mssparkutils.fs.exists(onelake_table_path)):
        raise ValueError("Encountered error while checking for Lakehouse table path specified.")
except Exception as e:
    print(f"Error message: {e}")
    # no further execution but Session is still active
    mssparkutils.notebook.exit(error_message)
else:
    print(f"Target table path: {onelake_table_path} is valid and exists.")
    print("Listing source data contents to check connectivity")
    print(mssparkutils.fs.ls(wasbs_path))

StatementMeta(, , , Waiting, , Waiting)

## Create data extraction and load functions

In [None]:
def identify_table_load_mode(table_name: str) -> bool:

    # Not so preferred option - works when we have a default lakehouse attached
    # load_mode = "append" if spark.catalog.tableExists(table_name) else "overwrite"

    # Preferred option - Assuming default lakehouse is not set, checking based on the delta path
    load_mode = "append" if DeltaTable.isDeltaTable(spark, f"{onelake_table_path}/{table_name}") else "overwrite"
    
    return  load_mode

def delete_delta_table(table_name: str) -> bool:

    delta_table_path = f"{onelake_table_path}/{table_name}"

    if mssparkutils.fs.exists(delta_table_path):
        print(f"Attempting to delete exisitng delta table with {delta_table_path = }....")

        try:
            mssparkutils.fs.rm(dir=delta_table_path, recurse=True)   
        except Exception as e:
            print(f"Deletion failed with the error:\n===={e}\n=====")
            raise
        else:
            print(f"Deleted exisitng delta table: {table_name}.")
    else:
        print(f"The specified delta table doesn't exist. No need for deletion.")

def transform_data(city: str, data_frame: object) -> object:

    # Need timezone to convert to UTC
    if city in ("Boston", "NewYorkCity"):
        timezone = "America/New_York"
    elif city in ("Seattle", "SanFrancisco"):
        timezone = "America/Los_Angeles"
    else:
        timezone = "America/Chicago"

    data_frame = data_frame\
        .withColumn("dateTimeUTC", to_utc_timestamp(data_frame.dateTime, timezone)) \
        .withColumn("City", lit(city))

    return data_frame

def etl_steps(table_name: str, cleanup: Optional[bool] = True) -> None:

    # Optionally delete existing contents
    delta_table_path = f"{onelake_table_path}/{table_name}"
    if cleanup:
        delete_delta_table(table_name)
        print(f"A new delta table '{table_name}' will be created with {delta_table_path = }")
    else:
        print("No request for cleanup. Proceeding to ETL steps.")
    
    for city in cities:
        print(f"ETL started for {city = }.")

        print(f"\t Data Extraction in progress.")
        city_calls_data_path = f"{wasbs_path}/city={city}"
        city_calls_df = spark.read.parquet(city_calls_data_path)

        print(f"\t Read {city_calls_df.count()} records for {city = }.")
        print(f"\t Data transformation in progress.")
        city_calls_df = transform_data(city, city_calls_df)

        delta_mode = identify_table_load_mode(table_name)
        print(f"\t Data loading in inprogress using {delta_mode} mode.")
        city_calls_df.write.format("delta").mode(delta_mode).save(delta_table_path) 
    
    print(f"\n=====\nCity safety data is loaded into {table_name =} for {cities =}\n=====")

    return None

def gather_city_level_metrics(table_name: str) -> None:

    delta_table = spark.read.format("delta").load(f"{onelake_table_path}/{table_name}")
    city_metrics = delta_table.groupBy("city").agg(  
        count("*").alias("count")  
        # avg("metric1").alias("avg_metric1"),  
        # max("metric2").alias("max_metric2"),  
        # min("metric3").alias("min_metric3"),  
        # sum("metric4").alias("sum_metric4")  
    )  
  
    display(city_metrics)

    return None




StatementMeta(, , , Waiting, , Waiting)

## Read Microsoft Open data sets and load the target table

In [None]:
# main function
try:
    etl_steps(table_name=lakehouse_table_name, cleanup=True)
except Exception as e:
    print(f"ETL step failed with error {e}")
    raise
else:
    gather_city_level_metrics(table_name=lakehouse_table_name)
finally:
    print("City safety processing is complete.")


StatementMeta(, , , Waiting, , Waiting)

## Visualize data

In [None]:
# WIP

StatementMeta(, , , Waiting, , Waiting)