# Wrangle Data in Spark

In [19]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.getOrCreate()

import numpy as np
import pandas as pd

This exercises uses the `case.csv`, `dept.csv`, and `source.csv` files from the san antonio 311 call dataset.

1. Read the case, department, and source data into their own spark dataframes.

2. Let's see how writing to the local disk works in spark:

    - Write the code necessary to store the source data in both csv and json format, store these as `sources_csv` and `sources_json`
    - Inspect your folder structure. What do you notice?
3. Inspect the data in your dataframes. Are the data types appropriate? Write the code necessary to cast the values to the appropriate types.

In [20]:
# read the case, department, and source data into their own spark dataframes.
case = spark.read.csv("case.csv", sep=",", header=True, inferSchema=True)

dept = spark.read.csv("dept.csv", sep=",", header=True, inferSchema=True)

source = spark.read.csv("source.csv", sep=",", header=True, inferSchema=True)

In [21]:
# write the code necessary to store the source data in both csv and json format, 
# store these as sources_csv and sources_json

# write to csv
source.write.csv("source_csv", mode="overwrite")

# write to json
source.write.json("source_json", mode="overwrite")

In [22]:
# inspect your folder structure. what do you notice?


In [23]:
# Inspect the data in your dataframes. Are the data types appropriate? 
# Write the code necessary to cast the values to the appropriate types.

source.dtypes

[('source_id', 'string'), ('source_username', 'string')]

In [24]:
source.show(5)

+---------+----------------+
|source_id| source_username|
+---------+----------------+
|   100137|Merlene Blodgett|
|   103582|     Carmen Cura|
|   106463| Richard Sanchez|
|   119403|  Betty De Hoyos|
|   119555|  Socorro Quiara|
+---------+----------------+
only showing top 5 rows



In [25]:
dept.dtypes

[('dept_division', 'string'),
 ('dept_name', 'string'),
 ('standardized_dept_name', 'string'),
 ('dept_subject_to_SLA', 'string')]

In [26]:
dept.show(3)

+---------------+--------------------+----------------------+-------------------+
|  dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+---------------+--------------------+----------------------+-------------------+
|311 Call Center|    Customer Service|      Customer Service|                YES|
|          Brush|Solid Waste Manag...|           Solid Waste|                YES|
|Clean and Green|Parks and Recreation|    Parks & Recreation|                YES|
+---------------+--------------------+----------------------+-------------------+
only showing top 3 rows



In [30]:
# change the dept_subject_to_SLA to bool
dept = dept.withColumn('dept_subject_to_SLA', expr('dept_subject_to_SLA == "YES"'))

dept.show(3)

+---------------+--------------------+----------------------+-------------------+
|  dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+---------------+--------------------+----------------------+-------------------+
|311 Call Center|    Customer Service|      Customer Service|               true|
|          Brush|Solid Waste Manag...|           Solid Waste|               true|
|Clean and Green|Parks and Recreation|    Parks & Recreation|               true|
+---------------+--------------------+----------------------+-------------------+
only showing top 3 rows



In [31]:
case.dtypes

[('case_id', 'int'),
 ('case_opened_date', 'string'),
 ('case_closed_date', 'string'),
 ('SLA_due_date', 'string'),
 ('case_late', 'string'),
 ('num_days_late', 'double'),
 ('case_closed', 'string'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'int')]

In [32]:
# change case closed and case late to bool values
case = case.withColumn("case_closed", expr('case_closed == "YES"')).withColumn(
    "case_late", expr('case_late == "YES"')
)

case.select("case_closed", "case_late").show(5)

+-----------+---------+
|case_closed|case_late|
+-----------+---------+
|       true|    false|
|       true|    false|
|       true|    false|
|       true|    false|
|       true|     true|
+-----------+---------+
only showing top 5 rows



In [33]:
# convert council_district to a string since it's a unique 
# identifier and we won't be doing any math with it
case = case.withColumn("council_district", col("council_district").cast("string"))

In [34]:
# case dates need to be changed to date types

# rename the SLA due date column
case = case.withColumnRenamed("SLA_due_date", "case_due_date")

