# Prepare
Do your work in a file named prepare for these exercises.

In [1]:
import pandas as pd
import warnings
warnings.filterwarnings("ignore")

from pyspark.sql import SparkSession, DataFrame, Column, Row, GroupedData, \
    DataFrameNaFunctions, DataFrameStatFunctions, functions, types, Window
from pyspark.sql.functions import col, expr
from pyspark.sql import functions as f
import pyspark.sql.types as T
from pyspark.sql.functions import round
from pyspark.sql.functions import format_string
from pyspark.sql.functions import trim, upper
from pyspark.sql.functions import substring
from pyspark.sql.functions import regexp_extract
from pyspark.sql.functions import sum, sumDistinct
import datetime
from pyspark.sql.functions import year, month, dayofmonth
from pyspark.sql.functions import count, countDistinct, approx_count_distinct
from pyspark.sql.functions import mean, stddev, variance, skewness, kurtosis


## Part 1
1. Read the case.csv file from the 311 call data into a Spark DataFrame.

In [2]:
spark = SparkSession.builder \
                    .master("local") \
                    .appName("read") \
                    .enableHiveSupport() \
                    .getOrCreate()

data = "sa311/case.csv"

In [3]:
df = spark.read\
          .csv(data, 
               header=True, 
               inferSchema=True)

