### Spark Data Wrangling Exercises

In [4]:
import pandas as pd
import numpy as np
import pyspark
import pyspark.sql.functions as f
from pyspark.sql.functions import when
from pyspark.sql.functions import lit
from pydataset import data
from pyspark.sql import SparkSession

In [6]:
spark = SparkSession.builder.getOrCreate()

In [7]:
source_df = spark.read.csv("source.csv", sep=",", header=True, inferSchema=True)
case_df = spark.read.csv("case.csv", sep=",", header=True, inferSchema=True)
dept_df = spark.read.csv("dept.csv", sep=",", header=True, inferSchema=True)

                                                                                

In [9]:
source_df.show(5)
case_df.show(5)
dept_df.show(5)

+---------+----------------+
|source_id| source_username|
+---------+----------------+
|   100137|Merlene Blodgett|
|   103582|     Carmen Cura|
|   106463| Richard Sanchez|
|   119403|  Betty De Hoyos|
|   119555|  Socorro Quiara|
+---------+----------------+
only showing top 5 rows

+----------+----------------+----------------+------------+---------+-------------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|   case_id|case_opened_date|case_closed_date|SLA_due_date|case_late|      num_days_late|case_closed|   dept_division|service_request_type|   SLA_days|case_status|source_id|     request_address|council_district|
+----------+----------------+----------------+------------+---------+-------------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|1014127332|     1/1/18 0:42|    1/1/18 12:29|9/26/20 0:42|       NO| -998.508

In [10]:
source_df.write.json('source_json', mode='overwrite')
dept_df.write.json('dept_json', mode='overwrite')
case_df.write.json('case_json', mode='overwrite')

                                                                                

---

In [19]:
case_df.groupBy("case_closed", "case_late").count().show()

+-----------+---------+------+
|case_closed|case_late| count|
+-----------+---------+------+
|         NO|      YES|  6525|
|        YES|      YES| 87978|
|         NO|       NO| 11585|
|        YES|       NO|735616|
+-----------+---------+------+



In [23]:
case_df = case_df.withColumnRenamed("SLA_due_date", "case_due_date")
case_df = case_df.withColumn("case_closed", f.expr('case_closed == "YES"')).withColumn("case_late", f.expr('case_late == "YES"'))
case_df = case_df.withColumn("council_district", f.col("council_district").cast("string"))

fmt = "M/d/yy H:mm"
case_df = (
    case_df.withColumn("case_opened_date", f.to_timestamp("case_opened_date", fmt))
     .withColumn("case_closed_date", f.to_timestamp("case_closed_date", fmt))
    .withColumn("case_due_date", f.to_timestamp("case_due_date", fmt))
)


In [25]:
case_df.show(2, vertical=True, truncate=False)

-RECORD 0----------------------------------------------------
 case_id              | 1014127332                           
 case_opened_date     | 2018-01-01 00:42:00                  
 case_closed_date     | 2018-01-01 12:29:00                  
 case_due_date        | 2020-09-26 00:42:00                  
 case_late            | false                                
 num_days_late        | -998.5087616000001                   
 case_closed          | true                                 
 dept_division        | Field Operations                     
 service_request_type | Stray Animal                         
 SLA_days             | 999.0                                
 case_status          | Closed                               
 source_id            | svcCRMLS                             
 request_address      | 2315  EL PASO ST, San Antonio, 78207 
 council_district     | 5                                    
-RECORD 1----------------------------------------------------
 case_id

---

In [26]:
#How old is the latest (in terms of days past SLA) currently open issue? How long has the oldest 
#(in terms of days since opened) currently opened issue been open?

In [29]:
case_df.groupby(case_df.service_request_type == 'Stray Animal').count().show()

+-------------------------------------+------+
|(service_request_type = Stray Animal)| count|
+-------------------------------------+------+
|                                 true| 26760|
|                                false|814944|
+-------------------------------------+------+



In [40]:
case_df.groupby(case_df.dept_division).count().show(truncate=False)

+----------------------------+------+
|dept_division               |count |
+----------------------------+------+
|Miscellaneous               |45123 |
|Solid Waste                 |813   |
|Field Operations            |116915|
|Streets                     |38510 |
|Waste Collection            |215122|
|Code Enforcement (IntExp)   |2189  |
|Vector                      |538   |
|Dangerous Premise           |15479 |
|311 Call Center             |2849  |
|Brush                       |18212 |
|Dangerous Premise (IntExp)  |36    |
|Traffic Engineering Design  |4334  |
|Code Enforcement (Internal) |198   |
|District 2                  |3     |
|Signals                     |20700 |
|Engineering Division        |1375  |
|Director's Office Horizontal|515   |
|Storm Water                 |13769 |
|Shops                       |112   |
|Storm Water Engineering     |494   |
+----------------------------+------+
only showing top 20 rows



In [39]:
(
    case_df.filter(case_df.dept_division == 'Field Operations')
    .groupby(case_df.service_request_type == 'Officer Standby')
    .count()
    .show()
)

+----------------------------------------+------+
|(service_request_type = Officer Standby)| count|
+----------------------------------------+------+
|                                    true|  3013|
|                                   false|113902|
+----------------------------------------+------+



