In [1]:
# Data Acquisition
# This exercises uses the case.csv, dept.csv, and source.csv files from the san antonio 311 call dataset.

# Read the case, department, and source data into their own spark dataframes.

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

conf = pyspark.SparkConf().set('spark.driver.host','127.0.0.1')
sc = pyspark.SparkContext(master='local', appName='myAppName',conf=conf)


spark = pyspark.sql.SparkSession.builder.getOrCreate()

source = spark.read.csv("source.csv", sep=",", header=True, inferSchema=True)

case = spark.read.csv("case.csv", sep=",", header=True, inferSchema=True)

dept = spark.read.csv("dept.csv", sep=",", header=True, inferSchema=True)

source.show(1)

case.show(1,vertical=True)

dept.show(1)

+---------+----------------+
|source_id| source_username|
+---------+----------------+
|   100137|Merlene Blodgett|
+---------+----------------+
only showing top 1 row

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 SLA_due_date         | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
only showing top 1 row

+---------------+----------------+----------------------+-------------------+
|  dept_division|       dept_name|stand

In [2]:
# Let's see how writing to the local disk works in spark:

# Write the code necessary to store the source data in both csv and json format, 
# store these as sources_csv and sources_json
# Inspect your folder structure. What do you notice?

source.write.json("sources_json", mode="overwrite")

source.write.csv("sources_csv", mode="overwrite")

In [3]:
# Inspect the data in your dataframes. Are the data types appropriate? 
# Write the code necessary to cast the values to the appropriate types.

from pyspark.sql.types import StructType, StructField, StringType

schema = StructType(
    [
        StructField("source_id", StringType()),
        StructField("source_username", StringType()),
    ]
)

spark.read.csv("sources_csv", header=True, schema=schema).show()

+---------+--------------------+
|source_id|     source_username|
+---------+--------------------+
|   103582|         Carmen Cura|
|   106463|     Richard Sanchez|
|   119403|      Betty De Hoyos|
|   119555|      Socorro Quiara|
|   119868| Michelle San Miguel|
|   120752|      Eva T. Kleiber|
|   124405|           Lori Lara|
|   132408|       Leonard Silva|
|   135723|        Amy Cardenas|
|   136202|    Michelle Urrutia|
|   136979|      Leticia Garcia|
|   137943|    Pamela K. Baccus|
|   138605|        Marisa Ozuna|
|   138650|      Kimberly Green|
|   138650|Kimberly Green-Woods|
|   138793| Guadalupe Rodriguez|
|   138810|       Tawona Martin|
|   139342|     Jessica Mendoza|
|   139344|        Isis Mendoza|
|   139345|      Andrea Alvarez|
+---------+--------------------+
only showing top 20 rows