In [4]:
df.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- SLA_due_date: string (nullable = true)
 |-- case_late: string (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: integer (nullable = true)



2. How old is the latest (in terms of days past SLA) currently open issue? How long has the oldest (in terms of days since opened) currently opened issue been open?

In [5]:
# Convert the date fields to be of data type timestamp.
df = df.withColumn("case_opened_date", 
                   f.to_timestamp(f.col("case_opened_date"), 
                                  "M/d/yy H:mm")) \
       .withColumn("case_closed_date", 
                   f.to_timestamp(f.col("case_closed_date"),
                                  "M/d/yy H:mm")) \
       .withColumn("SLA_due_date", 
                   f.to_timestamp(f.col("SLA_due_date"),
                                  "M/d/yy H:mm")) \
       .withColumn("case_late", 
                   f.when(df.case_late == "YES", True) \
                    .otherwise(False)) \
       .withColumn("case_closed", f.when(df.case_closed == "YES", True) \
                    .otherwise(False))

In [6]:
caseOpenFilter = df.case_closed == "NO"

In [7]:
df.select("SLA_due_date",
          "case_opened_date",
          "case_closed_date",
          "case_closed") \
  .where(caseOpenFilter) \
  .sort("SLA_due_date", ascending=False) \
  .show(3, vertical=True, truncate=False)

-RECORD 0-------------------------------
 SLA_due_date     | 2022-05-27 15:07:00 
 case_opened_date | 2018-07-27 15:07:00 
 case_closed_date | null                
 case_closed      | false               
-RECORD 1-------------------------------
 SLA_due_date     | 2022-05-25 15:42:00 
 case_opened_date | 2018-07-25 15:42:00 
 case_closed_date | null                
 case_closed      | false               
-RECORD 2-------------------------------
 SLA_due_date     | 2022-05-24 10:01:00 
 case_opened_date | 2018-07-24 10:01:00 
 case_closed_date | null                
 case_closed      | false               
only showing top 3 rows



In [8]:
df.groupBy("case_closed") \
  .count() \
  .show(3, truncate=False)

+-----------+------+
|case_closed|count |
+-----------+------+
|true       |823594|
|false      |18110 |
+-----------+------+



In [9]:
# Compute the number of days past the SLA due date.
# --Spark
# DATEDIFF ( enddate , startdate )

df = df.withColumn("days_past_SLA", 
                   f.datediff(f.current_timestamp(), 
                              "SLA_due_date"))
df.select("SLA_due_date",
          "days_past_SLA",
          "num_days_late",
          "case_opened_date",
          "case_closed") \
  .where(caseOpenFilter) \
  .sort("days_past_SLA", ascending=False) \
  .show(3, vertical=True, truncate=False)

# How old is the latest (in terms of days past 
# SLA) currently open issue?
# The oldest is 851 days past the SLA due date 
# calculated to today's day (2019-05-18), or 349 days
# past the SLA due date per the "num_days_late" 
# column.

-RECORD 0-------------------------------
 SLA_due_date     | 2017-01-17 08:30:00 
 days_past_SLA    | 851                 
 num_days_late    | 348.6458333         
 case_opened_date | 2017-01-01 13:48:00 
 case_closed      | false               
-RECORD 1-------------------------------
 SLA_due_date     | 2017-01-17 11:26:00 
 days_past_SLA    | 851                 
 num_days_late    | 348.52356480000003  
 case_opened_date | 2017-01-02 11:26:00 
 case_closed      | false               
-RECORD 2-------------------------------
 SLA_due_date     | 2017-01-17 08:30:00 
 days_past_SLA    | 851                 
 num_days_late    | 348.6458333         
 case_opened_date | 2017-01-01 13:57:00 
 case_closed      | false               
only showing top 3 rows



In [10]:
# Michael Moran's solution gave a very different answer!

latest_open_issue = f.max(df.where(~df.case_closed).case_opened_date).alias("latest_open_issue")

latest_date = df.select(f.max("case_opened_date")).first()[0]
print(latest_date)

df.where(df.case_opened_date == latest_date).show(truncate=False, vertical=True)

2018-08-08 10:38:00
-RECORD 0--------------------------------------------------------------
 case_id              | 1014759619                                     
 case_opened_date     | 2018-08-08 10:38:00                            
 case_closed_date     | null                                           
 SLA_due_date         | 2018-08-22 10:38:00                            
 case_late            | false                                          
 num_days_late        | -13.44305556                                   
 case_closed          | false                                          
 dept_division        | Code Enforcement                               
 service_request_type | Bandit Signs                                   
 SLA_days             | 14.0                                           
 case_status          | Open                                           
 source_id            | CRM_Listener                                   
 request_address      | 1935  MILITARY DR SW

In [10]:
# Compute the number of days since opened.
# --Spark
# DATEDIFF ( enddate , startdate )

df = df.withColumn("days_since_opening", 
                   f.datediff(f.current_timestamp(), 
                              "case_opened_date"))
df.select("days_since_opening",
          "case_opened_date",
          "case_closed") \
  .where(caseOpenFilter) \
  .sort("days_since_opening", ascending=False) \
  .show(3, truncate=False)

# How long has the oldest (in terms of days since
# opened) currently opened issue been open?
# The oldest in terms of days since opened has
# been open for 867 days calculated to today's date
# (2019-05-18).

+------------------+-------------------+-----------+
|days_since_opening|case_opened_date   |case_closed|
+------------------+-------------------+-----------+
|867               |2017-01-01 13:48:00|false      |
|867               |2017-01-01 13:57:00|false      |
|866               |2017-01-02 11:26:00|false      |
+------------------+-------------------+-----------+
only showing top 3 rows



In [11]:
# Michael Moran's solution (same answer)
(df.select(f.min(df.where(~df.case_closed).case_opened_date).alias("earliest_open_case"))
   .withColumn("now", f.to_timestamp(f.current_timestamp(), "M/d/y H:mm"))
   .withColumn("date_difference", f.datediff(f.col("now"), f.col("earliest_open_case")))
   .show())

+-------------------+-------------------+---------------+
| earliest_open_case|                now|date_difference|
+-------------------+-------------------+---------------+
|2017-01-01 03:22:00|2019-05-18 15:56:53|            867|
+-------------------+-------------------+---------------+



3. How many Stray Animal cases are there?

In [12]:
df.groupBy("service_request_type") \
  .count() \
  .sort("count", ascending=False) \
  .show(6, truncate=False)
# 26760 Stray Animal cases

+--------------------------+-----+
|service_request_type      |count|
+--------------------------+-----+
|No Pickup                 |86855|
|Overgrown Yard/Trash      |65895|
|Bandit Signs              |32910|
|Damaged Cart              |30338|
|Front Or Side Yard Parking|28794|
|Stray Animal              |26760|
+--------------------------+-----+
only showing top 6 rows



In [13]:
df.where(f.lower(df.service_request_type) == "stray animal").count()

26760

4. How many service requests that are assigned to the Field Operations department (dept_division) are not classified as "Officer Standby" request type (service_request_type)?

In [14]:
from pyspark.sql.functions import sum, sumDistinct

In [15]:
FO_requests = df.groupBy("service_request_type", "dept_division") \
  .count() \
  .where(df.dept_division == "Field Operations") \
  .where(df.service_request_type != "Officer Standby") \
  .sort("count", ascending=False) 

FO_requests.show(truncate=False)

+-------------------------------+----------------+-----+
|service_request_type           |dept_division   |count|
+-------------------------------+----------------+-----+
|Stray Animal                   |Field Operations|26760|
|Aggressive Animal(Non-Critical)|Field Operations|24882|
|Animal Neglect                 |Field Operations|13441|
|Trapped/Confined Animal        |Field Operations|11354|
|Public Nuisance(Own Animal)    |Field Operations|10715|
|Injured Animal(Critical)       |Field Operations|9633 |
|Aggressive Animal(Critical)    |Field Operations|5266 |
|Animal Bite(Non-Critical)      |Field Operations|4750 |
|Animal Permits Request         |Field Operations|3026 |
|Animal Cruelty(Critical)       |Field Operations|3001 |
|Animal Bite(Critical)          |Field Operations|708  |
|City Council Animal Request    |Field Operations|365  |
|Spay/Neuter Request Response   |Field Operations|1    |
+-------------------------------+----------------+-----+



In [16]:
FO_requests.agg(sum("count")).show()
# 113,902 service requests that are assigned to 
# the Field Operations department (dept_division)
# are not classified as "Officer Standby" request 
# type (service_request_type)

+----------+
|sum(count)|
+----------+
|    113902|
+----------+



In [17]:
df.where(f.lower(df.dept_division) == "field operations") \
  .where(f.lower(f.col("service_request_type")) != "officer standby") \
  .count()

113902

5. Create a new DataFrame without any information related to dates or location.

In [18]:
df.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- SLA_due_date: timestamp (nullable = true)
 |-- case_late: boolean (nullable = false)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: boolean (nullable = false)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: integer (nullable = true)
 |-- days_past_SLA: integer (nullable = true)
 |-- days_since_opening: integer (nullable = true)



In [19]:
no_dates = df.select('case_id', 
                     'case_late',
                     'num_days_late', 
                     'case_closed',
                     'dept_division', 
                     'service_request_type',
                     'SLA_days',
                     'case_status',
                     'source_id',
                     'days_past_SLA',
                     'days_since_opening')

no_dates.columns

['case_id',
 'case_late',
 'num_days_late',
 'case_closed',
 'dept_division',
 'service_request_type',
 'SLA_days',
 'case_status',
 'source_id',
 'days_past_SLA',
 'days_since_opening']

In [20]:
# OR
df_wo_dates_locations = df.drop("case_opened_date", 
                                "case_closed_date", 
                                "SLA_due_date", 
                                "request_address", 
                                "council_district")

df_wo_dates_locations.columns

['case_id',
 'case_late',
 'num_days_late',
 'case_closed',
 'dept_division',
 'service_request_type',
 'SLA_days',
 'case_status',
 'source_id',
 'days_past_SLA',
 'days_since_opening']

6. Read dept.csv into a Spark DataFrame. Inspect the dept_name column. Replace the missing values with "other".

In [21]:
data = "sa311/dept.csv"
dept_df = spark.read\
          .csv(data, 
               header=True, 
               inferSchema=True)
dept_df.printSchema()

root
 |-- dept_division: string (nullable = true)
 |-- dept_name: string (nullable = true)
 |-- standardized_dept_name: string (nullable = true)
 |-- dept_subject_to_SLA: string (nullable = true)



In [22]:
dept_df.groupBy("dept_name") \
  .count() \
  .show(truncate=False)

+-------------------------+-----+
|dept_name                |count|
+-------------------------+-----+
|Animal Care Services     |1    |
|null                     |1    |
|Solid Waste Management   |4    |
|Development Services     |1    |
|Trans & Cap Improvements |7    |
|Customer Service         |1    |
|Metro Health             |3    |
|Parks and Recreation     |7    |
|Code Enforcement Services|6    |
|City Council             |8    |
+-------------------------+-----+



In [23]:
dept_df = dept_df.na.fill({"dept_name": "other"})

In [24]:
dept_df.groupBy("dept_name") \
  .count() \
  .show(truncate=False)

+-------------------------+-----+
|dept_name                |count|
+-------------------------+-----+
|Animal Care Services     |1    |
|Solid Waste Management   |4    |
|other                    |1    |
|Development Services     |1    |
|Trans & Cap Improvements |7    |
|Customer Service         |1    |
|Metro Health             |3    |
|Parks and Recreation     |7    |
|Code Enforcement Services|6    |
|City Council             |8    |
+-------------------------+-----+



In [25]:
dept_df.printSchema()

root
 |-- dept_division: string (nullable = true)
 |-- dept_name: string (nullable = false)
 |-- standardized_dept_name: string (nullable = true)
 |-- dept_subject_to_SLA: string (nullable = true)



## Part 2
1. Convert the council_district column to a string column.

In [26]:
df.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- SLA_due_date: timestamp (nullable = true)
 |-- case_late: boolean (nullable = false)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: boolean (nullable = false)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: integer (nullable = true)
 |-- days_past_SLA: integer (nullable = true)
 |-- days_since_opening: integer (nullable = true)



In [27]:
df = df.withColumn("council_district", 
              df.council_district.cast("string"))

In [28]:
df.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- SLA_due_date: timestamp (nullable = true)
 |-- case_late: boolean (nullable = false)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: boolean (nullable = false)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = true)
 |-- days_past_SLA: integer (nullable = true)
 |-- days_since_opening: integer (nullable = true)



