In [None]:
from pyspark.sql.functions import to_date, col, expr, avg, min, max, count, year, countDistinct, weekofyear

In [ ]:
fire_df = (
    spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load("/FileStore/dataset/fire_department_calls_data.csv")
    )

In [ ]:
renamed_fire_df = fire_df.withColumnRenamed("zipcode_of_incident", "zipcode")
display(renamed_fire_df)

In [ ]:
renamed_fire_df.printSchema()

In [ ]:
transformed_fire_df = (
    renamed_fire_df
    .withColumn("call_date", to_date("call_date"))
    .withColumn("delay", (col("dispatch_dttm")-col("received_dttm")).cast("integer"))
    )
display(transformed_fire_df)
displayHTML("------------------------------------------------------------------------------------------------------")
transformed_fire_df.printSchema()

In [ ]:
transformed_fire_df.cache()

In [ ]:
%md
#### 1) How many distinct types of calls were made to the fire department? 
*İtfaiye departmanına kaç farklı türde çağrı yapılmıştır?*

In [ ]:
# Solution-1:
transformed_fire_df.createOrReplaceTempView("transformed_fire_view")
sql_df = spark.sql("""
                   select count(distinct call_type) 
                   from transformed_fire_view 
                   where call_type is not null
                   """)
display(sql_df)

In [ ]:
# Solution-2:
api_df = transformed_fire_df.where("call_type is not null").select("call_type").distinct()
display(api_df.count())

In [ ]:
%md
#### 2) What are distinct types of calls made to the fire department? 
*İtfaiye departmanına yapılan farklı türdeki çağrılar nelerdir?*

```sql
select distinct call_type 
from demo_db.fire_department_calls 
where call_type is not null;

In [ ]:
api_df = transformed_fire_df.where("call_type is not null").select(expr("call_type as distinct_call_type")).distinct()
api_df.show(truncate=False)
displayHTML("-------------------------------------------------------------------------")
display(api_df)

In [ ]:
%md
#### 3) Find out all responses for delayed times greater than 5 mins?
*5 dakikadan uzun gecikme süreleri için tüm yanıtları (çağruları) bulun* 

```sql
SELECT call_number, delay AS delayed_time
FROM demo_db.fire_department_calls
WHERE delay > 5*60
ORDER BY delayed_time DESC;

In [ ]:
api_df = transformed_fire_df.where("delay > 5*60").select("call_number", "delay")
api_df.show()

In [ ]:
%md
#### 4) What were the most common call types? 
*En yaygın çağrı türleri nelerdi?*

```sql
select call_type, count(*) as call_count 
from demo_db.fire_department_calls
where call_type is not null;
group by call_type
order by call_count desc

In [ ]:
api_df = (transformed_fire_df
          .where("call_type is not null")
          .select("call_type")
          .groupBy("call_type").count()
          .orderBy("count", ascending=False)
          )
api_df.show(truncate=False)
"""
Note: 
1) DataFrame.count() -> Action
2) GroupedData.count() -> Transformation
"""

In [ ]:
%md
#### 5) What zip codes accounted for the most common calls (types)? 
*En yaygın çağrıların hangi posta kodları hesaplandı?*

```sql
select call_type, zipcode, count(*) as call_count 
from demo_db.fire_department_calls
where CallType is not null
group by call_type, zipcode
order by call_count desc

In [ ]:
api_df = (transformed_fire_df
          .select("call_type", "zipcode")
          .where("call_type is not null")
          .groupBy("call_type", "zipcode").count()
          .orderBy("count", ascending=False)
          )
api_df.show(truncate=False)

In [ ]:
%md
#### 6) What San Francisco neighborhoods are in the zip codes 94102 and 94103
*94102 ve 94103 posta kodlarında hangi San Francisco mahalleleri bulunmaktadır?* 

```sql
select neighborhoods_analysis_boundaries 
from demo_db.fire_department_calls
where zipcode in (94102, 94103)


In [ ]:
api_df = transformed_fire_df.where("zipcode in (94102, 94103)").select("neighborhoods_analysis_boundaries").distinct()
api_df.show(truncate=False)

In [ ]:
%md
#### 7) What was the sum of all calls, average, min, and max of the call response times? 
*Tüm çağrıların toplamı ve ortalama, minimum ve maksimum yanıt süreleri nedir?*

```sql
select count(call_number), avg(delay), min(delay), max(delay) 
from demo_db.fire_department_calls

In [ ]:
api_df = transformed_fire_df.agg(
    count("call_number").alias("count_call_number"),
    avg("delay").alias("average_delay"),
    min("delay").alias("minimum_delay"),
    max("delay").alias("maximum_delay")
)
api_df.show()

In [ ]:
%md
#### 8) How many distinct years of data are in the CSV file?
*CSV dosyasındaki kaç farklı yıl verisi vardır?* 

```sql
select count(distinct year(call_date)) from demo_db.fire_department_calls

In [ ]:
# api_df = transformed_fire_df.select(year("call_date").alias("year")).distinct().count()
api_df = transformed_fire_df.select(year("call_date").alias("year")).agg(countDistinct("year"))
api_df.show()

In [ ]:
%md
#### 9) What week of the year in 2006 had the most fire calls? 
*2006'in kaçıncı haftası en fazla itfaiye çağrısını gösteriyor?*

```sql
select weekofyear(call_date) as weekofyear, count(*) as fire_call_count from demo_db.fire_department_calls
where year(call_date) == 2006
group by weekofyear(call_date)
order by fire_call_count desc limit 1

In [ ]:
api_df = (transformed_fire_df
          .where("year(call_date) == 2006")
          .select(weekofyear("call_date").alias("weekofyear"))
          .groupBy("weekofyear").count()
          .orderBy("count", ascending=False).limit(1))
display(api_df)

In [ ]:
%md
#### 10) What neighborhoods in San Francisco had the worst response time in 2006? 
*2006'de San Francisco mahallelerinde en kötü yanıt süresine sahip mahalleler hangileridir?*

```sql
select neighborhoods_analysis_boundaries, avg(delay) as avg_delayed_time
from demo_db.fire_department_calls
where year(call_date) == 2006
group by neighborhoods_analysis_boundaries
order by avg_delayed_time desc limit 1

In [ ]:
api_df = (transformed_fire_df
          .where("year(call_date) == 2006")
          .select("neighborhoods_analysis_boundaries", "delay")
          .groupBy("neighborhoods_analysis_boundaries").avg()
          .orderBy("avg(delay)", ascending=False).limit(1)
          )
display(api_df)