In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.types import *
from delta import *
from delta.pip_utils import configure_spark_with_delta_pip
import itertools
from pyspark.sql import functions as func
from pyspark.sql import Window
from datetime import date, timedelta
import time

In [None]:
def path_exists(path):
  try:
    dbutils.fs.ls(path)
    return True
  except Exception as e:
    if 'java.io.FileNotFoundException' in str(e):
      return False
    else:
      raise

In [None]:
appId = ''
clientSecret = ''
tenantId = ''
container_name = ''
storage_account_name = ''

In [None]:
configs = {"fs.azure.account.auth.type": "OAuth",
       "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
       "fs.azure.account.oauth2.client.id": appId,
       "fs.azure.account.oauth2.client.secret": clientSecret,
       "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{tenantId}/oauth2/token",
       "fs.azure.createRemoteFileSystemDuringInitialization": "true"}

if not path_exists('/mnt/chicago-crash'):
       dbutils.fs.mount(
       source = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/",
       mount_point = "/mnt/chicago-crash",
       extra_configs = configs)

In [None]:
conf = SparkConf()
conf.set('spark.jars.packages', "io.delta:delta-iceberg_2.12:2.3.0.0")
conf.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
conf.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
conf.set('spark.databricks.delta.schema.autoMerge.enabled', 'true')
spark = SparkSession.builder \
    .appName('CrawlData') \
    .config(conf=conf)
    
spark = configure_spark_with_delta_pip(spark).getOrCreate()

In [None]:
from datetime import datetime, timedelta
import requests

def get_crash_data(days_ago=1):
    limit, offset = 10000, 0
    current_time = datetime.now()
    filter_date = current_time - timedelta(days=days_ago)
    watermarkTable = DeltaTable.forPath(spark, '/mnt/chicago-crash/raw/watermark')
    while True:
        response = requests.get(f"https://data.cityofchicago.org/resource/85ca-t3if.json?$where=crash_date>'{filter_date.date()}'&$limit={limit}&$offset={offset}")
        offset += 10000
        if len(response.json()) == 0:
            break
        if response.status_code != 200:
            break
        crashes_df = spark.createDataFrame(response.json()) \
            .withColumn('year', func.year(func.to_date(func.col('crash_date')))) \
            .withColumn('month', func.month(func.to_date(func.col('crash_date')))) \
            .withColumn('day', func.month(func.to_date(func.col('crash_date')))) \
            .na.fill('N/A')
        
        
        max_timestamp = crashes_df.select(func.max(func.to_timestamp("crash_date")).alias('max_timestamp')) \
                .collect()[0][0]
                
        insert_watermark = spark.createDataFrame(data=[['Crashes', max_timestamp]], schema=['TableName', 'WatermarkValue'])
        watermarkTable.alias('a') \
                .merge(insert_watermark.alias('b'), condition='a.TableName = b.TableName') \
                .whenMatchedUpdate(condition='b.WatermarkValue > a.WatermarkValue', set={'WatermarkValue': 'b.WatermarkValue'}) \
                .whenNotMatchedInsertAll() \
                .execute()
                
        if path_exists('/mnt/chicago-crash/raw/crashes'):
            old_crashes_table = DeltaTable.forPath(spark, path='/mnt/chicago-crash/raw/crashes')
            old_crashes_table.alias('a') \
                .merge(crashes_df.alias('b'), 'a.crash_record_id = b.crash_record_id') \
                .whenMatchedUpdateAll() \
                .whenNotMatchedInsertAll() \
                .execute()
        else:
            crashes_df.write.mode('overwrite').format('delta').option('path', '/mnt/chicago-crash/raw/crashes').option('mergeSchema', True).partitionBy('year', 'month', 'day').save()

def get_people_data(days_ago=1):
    limit, offset = 10000, 0
    current_time = datetime.now()
    filter_date = current_time - timedelta(days=days_ago)
    watermarkTable = DeltaTable.forPath(spark, '/mnt/chicago-crash/raw/watermark')
    while True:
        response = requests.get(f"https://data.cityofchicago.org/resource/u6pd-qa9d.json?$where=crash_date>'{filter_date.date()}'&$limit={limit}&$offset={offset}")
        offset += 10000
        if len(response.json()) == 0:
            break
        if response.status_code != 200:
            break
        people_df = spark.createDataFrame(response.json()) \
            .withColumn('year', func.year(func.to_date(func.col('crash_date')))) \
            .withColumn('month', func.month(func.to_date(func.col('crash_date')))) \
            .withColumn('day', func.month(func.to_date(func.col('crash_date')))) \
            .na.fill('N/A')
        
        max_timestamp = people_df.select(func.max(func.to_timestamp("crash_date")).alias('max_timestamp')) \
                .collect()[0][0]
        insert_watermark = spark.createDataFrame(data=[['People', max_timestamp]], schema=['TableName', 'WatermarkValue'])
        watermarkTable.alias('a') \
                .merge(insert_watermark.alias('b'), condition='a.TableName = b.TableName') \
                .whenMatchedUpdate(condition='b.WatermarkValue > a.WatermarkValue', set={'WatermarkValue': 'b.WatermarkValue'}) \
                .whenNotMatchedInsertAll() \
                .execute()
                
        if path_exists('/mnt/chicago-crash/raw/people'):
            old_people_table = DeltaTable.forPath(spark, path='/mnt/chicago-crash/raw/people')
            old_people_table.alias('a') \
                .merge(people_df.alias('b'), 'a.person_id = b.person_id AND a.crash_record_id = b.crash_record_id') \
                .whenMatchedUpdateAll() \
                .whenNotMatchedInsertAll() \
                .execute()
        else:
            people_df.write.mode('overwrite').format('delta').option('path', '/mnt/chicago-crash/raw/people').option('mergeSchema', True).partitionBy('year', 'month', 'day').save()