2. Extract the year from the case_closed_date column.

In [29]:
df.select(f.year("case_closed_date") \
  .alias("case_closed_year")) \
  .show(5)

+----------------+
|case_closed_year|
+----------------+
|            2018|
|            2018|
|            2018|
|            2018|
|            2018|
+----------------+
only showing top 5 rows



In [30]:
df = df.withColumn("year_closed", 
              year("case_closed_date"))

In [31]:
df.show(2, vertical=True, truncate=False)

-RECORD 0----------------------------------------------------
 case_id              | 1014127332                           
 case_opened_date     | 2018-01-01 00:42:00                  
 case_closed_date     | 2018-01-01 12:29:00                  
 SLA_due_date         | 2020-09-26 00:42:00                  
 case_late            | false                                
 num_days_late        | -998.5087616000001                   
 case_closed          | true                                 
 dept_division        | Field Operations                     
 service_request_type | Stray Animal                         
 SLA_days             | 999.0                                
 case_status          | Closed                               
 source_id            | svcCRMLS                             
 request_address      | 2315  EL PASO ST, San Antonio, 78207 
 council_district     | 5                                    
 days_past_SLA        | -497                                 
 days_si

3. Convert num_days_late from days to hours in new columns num_hours_late.

In [32]:
df = df.withColumn("num_hours_late", 
              df.num_days_late * 24)

