In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [2]:
spark = (SparkSession
         .builder
         .appName("sf-fire-call")
         .config("spark.jars.packages","io.delta:delta-core_2.12:2.1.0,org.apache.hadoop:hadoop-aws:3.3.1,org.apache.hadoop:hadoop-common:3.3.1,com.amazonaws:aws-java-sdk-s3:1.12.350")
         .config("spark.sql.extensions","io.delta.sql.DeltaSparkSessionExtension")
         .config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog")
         .config("spark.jars.ivy", "/tmp/ivy")
         .config("spark.jars.repositories", "https://repo1.maven.org/maven2/")
         .getOrCreate())

In [3]:
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", "AWS_ACCESS_KEY")
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "AWS_SECRET_KEY")
spark._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")

In [4]:
fire_schema = StructType([StructField("CallNumber", IntegerType(), True),
                   StructField("UnitID", StringType(), True),
                   StructField("IncidentNumber", IntegerType(), True),
                   StructField("CallType", StringType(), True),
                   StructField("CallDate", StringType(), True),
                   StructField("WatchDate", StringType(), True),
                   StructField("ReceivedDtTm", StringType(), True),
                   StructField("EntryDtTm", StringType(), True),
                   StructField("DispatchDtTm", StringType(), True),
                   StructField("ResponseDtTm", StringType(), True),
                   StructField("OnSceneDtTm", StringType(), True),
                   StructField("TransportDtTm", StringType(), True),
                   StructField("HospitalDtTm", StringType(), True),
                   StructField("CallFinalDisposition", StringType(), True),
                   StructField("AvailableDtTm", StringType(), True),
                   StructField("Address", StringType(), True),
                   StructField("City", StringType(), True),
                   StructField("ZipcodeofIncident", IntegerType(), True),
                   StructField("Battalion", StringType(), True),
                   StructField("StationArea", StringType(), True),
                   StructField("Box", IntegerType(), True),
                   StructField("OriginalPriority", IntegerType(), True),
                   StructField("Priority", IntegerType(), True),
                   StructField("FinalPriority", IntegerType(), True),
                   StructField("ALSUnit", StringType(), True),
                   StructField("CallTypeGroup", StringType(), True),
                   StructField("NumberofAlarms", IntegerType(), True),
                   StructField("UnitType", StringType(), True),
                   StructField("Unitsequenceincalldispatch", IntegerType(), True),
                   StructField("FirePreventionDistrict", IntegerType(), True),
                   StructField("SupervisorDistrict", IntegerType(), True),
                   StructField("NeighborhooodsAnalysisBoundaries", StringType(), True),
                   StructField("RowID", StringType(), True),
                   StructField("case_location", StringType(), True),
                   StructField("AnalysisNeighborhoods", IntegerType(), True),
                  ])

In [5]:
df = spark.read.csv("./Fire_Department_Calls_for_Service.csv", header=True, schema=fire_schema)

In [6]:
df.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- ReceivedDtTm: string (nullable = true)
 |-- EntryDtTm: string (nullable = true)
 |-- DispatchDtTm: string (nullable = true)
 |-- ResponseDtTm: string (nullable = true)
 |-- OnSceneDtTm: string (nullable = true)
 |-- TransportDtTm: string (nullable = true)
 |-- HospitalDtTm: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- ZipcodeofIncident: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: integer (nullable = true)
 |-- OriginalPriority: integer (nullable = true)
 |-- Priority: integer (nullable = true)
 |-- Fina

In [7]:
df_dates = (df.withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy")).drop("CallDate")
    .withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy")).drop("WatchDate")
    .withColumn("ReceivedDtTS", to_timestamp(col("ReceivedDtTm"), "MM/dd/yyyy hh:mm:ss a")).drop("ReceivedDtTm")
    .withColumn("DispatchDtTS", to_timestamp(col("DispatchDtTm"), "MM/dd/yyyy hh:mm:ss a")).drop("DispatchDtTm")
    .withColumn("ResponseDtTS", to_timestamp(col("ResponseDtTm"), "MM/dd/yyyy hh:mm:ss a")).drop("ResponseDtTm")
    .withColumn("OnSceneDtTS", to_timestamp(col("OnSceneDtTm"), "MM/dd/yyyy hh:mm:ss a")).drop("OnSceneDtTm")
)

