In [0]:
from pyspark.sql import *
from pyspark.sql.functions import *

In [0]:
raw_fire_df = spark.read \
    .format("csv") \
        .option("header", "true") \
            .option("inferSchema", "true") \
                .load("/databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv")

In [0]:
raw_fire_df.printSchema()

In [0]:
display(raw_fire_df)

In [0]:
raw_fire_df.createOrReplaceGlobalTempView("fire_service_calls_view")

In [0]:
renamed_fire_df = raw_fire_df \
    .withColumnRenamed("Call Number", "CallNumber") \
    .withColumnRenamed("Unit ID", "UnitID") \
    .withColumnRenamed("Incident Number", "IncidentNumber") \
    .withColumnRenamed("Call Type", "CallType") \
    .withColumnRenamed("Call Date", "CallDate") \
    .withColumnRenamed("Watch Date", "WatchDate") \
    .withColumnRenamed("Received DtTm", "ReceivedDtTm") \
    .withColumnRenamed("Entry DtTm", "EntryDtTm") \
    .withColumnRenamed("Dispatch DtTm", "DispatchDtTm") \
    .withColumnRenamed("Response DtTm", "ResponseDtTm") \
    .withColumnRenamed("On Scene DtTm", "OnSceneDtTm") \
    .withColumnRenamed("Transport DtTm", "TransportDtTm") \
    .withColumnRenamed("Hospital DtTm", "HospitalDtTm") \
    .withColumnRenamed("Call Final Disposition", "CallFinalDisposition") \
    .withColumnRenamed("Available DtTm", "AvailableDtTm") \
    .withColumnRenamed("Address", "Address") \
    .withColumnRenamed("City", "City") \
    .withColumnRenamed("Zipcode of Incident", "ZipcodeofIncident") \
    .withColumnRenamed("Battalion", "Battalion") \
    .withColumnRenamed("Station Area", "StationArea") \
    .withColumnRenamed("Box", "Box") \
    .withColumnRenamed("Original Priority", "OriginalPriority") \
    .withColumnRenamed("Priority", "Priority") \
    .withColumnRenamed("Final Priority", "FinalPriority") \
    .withColumnRenamed("ALS Unit", "ALSUnit") \
    .withColumnRenamed("Call Type Group", "CallTypeGroup") \
    .withColumnRenamed("Number of Alarms", "NumberofAlarms") \
    .withColumnRenamed("Unit Type", "UnitType") \
    .withColumnRenamed("Unit seq in call disp", "Unitseqincalldisp") \
    .withColumnRenamed("Fire Prevention District", "FirePreventionDistrict") \
    .withColumnRenamed("Supervisor District", "SupervisorDistrict") \
    .withColumnRenamed("Neighborhoods - Analysis Boundaries", "AnalysisBoundaries") \
    .withColumnRenamed("RowID", "RowID") \
    .withColumnRenamed("case_location", "caselocation") \
    .withColumnRenamed("data_as_of", "dataasof") \
    .withColumnRenamed("data_loaded_at", "dataloadedat")

In [0]:
renamed_fire_df.printSchema()

In [0]:
display(renamed_fire_df)

In [0]:
fire_df = renamed_fire_df \
    .withColumn("CallDate",to_date("CallDate", "mm/dd/yyyy")) \
    .withColumn("WatchDate",to_date("WatchDate", "mm/dd/yyyy")) \
    .withColumn("AvailableDtTm",to_timestamp("AvailableDtTm", "mm/dd/yyyy hh:mm:ss a")) \
    .withColumn("Delay",round("Delay",2)) \
        .withColumn("CallYear",year("CallDate"))
display(fire_df)

In [0]:
fire_df.printSchema()

In [0]:
fire_df.cache()

#### Q1. How many distinct types of calls were made to the Fire Department?
select count(distinct CallType) as distinct_call_type_count
from fire_service_calls_tbl
where CallType is not null

In [0]:
fire_df.createOrReplaceTempView("fire_service_calls_view")
q1_sql_df = spark.sql("""
        select count(distinct CallType) as distinct_call_type_count
        from fire_service_calls_view
        where CallType is not null
        """)
display(q1_sql_df)

In [0]:
spark.conf.set("spark.sql.legacy.timeParserPolicy","LEGACY")

