In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.getOrCreate()

In [2]:
source = spark.read.csv("source.csv", sep=",", header=True, inferSchema=True)
dept = spark.read.csv("dept.csv", sep=",", header=True, inferSchema=True)
case = spark.read.csv("case.csv", sep=",", header=True, inferSchema=True)

In [3]:
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType(
    [
        StructField("source_id", StringType()),
        StructField("source_username", StringType()),
    ]
)

spark.read.csv("source.csv", header=True, schema=schema)

DataFrame[source_id: string, source_username: string]

In [4]:
source.write.csv("sources.csv", mode="overwrite")
source.write.json("sources.json", mode="overwrite")

In [5]:
source.show(10)
source.printSchema

+---------+-------------------+
|source_id|    source_username|
+---------+-------------------+
|   100137|   Merlene Blodgett|
|   103582|        Carmen Cura|
|   106463|    Richard Sanchez|
|   119403|     Betty De Hoyos|
|   119555|     Socorro Quiara|
|   119868|Michelle San Miguel|
|   120752|     Eva T. Kleiber|
|   124405|          Lori Lara|
|   132408|      Leonard Silva|
|   135723|       Amy Cardenas|
+---------+-------------------+
only showing top 10 rows



<bound method DataFrame.printSchema of DataFrame[source_id: string, source_username: string]>

In [6]:
dept.show(10)
dept.printSchema

+--------------------+--------------------+----------------------+-------------------+
|       dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+--------------------+--------------------+----------------------+-------------------+
|     311 Call Center|    Customer Service|      Customer Service|                YES|
|               Brush|Solid Waste Manag...|           Solid Waste|                YES|
|     Clean and Green|Parks and Recreation|    Parks & Recreation|                YES|
|Clean and Green N...|Parks and Recreation|    Parks & Recreation|                YES|
|    Code Enforcement|Code Enforcement ...|  DSD/Code Enforcement|                YES|
|Code Enforcement ...|Code Enforcement ...|  DSD/Code Enforcement|                YES|
|Code Enforcement ...|                null|  DSD/Code Enforcement|                YES|
|   Dangerous Premise|Code Enforcement ...|  DSD/Code Enforcement|                YES|
|Dangerous Premise...|Code Enforcement ...|

<bound method DataFrame.printSchema of DataFrame[dept_division: string, dept_name: string, standardized_dept_name: string, dept_subject_to_SLA: string]>

In [7]:
case.show(10, vertical=True)
case.printSchema

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 SLA_due_date         | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 1/1/18 0:46          
 case_closed_date     | 1/3/18 8:11          
 SLA_due_date         | 1/5/18 8:30          
 case_late            | NO                   
 num_days_late        | -2.0126041

<bound method DataFrame.printSchema of DataFrame[case_id: int, case_opened_date: string, case_closed_date: string, SLA_due_date: string, case_late: string, num_days_late: double, case_closed: string, dept_division: string, service_request_type: string, SLA_days: double, case_status: string, source_id: string, request_address: string, council_district: int]>

In [8]:
case = case.withColumnRenamed("SLA_due_date", "case_due_date")

In [9]:
case.groupby('case_late', 'case_closed').count().show()

+---------+-----------+------+
|case_late|case_closed| count|
+---------+-----------+------+
|       NO|        YES|735616|
|      YES|        YES| 87978|
|       NO|         NO| 11585|
|      YES|         NO|  6525|
+---------+-----------+------+



In [10]:
case = case.withColumn('case_late', expr("case_late == 'YES'"))

In [11]:
case = case.withColumn('case_closed', expr("case_closed == 'YES'"))