In [8]:
df_dates.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- EntryDtTm: string (nullable = true)
 |-- TransportDtTm: string (nullable = true)
 |-- HospitalDtTm: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- ZipcodeofIncident: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: integer (nullable = true)
 |-- OriginalPriority: integer (nullable = true)
 |-- Priority: integer (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: string (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumberofAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- Unitsequenceincalldispatch: integer (nullab

In [9]:
df1 = df_dates.select("CallNumber", "UnitID", "IncidentNumber", "CallType", "IncidentDate", "ReceivedDtTS", "OnSceneDtTS", "Address", "City", "ZipcodeofIncident", "StationArea")
df1_less = df1.select("*").where((col("IncidentDate") >= "2019-01-01") & (col("IncidentDate") <= "2020-12-31"))

In [10]:
df1_less.select(min("IncidentDate")).show()
df1_less.select(max("IncidentDate")).show()

+-------------------+
|  min(IncidentDate)|
+-------------------+
|2019-01-01 00:00:00|
+-------------------+

+-------------------+
|  max(IncidentDate)|
+-------------------+
|2020-12-31 00:00:00|
+-------------------+



In [11]:
df1.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- IncidentDate: timestamp (nullable = true)
 |-- ReceivedDtTS: timestamp (nullable = true)
 |-- OnSceneDtTS: timestamp (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- ZipcodeofIncident: integer (nullable = true)
 |-- StationArea: string (nullable = true)



In [12]:
df2 = df_dates.select("CallNumber", "UnitID", "IncidentNumber", "CallType", "IncidentDate", "OnWatchDate", "ReceivedDtTS", "DispatchDtTS", "ResponseDtTS", "OnSceneDtTS", "Address", "City", "ZipcodeofIncident", "StationArea")
df2_less = df2.select("*").where((col("IncidentDate") >= "2021-01-01"))

In [13]:
df2_less.select(min("IncidentDate")).show()
df2_less.select(max("IncidentDate")).show()

+-------------------+
|  min(IncidentDate)|
+-------------------+
|2021-01-01 00:00:00|
+-------------------+

+-------------------+
|  max(IncidentDate)|
+-------------------+
|2023-02-05 00:00:00|
+-------------------+



In [24]:
df2_less.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- IncidentDate: timestamp (nullable = true)
 |-- OnWatchDate: timestamp (nullable = true)
 |-- ReceivedDtTS: timestamp (nullable = true)
 |-- DispatchDtTS: timestamp (nullable = true)
 |-- ResponseDtTS: timestamp (nullable = true)
 |-- OnSceneDtTS: timestamp (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- ZipcodeofIncident: integer (nullable = true)
 |-- StationArea: string (nullable = true)



In [15]:
df1_less.write.format("delta").save("./sf-fire-delta")

In [16]:
spark.read.format("delta").load("./sf-fire-delta").createOrReplaceTempView("sf_fire_delta")

In [17]:
spark.sql("select count(*) from sf_fire_delta").show()

+--------+
|count(1)|
+--------+
|  605598|
+--------+



In [18]:
spark.sql("select * from sf_fire_delta limit 1").show()

+----------+------+--------------+----------------+-------------------+-------------------+-------------------+--------------------+-------------+-----------------+-----------+
|CallNumber|UnitID|IncidentNumber|        CallType|       IncidentDate|       ReceivedDtTS|        OnSceneDtTS|             Address|         City|ZipcodeofIncident|StationArea|
+----------+------+--------------+----------------+-------------------+-------------------+-------------------+--------------------+-------------+-----------------+-----------+
| 192750025|  QRV1|      19117988|Medical Incident|2019-10-02 00:00:00|2019-10-02 00:12:01|2019-10-02 00:15:53|1000 Block of MIS...|San Francisco|            94103|         01|
+----------+------+--------------+----------------+-------------------+-------------------+-------------------+--------------------+-------------+-----------------+-----------+



In [19]:
df2_less.write.format("delta").mode("append").option("mergeSchema", "true").save("./sf-fire-delta")

In [29]:
spark.read.format("delta").load("./sf-fire-delta").createOrReplaceTempView("sf_fire_delta")

In [None]:
spark.sql("select count(*) from sf_fire_delta").show()

In [30]:
spark.sql("select * from sf_fire_delta order by IncidentDate desc").show()

+----------+------+--------------+--------------------+-------------------+-------------------+-------------------+--------------------+-------------+-----------------+-----------+-------------------+-------------------+-------------------+
|CallNumber|UnitID|IncidentNumber|            CallType|       IncidentDate|       ReceivedDtTS|        OnSceneDtTS|             Address|         City|ZipcodeofIncident|StationArea|        OnWatchDate|       DispatchDtTS|       ResponseDtTS|
+----------+------+--------------+--------------------+-------------------+-------------------+-------------------+--------------------+-------------+-----------------+-----------+-------------------+-------------------+-------------------+
| 230360173|   E01|      23017334|   Traffic Collision|2023-02-05 00:00:00|2023-02-05 01:21:20|2023-02-05 01:26:52|    TURK ST/MASON ST|San Francisco|            94103|         01|2023-02-04 00:00:00|2023-02-05 01:21:48|2023-02-05 01:24:03|
| 230360341|   E33|      23017363|Tr

In [31]:
spark.sql("select * from sf_fire_delta order by IncidentDate asc").show()

+----------+------+--------------+----------------+-------------------+-------------------+-------------------+--------------------+-------------+-----------------+-----------+-----------+------------+------------+
|CallNumber|UnitID|IncidentNumber|        CallType|       IncidentDate|       ReceivedDtTS|        OnSceneDtTS|             Address|         City|ZipcodeofIncident|StationArea|OnWatchDate|DispatchDtTS|ResponseDtTS|
+----------+------+--------------+----------------+-------------------+-------------------+-------------------+--------------------+-------------+-----------------+-----------+-----------+------------+------------+
| 190011241|  KM13|      19000164|Medical Incident|2019-01-01 00:00:00|2019-01-01 08:13:02|2019-01-01 08:20:32|800 Block of LEAV...|San Francisco|            94109|         41|       null|        null|        null|
| 190010474|   E15|      19000060|Medical Incident|2019-01-01 00:00:00|2019-01-01 01:58:35|2019-01-01 02:05:00|0 Block of NIAGAR...|San Fran

In [32]:
from delta.tables import *

In [34]:
deltaTable = DeltaTable.forPath(spark, "./sf-fire-delta")

In [37]:
deltaTable.update("IncidentNumber = 19000164", {"IncidentNumber": "19000165"})

In [40]:
spark.sql("select * from sf_fire_delta order by IncidentDate asc").show(1)

+----------+------+--------------+----------------+-------------------+-------------------+-------------------+--------------------+-------------+-----------------+-----------+-----------+------------+------------+
|CallNumber|UnitID|IncidentNumber|        CallType|       IncidentDate|       ReceivedDtTS|        OnSceneDtTS|             Address|         City|ZipcodeofIncident|StationArea|OnWatchDate|DispatchDtTS|ResponseDtTS|
+----------+------+--------------+----------------+-------------------+-------------------+-------------------+--------------------+-------------+-----------------+-----------+-----------+------------+------------+
| 190011736|  QRV2|      19000216|Medical Incident|2019-01-01 00:00:00|2019-01-01 11:43:29|2019-01-01 11:48:35|400 Block of ELLI...|San Francisco|            94102|         03|       null|        null|        null|
+----------+------+--------------+----------------+-------------------+-------------------+-------------------+--------------------+--------

In [39]:
deltaTable.delete("IncidentNumber = 19000165")

In [42]:
spark.sql("select * from sf_fire_delta order by IncidentDate asc").show(1)

+----------+------+--------------+----------------+-------------------+-------------------+-------------------+--------------------+-------------+-----------------+-----------+-----------+------------+------------+
|CallNumber|UnitID|IncidentNumber|        CallType|       IncidentDate|       ReceivedDtTS|        OnSceneDtTS|             Address|         City|ZipcodeofIncident|StationArea|OnWatchDate|DispatchDtTS|ResponseDtTS|
+----------+------+--------------+----------------+-------------------+-------------------+-------------------+--------------------+-------------+-----------------+-----------+-----------+------------+------------+
| 190011736|  QRV2|      19000216|Medical Incident|2019-01-01 00:00:00|2019-01-01 11:43:29|2019-01-01 11:48:35|400 Block of ELLI...|San Francisco|            94102|         03|       null|        null|        null|
+----------+------+--------------+----------------+-------------------+-------------------+-------------------+--------------------+--------

In [None]:
#upserting