In [1]:
import pandas as pd
import numpy as np

import pyspark
spark = pyspark.sql.SparkSession.builder.getOrCreate()
from pyspark.sql.functions import *

## This exercises uses the case.csv, dept.csv, and source.csv files from the san antonio 311 call dataset.

### Read the case, department, and source data into their own spark dataframes.

In [3]:
#Create the source df
source_df = spark.read.csv("source.csv", sep=",", header=True, inferSchema=True)

#Create the department df
dept_df = spark.read.csv("dept.csv", sep=",", header=True, inferSchema=True)

#Create the case df
case_df = spark.read.csv("case.csv", header=True, inferSchema=True)

In [4]:
print("--- case ---")
case_df.show(1, vertical=True)
print("--- dept ---")
dept_df.show()
print("--- source ---")
source_df.show()

--- case ---
-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 SLA_due_date         | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
only showing top 1 row

--- dept ---
+--------------------+--------------------+----------------------+-------------------+
|       dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+--------------------+--------------------+----------------------+-------------------+

### Let's see how writing to the local disk works in spark:

- Write the code necessary to store the source data in both csv and json format, store these as sources_csv and sources_json
- Inspect your folder structure. What do you notice?

In [5]:
# write to json
source_df.write.json("source_json", mode="overwrite")

In [6]:
#Write to csv
source_df.write.csv("source_csv", mode="overwrite")

There are folders for each json and csv. Within each file folder is a success document showing that the write was a success and a document with the data.

###  Inspect the data in your dataframes. Are the data types appropriate? Write the code necessary to cast the values to the appropriate types.

In [7]:
#Check the dtypes for source_df
source_df

#takeaways: source_id is a string and can stay that way for now

DataFrame[source_id: string, source_username: string]

In [8]:
#Check the dtypes for dept
dept_df

#takeaways:
# All good except for dept_subject_to_SLA could be converted to abool

DataFrame[dept_division: string, dept_name: string, standardized_dept_name: string, dept_subject_to_SLA: string]

In [9]:
#check the dtypes for case_df
case_df

#takeaways:
#case_id --> string
#case_opened_date, case_closed_date, SLA_due_date --> datetime
#case_late --> abool
#SLA_days --> int
#council_district --> string

DataFrame[case_id: int, case_opened_date: string, case_closed_date: string, SLA_due_date: string, case_late: string, num_days_late: double, case_closed: string, dept_division: string, service_request_type: string, SLA_days: double, case_status: string, source_id: string, request_address: string, council_district: int]

In [10]:
case_df.show(1, vertical=True)


-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 SLA_due_date         | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
only showing top 1 row



In [11]:
#case_id --> string
case_df.select(case_df.case_id.cast("string"))

#case_opened_date, case_closed_date, SLA_due_date --> datetime

fmt = "M/d/yy H:mm"
case_df = (
    case_df.withColumn("case_opened_date", to_timestamp("case_opened_date", fmt))
    .withColumn("case_closed_date", to_timestamp("case_opened_date", fmt))
    .withColumn("case_due_date", to_timestamp("case_opened_date", fmt))
)

#case_late --> abool


#SLA_days --> int
#council_district --> string

**1. How old is the latest (in terms of days past SLA) currently open issue? How long has the oldest (in terms of days since opened) currently opened issue been open?**

In [37]:
#How old is the latest currently open issue?
case_df.where((case_df.case_closed == 'NO') & (case_df.num_days_late > 0)).select(min(case_df.num_days_late)).show()


+------------------+
|min(num_days_late)|
+------------------+
|       0.060810185|
+------------------+



In [15]:
#How long has the oldest (in terms of days since opened) currently opened issue been open?
case_df.where((case_df.case_closed == 'NO') & (case_df.num_days_late > 0)).select(round(max(case_df.num_days_late),2)).show()

+----------------------------+
|round(max(num_days_late), 2)|
+----------------------------+
|                      348.65|
+----------------------------+



**2. How many Stray Animal cases are there?**

In [16]:
# Need to groupby case type
(
    case_df.groupBy('service_request_type').count()
    .where(case_df.service_request_type == 'Stray Animal').show()
)

+--------------------+-----+
|service_request_type|count|
+--------------------+-----+
|        Stray Animal|26760|
+--------------------+-----+



**3. How many service requests that are assigned to the Field Operations department (dept_division) are not classified as "Officer Standby" request type (service_request_type)?**


In [31]:
(
    case_df.select('service_request_type')
    .where(case_df.dept_division == 'Field Operations')
    .where(case_df.service_request_type != 'Officer Standby').count()
)

113902

**4. Convert the council_district column to a string column.**

In [20]:
case_df = case_df.withColumn("council_district", col("council_district").cast("string"))

In [21]:
case_df.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- SLA_due_date: string (nullable = true)
 |-- case_late: string (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = true)
 |-- case_due_date: timestamp (nullable = true)



**5. Extract the year from the case_closed_date column.**


In [22]:
case_df = case_df.withColumn('year', year("case_closed_date"))

In [23]:
case_df.select(case_df.year).show(5)

+----+
|year|
+----+
|2018|
|2018|
|2018|
|2018|
|2018|
+----+
only showing top 5 rows



**6. Convert num_days_late from days to hours in new columns num_hours_late.**

