# 04. Drivers Application Status

## Airflow DAG

Importing libraries

In [None]:
from airflow import DAG
from airflow.providers.standard.operators.python import PythonOperator, BranchPythonOperator
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.providers.standard.operators.empty import EmptyOperator

from pendulum import duration
from datetime import datetime, timezone
import requests


Check if the date equals today by looking at the lastupdate field in the API response. If it's up to date, return true, otherwise return false

In [None]:
def is_today() -> bool:
    url = "https://data.cityofnewyork.us/resource/dpec-ucu7.json?$limit=20&$order=lastupdate DESC"
    try:
        response = requests.get(url, timeout=20)
        data = response.json()
        today = datetime.now(timezone.utc).strftime('%Y-%m-%d')
        
        for row in data:
            lastupdate = row.get('lastupdate', '')
            if lastupdate and lastupdate.startswith(today):
                return True
        return False
    except:
        return False

Return 'download' or 'skip' based on the is_today() function

In [None]:
def decide(**context) -> str:
    return 'download' if is_today() else 'skip'

Get the data from the API and save it as a CSV file in the local directory

In [None]:
def get_data(file_name: str) -> None:
    import csv

    try:
        url = "https://data.cityofnewyork.us/resource/dpec-ucu7.json?$limit=10000"

        response = requests.get(url, timeout=60)
        response.raise_for_status()
        data = response.json()
        if not data:
            raise RuntimeError("No data received from the API")

        keys = data[0].keys()

        with open(file_name, mode='w', newline='', encoding='utf-8') as file:
            writer = csv.DictWriter(file, fieldnames=keys)
            writer.writeheader()
            writer.writerows(data)

    except Exception as e:
        raise RuntimeError(f"Failed to get data: {str(e)}")

Connect with Databricks environement and upload the CSV file from the local directory to Databricks volume

In [None]:
def upload_to_volume(**context):
    from airflow.sdk.bases.hook import BaseHook
    from databricks.sdk import WorkspaceClient

    ds = context['ds']
    file_name = f"tlc_driver_application_{ds}.csv"

    local_file = f"/airflow/{file_name}"
    volume_file = f"/Volumes/driver_app_status/raw_data/raw_data/{file_name}"

    conn = BaseHook.get_connection('databricks_ingestion')
    host = conn.host.rstrip('/')
    token = conn.password or conn.extra_dejson.get('token')
    w = WorkspaceClient(host=host, token=token)

    with open(local_file, "rb") as f:
        w.files.upload(volume_file, f, overwrite=True)

In [None]:
default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 2,
    "retry_delay": duration(minutes=2),
}

Define the DAG with the specified parameters and tasks

In [None]:
with DAG(
    dag_id="drivers_status_dag",
    start_date=datetime(2026, 1, 15),
    #end_date=datetime(2026, 2, 15),
    schedule="0 20 * * *",
    catchup=False,
    default_args=default_args,
    tags=["databricks", "drivers", "daily"],
) as dag:


    check_today = BranchPythonOperator(
        task_id='check_today',
        python_callable=decide,
    )


    download = PythonOperator(
        task_id="download",
        python_callable=get_data,
        op_kwargs={"file_name": "/airflow/tlc_driver_application_{{ds}}.csv"},
       
    )

    ingest_csv = PythonOperator(
        task_id='ingest_csv',
        python_callable=upload_to_volume,
    )

    skip = EmptyOperator(task_id='skip')

In [None]:
    check_today >> [download, skip]
    download >> ingest_csv

![Airflow](images/air01.jpg)

## Databricks

#### 1. Setting up project environment
Creating catalog, schemas, volume

In [None]:
%sql
CREATE CATALOG driver_app_status

'raw_data' will be used for uploaded .csv files, 'dlt_schema' for all other data

In [None]:
%sql
CREATE SCHEMA driver_app_status.raw_data;

CREATE SCHEMA driver_app_status.dlt_schema;


In [None]:
%sql
USE CATALOG driver_app_status;
USE SCHEMA raw_data;

In [None]:
%sql
CREATE VOLUME raw_data
  COMMENT 'CSV raw file';

#### 2. Defining input
Saving data to delta table and adding a widget for current date (will be set up dynamically in job setting later) 

In [None]:
dbutils.widgets.text(
    name="file_date",
    defaultValue=""
)

file_date = dbutils.widgets.get("file_date")

if not file_date:
    from datetime import date
    file_date = date.today().strftime("%Y-%m-%d")

file_path = f"/Volumes/driver_app_status/raw_data/raw_data/tlc_driver_application_{file_date}.csv"

In [None]:
from pyspark.sql.functions import *

df = (spark.read
    .format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load(file_path)
)

In [None]:
df.write \
  .format("delta") \
  .mode("append") \
  .option("overwriteSchema", "true") \
  .saveAsTable("driver_app_status.dlt_schema.raw_drivers")

#### 3. Ingestion bronze
Defining exception rules for bronze layer