In [33]:
df.select("num_hours_late", "num_days_late") \
  .show(3, truncate=False)

+-------------------+-------------------+
|num_hours_late     |num_days_late      |
+-------------------+-------------------+
|-23964.2102784     |-998.5087616000001 |
|-48.302500007999996|-2.0126041669999997|
|-72.536111112      |-3.022337963       |
+-------------------+-------------------+
only showing top 3 rows



4. Convert the case_late column to a boolean column.
##### Done in # 1 of part 1.

In [34]:
df.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- SLA_due_date: timestamp (nullable = true)
 |-- case_late: boolean (nullable = false)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: boolean (nullable = false)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = true)
 |-- days_past_SLA: integer (nullable = true)
 |-- days_since_opening: integer (nullable = true)
 |-- year_closed: integer (nullable = true)
 |-- num_hours_late: double (nullable = true)



But could have done it here thusly...

In [35]:
# df.select("case_late") \
#   .withColumn("is_case_late", (df["case_late"] == "YES")) \
#   .show(10, truncate=False)

In [36]:
df.show(3, vertical=True, truncate=False)

-RECORD 0-----------------------------------------------------
 case_id              | 1014127332                            
 case_opened_date     | 2018-01-01 00:42:00                   
 case_closed_date     | 2018-01-01 12:29:00                   
 SLA_due_date         | 2020-09-26 00:42:00                   
 case_late            | false                                 
 num_days_late        | -998.5087616000001                    
 case_closed          | true                                  
 dept_division        | Field Operations                      
 service_request_type | Stray Animal                          
 SLA_days             | 999.0                                 
 case_status          | Closed                                
 source_id            | svcCRMLS                              
 request_address      | 2315  EL PASO ST, San Antonio, 78207  
 council_district     | 5                                     
 days_past_SLA        | -497                           

