In [0]:
from pyspark.sql.functions import *

In [0]:
raw_fire_df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema","true") \
    .load("/databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv")

In [0]:
display(raw_fire_df)

In [0]:
renamed_fire_df = raw_fire_df \
    .withColumnRenamed("Call Number", "CallNumber") \
    .withColumnRenamed("Unit ID", "UnitID") \
    .withColumnRenamed("Incident Number", "IncidentNumber") \
    .withColumnRenamed("Call Date", "CallDate") \
    .withColumnRenamed("Watch Date", "WatchDate") \
    .withColumnRenamed("Call Final Disposition", "CallFinalDisposition") \
    .withColumnRenamed("Available DtTm", "AvailableDtTm") \
    .withColumnRenamed("Zipcode of Incident", "Zipcode") \
    .withColumnRenamed("Station Area", "StationArea") \
    .withColumnRenamed("Final Priority", "FinalPriority") \
    .withColumnRenamed("ALS Unit", "ALSUnit") \
    .withColumnRenamed("Call Type Group", "CallTypeGroup") \
    .withColumnRenamed("Unit sequence in call dispatch", "UnitSequenceInCallDispatch") \
    .withColumnRenamed("Fire Prevention District", "FirePreventionDistrict") \
    .withColumnRenamed("Supervisor District", "SupervisorDistrict")

In [0]:
display(renamed_fire_df)

In [0]:
renamed_fire_df.printSchema()

In [0]:
fire_df = renamed_fire_df \
    .withColumn("CallDate", to_date("CallDate", "MM/dd/yyyy")) \
    .withColumn("WatchDate", to_date("WatchDate", "MM/dd/yyyy")) \
    .withColumn("AvailableDtTm", to_timestamp("AvailableDtTm", "MM/dd/yyyy hh:mm:ss a")) \
    .withColumn("Delay", round("Delay", 2))

In [0]:
display(fire_df)

In [0]:
fire_df.printSchema()

In [0]:
fire_df.cache()

##### Q1. How many distinct types of calls were made to the Fire Department?
```SQL
select count(distinct CallType) as distinct_call_type_count
from fire_service_calls_tbl
where CallType is not null
```

In [0]:
fire_df.createOrReplaceTempView("fire_service_calls_view")
q1_sql_df = spark.sql("""
        select count(distinct CallType) as distinct_call_type_count
        from fire_service_calls_view
        where CallType is not null
        """)
display(q1_sql_df)

In [0]:
q1_df = fire_df.where("CallType is not null") \
            .select("CallType") \
            .distinct()
print(q1_df.count())

In [0]:
q1_df1 = fire_df.where("CallType is not null")
q1_df2 = q1_df1.select("CallType")
q1_df3 = q1_df2.distinct()
print(q1_df3.count())

##### Q2. What were distinct types of calls made to the Fire Department?
```sql
select distinct CallType as distinct_call_types
from fire_service_calls_tbl
where CallType is not null
```

In [0]:
q2_df = fire_df.where("CallType is not null") \
            .select(expr("CallType as distinct_call_type")) \
            .distinct()
q2_df.show()

In [0]:
display(q2_df)

##### Q3. Find out all response for delayed times greater than 5 mins?
``` sql
select CallNumber, Delay
from fire_service_calls_tbl
where Delay > 5
```

In [0]:
fire_df.where("Delay > 5") \
    .select("CallNumber", "Delay") \
    .show()

##### Q4. What were the most common call types?
```sql
select CallType, count(*) as count
from fire_service_calls_tbl
where CallType is not null
group by CallType
order by count desc
```

In [0]:
fire_df.select("CallType") \
    .where("CallType is not null") \
    .groupBy("CallType") \
    .count() \
    .orderBy("count", ascending=False) \
    .show()

##### Q5. What zip codes accounted for most common calls?
```sql
select CallType, ZipCode, count(*) as count
from fire_service_calls_tbl
where CallType is not null
group by CallType, Zipcode
order by count desc
```

##### Q6. What San Francisco neighborhoods are in the zip codes 94102 and 94103
```sql
select distinct Neighborhood, Zipcode
from fire_service_calls_tbl
where Zipcode== 94102 or Zipcode == 94103
```

##### Q7. What was the sum of all calls, average, min and max of the response times for calls?
```sql
select sum(NumAlarms), avg(Delay), min(Delay), max(Delay)
from fire_service_calls_tbl
```

##### Q8. How many distinct years of data is in the CSV file?
```sql
select distinct year(to_timestamp(CallDate, "MM/dd/yyyy")) as year_num
from fire_service_calls_tbl
order by year_num
```

##### Q9. What week of the year in 2018 had the most fire calls?
```sql
select weekofyear(to_timestamp(CallDate, "MM/dd/yyyy")) week_year, count(*) as count
from fire_service_calls_tbl 
where year(to_timestamp(CallDate, "MM/dd/yyyy")) == 2018
group by week_year
order by count desc
```

##### Q10. What neighborhoods in San Francisco had the worst response time in 2018?
```sql
select Neighborhood, Delay
from fire_service_calls_tbl 
where year(to_timestamp(CallDate, "MM/dd/yyyy")) == 2018
```