In [0]:
from pyspark.sql.functions import *

In [0]:
raw_df = spark.read.format("csv") \
        .option("header","true") \
        .option("inferSchema","true") \
        .load("/databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv")

In [0]:
# raw_df.show(1)
display(raw_df.limit(10))

In [0]:
# We can see the column names have spaces inbetween them and these are not std notations hence we need to rectify it first
renamed_raw_df = raw_df \
    .withColumnRenamed("Call Number","CallNumber") \
    .withColumnRenamed("Unit ID","UnitID") \
    .withColumnRenamed("Incident Number","IncidentNumber") \
    .withColumnRenamed("call Date","CallDate") \
    .withColumnRenamed("Watch Date","WatchDate") \
    .withColumnRenamed("Call Final Disposition","CallFinalDisposition") \
    .withColumnRenamed("Available DtTm","AvailableDtTm") \
    .withColumnRenamed("Zipcode of Incident","ZipcodeofIncident") \
    .withColumnRenamed("Station Area","StationArea") \
    .withColumnRenamed("Final Priority","FinalPriority") \
    .withColumnRenamed("ALS Unit","ALSUnit") \
    .withColumnRenamed("Call Type Group","CallTypeGroup") \
    .withColumnRenamed("Unit sequence in call dispatch","UnitSequenceInCallDispatch") \
    .withColumnRenamed("Fire Prevention District","FirePreventionDistrict") \
    .withColumnRenamed("Supervisor District","SupervisorDistrict")

display(renamed_raw_df.limit(10))

In [0]:
renamed_raw_df.printSchema()

In [0]:
fire_df = renamed_raw_df \
    .withColumn("CallDate", to_date("CallDate","MM/dd/yyyy")) \
    .withColumn("WatchDate", to_date("WatchDate","MM/dd/yyyy")) \
    .withColumn("AvailableDtTm", to_timestamp("AvailableDtTm","MM/dd/yyyy hh:mm:ss a"))  \
    .withColumn("Delay", round("Delay",2))
    # a is used in AvalilabeDtTm to mark AM PM, without which it will assume a 24 hour timeline, since we have AM PM in our data we need this

In [0]:
display(fire_df.limit(10))

In [0]:
fire_df.printSchema()

In [0]:
# Q1 How many distinct types of calls were made to the department
# sql method --> first we need to create a temp view and then run the sql query
fire_df.createOrReplaceTempView("fire_view")
q1_sql_df = spark.sql(
    """
    SELECT COUNT(DISTINCT CallType) as dist_call_type
    FROM fire_view """
)
display(q1_sql_df)

In [0]:
# Q1 How many distinct types of calls were made to the department
# DataFrame method
q1_df = fire_df.where("CallType is not null") \
    .select("CallType") \
    .distinct()
print(q1_df.count())

In [0]:
# Q2 - What were the distinct types of calls made to the department
q2_df = fire_df.where("CallType is not null") \
    .select(expr("CallType as distinct_calltype")) \
    .distinct()
display(q2_df)

In [0]:
# Q3 Find out all responses for delayed time greater than 5 minutes
q3_df = fire_df.where("Delay > 5") \
    .select("CallType", "Delay")
# display(q3_df.limit(10))
display(q3_df)


In [0]:
# Q4 What were the most common call types
# We have started with select instead of where; We have a groupBy and a count and count is an action
# Count method returns the number of rows in the DataFrame hence it is an action, a transformation always returns a DF
# groupBy groups the DataFrame using specified column so we can run aggregation on them; groupBy returns a GroupedData object; so we have a count method for grouped data which counts the no of records for each group
# Since count is used after the groupby it is a transformation
# DF.count() --> Action
# GroupedData.count() --> Transformation
q4_df = fire_df.select("CallType") \
    .where("CallType is not Null") \
    .groupBy("CallType") \
    .count()\
    .orderBy("count",ascending=False) \
    .show()

In [0]:
# Q5 What zip code accounted for most calls?
q5_df = fire_df.select("CallType","ZipcodeofIncident")\
    .groupBy("ZipcodeofIncident")\
    .count()\
    .orderBy("count",ascending = False)

display(q5_df)


In [0]:
# Q6 What SF neighbhorhoods are in zip code 94102 and 94103?
q6_df = fire_df.select("Neighborhood", "ZipcodeofIncident")\
    .where("zipcodeofincident IN (94102, 94103)")
display(q6_df)


In [0]:
# Q7 What was the sum of all call alarms, average, min and max of the call response time?
q7_df = fire_df.agg(sum("NumAlarms").alias("sumofCallAlarms"),
                    round(avg("Delay"),2).alias("AvgDelay"),
                    round(min("Delay"),2).alias("Mindelay"),
                    round(max("Delay"),2).alias("MaxDelay"))
display(q7_df)


In [0]:
# Q8 How many distinct years of data is present in the dataset?

df_with_year=fire_df.withColumn("CallYear", year("callDate"))
q8_df = df_with_year.select("CallYear").distinct()
display(q8_df)



In [0]:
# Q9 What week of the year 2018 had the most fire calls?
q9_df = df_with_year.filter(df_with_year.CallYear == 2018)\
    .withColumn("WeekOfYear", weekofyear("CallDate"))\
    .groupBy("WeekOfYear")\
        .agg(count("CallType").alias("FireCallCount"))\
        .orderBy("FireCallCount",ascending=False)
display(q9_df)

In [0]:
# Q10 What nighborhoods in SF had the worst response time in 2018?
q10_df = df_with_year.filter(df_with_year.CallYear == 2018)\
    .select("Neighborhood","Delay")\
    .orderBy("Delay",ascending=False)

display(q10_df)