In [42]:
case_df.withColumn('Year_closed', f.year('case_closed_date')).show(2, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
 Year_closed          | 2018                 
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 2018-01-01 00:46:00  
 case_closed_date     | 2018-01-03 08:11:00  
 case_due_date        | 2018-01-05 08:30:00  
 case_late            | false     

In [43]:
case_df.withColumn('num_hours_late', (case_df.num_days_late * 24)).show(2, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
 num_hours_late       | -23964.2102784       
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 2018-01-01 00:46:00  
 case_closed_date     | 2018-01-03 08:11:00  
 case_due_date        | 2018-01-05 08:30:00  
 case_late            | false     

In [55]:
join_df = (   
    case_df.join(dept_df, "dept_division", "left")
    .drop(dept_df.dept_division)
    .drop(dept_df.dept_name)
    .drop(case_df.dept_division)
    .withColumnRenamed("standardized_dept_name", "department")
    .withColumn("dept_subject_to_SLA", f.col("dept_subject_to_SLA") == "YES")
    .join(source_df, "source_id", "left")
    .drop(source_df.source_id)
    #.show(2, vertical=True, truncate=False)
)

In [56]:
join_df.show(2, vertical=True, truncate=False)

-RECORD 0----------------------------------------------------
 source_id            | svcCRMLS                             
 case_id              | 1014127332                           
 case_opened_date     | 2018-01-01 00:42:00                  
 case_closed_date     | 2018-01-01 12:29:00                  
 case_due_date        | 2020-09-26 00:42:00                  
 case_late            | false                                
 num_days_late        | -998.5087616000001                   
 case_closed          | true                                 
 service_request_type | Stray Animal                         
 SLA_days             | 999.0                                
 case_status          | Closed                               
 request_address      | 2315  EL PASO ST, San Antonio, 78207 
 council_district     | 5                                    
 department           | Animal Care Services                 
 dept_subject_to_SLA  | true                                 
 source_

In [67]:
join_df.groupby(join_df.source_username == 'NaN').count().show()

+-----------------------+------+
|(source_username = NaN)| count|
+-----------------------+------+
|                  false|855269|
+-----------------------+------+



In [70]:
join_df.groupby(join_df.service_request_type).count().sort('count', ascending=False).show(10, truncate=False)

+--------------------------------+-----+
|service_request_type            |count|
+--------------------------------+-----+
|No Pickup                       |89210|
|Overgrown Yard/Trash            |66403|
|Bandit Signs                    |32968|
|Damaged Cart                    |31163|
|Front Or Side Yard Parking      |28920|
|Stray Animal                    |27361|
|Aggressive Animal(Non-Critical) |25492|
|Cart Exchange Request           |22608|
|Junk Vehicle On Private Property|21649|
|Pot Hole Repair                 |20827|
+--------------------------------+-----+
only showing top 10 rows



In [75]:
(
    join_df.select('service_request_type', 'num_days_late')
    .groupby('service_request_type')
    .agg(f.avg('num_days_late').alias("avg_days_late"))
    .sort(f.col('avg_days_late'), ascending=False)
    .show(5, truncate=False)
)

+--------------------------------------+------------------+
|service_request_type                  |avg_days_late     |
+--------------------------------------+------------------+
|Zoning: Junk Yards                    |175.9563621042095 |
|Labeling for Used Mattress            |162.43032902285717|
|Record Keeping of Used Mattresses     |153.99724039428568|
|Signage Requied for Sale of Used Mattr|151.63868055333333|
|Storage of Used Mattress              |142.11255641500003|
+--------------------------------------+------------------+
only showing top 5 rows





In [71]:
join_df.select('department', 'num_days_late').groupby('department').agg(f.avg('num_days_late')).show(5)

[Stage 149:===>                                                   (1 + 15) / 16]

+--------------------+-------------------+
|          department| avg(num_days_late)|
+--------------------+-------------------+
|         Solid Waste| -2.200057513672161|
|Animal Care Services|-226.51783940550325|
|Trans & Cap Impro...| -20.61283735405272|
|  Parks & Recreation| -5.251521960055153|
|    Customer Service| 59.737091496300785|
+--------------------+-------------------+
only showing top 5 rows



                                                                                

In [80]:
(
    join_df.select('department', 'service_request_type', 'num_days_late')
    .groupby('department', 'service_request_type')
    .agg(f.avg('num_days_late').alias("avg_days_late"))
    .sort('department')
    #.sort(f.col('avg_days_late'), ascending=False)
    .show(20, truncate=False)
)

+--------------------+--------------------------------------+--------------------+
|department          |service_request_type                  |avg_days_late       |
+--------------------+--------------------------------------+--------------------+
|Animal Care Services|Injured Animal(Critical)              |-0.06529248474639535|
|Animal Care Services|Stray Animal                          |-998.8064665118966  |
|Animal Care Services|Animal Bite(Critical)                 |0.01909683481311018 |
|Animal Care Services|Aggressive Animal(Critical)           |16.696368811892228  |
|Animal Care Services|Animal Permits Request                |22.19792271447681   |
|Animal Care Services|Aggressive Animal(Non-Critical)       |2.645033883163267   |
|Animal Care Services|Animal Bite(Non-Critical)             |-2.505667547973447  |
|Animal Care Services|Spay/Neuter Request Response          |-6.976550926        |
|Animal Care Services|Animal Cruelty(Critical)              |-0.06219277353522766|
|Ani

[Stage 184:===>                                                   (1 + 15) / 16]                                                                                