In [0]:
from pyspark.sql.functions import *

In [0]:
raw_fire_df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema","true") \
    .load("/databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv")

In [0]:
renamed_fire_df = raw_fire_df \
    .withColumnRenamed("Call Number", "CallNumber") \
    .withColumnRenamed("Unit ID", "UnitID") \
    .withColumnRenamed("Incident Number", "IncidentNumber") \
    .withColumnRenamed("Call Date", "CallDate") \
    .withColumnRenamed("Watch Date", "WatchDate") \
    .withColumnRenamed("Call Final Disposition", "CallFinalDisposition") \
    .withColumnRenamed("Available DtTm", "AvailableDtTm") \
    .withColumnRenamed("Zipcode of Incident", "Zipcode") \
    .withColumnRenamed("Station Area", "StationArea") \
    .withColumnRenamed("Final Priority", "FinalPriority") \
    .withColumnRenamed("ALS Unit", "ALSUnit") \
    .withColumnRenamed("Call Type Group", "CallTypeGroup") \
    .withColumnRenamed("Unit sequence in call dispatch", "UnitSequenceInCallDispatch") \
    .withColumnRenamed("Fire Prevention District", "FirePreventionDistrict") \
    .withColumnRenamed("Supervisor District", "SupervisorDistrict")