In [12]:
case.show(3, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 case_due_date        | 9/26/20 0:42         
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 1/1/18 0:46          
 case_closed_date     | 1/3/18 8:11          
 case_due_date        | 1/5/18 8:30          
 case_late            | false                
 num_days_late        | -2.0126041

In [13]:
case = case.withColumn('council_district', format_string("%03d", col("council_district")))

In [14]:
case.show(3, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 case_due_date        | 9/26/20 0:42         
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 005                  
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 1/1/18 0:46          
 case_closed_date     | 1/3/18 8:11          
 case_due_date        | 1/5/18 8:30          
 case_late            | false                
 num_days_late        | -2.0126041

In [15]:
print("--- Before handling dates")
case.select("case_opened_date", "case_closed_date", "case_due_date").show(5)

fmt = "M/d/yy H:mm"
case = (
    case.withColumn("case_opened_date", to_timestamp("case_opened_date", fmt))
    .withColumn("case_closed_date", to_timestamp("case_closed_date", fmt))
    .withColumn("case_due_date", to_timestamp("case_due_date", fmt))
)

print("--- After")
case.select("case_opened_date", "case_closed_date", "case_due_date").show(5)

--- Before handling dates
+----------------+----------------+-------------+
|case_opened_date|case_closed_date|case_due_date|
+----------------+----------------+-------------+
|     1/1/18 0:42|    1/1/18 12:29| 9/26/20 0:42|
|     1/1/18 0:46|     1/3/18 8:11|  1/5/18 8:30|
|     1/1/18 0:48|     1/2/18 7:57|  1/5/18 8:30|
|     1/1/18 1:29|     1/2/18 8:13| 1/17/18 8:30|
|     1/1/18 1:34|    1/1/18 13:29|  1/1/18 4:34|
+----------------+----------------+-------------+
only showing top 5 rows

--- After
+-------------------+-------------------+-------------------+
|   case_opened_date|   case_closed_date|      case_due_date|
+-------------------+-------------------+-------------------+
|2018-01-01 00:42:00|2018-01-01 12:29:00|2020-09-26 00:42:00|
|2018-01-01 00:46:00|2018-01-03 08:11:00|2018-01-05 08:30:00|
|2018-01-01 00:48:00|2018-01-02 07:57:00|2018-01-05 08:30:00|
|2018-01-01 01:29:00|2018-01-02 08:13:00|2018-01-17 08:30:00|
|2018-01-01 01:34:00|2018-01-01 13:29:00|2018-01-01 04:

In [16]:
case.printSchema()
case.show(3, vertical=True)

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- case_due_date: timestamp (nullable = true)
 |-- case_late: boolean (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: boolean (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = false)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true      

In [17]:
case = case.withColumn("request_address", lower(trim("request_address")))

In [18]:
case = case.withColumn("zipcode", regexp_extract("request_address", r"(\d+)$", 1))

In [19]:
case = case.withColumn("case_age", datediff(current_timestamp(), "case_opened_date"))
case = case.withColumn("days_to_close", datediff("case_closed_date", "case_opened_date"))
case = case.withColumn("case_lifetime", when(col("case_closed"), col("days_to_close")).otherwise(col("case_age")))

In [20]:
case.where(~ col("case_closed")).show(vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014128388           
 case_opened_date     | 2018-01-02 09:39:00  
 case_closed_date     | null                 
 case_due_date        | 2018-01-09 09:39:00  
 case_late            | true                 
 num_days_late        | 211.5974884          
 case_closed          | false                
 dept_division        | 311 Call Center      
 service_request_type | Complaint            
 SLA_days             | 7.0                  
 case_status          | Open                 
 source_id            | mt13131              
 request_address      | 7326  westglade p... 
 council_district     | 006                  
 zipcode              | 78227                
 case_age             | 1064                 
 days_to_close        | null                 
 case_lifetime        | 1064                 
-RECORD 1------------------------------------
 case_id              | 1014128790           
 case_opened_date     | 2018-01-02

In [21]:
case = (
    case
    .join(dept, "dept_division", "left")
    .drop(dept.dept_division)
    .drop(dept.dept_name)
    .withColumnRenamed("standardized_dept_name", "department")
    .withColumn("dept_subject_to_SLA", col("dept_subject_to_SLA") == "YES")
)

case.show(2, vertical=True)

-RECORD 0------------------------------------
 dept_division        | Field Operations     
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  el paso st,... 
 council_district     | 005                  
 zipcode              | 78207                
 case_age             | 1065                 
 days_to_close        | 0                    
 case_lifetime        | 0                    
 department           | Animal Care Services 
 dept_subject_to_SLA  | true                 
-RECORD 1-------------------------

In [22]:
case = case.withColumn("days_past_SLAdue", datediff(current_timestamp(), "case_due_date"))

In [23]:
case.where(~ col("case_closed")).show(vertical=True)

-RECORD 0------------------------------------
 dept_division        | 311 Call Center      
 case_id              | 1014128388           
 case_opened_date     | 2018-01-02 09:39:00  
 case_closed_date     | null                 
 case_due_date        | 2018-01-09 09:39:00  
 case_late            | true                 
 num_days_late        | 211.5974884          
 case_closed          | false                
 service_request_type | Complaint            
 SLA_days             | 7.0                  
 case_status          | Open                 
 source_id            | mt13131              
 request_address      | 7326  westglade p... 
 council_district     | 006                  
 zipcode              | 78227                
 case_age             | 1064                 
 days_to_close        | null                 
 case_lifetime        | 1064                 
 department           | Customer Service     
 dept_subject_to_SLA  | true                 
 days_past_SLAdue     | 1057      

In [24]:
(
    case.filter(col("service_request_type") == lit("Stray Animal"))
    .groupBy("case_closed", "service_request_type")
    .count()
    .sort(col("count").desc())
    .show()
)

+-----------+--------------------+-----+
|case_closed|service_request_type|count|
+-----------+--------------------+-----+
|       true|        Stray Animal|26745|
|      false|        Stray Animal|   15|
+-----------+--------------------+-----+



In [25]:
(
    case.filter(col("service_request_type") != lit("Officer Standby"))
    .count()
)

838691

In [26]:
case.printSchema

<bound method DataFrame.printSchema of DataFrame[dept_division: string, case_id: int, case_opened_date: timestamp, case_closed_date: timestamp, case_due_date: timestamp, case_late: boolean, num_days_late: double, case_closed: boolean, service_request_type: string, SLA_days: double, case_status: string, source_id: string, request_address: string, council_district: string, zipcode: string, case_age: int, days_to_close: int, case_lifetime: int, department: string, dept_subject_to_SLA: boolean, days_past_SLAdue: int]>

In [27]:
case = case.withColumn("year", year("case_closed_date"))

In [28]:
case.show(2, vertical=True)

-RECORD 0------------------------------------
 dept_division        | Field Operations     
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  el paso st,... 
 council_district     | 005                  
 zipcode              | 78207                
 case_age             | 1065                 
 days_to_close        | 0                    
 case_lifetime        | 0                    
 department           | Animal Care Services 
 dept_subject_to_SLA  | true                 
 days_past_SLAdue     | 66        

In [29]:
case = case.withColumn("num_hours_late", (col("num_days_late") * 24))

In [30]:
case.show(2, vertical=True)

-RECORD 0------------------------------------
 dept_division        | Field Operations     
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  el paso st,... 
 council_district     | 005                  
 zipcode              | 78207                
 case_age             | 1065                 
 days_to_close        | 0                    
 case_lifetime        | 0                    
 department           | Animal Care Services 
 dept_subject_to_SLA  | true                 
 days_past_SLAdue     | 66        

In [31]:
source.show()

+---------+--------------------+
|source_id|     source_username|
+---------+--------------------+
|   100137|    Merlene Blodgett|
|   103582|         Carmen Cura|
|   106463|     Richard Sanchez|
|   119403|      Betty De Hoyos|
|   119555|      Socorro Quiara|
|   119868| Michelle San Miguel|
|   120752|      Eva T. Kleiber|
|   124405|           Lori Lara|
|   132408|       Leonard Silva|
|   135723|        Amy Cardenas|
|   136202|    Michelle Urrutia|
|   136979|      Leticia Garcia|
|   137943|    Pamela K. Baccus|
|   138605|        Marisa Ozuna|
|   138650|      Kimberly Green|
|   138650|Kimberly Green-Woods|
|   138793| Guadalupe Rodriguez|
|   138810|       Tawona Martin|
|   139342|     Jessica Mendoza|
|   139344|        Isis Mendoza|
+---------+--------------------+
only showing top 20 rows



In [35]:
case.show(5, vertical=True)

-RECORD 0------------------------------------
 source_id            | svcCRMLS             
 dept_division        | Field Operations     
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 request_address      | 2315  el paso st,... 
 council_district     | 005                  
 zipcode              | 78207                
 case_age             | 1065                 
 days_to_close        | 0                    
 case_lifetime        | 0                    
 department           | Animal Care Services 
 dept_subject_to_SLA  | true                 
 days_past_SLAdue     | 66        

In [33]:
case = case.join(source, "source_id", "left")

In [36]:
case.show(5, vertical=True)

-RECORD 0------------------------------------
 source_id            | svcCRMLS             
 dept_division        | Field Operations     
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 request_address      | 2315  el paso st,... 
 council_district     | 005                  
 zipcode              | 78207                
 case_age             | 1065                 
 days_to_close        | 0                    
 case_lifetime        | 0                    
 department           | Animal Care Services 
 dept_subject_to_SLA  | true                 
 days_past_SLAdue     | 66        

In [37]:
case.where(case.source_username == None).show(vertical=True)

(0 rows)



In [44]:
(
    case
    .groupBy("service_request_type")
    .count()
    .sort(col("count").desc())
    .show(10)
)

+--------------------+-----+
|service_request_type|count|
+--------------------+-----+
|           No Pickup|89210|
|Overgrown Yard/Trash|66403|
|        Bandit Signs|32968|
|        Damaged Cart|31163|
|Front Or Side Yar...|28920|
|        Stray Animal|27361|
|Aggressive Animal...|25492|
|Cart Exchange Req...|22608|
|Junk Vehicle On P...|21649|
|     Pot Hole Repair|20827|
+--------------------+-----+
only showing top 10 rows



In [39]:
(
    case
    .groupBy("service_request_type")
    .mean("num_days_late")
    .sort(col("avg(num_days_late)").desc())
    .show(10)
)

+--------------------+------------------+
|service_request_type|avg(num_days_late)|
+--------------------+------------------+
|  Zoning: Junk Yards| 175.9563621042095|
|Labeling for Used...|162.43032902285717|
|Record Keeping of...|153.99724039428568|
|Signage Requied f...|151.63868055333333|
|Storage of Used M...|     142.112556415|
|Zoning: Recycle Yard|135.92851612479797|
|Donation Containe...|131.75610506358706|
|License Requied U...|128.79828704142858|
|Traffic Signal Gr...|101.79846062200002|
|           Complaint| 72.87050230311695|
+--------------------+------------------+
only showing top 10 rows



In [40]:
(
    case
    .groupBy("department")
    .mean("num_days_late")
    .sort(col("avg(num_days_late)").desc())
    .show()
)

+--------------------+-------------------+
|          department| avg(num_days_late)|
+--------------------+-------------------+
|    Customer Service| 59.737091496300785|
|         Solid Waste|-2.2000575136721747|
|        Metro Health| -4.911766979607002|
|  Parks & Recreation| -5.251521960055133|
|Trans & Cap Impro...|-20.612837354052626|
|DSD/Code Enforcement| -38.36938892614453|
|Animal Care Services|-226.51783940550382|
|        City Council|               null|
+--------------------+-------------------+



In [42]:
(
    case
    .groupBy("department", "service_request_type")
    .mean("num_days_late")
    .sort(col("avg(num_days_late)").desc())
    .show()
)

+--------------------+--------------------+------------------+
|          department|service_request_type|avg(num_days_late)|
+--------------------+--------------------+------------------+
|DSD/Code Enforcement|  Zoning: Junk Yards| 175.9563621042095|
|DSD/Code Enforcement|Labeling for Used...|162.43032902285717|
|DSD/Code Enforcement|Record Keeping of...|153.99724039428568|
|DSD/Code Enforcement|Signage Requied f...|151.63868055333333|
|DSD/Code Enforcement|Storage of Used M...|     142.112556415|
|DSD/Code Enforcement|Zoning: Recycle Yard|135.92851612479797|
|DSD/Code Enforcement|Donation Containe...|131.75610506358706|
|DSD/Code Enforcement|License Requied U...|128.79828704142858|
|Trans & Cap Impro...|Traffic Signal Gr...|101.79846062200002|
|    Customer Service|           Complaint| 72.87050230311695|
|DSD/Code Enforcement|             Vendors|   66.548098985078|
|  Parks & Recreation|Reservation Assis...|       66.03116319|
|DSD/Code Enforcement|   No Address Posted|59.875640733