5. Convert the SLA_days columns to a double column.

##### This was done by inferSchema when the csv was read. But to do this manually, I would do: 
```
df = df.withColumn("SLA_days",
                    df.SLA_days.cast("double"))
```

In [37]:
df.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- SLA_due_date: timestamp (nullable = true)
 |-- case_late: boolean (nullable = false)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: boolean (nullable = false)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = true)
 |-- days_past_SLA: integer (nullable = true)
 |-- days_since_opening: integer (nullable = true)
 |-- year_closed: integer (nullable = true)
 |-- num_hours_late: double (nullable = true)



In [38]:
dept_df.printSchema()

root
 |-- dept_division: string (nullable = true)
 |-- dept_name: string (nullable = false)
 |-- standardized_dept_name: string (nullable = true)
 |-- dept_subject_to_SLA: string (nullable = true)



6. Pull it all together -- merge depts_df and df

Used an "inner" join: Use a join expression and the value inner to return only those rows for which the join expression is true. Inner join is the default, so could have left off that parameter.

This gives us a list of cases associated with a department and the corresponding department information.

In [39]:
joined_df = df.join(dept_df, "dept_division", "inner")
joined_df.show(2, vertical=True, truncate=False)

-RECORD 0------------------------------------------------------
 dept_division          | Field Operations                     
 case_id                | 1014127332                           
 case_opened_date       | 2018-01-01 00:42:00                  
 case_closed_date       | 2018-01-01 12:29:00                  
 SLA_due_date           | 2020-09-26 00:42:00                  
 case_late              | false                                
 num_days_late          | -998.5087616000001                   
 case_closed            | true                                 
 service_request_type   | Stray Animal                         
 SLA_days               | 999.0                                
 case_status            | Closed                               
 source_id              | svcCRMLS                             
 request_address        | 2315  EL PASO ST, San Antonio, 78207 
 council_district       | 5                                    
 days_past_SLA          | -497          

Check to see if any rows were not associated with a department and were dropped...

In [40]:
df.join(dept_df, df.dept_division == dept_df.dept_division, "left_anti") \
    .show(vertical=True, truncate=False)

(0 rows)



Or instructions might have meant to display all the work I just did...

In [41]:
df.select(col('council_district').cast('string'), 
           col('case_closed_date'), 
           f.regexp_extract(col('case_closed_date'), '\d+/\d+/(\d+)', 1).alias('year'), 
           col('case_late'), 
           col('num_days_late')) \
 .withColumn('num_hours_late', f.round(df.num_days_late * 24, 2)) \
 .withColumn('case_late_bool', col('case_late') == 'YES') \
 .show(3, vertical=True, truncate=False)

-RECORD 0-------------------------------
 council_district | 5                   
 case_closed_date | 2018-01-01 12:29:00 
 year             |                     
 case_late        | false               
 num_days_late    | -998.5087616000001  
 num_hours_late   | -23964.21           
 case_late_bool   | false               
-RECORD 1-------------------------------
 council_district | 3                   
 case_closed_date | 2018-01-03 08:11:00 
 year             |                     
 case_late        | false               
 num_days_late    | -2.0126041669999997 
 num_hours_late   | -48.3               
 case_late_bool   | false               
-RECORD 2-------------------------------
 council_district | 3                   
 case_closed_date | 2018-01-02 07:57:00 
 year             |                     
 case_late        | false               
 num_days_late    | -3.022337963        
 num_hours_late   | -72.54              
 case_late_bool   | false               
only showing top

## Part 3
1. Create a DataFrame with all combinations of council_district and service_request_type (regardless of whether the combination is observed in the data).

In [42]:
all_dist_serv = df.join(dept_df, df.dept_division == dept_df.dept_division, "full_outer")
all_dist_serv.show(2, vertical=True, truncate=False)

