In [1]:
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

Spark = SparkSession.builder.appName("Sanfranciso").getOrCreate()

In [2]:
Spark

# 1. Read the sanfranciso fire calls.

In [3]:
sf_df=Spark.read.csv("sf-fire-calls.csv",header=True, inferSchema=True)
sf_df.show()

+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|      UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|    Delay|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+----------------+--------+

# 2. show the columns

In [4]:
sf_df.columns

['CallNumber',
 'UnitID',
 'IncidentNumber',
 'CallType',
 'CallDate',
 'WatchDate',
 'CallFinalDisposition',
 'AvailableDtTm',
 'Address',
 'City',
 'Zipcode',
 'Battalion',
 'StationArea',
 'Box',
 'OriginalPriority',
 'Priority',
 'FinalPriority',
 'ALSUnit',
 'CallTypeGroup',
 'NumAlarms',
 'UnitType',
 'UnitSequenceInCallDispatch',
 'FirePreventionDistrict',
 'SupervisorDistrict',
 'Neighborhood',
 'Location',
 'RowID',
 'Delay']

## 3. Print the schema of the dataset

In [5]:
sf_df.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: string (nullable = true)
 |-- WatchDate: string (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OriginalPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 

# Questions
 1. What are  all the different types of fire in the calls type

In [6]:

sf_1= sf_df.select("CallType")
fi_sf= sf_1.filter((sf_1["CallType"]).like("%Fire%")).groupBy("CallType").count()
fi_sf.show()

+--------------+-----+
|      CallType|count|
+--------------+-----+
|   Marine Fire|   14|
|  Vehicle Fire|  854|
|  Outside Fire| 2094|
|Structure Fire|23319|
+--------------+-----+



# 
2. How many fire calls in 2018 were categorized as "medical incidents"?

In [7]:

sf_2 = sf_df.withColumn("CallYear", year(to_date(sf_df["CallDate"], 'MM/dd/yyyy')))
fi_2018= sf_2.filter((sf_2["CallYear"]==2018))
res_1=fi_2018.filter((sf_2["CallType"]=="Medical Incident"))
res_1.groupBy("CallType","CallYear").count().show()

+----------------+--------+-----+
|        CallType|CallYear|count|
+----------------+--------+-----+
|Medical Incident|    2018| 7004|
+----------------+--------+-----+



  3. What were all the different types of fire calls  in 2018?

In [8]:

final=fi_2018.groupBy("CallType").count().orderBy("count", ascending=False).alias("TotalCalls")
final.show()

+--------------------+-----+
|            CallType|count|
+--------------------+-----+
|    Medical Incident| 7004|
|              Alarms| 1144|
|      Structure Fire|  906|
|   Traffic Collision|  433|
|        Outside Fire|  153|
|               Other|  114|
|Citizen Assist / ...|  113|
|Gas Leak (Natural...|   69|
|        Water Rescue|   43|
|Elevator / Escala...|   36|
|   Electrical Hazard|   30|
|        Vehicle Fire|   28|
|Smoke Investigati...|   28|
|Odor (Strange / U...|   10|
|          Fuel Spill|   10|
|              HazMat|    5|
|Train / Rail Inci...|    5|
|  Suspicious Package|    3|
|       Assist Police|    1|
|           Explosion|    1|
+--------------------+-----+



# 
4. Which neighborhood in San Francisco generated the most fire calls in 2018?

In [9]:

sf_4=fi_2018.groupBy(sf_2["Neighborhood"]).count()
sf_4.orderBy("count",ascending=False).show(5)

+--------------------+-----+
|        Neighborhood|count|
+--------------------+-----+
|          Tenderloin| 1393|
|     South of Market| 1053|
|             Mission|  913|
|Financial Distric...|  772|
|Bayview Hunters P...|  522|
+--------------------+-----+
only showing top 5 rows



#
5. Which neighborhoods had the worst response times to fire calls in 2018?

In [10]:

sf_5=fi_2018.groupBy(sf_2["Neighborhood"]).agg(max(sf_2["Delay"])).alias("Worst")
sf_5.orderBy("max(Delay)",ascending= False).show()

+--------------------+----------+
|        Neighborhood|max(Delay)|
+--------------------+----------+
|           Chinatown| 491.26666|
|Financial Distric...| 406.63333|
|          Tenderloin| 340.48334|
|      Haight Ashbury| 175.86667|
|Bayview Hunters P...|     155.8|
|     Pacific Heights| 129.01666|
|        Potrero Hill|     109.8|
|        Inner Sunset| 106.13333|
|     South of Market|  94.71667|
|      Inner Richmond| 90.433334|
|           Excelsior|  83.76667|
| Castro/Upper Market|  74.13333|
|    Western Addition| 67.916664|
|            Nob Hill|     67.45|
|             Mission| 54.666668|
|    Presidio Heights| 52.883335|
|       Outer Mission| 43.383335|
|  West of Twin Peaks|      43.2|
|         North Beach| 40.933334|
|            Presidio| 38.816666|
+--------------------+----------+
only showing top 20 rows



6. What was the average response time for fire calls in each neighborhood in 2018?

In [11]:
sf_6=fi_2018.groupBy(sf_2["Neighborhood"]).agg(round(avg(sf_2["Delay"]),4)).withColumnRenamed("round(avg(Delay), 4)","Average response")
sf_6.show()

+--------------------+----------------+
|        Neighborhood|Average response|
+--------------------+----------------+
|        Inner Sunset|          4.4381|
|      Haight Ashbury|          4.2664|
|        Lincoln Park|          3.3111|
|           Japantown|          3.3477|
|                None|          3.3633|
|         North Beach|          3.8892|
|   Lone Mountain/USF|          3.2472|
|    Western Addition|          3.2843|
|      Bernal Heights|          3.8465|
|         Mission Bay|          3.8549|
|        Hayes Valley|          3.3704|
|Financial Distric...|          4.3441|
|           Lakeshore|          3.8816|
|Bayview Hunters P...|          4.6206|
|   Visitacion Valley|          3.4838|
|      Inner Richmond|          4.3647|
|            Nob Hill|             3.8|
|Oceanview/Merced/...|          3.9472|
|      Outer Richmond|          3.6481|
|     Treasure Island|          5.4537|
+--------------------+----------------+
only showing top 20 rows



7. How many fire calls were handled by ALS units versus non-ALS units in 2018?/

In [12]:
sf_7=fi_2018.groupBy(sf_2["ALSUnit"]).count().show()

+-------+-----+
|ALSUnit|count|
+-------+-----+
|   true| 6676|
|  false| 3460|
+-------+-----+



8. What were all the different unit types?

In [13]:
sf_8=sf_df.groupBy(sf_2["UnitType"]).count()
unit_sf=sf_8.orderBy("count",ascending=False).show()

+--------------+-----+
|      UnitType|count|
+--------------+-----+
|        ENGINE|68839|
|         MEDIC|55191|
|         TRUCK|18973|
|         CHIEF|13036|
|       PRIVATE|10956|
|RESCUE CAPTAIN| 4221|
|  RESCUE SQUAD| 2952|
|       SUPPORT|  619|
|       AIRPORT|  358|
| INVESTIGATION|  151|
+--------------+-----+



9. Is there a correlation between neighborhood, zip code, and number of fire calls

In [14]:
fire_calls_count = sf_2.groupBy("Neighborhood", "Zipcode").count()
fire_calls_count.show()

+--------------------+-------+-----+
|        Neighborhood|Zipcode|count|
+--------------------+-------+-----+
|        Inner Sunset|  94122| 2161|
|Bayview Hunters P...|  94124| 9150|
|        Inner Sunset|  94114|   20|
|  West of Twin Peaks|  94112|  760|
|      Haight Ashbury|  94114|   21|
|           Glen Park|  94110|   25|
|           Excelsior|  94112| 3237|
|        Russian Hill|  94109| 2261|
|                None|  94124|    7|
|           Chinatown|  94133| 1861|
|     Pacific Heights|  94115| 2100|
|Oceanview/Merced/...|  94127|   12|
|        Potrero Hill|  94103|    5|
|        Inner Sunset|  94117|  224|
|    Golden Gate Park|  94117|  107|
|                None|   NULL|  141|
|          Noe Valley|  94131|  763|
|    Western Addition|  94117|  315|
|        McLaren Park|  94112|   36|
|      Outer Richmond|  94121| 4121|
+--------------------+-------+-----+
only showing top 20 rows



10. Which year had the most fire calls?

In [15]:
years1= sf_2.groupBy(sf_2["CallYear"]).count()
finals=years1.withColumnRenamed("count","Totalcalls").orderBy("Totalcalls",ascendeing=True)
finals.show()

+--------+----------+
|CallYear|Totalcalls|
+--------+----------+
|    2000|      5459|
|    2001|      7713|
|    2002|      8090|
|    2006|      8174|
|    2007|      8255|
|    2005|      8282|
|    2004|      8283|
|    2003|      8499|
|    2009|      8789|
|    2008|      8869|
|    2010|      9341|
|    2012|      9674|
|    2011|      9735|
|    2013|     10020|
|    2018|     10136|
|    2014|     10775|
|    2015|     11458|
|    2016|     11609|
|    2017|     12135|
+--------+----------+



11. how many num alarms is called during fire inciddent?

In [17]:
sf_11=sf_df.groupBy(sf_2["NumAlarms"]).count().show()

+---------+------+
|NumAlarms| count|
+---------+------+
|        1|174718|
|        3|   121|
|        5|    17|
|        4|    62|
|        2|   378|
+---------+------+



12. How many fire calls in 2018 required more than one alarm?

In [18]:
sf_12=fi_2018.filter(sf_2["NumAlarms"]>1).show()

+----------+------+--------------+--------------+----------+----------+--------------------+--------------------+--------------------+-------------+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+--------------+--------------------------+----------------------+------------------+--------------------+--------------------+-------------+---------+--------+
|CallNumber|UnitID|IncidentNumber|      CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|         City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|      UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|        RowID|    Delay|CallYear|
+----------+------+--------------+--------------+----------+----------+--------------------+--------------------+--------------------+-------------+-------+---------+----

13. How many fire calls required five alarm?

In [19]:
sf_13=sf_df.filter(sf_df["NumAlarms"]==5)
sf_13.show()

+----------+------+--------------+--------------+----------+----------+--------------------+--------------------+--------------------+-------------+-------+---------+-----------+----+----------------+--------+-------------+-------+-------------+---------+------------+--------------------------+----------------------+------------------+-------------------+--------------------+-------------+---------+
|CallNumber|UnitID|IncidentNumber|      CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|         City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|    UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|       Neighborhood|            Location|        RowID|    Delay|
+----------+------+--------------+--------------+----------+----------+--------------------+--------------------+--------------------+-------------+-------+---------+-----------+----+-----------

14. What is the trend in the number of non-life-threatening fire calls over the years?

In [20]:
sf_14=sf_2.filter(sf_df["CallTypeGroup"]=="Non Life-threatening")
sf_ca=sf_14.groupBy(sf_2["CallTypeGroup"],sf_2["CallYear"]).count().show()

+--------------------+--------+-----+
|       CallTypeGroup|CallYear|count|
+--------------------+--------+-----+
|Non Life-threatening|    2012| 2149|
|Non Life-threatening|    2013| 2352|
|Non Life-threatening|    2014| 2442|
|Non Life-threatening|    2017| 2970|
|Non Life-threatening|    2018| 2623|
|Non Life-threatening|    2015| 2638|
|Non Life-threatening|    2016| 2846|
+--------------------+--------+-----+



15. How can we use SQL tables to store this data and read it back?


In [21]:
sf_2.createOrReplaceTempView("sf_2")

In [22]:
result = Spark.sql("SELECT * FROM sf_2 WHERE CallYear = 2018")
result.show()

+----------+------+--------------+-----------------+----------+----------+--------------------+--------------------+--------------------+-------------+-------+---------+-----------+----+----------------+--------+-------------+-------+--------------------+---------+--------+--------------------------+----------------------+------------------+--------------------+--------------------+---------------+---------+--------+
|CallNumber|UnitID|IncidentNumber|         CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|         City|Zipcode|Battalion|StationArea| Box|OriginalPriority|Priority|FinalPriority|ALSUnit|       CallTypeGroup|NumAlarms|UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|        Neighborhood|            Location|          RowID|    Delay|CallYear|
+----------+------+--------------+-----------------+----------+----------+--------------------+--------------------+--------------------+-------------+-------