# visual of current date observations before changing the type
case.select("case_opened_date", "case_closed_date", "case_due_date").show(5)

+----------------+----------------+-------------+
|case_opened_date|case_closed_date|case_due_date|
+----------------+----------------+-------------+
|     1/1/18 0:42|    1/1/18 12:29| 9/26/20 0:42|
|     1/1/18 0:46|     1/3/18 8:11|  1/5/18 8:30|
|     1/1/18 0:48|     1/2/18 7:57|  1/5/18 8:30|
|     1/1/18 1:29|     1/2/18 8:13| 1/17/18 8:30|
|     1/1/18 1:34|    1/1/18 13:29|  1/1/18 4:34|
+----------------+----------------+-------------+
only showing top 5 rows



1. How old is the latest (in terms of days past SLA) currently open issue? How long has the oldest (in terms of days since opened) currently opened issue been open?
2. How many Stray Animal cases are there?
3. How many service requests that are assigned to the Field Operations department (`dept_division`) are not classified as "Officer Standby" request type (`service_request_type`)?

4. Convert the `council_district` column to a string column.

5. Extract the year from the case_closed_date column.

6. Convert num_days_late from days to hours in new columns num_hours_late.

7. Join the case data with the source and department data.

8. Are there any cases that do not have a request source?

9. What are the top 10 service request types in terms of number of requests?

10. What are the top 10 service request types in terms of average days late?

11. Does number of days late depend on department?

12. How do number of days late depend on department and request type?

In [42]:
case.show(3, False, True)

-RECORD 0-----------------------------------------------------
 case_id              | 1014127332                            
 case_opened_date     | 1/1/18 0:42                           
 case_closed_date     | 1/1/18 12:29                          
 case_due_date        | 9/26/20 0:42                          
 case_late            | false                                 
 num_days_late        | -998.5087616000001                    
 case_closed          | true                                  
 dept_division        | Field Operations                      
 service_request_type | Stray Animal                          
 SLA_days             | 999.0                                 
 case_status          | Closed                                
 source_id            | svcCRMLS                              
 request_address      | 2315  EL PASO ST, San Antonio, 78207  
 council_district     | 5                                     
 num_hours_late       | -23964.2102784                 

### 1. How old is the latest (in terms of days past SLA) currently open issue? How long has the oldest (in terms of days since opened) currently opened issue been open?

In [46]:
# How old is the latest (in terms of days past SLA) currently open issue? 
# How long has the oldest (in terms of days since opened) currently opened issue been open?

#  calculate the days current day - case due date

# case.select(datediff(current_timestamp(), 'case_due_date').alias( "days_past_due" ))\
# .filter(case.case_status == 'Open').sort(col('days_past_due').desc()).show(5)

# case.select('*', datediff(current_timestamp(), 'case_due_date').alias( "days_past_due" ))\
# .filter(case.case_status == 'Open').orderBy(desc("days_past_due")).show(5, False, True)

### 2. How many Stray Animal cases are there?

In [47]:
# total cases of stray animal
case.filter(case.service_request_type == 'Stray Animal').count()

26760

### 3. How many service requests that are assigned to the Field Operations department (dept_division) are not classified as "Officer Standby" request type (service_request_type)?

In [48]:
case.filter(case.dept_division == "Field Operations")\
.where(case.service_request_type != 'Officer Standby').count()

113902

In [49]:
# or 
(
    case.filter(expr("dept_division == 'Field Operations'"))
    .filter(expr('service_request_type != "Officer Standby"'))
    .count()
)

113902

### 4. Convert the council_district column to a string column.

In [50]:
# council_district as a string instead of int
case = case.withColumn('council_district', col('council_district').cast('string'))

### 5. Extract the year from the case_closed_date column.

In [51]:
case = case.withColumn("case_closed_year", year("case_closed_date"))

case.show(2, False, True)

