In [1]:
#establishing environment 
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType


#creating spark object
spark = SparkSession.builder.getOrCreate()

These exercises use the `case.csv`, `dept.csv`, and `source.csv` files from the San Antonio 311 call dataset.

### 1. Read the case, department, and source data into their own spark dataframes.

In [2]:
#read case data into spark df
case = spark.read.csv("case.csv", sep=",", header=True, inferSchema=True)
case.show(2, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 SLA_due_date         | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 1/1/18 0:46          
 case_closed_date     | 1/3/18 8:11          
 SLA_due_date         | 1/5/18 8:30          
 case_late            | NO                   
 num_days_late        | -2.0126041

In [3]:
#read dept data into spark df
dept = spark.read.csv("dept.csv", sep=",", header=True, inferSchema=True)
dept.show(2, vertical = True)

-RECORD 0--------------------------------------
 dept_division          | 311 Call Center      
 dept_name              | Customer Service     
 standardized_dept_name | Customer Service     
 dept_subject_to_SLA    | YES                  
-RECORD 1--------------------------------------
 dept_division          | Brush                
 dept_name              | Solid Waste Manag... 
 standardized_dept_name | Solid Waste          
 dept_subject_to_SLA    | YES                  
only showing top 2 rows



In [4]:
#read source data into spark df
schema = StructType([
    StructField("source_id", StringType()),
    StructField("source_username", StringType()),
])

source = spark.read.csv("source.csv", header=True, schema=schema)

source.show(2)

+---------+----------------+
|source_id| source_username|
+---------+----------------+
|   100137|Merlene Blodgett|
|   103582|     Carmen Cura|
+---------+----------------+
only showing top 2 rows



### 2. Let's see how writing to the local disk works in spark:

#### Write the code necessary to store the source data in both csv and json format, store these as `sources_csv` and `sources_json`

In [5]:
#write source data to csv
source.write.csv("sources_csv", mode = "overwrite")

In [6]:
#write source data to json
source.write.json("sources_json", mode = "overwrite")

#### Inspect your folder structure. What do you notice?

- The csv and json were created in their separate folders. 
- There are 2 files in each folder. Main file contains the data written from the DF and a separate success file.

#### 3. Inspect the data in your dataframes. Are the data types appropriate? Write the code necessary to cast the values to the appropriate types.

In [7]:
#case dtypes
case.dtypes

[('case_id', 'int'),
 ('case_opened_date', 'string'),
 ('case_closed_date', 'string'),
 ('SLA_due_date', 'string'),
 ('case_late', 'string'),
 ('num_days_late', 'double'),
 ('case_closed', 'string'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'int')]

In [8]:
#dept dtypes
dept.dtypes

[('dept_division', 'string'),
 ('dept_name', 'string'),
 ('standardized_dept_name', 'string'),
 ('dept_subject_to_SLA', 'string')]

In [9]:
#source dtypes
source.dtypes

[('source_id', 'string'), ('source_username', 'string')]

In [10]:
#case data
#change case_closed and case_late col dtypes to booleans
case = case.withColumn("case_closed", expr('case_closed=="YES"'))\
            .withColumn("case_late", expr('case_late=="YES"'))

#change council_district dtype to string
case = case.withColumn('council_district', col('council_district').cast('string'))

#change case_opened_date, case_closed_date, & SLA_due_date dtype to datetime
#identify format they are in
fmt = 'M/d/yy H:mm'

#recreate each col and convert to timestamp using fmt it is in
case = (
    case.withColumn('case_opened_date', to_timestamp('case_opened_date', fmt))
    .withColumn('case_closed_date', to_timestamp('case_closed_date', fmt))
    .withColumn('SLA_due_date', to_timestamp('SLA_due_date', fmt))
)

In [11]:
#double check dtypes for case
case.dtypes

[('case_id', 'int'),
 ('case_opened_date', 'timestamp'),
 ('case_closed_date', 'timestamp'),
 ('SLA_due_date', 'timestamp'),
 ('case_late', 'boolean'),
 ('num_days_late', 'double'),
 ('case_closed', 'boolean'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'string')]

In [12]:
case.show(1, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 SLA_due_date         | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
only showing top 1 row



In [13]:
#dept data
#change dept_subject_to_SLA dtype to boolean
dept = dept.withColumn("dept_subject_to_SLA", expr('dept_subject_to_SLA=="YES"'))

In [14]:
#double check dtypes for dept
dept.dtypes

[('dept_division', 'string'),
 ('dept_name', 'string'),
 ('standardized_dept_name', 'string'),
 ('dept_subject_to_SLA', 'boolean')]

In [15]:
dept.show(1, vertical=True)

-RECORD 0----------------------------------
 dept_division          | 311 Call Center  
 dept_name              | Customer Service 
 standardized_dept_name | Customer Service 
 dept_subject_to_SLA    | true             
only showing top 1 row



***
#### 1. How old is the latest (in terms of days past SLA) currently open issue? 

In [16]:
# filtering by open cases (ie. case_closed == NO)
# displaying max number of days late (ie. past SLA)
(case.filter(case.case_closed == 'NO').select(max(case.num_days_late)).show())

+------------------+
|max(num_days_late)|
+------------------+
|       348.6458333|
+------------------+



#### 1a. How long has the oldest (in terms of days since opened) currently opened issue been open?

In [17]:
case.createOrReplaceTempView('case')

spark.sql('''
SELECT DATEDIFF(current_timestamp, SLA_due_date) AS days_past_due
FROM case
WHERE NOT case_closed
ORDER BY days_past_due DESC
LIMIT 15
''').show()

+-------------+
|days_past_due|
+-------------+
|         1581|
|         1581|
|         1581|
|         1580|
|         1578|
|         1574|
|         1574|
|         1573|
|         1572|
|         1572|
|         1568|
|         1567|
|         1567|
|         1567|
|         1564|
+-------------+



#### 2. How many Stray Animal cases are there?

In [20]:
case.filter(case.service_request_type == 'Stray Animal').count()

26760

#### 3. How many service requests that are assigned to the Field Operations department (`dept_division`) are not classified as "Officer Standby" request type (`service_request_type`)?

In [22]:
case.filter(case.dept_division == 'Field Operations').where(case.service_request_type != "Officer Standby").count()

113902

In [23]:
#alternate sol:
(
    case.filter(expr("dept_division == 'Field Operations'"))
    .filter(expr('service_request_type != "Officer Standby"'))
    .count()
)

113902

#### 4. Convert the `council_district` column to a string column.

In [24]:
#already done in prepping/changing dtypes
#case = case.withColumn('council_district', col('council_district').cast('string'))

#### 5. Extract the year from the `case_closed_date` column.

In [25]:
case.select('case_closed_date', year('case_closed_date')).show(5)

+-------------------+----------------------+
|   case_closed_date|year(case_closed_date)|
+-------------------+----------------------+
|2018-01-01 12:29:00|                  2018|
|2018-01-03 08:11:00|                  2018|
|2018-01-02 07:57:00|                  2018|
|2018-01-02 08:13:00|                  2018|
|2018-01-01 13:29:00|                  2018|
+-------------------+----------------------+
only showing top 5 rows



#### 6. Convert `num_days_late` from days to hours in new columns num_hours_late.

In [27]:
(
    case.withColumn('num_hours_late', case.num_days_late * 24)
    .select('num_days_late', 'num_hours_late')
    .show()
)

+-------------------+-------------------+
|      num_days_late|     num_hours_late|
+-------------------+-------------------+
| -998.5087616000001|     -23964.2102784|
|-2.0126041669999997|-48.302500007999996|
|       -3.022337963|      -72.536111112|
|       -15.01148148|      -360.27555552|
|0.37216435200000003|  8.931944448000001|
|       -29.74398148| -713.8555555199999|
|       -14.70673611|      -352.96166664|
|       -14.70662037|      -352.95888888|
|       -14.70662037|      -352.95888888|
|       -14.70649306|      -352.95583344|
|       -14.70649306|      -352.95583344|
|       -14.70636574|      -352.95277776|
|          -14.70625|-352.95000000000005|
|       -14.70636574|      -352.95277776|
|       -14.70623843|-352.94972232000003|
|-14.705891199999998|-352.94138879999997|
|       -14.70600694|      -352.94416656|
|       -14.70576389|      -352.93833336|
|       -14.70576389|      -352.93833336|
|       -14.70564815|       -352.9355556|
+-------------------+-------------

#### 7. Join the case data with the source and department data.

In [28]:
case = (
    case
    # left join on dept_division
    .join(dept, "dept_division", "left")
    # drop all the columns except for standardized name, as it has much fewer unique values
    .drop(dept.dept_division)
    .drop(dept.dept_name)
    .drop(case.dept_division)
    .withColumnRenamed("standardized_dept_name", "department")
)

# previewing merged DF vertically 
case.show(1, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 SLA_due_date         | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
 department           | Animal Care Services 
 dept_subject_to_SLA  | true                 
only showing top 1 row



In [29]:
# left join on source_id 
case = (case.join(source, "source_id", "left"))

# previewing merged DF vertically 
case.show(1, vertical=True)

-RECORD 0------------------------------------
 source_id            | svcCRMLS             
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 SLA_due_date         | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
 department           | Animal Care Services 
 dept_subject_to_SLA  | true                 
 source_username      | svcCRMLS             
only showing top 1 row



#### 8. Are there any cases that do not have a request source?

In [31]:
(
    case.select(case.source_id.isNull().cast('int').alias('is_null'))
    .agg(sum('is_null'))
    .show()
)

+------------+
|sum(is_null)|
+------------+
|           0|
+------------+



In [32]:
#alternate
case.filter(case.request_address == None).count()

0

In [33]:
#alternate
case.filter(col('source_id').isNull()).show(vertical=True)

(0 rows)



In [34]:
#alternate
case.where(col("source_username").isNull()).count()

0

#### 9. What are the top 10 service request types in terms of number of requests?

In [35]:
# grouping by service request type and counting number of rows per service type request
# displaying top 10 with highest number of requests
case.groupby('service_request_type').agg((count(case.service_request_type)).alias('number_of_requests'))\
.sort(desc('number_of_requests')).show(10)

+--------------------+------------------+
|service_request_type|number_of_requests|
+--------------------+------------------+
|           No Pickup|             89210|
|Overgrown Yard/Trash|             66403|
|        Bandit Signs|             32968|
|        Damaged Cart|             31163|
|Front Or Side Yar...|             28920|
|        Stray Animal|             27361|
|Aggressive Animal...|             25492|
|Cart Exchange Req...|             22608|
|Junk Vehicle On P...|             21649|
|     Pot Hole Repair|             20827|
+--------------------+------------------+
only showing top 10 rows



In [36]:
#alternate
(
    case.groupby('service_request_type')
    .count()
    .sort(col('count').desc())
    .show(10, truncate=False)
)

+--------------------------------+-----+
|service_request_type            |count|
+--------------------------------+-----+
|No Pickup                       |89210|
|Overgrown Yard/Trash            |66403|
|Bandit Signs                    |32968|
|Damaged Cart                    |31163|
|Front Or Side Yard Parking      |28920|
|Stray Animal                    |27361|
|Aggressive Animal(Non-Critical) |25492|
|Cart Exchange Request           |22608|
|Junk Vehicle On Private Property|21649|
|Pot Hole Repair                 |20827|
+--------------------------------+-----+
only showing top 10 rows



#### 10. What are the top 10 service request types in terms of average days late?

In [37]:
# grouping by service request type and calculating average days late 
# displaying top 10 with highest average days late
case.groupby('service_request_type').agg((mean(case.num_days_late)).alias('avg_days_late'))\
.sort(desc('avg_days_late')).show(10)

+--------------------+------------------+
|service_request_type|     avg_days_late|
+--------------------+------------------+
|  Zoning: Junk Yards| 175.9563621042095|
|Labeling for Used...|162.43032902285717|
|Record Keeping of...|153.99724039428568|
|Signage Requied f...|151.63868055333333|
|Storage of Used M...|     142.112556415|
|Zoning: Recycle Yard|135.92851612479797|
|Donation Containe...|131.75610506358706|
|License Requied U...|128.79828704142858|
|Traffic Signal Gr...|101.79846062200002|
|           Complaint| 72.87050230311695|
+--------------------+------------------+
only showing top 10 rows



In [38]:
#alternate
case.groupBy('service_request_type').agg(round(mean(case.num_days_late),2).alias('avg_days_late')).sort(desc('avg_days_late')).show(10,truncate=False)

+--------------------------------------+-------------+
|service_request_type                  |avg_days_late|
+--------------------------------------+-------------+
|Zoning: Junk Yards                    |175.96       |
|Labeling for Used Mattress            |162.43       |
|Record Keeping of Used Mattresses     |154.0        |
|Signage Requied for Sale of Used Mattr|151.64       |
|Storage of Used Mattress              |142.11       |
|Zoning: Recycle Yard                  |135.93       |
|Donation Container Enforcement        |131.76       |
|License Requied Used Mattress Sales   |128.8        |
|Traffic Signal Graffiti               |101.8        |
|Complaint                             |72.87        |
+--------------------------------------+-------------+
only showing top 10 rows



#### 11. Does number of days late depend on department?

In [39]:
(
    case.filter('case_late')
    .groupby('department')
    .agg(mean('num_days_late').alias('days_late'), count('num_days_late').alias('n_cases_late'))
    .sort('days_late')
    .withColumn('days_late', round(col('days_late'), 1))
    .show(truncate=False)
)

+------------------------+---------+------------+
|department              |days_late|n_cases_late|
+------------------------+---------+------------+
|Metro Health            |6.5      |854         |
|Solid Waste             |7.1      |33729       |
|Trans & Cap Improvements|10.7     |5529        |
|Parks & Recreation      |22.4     |3810        |
|Animal Care Services    |23.4     |23751       |
|DSD/Code Enforcement    |49.5     |26439       |
|Customer Service        |88.2     |2035        |
+------------------------+---------+------------+



#### 12. How do number of days late depend on department and request type?

In [40]:
(
    case.filter("case_closed")
    .filter("case_late")
    .groupby("department", "service_request_type")
    .agg(avg("num_days_late").alias("days_late"), count("*").alias("n_cases"))
    .withColumn("days_late", round(col("days_late"), 1))
    .sort(desc("days_late"))
    .show(40, truncate=False)
)

+--------------------+--------------------------------------------+---------+-------+
|department          |service_request_type                        |days_late|n_cases|
+--------------------+--------------------------------------------+---------+-------+
|DSD/Code Enforcement|Zoning: Recycle Yard                        |273.6    |75     |
|DSD/Code Enforcement|Zoning: Junk Yards                          |251.9    |146    |
|DSD/Code Enforcement|Donation Container Enforcement              |201.7    |82     |
|DSD/Code Enforcement|Structure/Housing Maintenance               |182.4    |30     |
|DSD/Code Enforcement|Graffiti: Private Property (Corridors)      |175.1    |3      |
|DSD/Code Enforcement|Storage of Used Mattress                    |164.0    |7      |
|DSD/Code Enforcement|Labeling for Used Mattress                  |162.4    |7      |
|DSD/Code Enforcement|Record Keeping of Used Mattresses           |154.0    |7      |
|DSD/Code Enforcement|Signage Requied for Sale of Used