-RECORD 0----------------------------------------------------------
 case_id                | 1014127378                               
 case_opened_date       | 2018-01-01 08:24:00                      
 case_closed_date       | 2018-01-02 07:37:00                      
 SLA_due_date           | 2018-01-03 08:24:00                      
 case_late              | false                                    
 num_days_late          | -1.03287037                              
 case_closed            | true                                     
 dept_division          | Miscellaneous                            
 service_request_type   | Dead Animal - Cat                        
 SLA_days               | 2.0                                      
 case_status            | Closed                                   
 source_id              | 139344                                   
 request_address        | 7059  TIMBERCREEK DR, San Antonio, 78227 
 council_district       | 6                     

2. Join the case data with the source and department data.

In [43]:
source_df = spark.read.csv('./sa311/source.csv', 
                           sep=",", 
                           header=True, 
                           inferSchema=True)
source_df.printSchema()

root
 |-- source_id: string (nullable = true)
 |-- source_username: string (nullable = true)



In [44]:
joined_df.printSchema()

root
 |-- dept_division: string (nullable = true)
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- SLA_due_date: timestamp (nullable = true)
 |-- case_late: boolean (nullable = false)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: boolean (nullable = false)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = true)
 |-- days_past_SLA: integer (nullable = true)
 |-- days_since_opening: integer (nullable = true)
 |-- year_closed: integer (nullable = true)
 |-- num_hours_late: double (nullable = true)
 |-- dept_name: string (nullable = false)
 |-- standardized_dept_name: string (nullable = true)
 |-- dept_subject_to_SLA: string (nullable = true)



In [45]:
all_df = joined_df.join(source_df, joined_df.source_id == source_df.source_id, 'left_outer')
all_df.printSchema()
all_df.show(2, vertical=True, truncate=False)

root
 |-- dept_division: string (nullable = true)
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- SLA_due_date: timestamp (nullable = true)
 |-- case_late: boolean (nullable = false)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: boolean (nullable = false)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = true)
 |-- days_past_SLA: integer (nullable = true)
 |-- days_since_opening: integer (nullable = true)
 |-- year_closed: integer (nullable = true)
 |-- num_hours_late: double (nullable = true)
 |-- dept_name: string (nullable = false)
 |-- standardized_dept_name: string (nullable = true)
 |-- dept_subject_to_SLA: string (nullable = true)
 |-- source_id: string 

3. Are there any cases that do not have a request source? NO

In [46]:
joined_df.join(source_df, joined_df.source_id == source_df.source_id, "left_anti") \
         .show(vertical=True, truncate=False)

(0 rows)



# Part 4
1. Who are the top 10 service request types in terms of number of requests?

In [47]:
all_df.groupBy('service_request_type') \
      .agg(count('service_request_type').alias('count')) \
      .sort('count',ascending=False) \
      .show(10, truncate=False)

+--------------------------------+-----+
|service_request_type            |count|
+--------------------------------+-----+
|No Pickup                       |89210|
|Overgrown Yard/Trash            |66403|
|Bandit Signs                    |32968|
|Damaged Cart                    |31163|
|Front Or Side Yard Parking      |28920|
|Stray Animal                    |27361|
|Aggressive Animal(Non-Critical) |25492|
|Cart Exchange Request           |22608|
|Junk Vehicle On Private Property|21649|
|Pot Hole Repair                 |20827|
+--------------------------------+-----+
only showing top 10 rows



2. Who are the top 10 service request types in terms of average days late?

In [48]:
all_df.groupBy('service_request_type') \
      .agg(mean('num_days_late').alias('avg_days_late')) \
      .sort('avg_days_late',ascending=False) \
      .show(10, truncate=False)

+--------------------------------------+------------------+
|service_request_type                  |avg_days_late     |
+--------------------------------------+------------------+
|Zoning: Junk Yards                    |175.95636210420932|
|Labeling for Used Mattress            |162.43032902285717|
|Record Keeping of Used Mattresses     |153.99724039428568|
|Signage Requied for Sale of Used Mattr|151.63868055333333|
|Storage of Used Mattress              |142.112556415     |
|Zoning: Recycle Yard                  |135.9285161247979 |
|Donation Container Enforcement        |131.75610506358706|
|License Requied Used Mattress Sales   |128.79828704142858|
|Traffic Signal Graffiti               |101.79846062200002|
|Complaint                             |72.87050230311685 |
+--------------------------------------+------------------+
only showing top 10 rows