-RECORD 0----------------------------------------------------
 case_id              | 1014127332                           
 case_opened_date     | 1/1/18 0:42                          
 case_closed_date     | 1/1/18 12:29                         
 case_due_date        | 9/26/20 0:42                         
 case_late            | false                                
 num_days_late        | -998.5087616000001                   
 case_closed          | true                                 
 dept_division        | Field Operations                     
 service_request_type | Stray Animal                         
 SLA_days             | 999.0                                
 case_status          | Closed                               
 source_id            | svcCRMLS                             
 request_address      | 2315  EL PASO ST, San Antonio, 78207 
 council_district     | 5                                    
 num_hours_late       | -23964.2102784                       
 case_cl

### 6. Convert num_days_late from days to hours in new columns num_hours_late.

In [35]:
case.select('num_days_late').show(3)

+-------------------+
|      num_days_late|
+-------------------+
| -998.5087616000001|
|-2.0126041669999997|
|       -3.022337963|
+-------------------+
only showing top 3 rows



In [36]:
# convert num_days_late from days to hours in new column num_hours_late.
# 24 hours in a day so we'll multiply num_days_late by 24

case = case.withColumn('num_hours_late', expr('num_days_late*24'))

case.select("num_days_late", "num_hours_late").show(5)

+-------------------+-------------------+
|      num_days_late|     num_hours_late|
+-------------------+-------------------+
| -998.5087616000001|     -23964.2102784|
|-2.0126041669999997|-48.302500007999996|
|       -3.022337963|      -72.536111112|
|       -15.01148148|      -360.27555552|
|0.37216435200000003|  8.931944448000001|
+-------------------+-------------------+
only showing top 5 rows



### 7. Join the case data with the source and department data.

In [37]:
# join the case data with the source and department data.

# take a look at the tables' columns
case.dtypes, dept.dtypes, source.dtypes

([('case_id', 'int'),
  ('case_opened_date', 'string'),
  ('case_closed_date', 'string'),
  ('case_due_date', 'string'),
  ('case_late', 'boolean'),
  ('num_days_late', 'double'),
  ('case_closed', 'boolean'),
  ('dept_division', 'string'),
  ('service_request_type', 'string'),
  ('SLA_days', 'double'),
  ('case_status', 'string'),
  ('source_id', 'string'),
  ('request_address', 'string'),
  ('council_district', 'string'),
  ('num_hours_late', 'double')],
 [('dept_division', 'string'),
  ('dept_name', 'string'),
  ('standardized_dept_name', 'string'),
  ('dept_subject_to_SLA', 'boolean')],
 [('source_id', 'string'), ('source_username', 'string')])

In [56]:
# join the case data with the source and department data.
df = (
    case
    # left join on dept_division
    .join(dept, "dept_division", "left")
    # drop  the columns : dept_division, dept_name  standardized name, as it has much fewer unique values
    .drop(dept.dept_division)
    .drop(dept.dept_name)
    #rename standardized name,  
    .withColumnRenamed("standardized_dept_name", "department")
)

(
    case.join(source, "source_id", "left")
    .sort(col("source_username"))
    .show(5, vertical=True)
)

df.show(1, False, True)

-RECORD 0------------------------------------
 source_id            | af26445              
 case_id              | 1014219925           
 case_opened_date     | 2/5/18 14:16         
 case_closed_date     | 2/7/18 12:16         
 case_due_date        | 5/1/18 14:16         
 case_late            | false                
 num_days_late        | -83.08341435         
 case_closed          | true                 
 dept_division        | Signals              
 service_request_type | Signal Timing Mod... 
 SLA_days             | 85.0                 
 case_status          | Closed               
 request_address      | EWING HALSELL and... 
 council_district     | 8                    
 num_hours_late       | -1994.0019444        
 case_closed_year     | null                 
 source_username      | Alex Franklin        
-RECORD 1------------------------------------
 source_id            | af26445              
 case_id              | 1014220627           
 case_opened_date     | 2/5/18 16:

#### 8. Are there any cases that do not have a request source?

In [57]:
# look for missing values
df.filter("source_id is null").show(5,False, True)

(0 rows)



In [58]:
# another way
df.where(df.source_id.isNull()).show(3, False, True)

(0 rows)



### 9. What are the top 10 service request types in terms of number of requests?

In [59]:
df.groupBy("service_request_type").count().show(truncate= False)