In [None]:
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *


exception_rules ={
    "app_no" : "app_no IS NOT NULL",
    "lastupdate" : "lastupdate IS NOT NULL",
    "status" : "status IS NOT NULL",
}

columns = ["app_date", "app_no", "type", "status", "lastupdate"]


Creating bronze table - raw data with basic quality checks

In [None]:
@dlt.table(
  name="drivers_bronze"
)

@dlt.expect_all_or_drop(exception_rules)


def drivers_bronze():
    df=spark.readStream.table("driver_app_status.dlt_schema.raw_drivers")
    df = df.select(columns)
    return df

#### 4. Transform drivers
Creating silver view for cleaned and transformed data

In [None]:
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *

@dlt.view(
    name="drivers_silver_view"
)

Basic silver transformations

In [None]:
def drivers_silver_view():
    df = spark.readStream.table("drivers_bronze")
    df = df.withColumn("app_date", to_date(col("app_date"), "yyyy-MM-dd"))
    df = df.withColumn("lastupdate", to_date(col("lastupdate"), "yyyy-MM-dd"))
    df = df.filter(col("app_date") >= '2025-01-01')
    return df

Creating silver streaming table (SCD Type 1) - keeps only the latest record per 'app_no'

In [None]:
dlt.create_streaming_table(
    name = "drivers_silver"
)

Applying auto CDC flow for upserts based on 'lastupdate' (SCD Type 1 - overwrite with latest record)

In [None]:
dlt.create_auto_cdc_flow(
    target = "drivers_silver", 
    source = "drivers_silver_view",
    keys = ["app_no"],
    sequence_by = "lastupdate",
    ignore_null_updates=False,
    apply_as_deletes=None,
    apply_as_truncates=None,
    column_list=None,
    except_column_list=None,
    stored_as_scd_type = 1, 
    track_history_column_list=None,
    track_history_except_column_list=None
)

#### 5. Creating gold streaming table

In [None]:
import dlt

dlt.create_streaming_table(
    name = "dim_drivers"
)

Defining the flow from silver view and storing as SCD type 2

In [None]:
dlt.create_auto_cdc_flow(
    target = "dim_drivers",
    source = "drivers_silver_view",
    keys = ["app_no"],
    sequence_by = "lastupdate",
    ignore_null_updates=False,
    apply_as_deletes=None,
    apply_as_truncates=None,
    column_list=None,
    except_column_list=None,
    stored_as_scd_type = 2,
    track_history_column_list=None,
    track_history_except_column_list=None
)


#### 6. Final results 
Creating a view for Slowly Changing Dimension Type 2

In [None]:
%sql
CREATE VIEW IF NOT EXISTS driver_app_status.dlt_schema.drivers_scd2 AS
SELECT app_date,
  app_no,
	type,
  status,
	MIN(lastupdate) AS start_date,
	MAX(lastupdate) AS end_date
FROM driver_app_status.dlt_schema.dim_drivers
--WHERE app_no = '6129488'
--WHERE app_no = '6117523'
GROUP BY app_date, app_no, type, status
ORDER BY app_no, start_date


Based on the created view, we can see the final SCD type 2 table, which tracks the changes accross dimensions (app_date, app_no and type are not changing). It indicates the current status of a driver application in a defined period of time and holds much less data

In [None]:
%sql

SELECT * FROM driver_app_status.dlt_schema.drivers_scd2
WHERE app_no = '6131556'

app_date,app_no,type,status,start_date,end_date
2026-01-30,6131556,HDR,Incomplete,2026-02-02,2026-02-04
2026-01-30,6131556,HDR,Under Review,2026-02-05,2026-02-09


Below is a default Databricks SCD type 2 format based on gold layer with generated _START_AT and _END_AT columns. It holds a lot more data and treats given end date as a start date of a new day, which makes it is less desirable for both reasons


In [None]:
%sql
SELECT * FROM driver_app_status.dlt_schema.dim_drivers
WHERE app_no = '6131556'
ORDER BY lastupdate

app_date,app_no,type,status,lastupdate,__START_AT,__END_AT
2026-01-30,6131556,HDR,Incomplete,2026-02-02,2026-02-02,2026-02-03
2026-01-30,6131556,HDR,Incomplete,2026-02-03,2026-02-03,2026-02-04
2026-01-30,6131556,HDR,Incomplete,2026-02-04,2026-02-04,2026-02-05
2026-01-30,6131556,HDR,Under Review,2026-02-05,2026-02-05,2026-02-06
2026-01-30,6131556,HDR,Under Review,2026-02-06,2026-02-06,2026-02-07
2026-01-30,6131556,HDR,Under Review,2026-02-07,2026-02-07,2026-02-08
2026-01-30,6131556,HDR,Under Review,2026-02-08,2026-02-08,2026-02-09
2026-01-30,6131556,HDR,Under Review,2026-02-09,2026-02-09,