3. Does number of days late depend on department?
##### Code enforcement wins this prize!

In [49]:
all_df.groupBy('service_request_type', 'dept_division') \
      .agg(mean('num_days_late').alias('avg_days_late')) \
      .sort('avg_days_late',ascending=False) \
      .show(10, truncate=False)

+--------------------------------------+---------------------------+------------------+
|service_request_type                  |dept_division              |avg_days_late     |
+--------------------------------------+---------------------------+------------------+
|Zoning: Junk Yards                    |Code Enforcement           |175.95636210420932|
|Labeling for Used Mattress            |Code Enforcement (IntExp)  |162.43032902285717|
|Record Keeping of Used Mattresses     |Code Enforcement (IntExp)  |153.99724039428568|
|Signage Requied for Sale of Used Mattr|Code Enforcement (IntExp)  |151.63868055333333|
|Storage of Used Mattress              |Code Enforcement (IntExp)  |142.112556415     |
|Zoning: Recycle Yard                  |Code Enforcement (Internal)|135.9285161247979 |
|Donation Container Enforcement        |Code Enforcement           |131.75610506358706|
|License Requied Used Mattress Sales   |Code Enforcement (IntExp)  |128.79828704142858|
|Traffic Signal Graffiti        

Calculate average days late by department.

In [50]:
all_df.groupBy('dept_division') \
      .agg(mean('num_days_late').alias('avg_days_late')) \
      .sort('avg_days_late',ascending=False) \
      .show(10, truncate=False)

+-----------------------------+------------------+
|dept_division                |avg_days_late     |
+-----------------------------+------------------+
|Code Enforcement (Internal)  |135.9285161247979 |
|Reservations                 |66.03116319       |
|311 Call Center              |59.737091496300735|
|Director's Office Horizontal |37.57064670295004 |
|Engineering Division         |13.433724555869683|
|Shops                        |9.641261768722693 |
|Tree Crew                    |4.723282812065398 |
|Solid Waste                  |3.519023919876228 |
|Trades                       |3.2319771412769382|
|Clean and Green Natural Areas|1.6914689194878048|
+-----------------------------+------------------+
only showing top 10 rows



4. How do number of days late depend on department division and request type?

In [51]:
all_df.groupBy('dept_division', 'service_request_type') \
      .agg(mean('num_days_late').alias('avg_days_late')) \
      .sort('avg_days_late',ascending=False) \
      .show(truncate=False)

+----------------------------+--------------------------------------+------------------+
|dept_division               |service_request_type                  |avg_days_late     |
+----------------------------+--------------------------------------+------------------+
|Code Enforcement            |Zoning: Junk Yards                    |175.95636210420932|
|Code Enforcement (IntExp)   |Labeling for Used Mattress            |162.43032902285717|
|Code Enforcement (IntExp)   |Record Keeping of Used Mattresses     |153.99724039428568|
|Code Enforcement (IntExp)   |Signage Requied for Sale of Used Mattr|151.63868055333333|
|Code Enforcement (IntExp)   |Storage of Used Mattress              |142.112556415     |
|Code Enforcement (Internal) |Zoning: Recycle Yard                  |135.9285161247979 |
|Code Enforcement            |Donation Container Enforcement        |131.75610506358706|
|Code Enforcement (IntExp)   |License Requied Used Mattress Sales   |128.79828704142858|
|Signals             

For the most part, the request type does not significantly vary the days late in each department. 

Consider the number of requests that each department receives.

In [52]:
all_df.groupBy('dept_division') \
      .agg(count('service_request_type').alias('count_requests')) \
      .sort('count_requests',ascending=False) \
      .show(10, truncate=False)

+------------------+--------------+
|dept_division     |count_requests|
+------------------+--------------+
|Code Enforcement  |274298        |
|Waste Collection  |220696        |
|Field Operations  |119362        |
|Miscellaneous     |46165         |
|Streets           |39193         |
|Signals           |21095         |
|Brush             |18610         |
|Signs and Markings|18127         |
|Dangerous Premise |15648         |
|Graffiti (IntExp) |15154         |
+------------------+--------------+
only showing top 10 rows



Code Enforcement does have the most requests, but Waste Collection is second and it 
was much lower on the number of days late.

In [53]:
spark.stop()