In [0]:
renamed_fire_df.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: date (nullable = true)
 |-- WatchDate: date (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OrigPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 |-- Supe

In [0]:
fire_df = renamed_fire_df \
    .withColumn("Delay", round("Delay", 2))

In [0]:
fire_df.printSchema()

root
 |-- CallNumber: integer (nullable = true)
 |-- UnitID: string (nullable = true)
 |-- IncidentNumber: integer (nullable = true)
 |-- CallType: string (nullable = true)
 |-- CallDate: date (nullable = true)
 |-- WatchDate: date (nullable = true)
 |-- CallFinalDisposition: string (nullable = true)
 |-- AvailableDtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- StationArea: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- OrigPriority: string (nullable = true)
 |-- Priority: string (nullable = true)
 |-- FinalPriority: integer (nullable = true)
 |-- ALSUnit: boolean (nullable = true)
 |-- CallTypeGroup: string (nullable = true)
 |-- NumAlarms: integer (nullable = true)
 |-- UnitType: string (nullable = true)
 |-- UnitSequenceInCallDispatch: integer (nullable = true)
 |-- FirePreventionDistrict: string (nullable = true)
 |-- Supe

In [0]:
fire_df.cache()

Out[10]: DataFrame[CallNumber: int, UnitID: string, IncidentNumber: int, CallType: string, CallDate: date, WatchDate: date, CallFinalDisposition: string, AvailableDtTm: string, Address: string, City: string, Zipcode: int, Battalion: string, StationArea: string, Box: string, OrigPriority: string, Priority: string, FinalPriority: int, ALSUnit: boolean, CallTypeGroup: string, NumAlarms: int, UnitType: string, UnitSequenceInCallDispatch: int, FirePreventionDistrict: string, SupervisorDistrict: string, Neighborhood: string, Location: string, RowID: string, Delay: double]

In [0]:
fire_df.show(10)

+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+------------+--------+-------------+-------+-------------+---------+------------+--------------------------+----------------------+------------------+---------------+--------------------+-------------+-----+
|CallNumber|UnitID|IncidentNumber|        CallType|  CallDate| WatchDate|CallFinalDisposition|       AvailableDtTm|             Address|City|Zipcode|Battalion|StationArea| Box|OrigPriority|Priority|FinalPriority|ALSUnit|CallTypeGroup|NumAlarms|    UnitType|UnitSequenceInCallDispatch|FirePreventionDistrict|SupervisorDistrict|   Neighborhood|            Location|        RowID|Delay|
+----------+------+--------------+----------------+----------+----------+--------------------+--------------------+--------------------+----+-------+---------+-----------+----+------------+--------+-------------+-------+------------

##### Q1. How many distinct types of calls were made to the Fire Department?
```SQL
select count(distinct CallType) as distinct_call_type_count
from fire_service_calls_tbl
where CallType is not null
```

In [0]:
fire_df.createOrReplaceTempView("fire_service_calls_view")
q1_sql_df = spark.sql("""
        select count(distinct CallType) as distinct_call_type_count
        from fire_service_calls_view
        where CallType is not null
        """)
q1_sql_df.show(10)

+------------------------+
|distinct_call_type_count|
+------------------------+
|                      32|
+------------------------+



In [0]:
q1_df = fire_df.where("CallType is not null") \
            .select("CallType") \
            .distinct()
print(q1_df.count())

32


In [0]:
q1_df1 = fire_df.where("CallType is not null")
q1_df2 = q1_df1.select("CallType")
q1_df3 = q1_df2.distinct()
print(q1_df3.count())

32


##### Q2. What were distinct types of calls made to the Fire Department?
```sql
select distinct CallType as distinct_call_types
from fire_service_calls_tbl
where CallType is not null
```

In [0]:
q2_df = fire_df.where("CallType is not null") \
               .select(expr("CallType as distinct_call_type")) \
               .distinct()
q2_df.show(10)

+--------------------+
|  distinct_call_type|
+--------------------+
|              Alarms|
|Odor (Strange / U...|
|Citizen Assist / ...|
|        Vehicle Fire|
|               Other|
|        Outside Fire|
|   Electrical Hazard|
|      Structure Fire|
|    Medical Incident|
|          Fuel Spill|
+--------------------+
only showing top 10 rows



##### Q3. Find out all response for delayed times greater than 5 mins?
``` sql
select CallNumber, Delay
from fire_service_calls_tbl
where Delay > 5
```

In [0]:
fire_df.where("Delay > 5") \
    .select("CallNumber", "Delay") \
    .show()

+----------+-----+
|CallNumber|Delay|
+----------+-----+
|  20110014| 5.23|
|  20110017| 6.93|
|  20110019| 6.12|
|  20110039| 7.85|
|  20110045|77.33|
|  20110046| 5.42|
|  20110055|  6.5|
|  20110058| 6.85|
|  20110058| 6.85|
|  20110061| 6.33|
|  20110062| 7.02|
|  20110066|  9.1|
|  20110068| 5.98|
|  20110070| 5.42|
|  20110070| 5.08|
|  20110070| 8.18|
|  20110070| 5.98|
|  20110077|18.07|
|  20110103| 5.55|
|  20110103| 5.97|
+----------+-----+
only showing top 20 rows



##### Q4. What were the most common call types?
```sql
select CallType, count(*) as count
from fire_service_calls_tbl
where CallType is not null
group by CallType
order by count desc
```

In [0]:
fire_df.select("CallType") \
    .where("CallType is not null") \
    .groupBy("CallType") \
    .count() \
    .orderBy("count", ascending=False) \
    .show()

+--------------------+-------+
|            CallType|  count|
+--------------------+-------+
|    Medical Incident|2843475|
|      Structure Fire| 578998|
|              Alarms| 483518|
|   Traffic Collision| 175507|
|Citizen Assist / ...|  65360|
|               Other|  56961|
|        Outside Fire|  51603|
|        Vehicle Fire|  20939|
|        Water Rescue|  20037|
|Gas Leak (Natural...|  17284|
|   Electrical Hazard|  12608|
|Elevator / Escala...|  11851|
|Odor (Strange / U...|  11680|
|Smoke Investigati...|   9796|
|          Fuel Spill|   5198|
|              HazMat|   3437|
|Industrial Accidents|   2401|
|           Explosion|   2307|
|  Aircraft Emergency|   1196|
|Train / Rail Inci...|   1116|
+--------------------+-------+
only showing top 20 rows



##### Q5. What zip codes accounted for most common calls?
```sql
select CallType, ZipCode, count(*) as count
from fire_service_calls_tbl
where CallType is not null
group by CallType, Zipcode
order by count desc
```

In [0]:
q5_df = fire_df.select("ZipCode", "CallType") \
               .where("CallType is not null") \
               .groupBy("CallType", "ZipCode") \
               .count() \
               .orderBy("count", ascending=False) \
               .show()

+----------------+-------+------+
|        CallType|ZipCode| count|
+----------------+-------+------+
|Medical Incident|  94102|401457|
|Medical Incident|  94103|370215|
|Medical Incident|  94110|249279|
|Medical Incident|  94109|238087|
|Medical Incident|  94124|147564|
|Medical Incident|  94112|139565|
|Medical Incident|  94115|120087|
|Medical Incident|  94122|107602|
|Medical Incident|  94107|107439|
|Medical Incident|  94133| 99050|
|Medical Incident|  94117| 92744|
|Medical Incident|  94134| 83569|
|Medical Incident|  94114| 82378|
|Medical Incident|  94118| 77817|
|Medical Incident|  94121| 74943|
|Medical Incident|  94116| 66742|
|Medical Incident|  94132| 64439|
|  Structure Fire|  94110| 57014|
|Medical Incident|  94105| 56909|
|  Structure Fire|  94103| 55529|
+----------------+-------+------+
only showing top 20 rows



##### Q6. What San Francisco neighborhoods are in the zip codes 94102 and 94103
```sql
select distinct Neighborhood, Zipcode
from fire_service_calls_tbl
where Zipcode== 94102 or Zipcode == 94103
```

In [0]:
q6_df = fire_df.select("Neighborhood", "ZipCode")\
                .where("Zipcode == 94102 or Zipcode == 94103")\
                    .distinct()\
                        .orderBy("ZipCode")\
                            .show()

+--------------------+-------+
|        Neighborhood|ZipCode|
+--------------------+-------+
|    Western Addition|  94102|
|          Tenderloin|  94102|
|            Nob Hill|  94102|
|             Mission|  94102|
|     South of Market|  94102|
|Financial Distric...|  94102|
|        Hayes Valley|  94102|
|        Potrero Hill|  94103|
| Castro/Upper Market|  94103|
|     South of Market|  94103|
|        Hayes Valley|  94103|
|         Mission Bay|  94103|
|          Tenderloin|  94103|
|Financial Distric...|  94103|
|             Mission|  94103|
+--------------------+-------+



##### Q7. What was the sum of all calls, average, min and max of the response times for calls?
```sql
select sum(NumAlarms), avg(Delay), min(Delay), max(Delay)
from fire_service_calls_tbl
```

In [0]:
q7_df = fire_df.select(sum('NumAlarms').alias('sum_Alarms'), round(avg('Delay'), 2).alias('avg_Delay'), round(min('Delay'), 2).alias('min_Delay'), round(max('Delay'), 2).alias('max_Delay')).show()

+----------+---------+---------+---------+
|sum_Alarms|avg_Delay|min_Delay|max_Delay|
+----------+---------+---------+---------+
|   4403441|      3.9|     0.02|  1879.62|
+----------+---------+---------+---------+



##### Q8. How many distinct years of data is in the CSV file?
```sql
select distinct year(to_timestamp(CallDate, "MM/dd/yyyy")) as year_num
from fire_service_calls_tbl
order by year_num
```

In [0]:
q8_df = fire_df.select(year('CallDate').alias('num_Year'))\
                .distinct()\
                    .orderBy('num_Year')\
                        .show()

+--------+
|num_Year|
+--------+
|    2000|
|    2001|
|    2002|
|    2003|
|    2004|
|    2005|
|    2006|
|    2007|
|    2008|
|    2009|
|    2010|
|    2011|
|    2012|
|    2013|
|    2014|
|    2015|
|    2016|
|    2017|
|    2018|
+--------+



##### Q9. What week of the year in 2018 had the most fire calls?
```sql
select weekofyear(to_timestamp(CallDate, "MM/dd/yyyy")) week_year, count(*) as count
from fire_service_calls_tbl 
where year(to_timestamp(CallDate, "MM/dd/yyyy")) == 2018
group by week_year
order by count desc
```

In [0]:
q9_df = fire_df.select(weekofyear('CallDate').alias('week_year'))\
                .where(year('CallDate') == 2018)\
                    .groupBy('week_year')\
                        .count()\
                            .orderBy('count', ascending=False)\
                                .show()

+---------+-----+
|week_year|count|
+---------+-----+
|        1| 6401|
|       25| 6163|
|       13| 6103|
|       22| 6060|
|       44| 6048|
|       27| 6042|
|       16| 6009|
|       40| 6000|
|       43| 5986|
|        5| 5946|
|        2| 5929|
|       18| 5917|
|        9| 5874|
|        8| 5843|
|        6| 5839|
|       21| 5821|
|       38| 5817|
|       10| 5806|
|       23| 5781|
|       32| 5764|
+---------+-----+
only showing top 20 rows



##### Q10. What neighborhoods in San Francisco had the worst response time in 2018?
```sql
select Neighborhood, Delay
from fire_service_calls_tbl 
where year(to_timestamp(CallDate, "MM/dd/yyyy")) == 2018
```

In [0]:
q10_df = fire_df.select('Neighborhood', 'Delay')\
                    .where(year('CallDate')==2018)\
                        .orderBy('Delay', ascending=False)\
                            .show()

+--------------------+------+
|        Neighborhood| Delay|
+--------------------+------+
|  West of Twin Peaks|754.08|
|             Mission|745.93|
|           Chinatown|734.87|
|Bayview Hunters P...|715.77|
|Bayview Hunters P...|714.73|
|Bayview Hunters P...|713.05|
|Bayview Hunters P...| 700.0|
|             Mission|669.42|
|Bayview Hunters P...|609.08|
| Castro/Upper Market|561.17|
|  West of Twin Peaks|539.98|
|      Outer Richmond|508.85|
|           Chinatown|506.95|
|           Chinatown|491.45|
|           Chinatown|491.27|
|           Chinatown|490.03|
|           Chinatown| 485.8|
|     Sunset/Parkside| 457.4|
|Financial Distric...|419.73|
|Financial Distric...|419.63|
+--------------------+------+
only showing top 20 rows