+--------------------------------------+-----+
|service_request_type                  |count|
+--------------------------------------+-----+
|Minimum Housing-Owner Occupied        |8543 |
|Tree Removal                          |298  |
|Service Information                   |160  |
|Sign Maintenance                      |82   |
|Park Building Maint Invest            |48   |
|Brush Property Damage                 |184  |
|Graffiti: Private Property (Corridors)|8525 |
|Traffic Sign Graffiti                 |2123 |
|License Renewal Invoice               |1349 |
|Used/Scrap Tire Facility Registration |19   |
|Guardrail- New Request                |100  |
|Markings Installation SMO (NEW)       |8    |
|CCO_Request for Research/Information_1|2    |
|Sewer Line Broken                     |1107 |
|Zoning: Multi-Family In Single        |735  |
|Engineering Investigation             |489  |
|Zoning: Setbacks                      |809  |
|Traffic Sign Faded                    |2122 |
|Permits, Fen

In [60]:
df.groupBy("service_request_type").count().orderBy(desc("count")).show(10, truncate= False)

+--------------------------------+-----+
|service_request_type            |count|
+--------------------------------+-----+
|No Pickup                       |86855|
|Overgrown Yard/Trash            |65895|
|Bandit Signs                    |32910|
|Damaged Cart                    |30338|
|Front Or Side Yard Parking      |28794|
|Stray Animal                    |26760|
|Aggressive Animal(Non-Critical) |24882|
|Cart Exchange Request           |22024|
|Junk Vehicle On Private Property|21473|
|Pot Hole Repair                 |20616|
+--------------------------------+-----+
only showing top 10 rows



### 10. What are the top 10 service request types in terms of average days late

In [61]:
(
df.where('case_late')# just the rows where case_late == true
.groupBy("service_request_type").mean("num_days_late").orderBy(desc('avg(num_days_late)')).show(10, truncate= False)
)

+--------------------------------------+------------------+
|service_request_type                  |avg(num_days_late)|
+--------------------------------------+------------------+
|Zoning: Recycle Yard                  |210.89201994318182|
|Zoning: Junk Yards                    |200.20517608494276|
|Structure/Housing Maintenance         |190.20707698509807|
|Donation Container Enforcement        |171.09115313942615|
|Storage of Used Mattress              |163.96812829714287|
|Labeling for Used Mattress            |162.43032902285717|
|Record Keeping of Used Mattresses     |153.99724039428568|
|Signage Requied for Sale of Used Mattr|151.63868055333333|
|Traffic Signal Graffiti               |137.64583330000002|
|License Requied Used Mattress Sales   |128.79828704142858|
+--------------------------------------+------------------+
only showing top 10 rows



In [62]:
#if you want to know the number of cases 
(
    df.where('case_late') # just the rows where case_late == true
    .groupBy('service_request_type')
    .agg(mean('num_days_late').alias('n_days_late'), count('*').alias('n_cases'))
    .sort(desc('n_days_late'))
    .show(10, truncate=False)
)

+--------------------------------------+------------------+-------+
|service_request_type                  |n_days_late       |n_cases|
+--------------------------------------+------------------+-------+
|Zoning: Recycle Yard                  |210.89201994318182|132    |
|Zoning: Junk Yards                    |200.20517608494276|262    |
|Structure/Housing Maintenance         |190.20707698509807|51     |
|Donation Container Enforcement        |171.09115313942615|122    |
|Storage of Used Mattress              |163.96812829714287|7      |
|Labeling for Used Mattress            |162.43032902285717|7      |
|Record Keeping of Used Mattresses     |153.99724039428568|7      |
|Signage Requied for Sale of Used Mattr|151.63868055333333|12     |
|Traffic Signal Graffiti               |137.64583330000002|2      |
|License Requied Used Mattress Sales   |128.79828704142858|7      |
+--------------------------------------+------------------+-------+
only showing top 10 rows



### 11. Does number of days late depend on department?

In [63]:
df.where('case_late').groupBy("department").agg(mean("num_days_late")).sort('avg(num_days_late)').show(truncate=False)

