# San Francisco Fire Department Data Analysis
This is an exploratory data analysis, ETL, and common data operations on the San Francisco Fire Department public data set.

## Data extraction

In [2]:
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

In [3]:
  val fireSchema: StructType = StructType(Array(
    StructField("CallNumber", IntegerType, nullable = true),
    StructField("UnitID", StringType, nullable = true),
    StructField("IncidentNumber", IntegerType, nullable = true),
    StructField("CallType", StringType, nullable = true),
    StructField("Location", StringType, nullable = true),
    StructField("CallDate", StringType, nullable = true),
    StructField("WatchDate", StringType, nullable = true),
    StructField("CallFinalDisposition", StringType, nullable = true),
    StructField("AvailableDtTm", StringType, nullable = true),
    StructField("Address", StringType, nullable = true),
    StructField("City", StringType, nullable = true),
    StructField("Zipcode", StringType, nullable = true),
    StructField("Battalion", StringType, nullable = true),
    StructField("StationArea", StringType, nullable = true),
    StructField("Box", StringType, nullable = true),
    StructField("OriginalPriority", StringType, nullable = true),
    StructField("Priority", StringType, nullable = true),
    StructField("FinalPriority", IntegerType, nullable = true),
    StructField("ALSUnit", BooleanType, nullable = true),
    StructField("CallTypeGroup", StringType, nullable = true),
    StructField("NumAlarms", IntegerType, nullable = true),
    StructField("UnitType", StringType, nullable = true),
    StructField("UnitSequenceInCallDispatch", IntegerType, nullable = true),
    StructField("FirePreventionDistrict", StringType, nullable = true),
    StructField("SupervisorDistrict", StringType, nullable = true),
    StructField("Neighborhood", StringType, nullable = true),
    StructField("RowID", StringType, nullable = true),
    StructField("Delay", FloatType, nullable = true)))