def get_vehicle_data(days_ago=1):
    limit, offset = 10000, 0
    current_time = datetime.now()
    filter_date = current_time - timedelta(days=days_ago)
    watermarkTable = DeltaTable.forPath(spark, '/mnt/chicago-crash/raw/watermark')
    while True:
        response = requests.get(f"https://data.cityofchicago.org/resource/68nd-jvt3.json?$where=crash_date>'{filter_date.date()}'&$limit={limit}&$offset={offset}")
        offset += 10000
        if len(response.json()) == 0:
            break
        if response.status_code != 200:
            break
        vehicles_df = spark.createDataFrame(response.json()) \
            .withColumn('year', func.year(func.to_date(func.col('crash_date')))) \
            .withColumn('month', func.month(func.to_date(func.col('crash_date')))) \
            .withColumn('day', func.month(func.to_date(func.col('crash_date')))) \
            .na.fill('N/A') \
            .filter(func.col('unit_type') != 'PEDESTRIAN')
        
        max_timestamp = vehicles_df.select(func.max(func.to_timestamp("crash_date")).alias('max_timestamp')) \
                .collect()[0][0]
        insert_watermark = spark.createDataFrame(data=[['Vehicles', max_timestamp]], schema=['TableName', 'WatermarkValue'])
        watermarkTable.alias('a') \
                .merge(insert_watermark.alias('b'), condition='a.TableName = b.TableName') \
                .whenMatchedUpdate(condition='b.WatermarkValue > a.WatermarkValue', set={'WatermarkValue': 'b.WatermarkValue'}) \
                .whenNotMatchedInsertAll() \
                .execute()
                
        if path_exists('/mnt/chicago-crash/raw/vehicles'):
            old_vehicles_table = DeltaTable.forPath(spark, path='/mnt/chicago-crash/raw/vehicles')
            old_vehicles_table.alias('a') \
                .merge(vehicles_df.alias('b'), condition='a.vehicle_id = b.vehicle_id AND a.crash_record_id = b.crash_record_id') \
                .whenMatchedUpdateAll() \
                .whenNotMatchedInsertAll() \
                .execute()
        else:
            vehicles_df.write.mode('overwrite').format('delta').option('path', '/mnt/chicago-crash/raw/vehicles').option('mergeSchema', True).partitionBy('year', 'month', 'day').save()

In [None]:
DeltaTable.createIfNotExists(spark) \
    .location('/mnt/chicago-crash/raw/watermark') \
    .addColumn('TableName', dataType='STRING', nullable=False) \
    .addColumn('WatermarkValue', dataType='TIMESTAMP', nullable=False) \
    .execute()

<delta.tables.DeltaTable at 0x7feba005db40>

In [None]:
get_crash_data(days_ago=365)
get_people_data(days_ago=365)
get_vehicle_data(days_ago=365)

In [None]:
if path_exists('/mnt/chicago-crash'):
    dbutils.fs.unmount('/mnt/chicago-crash')

[0;31m---------------------------------------------------------------------------[0m
[0;31mExecutionError[0m                            Traceback (most recent call last)
File [0;32m<command-4383724205036957>, line 2[0m
[1;32m      1[0m [38;5;28;01mif[39;00m path_exists([38;5;124m'[39m[38;5;124m/mnt/chicago-crash[39m[38;5;124m'[39m):
[0;32m----> 2[0m     [43mdbutils[49m[38;5;241;43m.[39;49m[43mfs[49m[38;5;241;43m.[39;49m[43munmount[49m[43m([49m[38;5;124;43m'[39;49m[38;5;124;43m/mnt/chicago-crash[39;49m[38;5;124;43m'[39;49m[43m)[49m

File [0;32m/databricks/python_shell/dbruntime/dbutils.py:362[0m, in [0;36mDBUtils.FSHandler.prettify_exception_message.<locals>.f_with_exception_handling[0;34m(*args, **kwargs)[0m
[1;32m    360[0m exc[38;5;241m.[39m__context__ [38;5;241m=[39m [38;5;28;01mNone[39;00m
[1;32m    361[0m exc[38;5;241m.[39m__cause__ [38;5;241m=[39m [38;5;28;01mNone[39;00m
[0;32m--> 362[0m [38;5;28;01mraise[39;00m exc