In [4]:
case.show(2, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 SLA_due_date         | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 1/1/18 0:46          
 case_closed_date     | 1/3/18 8:11          
 SLA_due_date         | 1/5/18 8:30          
 case_late            | NO                   
 num_days_late        | -2.0126041

In [5]:
# How old is the latest (in terms of days past SLA) currently open issue? 

case.filter(case.case_closed=="NO").select(round(max(case.SLA_days)).alias("longest_SLA_days")).show()



+----------------+
|longest_SLA_days|
+----------------+
|          1419.0|
+----------------+



In [6]:
# How many Stray Animal cases are there?

case.filter(case.service_request_type=="Stray Animal").select(count(case.service_request_type).alias("number of stray animal cases")).show()

+----------------------------+
|number of stray animal cases|
+----------------------------+
|                       26760|
+----------------------------+



In [7]:
# How many service requests that are assigned to the Field Operations department (dept_division) 
# are not classified as "Officer Standby" request type (service_request_type)?

case.filter(case.dept_division=="Field Operations").filter(case.service_request_type!="Officer Standby").select(count(case.dept_division).alias("cases")).show()


+------+
| cases|
+------+
|113902|
+------+



In [8]:
# Convert the council_district column to a string column.

case = case.withColumn("council_district", col("council_district").cast("string"))

case.dtypes

[('case_id', 'int'),
 ('case_opened_date', 'string'),
 ('case_closed_date', 'string'),
 ('SLA_due_date', 'string'),
 ('case_late', 'string'),
 ('num_days_late', 'double'),
 ('case_closed', 'string'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'string')]

In [9]:
# Extract the year from the case_closed_date column. 1/1/18 0:46 

case = case.withColumn("year", regexp_extract("case_closed_date", r"\d{1,2}/\d{1,2}/(\d{1,2})", 1))

case.select(case.year).show()

+----+
|year|
+----+
|  18|
|  18|
|  18|
|  18|
|  18|
|  18|
|  18|
|  18|
|  18|
|  18|
|  18|
|  18|
|  18|
|  18|
|  18|
|  18|
|  18|
|  18|
|  18|
|  18|
+----+
only showing top 20 rows



In [10]:
case.show(1, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 SLA_due_date         | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
 year                 | 18                   
only showing top 1 row



In [11]:
# Convert num_days_late from days to hours in new columns num_hours_late.

case.withColumn("num_hours_late", case.num_days_late * 24).show(1,vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 SLA_due_date         | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
 year                 | 18                   
 num_hours_late       | -23964.2102784       
only showing top 1 row



In [12]:
# Join the case data with the source and department data.

source.show(1,vertical=True)

source.join(case, on=source.source_id == case.source_id, how="left").show(1,vertical=True)

df = source.join(case, on=source.source_id == case.source_id, how="left").join(dept, on=case.dept_division == dept.dept_division, how="left").show(1,vertical=True)



-RECORD 0---------------------------
 source_id       | 100137           
 source_username | Merlene Blodgett 
only showing top 1 row

-RECORD 0------------------------------------
 source_id            | 136202               
 source_username      | Michelle Urrutia     
 case_id              | 1014128056           
 case_opened_date     | 1/2/18 8:21          
 case_closed_date     | 1/10/18 8:39         
 SLA_due_date         | 1/5/18 8:30          
 case_late            | YES                  
 num_days_late        | 5.00681713           
 case_closed          | YES                  
 dept_division        | Waste Collection     
 service_request_type | Solid Waste Fees ... 
 SLA_days             | 3.00619213           
 case_status          | Closed               
 source_id            | 136202               
 request_address      | 3214  STONEY FORK... 
 council_district     | 10                   
 year                 | 18                   
only showing top 1 row

-RECORD 0----

In [13]:
# Are there any cases that do not have a request source?

df = source.join(case, on=source.source_id == case.source_id, how="left").join(dept, on=case.dept_division == dept.dept_division, how="left").drop(dept.dept_division).drop(dept.dept_name)


df.show(1,vertical=True)

-RECORD 0--------------------------------------
 source_id              | 136202               
 source_username        | Michelle Urrutia     
 case_id                | 1014128056           
 case_opened_date       | 1/2/18 8:21          
 case_closed_date       | 1/10/18 8:39         
 SLA_due_date           | 1/5/18 8:30          
 case_late              | YES                  
 num_days_late          | 5.00681713           
 case_closed            | YES                  
 dept_division          | Waste Collection     
 service_request_type   | Solid Waste Fees ... 
 SLA_days               | 3.00619213           
 case_status            | Closed               
 source_id              | 136202               
 request_address        | 3214  STONEY FORK... 
 council_district       | 10                   
 year                   | 18                   
 standardized_dept_name | Solid Waste          
 dept_subject_to_SLA    | YES                  
only showing top 1 row



In [14]:
# What are the top 10 service request types in terms of number of requests?

from pyspark.sql.functions import asc, desc

df.groupBy("service_request_type").agg(count("service_request_type")).sort(col("count(service_request_type)").desc()).show(10)

+--------------------+---------------------------+
|service_request_type|count(service_request_type)|
+--------------------+---------------------------+
|           No Pickup|                      89210|
|Overgrown Yard/Trash|                      66403|
|        Bandit Signs|                      32968|
|        Damaged Cart|                      31163|
|Front Or Side Yar...|                      28920|
|        Stray Animal|                      27361|
|Aggressive Animal...|                      25492|
|Cart Exchange Req...|                      22608|
|Junk Vehicle On P...|                      21649|
|     Pot Hole Repair|                      20827|
+--------------------+---------------------------+
only showing top 10 rows



In [15]:
# What are the top 10 service request types in terms of average days late?

df.groupBy("service_request_type").agg(avg("num_days_late")).sort(col("avg(num_days_late)").asc()).na.drop().show(10)

+--------------------+-------------------+
|service_request_type| avg(num_days_late)|
+--------------------+-------------------+
|  Engineering Design|      -1399.1272335|
|Signal Timing Mod...| -1247.079779973214|
|        Stray Animal| -998.8064665118961|
|Major Park Improv...| -280.2546235360404|
|Sidewalk Cost Sha...|-186.18202610536574|
|Multi Tenant Exte...|-135.71588128047625|
|   CPS Energy Towers|-129.84778717829744|
|CPS Energy Wood P...| -129.3090520272121|
|CPS Energy Metal ...|-129.17919786427768|
|Multi Tenant Inte...| -125.1431856354651|
+--------------------+-------------------+
only showing top 10 rows



In [16]:
# Does number of days late depend on department?

df.groupBy("dept_division").agg(avg("num_days_late")).sort(col("avg(num_days_late)").desc()).na.drop().show(20)


+--------------------+-------------------+
|       dept_division| avg(num_days_late)|
+--------------------+-------------------+
|Code Enforcement ...|  135.9285161247979|
|        Reservations|        66.03116319|
|     311 Call Center|  59.73709149630082|
|Director's Office...|  37.57064670295009|
|Engineering Division| 13.433724555869722|
|               Shops|  9.641261768722691|
|           Tree Crew| 4.7232828120653965|
|         Solid Waste| 3.5190239198762248|
|              Trades|  3.231977141276936|
|Clean and Green N...|  1.691468919487805|
|              Vector| -1.120653299322344|
|    Facility License|-1.4126937702216642|
|       Miscellaneous|-1.7218576838926671|
|    Waste Collection| -2.170652238479893|
|     Clean and Green| -2.557154979254144|
|               Brush|-3.9857905714570987|
| Food Establishments| -6.971552370451529|
|  Signs and Markings| -7.448628001357727|
|    Shops (Internal)|  -8.18626711987648|
|         Storm Water|-14.055678397031905|
+----------

In [17]:
# How do number of days late depend on department and request type?

df.groupBy("dept_division","service_request_type").mean("num_days_late").sort(col("avg(num_days_late)").asc()).na.drop().show()

+--------------------+--------------------+-------------------+
|       dept_division|service_request_type| avg(num_days_late)|
+--------------------+--------------------+-------------------+
|Storm Water Engin...|  Engineering Design|      -1399.1272335|
|             Signals|Signal Timing Mod...| -1247.079779973214|
|    Field Operations|        Stray Animal| -998.8064665118961|
|    Shops (Internal)|Major Park Improv...| -280.2546235360404|
|             Streets|Sidewalk Cost Sha...|-186.18202610536574|
|    Code Enforcement|Multi Tenant Exte...|-135.71588128047625|
|   Graffiti (IntExp)|   CPS Energy Towers|-129.84778717829744|
|   Graffiti (IntExp)|CPS Energy Wood P...| -129.3090520272121|
|   Graffiti (IntExp)|CPS Energy Metal ...|-129.17919786427768|
|    Code Enforcement|Multi Tenant Inte...| -125.1431856354651|
|    Code Enforcement|Temporary Obstruc...|-119.75801638405598|
|             Streets|Manhole Sunken/Ra...|-119.26001888622663|
|Code Enforcement ...|Minimum Housing-O.