In [25]:
case_df.withColumn('num_hours_late', case_df.num_days_late / 24).show(5, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 00:42:00  
 SLA_due_date         | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
 case_due_date        | 2018-01-01 00:42:00  
 year                 | 2018                 
 num_hours_late       | -41.60453173333334   
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 2018-01-01 00:46:00  
 case_closed_date     | 2018-01-01

**7. Join the case data with the source and department data.**

In [26]:
#join the case and source dataframes
case_source = case_df.join(source_df, on='source_id', how='left')

#Then join the dept df to the case_source df
case_source_dept = case_source.join(dept_df, on='dept_division', how='left')

In [27]:
#Check the join
case_source_dept.show(1,  vertical=True)


-RECORD 0--------------------------------------
 dept_division          | Field Operations     
 source_id              | svcCRMLS             
 case_id                | 1014127332           
 case_opened_date       | 2018-01-01 00:42:00  
 case_closed_date       | 2018-01-01 00:42:00  
 SLA_due_date           | 9/26/20 0:42         
 case_late              | NO                   
 num_days_late          | -998.5087616000001   
 case_closed            | YES                  
 service_request_type   | Stray Animal         
 SLA_days               | 999.0                
 case_status            | Closed               
 request_address        | 2315  EL PASO ST,... 
 council_district       | 5                    
 case_due_date          | 2018-01-01 00:42:00  
 year                   | 2018                 
 source_username        | svcCRMLS             
 dept_name              | Animal Care Services 
 standardized_dept_name | Animal Care Services 
 dept_subject_to_SLA    | YES           

**8. Are there any cases that do not have a request source?**

In [36]:
case_source_dept.filter(col('source_id').isNull()).show(vertical=True)

(0 rows)



**9. What are the top 10 service request types in terms of number of requests?**

In [29]:
(
    case_source_dept.select('service_request_type')
    .groupBy('service_request_type').count()
    .sort(desc('count')).show(5,truncate=False)
)

+--------------------------+-----+
|service_request_type      |count|
+--------------------------+-----+
|No Pickup                 |89210|
|Overgrown Yard/Trash      |66403|
|Bandit Signs              |32968|
|Damaged Cart              |31163|
|Front Or Side Yard Parking|28920|
+--------------------------+-----+
only showing top 5 rows



**10. What are the top 10 service request types in terms of average days late?**

In [32]:
(
    case_source_dept.groupBy('service_request_type')
    .agg(round(mean(case_source_dept.num_days_late),2).alias('avg_days_late'))
    .sort(desc('avg_days_late')).show(10,truncate=False)
)


+--------------------------------------+-------------+
|service_request_type                  |avg_days_late|
+--------------------------------------+-------------+
|Zoning: Junk Yards                    |175.96       |
|Labeling for Used Mattress            |162.43       |
|Record Keeping of Used Mattresses     |154.0        |
|Signage Requied for Sale of Used Mattr|151.64       |
|Storage of Used Mattress              |142.11       |
|Zoning: Recycle Yard                  |135.93       |
|Donation Container Enforcement        |131.76       |
|License Requied Used Mattress Sales   |128.8        |
|Traffic Signal Graffiti               |101.8        |
|Complaint                             |72.87        |
+--------------------------------------+-------------+
only showing top 10 rows



**11. Does number of days late depend on department?**

In [33]:
(
    case_source_dept.groupBy('dept_name').agg(round(mean(case_source_dept.num_days_late),2).alias('avg_days_late'))
    .sort(desc('avg_days_late')).show(truncate=False)
)

+-------------------------+-------------+
|dept_name                |avg_days_late|
+-------------------------+-------------+
|null                     |135.93       |
|Customer Service         |59.74        |
|Development Services     |13.43        |
|Solid Waste Management   |-2.2         |
|Metro Health             |-4.91        |
|Parks and Recreation     |-5.25        |
|Trans & Cap Improvements |-20.61       |
|Code Enforcement Services|-38.7        |
|Animal Care Services     |-226.52      |
|City Council             |null         |
+-------------------------+-------------+



In [46]:
case_source_dept.num_days_late

Column<'num_days_late'>

**12. How do number of days late depend on department and request type?**

In [49]:
(
    case_source_dept
    .groupBy("dept_name", "service_request_type")
    .mean("num_days_late")
    .sort(col("avg(num_days_late)").desc())
    .show()
)

+--------------------+--------------------+------------------+
|           dept_name|service_request_type|avg(num_days_late)|
+--------------------+--------------------+------------------+
|Code Enforcement ...|  Zoning: Junk Yards| 175.9563621042095|
|Code Enforcement ...|Labeling for Used...|162.43032902285717|
|Code Enforcement ...|Record Keeping of...|153.99724039428568|
|Code Enforcement ...|Signage Requied f...|151.63868055333333|
|Code Enforcement ...|Storage of Used M...|     142.112556415|
|                null|Zoning: Recycle Yard|135.92851612479797|
|Code Enforcement ...|Donation Containe...|131.75610506358706|
|Code Enforcement ...|License Requied U...|128.79828704142858|
|Trans & Cap Impro...|Traffic Signal Gr...|101.79846062200002|
|    Customer Service|           Complaint| 72.87050230311695|
|Code Enforcement ...|             Vendors|   66.548098985078|
|Parks and Recreation|Reservation Assis...|       66.03116319|
|Code Enforcement ...|   No Address Posted|59.875640733