fireSchema = StructType(StructField(CallNumber,IntegerType,true), StructField(UnitID,StringType,true), StructField(IncidentNumber,IntegerType,true), StructField(CallType,StringType,true), StructField(Location,StringType,true), StructField(CallDate,StringType,true), StructField(WatchDate,StringType,true), StructField(CallFinalDisposition,StringType,true), StructField(AvailableDtTm,StringType,true), StructField(Address,StringType,true), StructField(City,StringType,true), StructField(Zipcode,StringType,true), StructField(Battalion,StringType,true), StructField(StationArea,StringType,true), StructField(Box,StringType,true), StructField(OriginalPriority,StringType,true), StructField(Priority,StringType,true), StructField(FinalPriority,IntegerType,true),...


StructType(StructField(CallNumber,IntegerType,true), StructField(UnitID,StringType,true), StructField(IncidentNumber,IntegerType,true), StructField(CallType,StringType,true), StructField(Location,StringType,true), StructField(CallDate,StringType,true), StructField(WatchDate,StringType,true), StructField(CallFinalDisposition,StringType,true), StructField(AvailableDtTm,StringType,true), StructField(Address,StringType,true), StructField(City,StringType,true), StructField(Zipcode,StringType,true), StructField(Battalion,StringType,true), StructField(StationArea,StringType,true), StructField(Box,StringType,true), StructField(OriginalPriority,StringType,true), StructField(Priority,StringType,true), StructField(FinalPriority,IntegerType,true),...

In [4]:
val fireDF: DataFrame = spark.read.schema(fireSchema).option("header", "true").csv("/data/sf-fire/sf-fire-calls.csv")

fireDF = [CallNumber: int, UnitID: string ... 26 more fields]


[CallNumber: int, UnitID: string ... 26 more fields]

## Data Transformation

### What were all the different types of fire calls in 2018?

In [5]:
val fireDF2018 = fireDF
.filter(
  col("CallType").isNotNull &&
    year(to_timestamp(col("CallDate"), "MM/dd/yyyy")) === "2018"
).cache()

val callTypesIn2018 = 
fireDF2018
.select("CallType")
.distinct()

callTypesIn2018.show(truncate = false)

+-------------------------------+
|CallType                       |
+-------------------------------+
|Elevator / Escalator Rescue    |
|Alarms                         |
|Odor (Strange / Unknown)       |
|Citizen Assist / Service Call  |
|HazMat                         |
|Vehicle Fire                   |
|Other                          |
|Outside Fire                   |
|Traffic Collision              |
|Assist Police                  |
|Gas Leak (Natural and LP Gases)|
|Water Rescue                   |
|Electrical Hazard              |
|Structure Fire                 |
|Medical Incident               |
|Fuel Spill                     |
|Smoke Investigation (Outside)  |
|Train / Rail Incident          |
|Explosion                      |
|Suspicious Package             |
+-------------------------------+



fireDF2018 = [CallNumber: int, UnitID: string ... 26 more fields]
callTypesIn2018 = [CallType: string]


[CallType: string]

### What months within the year 2018 saw the highest number of fire calls?

In [9]:
val monthsWithTheHighestCalls =
fireDF2018
  .withColumn("Month", month(to_timestamp(col("CallDate"), "MM/dd/yyyy")))
  .groupBy("Month")
  .agg(count(col("CallType")).as("NumberOfCalls"))
  .orderBy(desc("NumberOfCalls"))

monthsWithTheHighestCalls.show(truncate = false)

+-----+-------------+
|Month|NumberOfCalls|
+-----+-------------+
|10   |1068         |
|5    |1049         |
|3    |1029         |
|8    |1021         |
|1    |986          |
|6    |976          |
|7    |972          |
|9    |949          |
|4    |948          |
|2    |920          |
|11   |192          |
+-----+-------------+



monthsWithTheHighestCalls = [Month: int, NumberOfCalls: bigint]


[Month: int, NumberOfCalls: bigint]

### Which neighborhood in San Francisco generated the most fire calls in 2018?

In [13]:
val neighborhoodsWithTheMostFireCalls =
fireDF2018
  .filter(col("Neighborhood").isNotNull)
  .groupBy("Neighborhood")
  .agg(count(col("CallType")).as("NumberOfCalls"))
  .orderBy(desc("NumberOfCalls"))

neighborhoodsWithTheMostFireCalls.take(1).foreach(println)

[(37.77762423892872, -122.39998111124002),58]


neighborhoodsWithTheMostFireCalls = [Neighborhood: string, NumberOfCalls: bigint]


[Neighborhood: string, NumberOfCalls: bigint]

### Which neighborhoods had the worst response times to fire calls in 2018?

In [14]:
val worstResponseTimesByNeighborhoods =
fireDF2018
  .filter(col("Neighborhood").isNotNull)
  .groupBy("Neighborhood")
  .agg(max("Delay").alias("MaxResponseDelayInMins"))
  .orderBy(desc("MaxResponseDelayInMins"))

worstResponseTimesByNeighborhoods.show(false)

+-----------------------------------------+----------------------+
|Neighborhood                             |MaxResponseDelayInMins|
+-----------------------------------------+----------------------+
|(37.80022303736301, -122.41050959530114) |491.26666             |
|(37.79267911710725, -122.39685349996866) |406.63333             |
|(37.783834437414136, -122.4129305220591) |340.48334             |
|(37.76558354129728, -122.45209163136114) |175.86667             |
|(37.729934693604356, -122.39774442710336)|155.8                 |
|(37.7980449492818, -122.3963670843851)   |135.51666             |
|(37.789793139575416, -122.4264272862002) |129.01666             |
|(37.75762574276467, -122.39262295923919) |109.8                 |
|(37.75946872471556, -122.46391578590715) |106.13333             |
|(37.77918239332259, -122.40979393906078) |94.71667              |
|(37.725881170284005, -122.39245744802503)|92.816666             |
|(37.78005179422856, -122.46428116686292) |90.433334          

worstResponseTimesByNeighborhoods = [Neighborhood: string, MaxResponseDelayInMins: float]


[Neighborhood: string, MaxResponseDelayInMins: float]

### Which week in the year in 2018 had the most fire calls?

In [15]:
val weekWithTheMostFireCalls =
fireDF2018
  .withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy"))
  .groupBy(weekofyear(col("IncidentDate")).as("WeekOfTheYear"))
  .count()
  .orderBy(desc("count"))

weekWithTheMostFireCalls.show(10,truncate = false)

+-------------+-----+
|WeekOfTheYear|count|
+-------------+-----+
|22           |258  |
|43           |258  |
|40           |252  |
|25           |250  |
|44           |242  |
|32           |241  |
|13           |241  |
|31           |239  |
|11           |239  |
|18           |239  |
+-------------+-----+
only showing top 10 rows



weekWithTheMostFireCalls = [WeekOfTheYear: int, count: bigint]


[WeekOfTheYear: int, count: bigint]

### Is there a correlation between neighborhood, zip code, and number of fire calls?

In [22]:
val correlationBetweenNeighbourhoodsAndFireCalls = 
fireDF2018
  .groupBy("Neighborhood", "Zipcode")
  .count()
  .select(corr("Neighborhood", "count"))

correlationBetweenNeighbourhoodsAndFireCalls.show()

correlationBetweenNeighbourhoodsAndFireCalls = [corr(Neighborhood, count): double]


+-------------------------+
|corr(Neighborhood, count)|
+-------------------------+
|                     null|
+-------------------------+



[corr(Neighborhood, count): double]

## Load Data

### How can we use Parquet files or SQL tables to store this data and read it back?

In [8]:
val parquetData = "/data/fire-calls-data"

fireDF2018.write.format("parquet").save(parquetData)

lastException = null
parquetData = /data/fire-calls-data


/data/fire-calls-data

In [10]:
val readFromParquet = spark.read.parquet(parquetData)

readFromParquet.show(10)

+----------+------+--------------+-----------------+----------+----------+----------------+--------------------+--------------------+-------------+-----+-------+---------+-----------+---+----------------+--------+-------------+-------+-------------+---------+--------+--------------------------+----------------------+------------------+--------------------+--------------+----------+
|CallNumber|UnitID|IncidentNumber|         CallType|  Location|  CallDate|       WatchDate|CallFinalDisposition|       AvailableDtTm|      Address| City|Zipcode|Battalion|StationArea|Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|         RowID|     Delay|
+----------+------+--------------+-----------------+----------+----------+----------------+--------------------+--------------------+-------------+-----+-------+---------+-----------+---+----------------+--------+-------------+---

readFromParquet = [CallNumber: int, UnitID: string ... 26 more fields]


[CallNumber: int, UnitID: string ... 26 more fields]