In [0]:
q1_fire_df = fire_df.where("CallType is not null") \
    .select(expr("CallType as dist_call_type")) \
        .distinct()
q1_fire_df.count()

#### Q2. What were distinct types of calls made to the Fire Department?
select distinct CallType as distinct_call_types
from fire_service_calls_tbl
where CallType is not null

In [0]:
q2_fire_df = fire_df.where("CallType is not null") \
    .select("CallType") \
    .distinct()
display(q2_fire_df.show())

#### Q3. Find out all response for delayed times greater than 5 mins?
select CallNumber, Delay
from fire_service_calls_tbl
where Delay > 5

In [0]:
q3_fire_df = fire_df.filter(fire_df.Delay > 5) \
    .where("CallType is not null") \
    .select("CallNumber", "Delay") \
        .orderBy("Delay", ascending=False)
display(q3_fire_df.show())

#### Q4. What were the most common call types?
select CallType, count(*) as count
from fire_service_calls_tbl
where CallType is not null
group by CallType
order by count desc

In [0]:
q4_fire_df = fire_df.where("CallType is not null") \
    .select("CallType") \
        .groupBy("CallType") \
            .count() \
                .orderBy("count", ascending=False)
display(q4_fire_df.show())

#### Q5. What zip codes accounted for most common calls?
select CallType, ZipCode, count(*) as count
from fire_service_calls_tbl
where CallType is not null
group by CallType, Zipcode
order by count desc

In [0]:
q5_fire_df = fire_df.where("CallType is not null") \
    .select("CallType", "ZipcodeofIncident") \
        .groupBy("CallType", "ZipcodeofIncident") \
            .count() \
                .orderBy("count", ascending=False)
display(q5_fire_df.show())

#### Q6. What San Francisco neighborhoods are in the zip codes 94102 and 94103
select distinct Neighborhood, Zipcode
from fire_service_calls_tbl
where Zipcode== 94102 or Zipcode == 94103

In [0]:
q6_fire_df = fire_df.where("ZipcodeofIncident == '94102' or ZipcodeofIncident == '94103'") \
    .select("Neighborhood", "ZipcodeofIncident").distinct()
display(q6_fire_df.show())

#### Q7. What was the sum of all calls, average, min and max of the response times for calls?
select sum(NumAlarms), avg(Delay), min(Delay), max(Delay)
from fire_service_calls_tbl

In [0]:
q7_fire_df = fire_df.where("CallType is not null") \
    .select(sum("NumAlarms"), avg("Delay"), min("Delay"), max("Delay"))
display(q7_fire_df.show())

#### Q8. How many distinct years of data is in the CSV file?
select distinct year(to_timestamp(CallDate, "MM/dd/yyyy")) as year_num
from fire_service_calls_tbl
order by year_num

In [0]:
q8_fire_df = fire_df.where("CallDate is not null") \
    .select(year("CallDate").alias("CallYear")).distinct() \
        .orderBy("CallYear", ascending = False)
display(q8_fire_df.show())

#### Q9. What week of the year in 2018 had the most fire calls?
select weekofyear(to_timestamp(CallDate, "MM/dd/yyyy")) week_year, count(*) as count
from fire_service_calls_tbl 
where year(to_timestamp(CallDate, "MM/dd/yyyy")) == 2018
group by week_year
order by count desc

In [0]:
q9_fire_df = fire_df.filter(fire_df["CallYear"] == 2018) \
    .where("CallType is not null") \
        .select("CallYear",weekofyear("CallDate").alias("CallWeek")) \
            .groupBy("CallYear","CallWeek") \
                .count() \
                    .orderBy("count", ascending = False)
display(q9_fire_df)

#### Q10. What neighborhoods in San Francisco had the worst response time in 2018?
select Neighborhood, Delay
from fire_service_calls_tbl 
where year(to_timestamp(CallDate, "MM/dd/yyyy")) == 2018
group by Neighborhood, Delay
order by Delay desc

In [0]:
q10_fire_df = fire_df.filter(fire_df["CallYear"] == 2018) \
    .where("CallType is not null") \
        .select("Neighborhood","Delay") \
            .orderBy("Delay", ascending = False)
display(q10_fire_df)