In [71]:
# establishing environment 
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.getOrCreate()

## This exercises uses the case.csv, dept.csv, and source.csv files from the san antonio 311 call dataset.

### Read the case, department, and source data into their own spark dataframes.

In [72]:
source = spark.read.csv("source.csv", sep=",", header=True, inferSchema=True)

case = spark.read.csv("case.csv", sep=",", header=True, inferSchema=True)

dept = spark.read.csv("dept.csv", sep=",", header=True, inferSchema=True)

In [73]:
source.show(5)

+---------+----------------+
|source_id| source_username|
+---------+----------------+
|   100137|Merlene Blodgett|
|   103582|     Carmen Cura|
|   106463| Richard Sanchez|
|   119403|  Betty De Hoyos|
|   119555|  Socorro Quiara|
+---------+----------------+
only showing top 5 rows



In [74]:
case.show(5)

+----------+----------------+----------------+------------+---------+-------------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|   case_id|case_opened_date|case_closed_date|SLA_due_date|case_late|      num_days_late|case_closed|   dept_division|service_request_type|   SLA_days|case_status|source_id|     request_address|council_district|
+----------+----------------+----------------+------------+---------+-------------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|1014127332|     1/1/18 0:42|    1/1/18 12:29|9/26/20 0:42|       NO| -998.5087616000001|        YES|Field Operations|        Stray Animal|      999.0|     Closed| svcCRMLS|2315  EL PASO ST,...|               5|
|1014127333|     1/1/18 0:46|     1/3/18 8:11| 1/5/18 8:30|       NO|-2.0126041669999997|        YES|     Storm Water|Removal Of Obstru...|4.322222222| 

In [75]:
dept.show(5)

+--------------------+--------------------+----------------------+-------------------+
|       dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+--------------------+--------------------+----------------------+-------------------+
|     311 Call Center|    Customer Service|      Customer Service|                YES|
|               Brush|Solid Waste Manag...|           Solid Waste|                YES|
|     Clean and Green|Parks and Recreation|    Parks & Recreation|                YES|
|Clean and Green N...|Parks and Recreation|    Parks & Recreation|                YES|
|    Code Enforcement|Code Enforcement ...|  DSD/Code Enforcement|                YES|
+--------------------+--------------------+----------------------+-------------------+
only showing top 5 rows



## Write the code necessary to store the source data in both csv and json format, store these as sources_csv and sources_json

In [76]:
source.write.json("sources_json", mode="overwrite")

In [77]:
source.write.csv("sources_csv", mode="overwrite")

## Inspect your folder structure. What do you notice?

In [78]:
print('There are two files in each folder. The main file containing the data written from the DF and a separate success file.')

There are two files in each folder. The main file containing the data written from the DF and a separate success file.


## Inspect the data in your dataframes. Are the data types appropriate? Write the code necessary to cast the values to the appropriate types.

In [79]:
source.dtypes

[('source_id', 'string'), ('source_username', 'string')]

In [80]:
case.dtypes

[('case_id', 'int'),
 ('case_opened_date', 'string'),
 ('case_closed_date', 'string'),
 ('SLA_due_date', 'string'),
 ('case_late', 'string'),
 ('num_days_late', 'double'),
 ('case_closed', 'string'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'int')]

In [81]:
dept.dtypes

[('dept_division', 'string'),
 ('dept_name', 'string'),
 ('standardized_dept_name', 'string'),
 ('dept_subject_to_SLA', 'string')]

In [82]:
# converting case_closed and case_late from strings to booleans
# this will allow us to perform boolean operations on these columns should we need to in the future
case = case.withColumn("case_closed", expr('case_closed == "YES"'))\
.withColumn("case_late", expr('case_late == "YES"'))

In [83]:
# converting council_district to string since its an id and thus we won't be performing mathematical operations 
# on it
case = case.withColumn("council_district", col("council_district").cast("string"))

In [84]:
# setting date and time format
fmt = "M/d/yy H:mm"

# converting dates to date time format
case = (
    case.withColumn("case_opened_date", to_timestamp("case_opened_date", fmt))
    .withColumn("case_closed_date", to_timestamp("case_opened_date", fmt))
    .withColumn("case_due_date", to_timestamp("case_opened_date", fmt))
)

## How old is the latest (in terms of days past SLA) currently open issue? 

In [85]:
(case.filter(case.case_closed == 'NO').select(max(case.num_days_late)).show())

+------------------+
|max(num_days_late)|
+------------------+
|       348.6458333|
+------------------+



## How long has the oldest (in terms of days since opened) currently opened issue been open?

In [86]:
case = case.withColumn("case_age", datediff(current_timestamp(), "case_opened_date"))

In [94]:
(case.filter(case.case_closed == 'NO').select(max(case.case_age)).show())

+-------------+
|max(case_age)|
+-------------+
|         1429|
+-------------+



## How many Stray Animal cases are there?

In [109]:
case.filter(case.service_request_type == 'Stray Animal').count()

26760

## How many service requests that are assigned to the Field Operations department (dept_division) are not classified as "Officer Standby" request type (service_request_type)?

841704

## Convert the council_district column to a string column.

In [None]:
# This was done in an earlier question but I'm copying here for reference

# converting council_district to string since its an id and thus we won't be performing mathematical operations 
# on it
case = case.withColumn("council_district", col("council_district").cast("string"))

## Extract the year from the case_closed_date column.

## Convert num_days_late from days to hours in new columns num_hours_late.

## Join the case data with the source and department data.

## Are there any cases that do not have a request source?

## What are the top 10 service request types in terms of number of requests?

## What are the top 10 service request types in terms of average days late?

## Does number of days late depend on department?

## How do number of days late depend on department and request type?