+------------------------+------------------+
|department              |avg(num_days_late)|
+------------------------+------------------+
|Metro Health            |6.5438133155476494|
|Solid Waste             |7.186821906120899 |
|Trans & Cap Improvements|10.603064680316946|
|Parks & Recreation      |22.348910457867518|
|Animal Care Services    |23.458633245820124|
|DSD/Code Enforcement    |49.38428705358908 |
|Customer Service        |87.68385942150394 |
+------------------------+------------------+



In [64]:
df.where('case_late').groupBy("department").count().show(20)

+--------------------+-----+
|          department|count|
+--------------------+-----+
|         Solid Waste|32945|
|Animal Care Services|23276|
|Trans & Cap Impro...| 5411|
|  Parks & Recreation| 3797|
|    Customer Service| 2010|
|        Metro Health|  829|
|DSD/Code Enforcement|26235|
+--------------------+-----+



In [65]:
(
    df.filter('case_late')
    .groupby('department')
    .agg(mean('num_days_late').alias('days_late'), count('num_days_late').alias('n_cases_late'))
    .sort('days_late')
    .withColumn('days_late', round(col('days_late'), 1))
    .show(truncate=False)
)

+------------------------+---------+------------+
|department              |days_late|n_cases_late|
+------------------------+---------+------------+
|Metro Health            |6.5      |829         |
|Solid Waste             |7.2      |32945       |
|Trans & Cap Improvements|10.6     |5411        |
|Parks & Recreation      |22.3     |3797        |
|Animal Care Services    |23.5     |23276       |
|DSD/Code Enforcement    |49.4     |26235       |
|Customer Service        |87.7     |2010        |
+------------------------+---------+------------+



### 12. How do number of days late depend on department and request type?

In [66]:
df.filter("case_closed").groupBy("department", "service_request_type").mean("num_days_late")\
.orderBy(desc('avg(num_days_late)')).show(50, truncate= False)

+------------------------+----------------------------------------+------------------+
|department              |service_request_type                    |avg(num_days_late)|
+------------------------+----------------------------------------+------------------+
|DSD/Code Enforcement    |Zoning: Junk Yards                      |209.39675114281033|
|DSD/Code Enforcement    |Labeling for Used Mattress              |162.43032902285717|
|DSD/Code Enforcement    |Record Keeping of Used Mattresses       |153.99724039428568|
|DSD/Code Enforcement    |Signage Requied for Sale of Used Mattr  |151.63868055333333|
|DSD/Code Enforcement    |Storage of Used Mattress                |142.11255641500003|
|DSD/Code Enforcement    |Donation Container Enforcement          |141.27462658777188|
|DSD/Code Enforcement    |Zoning: Recycle Yard                    |138.9798982976596 |
|DSD/Code Enforcement    |License Requied Used Mattress Sales     |128.79828704142858|
|DSD/Code Enforcement    |Vendors          

In [67]:
# other way
(
    df.filter("case_closed")
#     .filter("case_late")
    .groupby("department", "service_request_type")
    .agg(avg("num_days_late").alias("days_late"), count("*").alias("n_cases"))
    .withColumn("days_late", round(col("days_late"), 1))
    .where(col('days_late') > 0)
    .sort(desc("days_late"))
    .show(40, truncate=False)
)

+------------------------+----------------------------------------+---------+-------+
|department              |service_request_type                    |days_late|n_cases|
+------------------------+----------------------------------------+---------+-------+
|DSD/Code Enforcement    |Zoning: Junk Yards                      |209.4    |174    |
|DSD/Code Enforcement    |Labeling for Used Mattress              |162.4    |7      |
|DSD/Code Enforcement    |Record Keeping of Used Mattresses       |154.0    |7      |
|DSD/Code Enforcement    |Signage Requied for Sale of Used Mattr  |151.6    |12     |
|DSD/Code Enforcement    |Storage of Used Mattress                |142.1    |8      |
|DSD/Code Enforcement    |Donation Container Enforcement          |141.3    |114    |
|DSD/Code Enforcement    |Zoning: Recycle Yard                    |139.0    |141    |
|DSD/Code Enforcement    |License Requied Used Mattress Sales     |128.8    |7      |
|DSD/Code Enforcement    |Vendors                     