# Spark Wrangle Exercise

## Data Acquisition

### 1. Read the cases, department, and source data into their own Spark dataframes.


In [18]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import env as e

from pyspark.sql import functions as F

In [2]:
# read the cases data from sql
spark = SparkSession.builder.getOrCreate()
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/26 13:39:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Cases 311_data

In [3]:
url = e.get_db_url('311_data')
query = 'select * from cases limit 100000'

In [4]:
df = pd.read_sql(query, url)

In [5]:
df = df.to_csv('cases.csv')

In [6]:
df = pd.read_csv('cases.csv', index_col=0)

In [7]:
cases_df = spark.createDataFrame(df)

  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:


In [8]:
cases_df.show(5, vertical=True, truncate=False)

                                                                                

-RECORD 0-----------------------------------------------------
 case_id              | 1014127332                            
 case_opened_date     | 1/1/18 0:42                           
 case_closed_date     | 1/1/18 12:29                          
 SLA_due_date         | 9/26/20 0:42                          
 case_late            | NO                                    
 num_days_late        | -998.5087616                          
 case_closed          | YES                                   
 dept_division        | Field Operations                      
 service_request_type | Stray Animal                          
 SLA_days             | 999.0                                 
 case_status          | Closed                                
 source_id            | svcCRMLS                              
 request_address      | 2315  EL PASO ST, San Antonio, 78207  
 council_district     | 5                                     
-RECORD 1----------------------------------------------

## Sources 311_data

In [9]:
source_df = pd.read_sql('select * from source', url)

In [10]:
source_df = spark.createDataFrame(source_df)
source_df.show(5, vertical=True, truncate=False)

  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:


-RECORD 0---------------------------
 index           | 0                
 source_id       | 100137           
 source_username | Merlene Blodgett 
-RECORD 1---------------------------
 index           | 1                
 source_id       | 103582           
 source_username | Carmen Cura      
-RECORD 2---------------------------
 index           | 2                
 source_id       | 106463           
 source_username | Richard Sanchez  
-RECORD 3---------------------------
 index           | 3                
 source_id       | 119403           
 source_username | Betty De Hoyos   
-RECORD 4---------------------------
 index           | 4                
 source_id       | 119555           
 source_username | Socorro Quiara   
only showing top 5 rows



## Dept from 311_data

In [11]:
dept_df = pd.read_sql('select * from dept', url)
dept_df = spark.createDataFrame(dept_df)
dept_df.show(5, vertical=True, truncate=False)

  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:


-RECORD 0-----------------------------------------------
 dept_division          | 311 Call Center               
 dept_name              | Customer Service              
 standardized_dept_name | Customer Service              
 dept_subject_to_SLA    | YES                           
-RECORD 1-----------------------------------------------
 dept_division          | Brush                         
 dept_name              | Solid Waste Management        
 standardized_dept_name | Solid Waste                   
 dept_subject_to_SLA    | YES                           
-RECORD 2-----------------------------------------------
 dept_division          | Clean and Green               
 dept_name              | Parks and Recreation          
 standardized_dept_name | Parks & Recreation            
 dept_subject_to_SLA    | YES                           
-RECORD 3-----------------------------------------------
 dept_division          | Clean and Green Natural Areas 
 dept_name              | Parks

### 2. Let's see how writing to the local disk works in Spark:


* Write the code necessary to store the source data in both csv and json format, store these as sources_csv and sources_json


In [12]:
# write the code necessary to store the source_df in both csv and json format,
# store these as sources_csv and sources_json
source_df.write.csv('sources_csv', mode='overwrite')
source_df.write.json('sources_json', mode='overwrite')

                                                                                

* Inspect your folder structure. What do you notice?


### 3. Inspect the data in your dataframes. Are the data types appropriate? Write the code necessary to cast the values to the appropriate types.

In [13]:
# inspect the data in your dataframes
# write the code necessary to cast the values to the appropriate types
cases_df.printSchema()

root
 |-- case_id: long (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- SLA_due_date: string (nullable = true)
 |-- case_late: string (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: long (nullable = true)



In [17]:
cases_df.show(1, vertical=True, truncate=False)

-RECORD 0----------------------------------------------------
 case_id              | 1014127332                           
 case_opened_date     | 1/1/18 0:42                          
 case_closed_date     | 1/1/18 12:29                         
 case_due_date        | 9/26/20 0:42                         
 case_late            | NO                                   
 num_days_late        | -998.5087616                         
 case_closed          | YES                                  
 dept_division        | Field Operations                     
 service_request_type | Stray Animal                         
 case_days            | 999.0                                
 case_status          | Closed                               
 source_id            | svcCRMLS                             
 request_address      | 2315  EL PASO ST, San Antonio, 78207 
 council_district     | 5                                    
only showing top 1 row



In [15]:
cases_df = cases_df.withColumnRenamed('SLA_due_date', 'case_due_date')
cases_df = cases_df.withColumnRenamed('SLA_days', 'case_days')

In [19]:
fmt = "M/d/yy H:mm"

In [20]:
cases_df = cases_df.withColumns({
    'case_opened_date': F.to_timestamp(F.col('case_opened_date'),
                  fmt), 
    'case_closed_date': F.to_timestamp(F.col('case_closed_date'),
                  fmt), 
    'case_due_date': F.to_timestamp(F.col('case_due_date'),
                  fmt)})

In [21]:
# change close_closed and case_late columns into boolean values
cases_df = cases_df.withColumns(
    {
        'case_closed': F.when(F.col('case_closed') == 'YES', True).cast('int'),
        'case_late': F.when(F.col('case_late') == 'YES', True).cast('int')
    }
)

In [23]:
#use .cast()
# pad with zeros using a concatenation
cases_df = cases_df.withColumn(
    'council_district',
    F.format_string(
        '%03d',
        cases_df.council_district.cast('int')
))

In [26]:
# normalize address
cases_df = cases_df.withColumn(
    'request_address',
    F.trim(F.lower(F.col('request_address')))
)

In [27]:
cases_df.printSchema()

root
 |-- case_id: long (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- case_due_date: timestamp (nullable = true)
 |-- case_late: integer (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: integer (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- case_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = false)



In [28]:
# lets divide that numerical column by seven to get num_weeks_late
cases_df = cases_df.withColumn(
    'num_weeks_late',
    F.round(F.col('num_days_late') / 7, 1))

## Data Exploratory Questions



### 1. How old is the latest (in terms of days past SLA) currently open issue? How long has the oldest (in terms of days since opened) currently opened issue been open?


In [30]:
cases_df.show(1, vertical=True, truncate=False)

-RECORD 0----------------------------------------------------
 case_id              | 1014127332                           
 case_opened_date     | 2018-01-01 00:42:00                  
 case_closed_date     | 2018-01-01 12:29:00                  
 case_due_date        | 2020-09-26 00:42:00                  
 case_late            | NULL                                 
 num_days_late        | -998.5087616                         
 case_closed          | 1                                    
 dept_division        | Field Operations                     
 service_request_type | Stray Animal                         
 case_days            | 999.0                                
 case_status          | Closed                               
 source_id            | svcCRMLS                             
 request_address      | 2315  el paso st, san antonio, 78207 
 council_district     | 005                                  
 num_weeks_late       | -142.6                               
only sho

In [32]:
#use datediff() to find the difference between two dates
cases_df = cases_df.withColumn(
    'case_age',
    F.datediff(F.current_timestamp(), F.col('case_opened_date')
    ))

In [33]:
# how old is the latest open issue in days?
cases_df.select(F.max(F.col('case_age'))).show()

23/10/26 14:56:55 WARN TaskSetManager: Stage 12 contains a task of very large size (1005 KiB). The maximum recommended task size is 1000 KiB.
[Stage 12:====>                                                   (1 + 11) / 12]

+-------------+
|max(case_age)|
+-------------+
|         2124|
+-------------+



                                                                                

In [29]:
# how old is the latest open issue in days?

cases_df.select(F.max(F.col('case_opened_date'))).show()

23/10/26 14:52:59 WARN TaskSetManager: Stage 8 contains a task of very large size (1005 KiB). The maximum recommended task size is 1000 KiB.
[Stage 8:>                                                        (0 + 12) / 12]

+---------------------+
|max(case_opened_date)|
+---------------------+
|  2018-03-13 16:55:00|
+---------------------+



                                                                                

### 2. How many Stray Animal cases are there?


In [34]:
# how many Stray Animal cases are there?
cases_df.filter(F.col('service_request_type') == 'Stray Animal').count()

23/10/26 14:57:23 WARN TaskSetManager: Stage 15 contains a task of very large size (1005 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

3497

### 3. How many service requests that are assigned to the Field Operations department (dept_division) are not classified as "Officer Standby" request type (service_request_type)?


In [35]:
dept_df.show(1, vertical=True, truncate=False)

-RECORD 0----------------------------------
 dept_division          | 311 Call Center  
 dept_name              | Customer Service 
 standardized_dept_name | Customer Service 
 dept_subject_to_SLA    | YES              
only showing top 1 row



In [36]:
df = cases_df.join(dept_df, 'dept_division', 'left').join(source_df, 'source_id', 'left')


In [37]:
df.printSchema()

root
 |-- source_id: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- case_id: long (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- case_due_date: timestamp (nullable = true)
 |-- case_late: integer (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: integer (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- case_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = false)
 |-- num_weeks_late: double (nullable = true)
 |-- case_age: integer (nullable = true)
 |-- dept_name: string (nullable = true)
 |-- standardized_dept_name: string (nullable = true)
 |-- dept_subject_to_SLA: string (nullable = true)
 |-- index: long (nullable = true)
 |-- source_username: string (nullable = true)



In [38]:
# see the shape of the data
df.count(), len(df.columns)

23/10/26 15:00:47 WARN TaskSetManager: Stage 19 contains a task of very large size (1005 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

(101722, 21)

In [39]:
# how many service requests are assigned to the Field Operations department that are not classified as 'officer stand by' request type?
df.filter((F.col('dept_name') == 'Field Operations') & (F.col('service_request_type') != 'Officer Standby')).count()

23/10/26 15:02:38 WARN TaskSetManager: Stage 31 contains a task of very large size (1005 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

0

### 4. Convert the council_district column to a string column.


In [40]:
# convert the council_district column to a string column.
df = df.withColumn('council_district', F.col('council_district').cast('string'))

In [41]:
df.printSchema()

root
 |-- source_id: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- case_id: long (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- case_due_date: timestamp (nullable = true)
 |-- case_late: integer (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: integer (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- case_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = false)
 |-- num_weeks_late: double (nullable = true)
 |-- case_age: integer (nullable = true)
 |-- dept_name: string (nullable = true)
 |-- standardized_dept_name: string (nullable = true)
 |-- dept_subject_to_SLA: string (nullable = true)
 |-- index: long (nullable = true)
 |-- source_username: string (nullable = true)



### 5. Extract the year from the case_closed_date column.


In [44]:
# extract the year from the case_closed_date column.
df = df.withColumn('year', F.year('case_closed_date'))

### 6. Convert num_days_late from days to hours in a new column num_hours_late.


In [45]:
# convert num_days_late from days to hours in new columns num_hours_late.
df = df.withColumn('num_hours_late', F.round(F.col('num_days_late') * 24, 2))

### 7. Join the cases data with the source and department data.


In [46]:
df.printSchema()

root
 |-- source_id: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- case_id: long (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- case_due_date: timestamp (nullable = true)
 |-- case_late: integer (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: integer (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- case_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = false)
 |-- num_weeks_late: double (nullable = true)
 |-- case_age: integer (nullable = true)
 |-- dept_name: string (nullable = true)
 |-- standardized_dept_name: string (nullable = true)
 |-- dept_subject_to_SLA: string (nullable = true)
 |-- index: long (nullable = true)
 |-- source_username: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- nu

In [47]:
df.show(1, vertical=True, truncate=False)

23/10/26 15:05:42 WARN TaskSetManager: Stage 45 contains a task of very large size (1005 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

-RECORD 0------------------------------------------------------
 source_id              | svcCRMLS                             
 dept_division          | Field Operations                     
 case_id                | 1014127332                           
 case_opened_date       | 2018-01-01 00:42:00                  
 case_closed_date       | 2018-01-01 12:29:00                  
 case_due_date          | 2020-09-26 00:42:00                  
 case_late              | NULL                                 
 num_days_late          | -998.5087616                         
 case_closed            | 1                                    
 service_request_type   | Stray Animal                         
 case_days              | 999.0                                
 case_status            | Closed                               
 request_address        | 2315  el paso st, san antonio, 78207 
 council_district       | 005                                  
 num_weeks_late         | -142.6        

### 8. Are there any cases that do not have a request source?


In [48]:
# are there any cases that do not have a request source?
df.filter(F.col('source_id').isNull()).count()

23/10/26 15:06:31 WARN TaskSetManager: Stage 54 contains a task of very large size (1005 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

0

### 9. What are the top 10 service request types in terms of number of requests?


In [49]:
# what are the top 10 service request types in terms of number of requests?
df.groupBy('service_request_type').count().sort(F.desc('count')).show(10, truncate=False)

23/10/26 15:09:06 WARN TaskSetManager: Stage 59 contains a task of very large size (1005 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+---------------------------------+-----+
|service_request_type             |count|
+---------------------------------+-----+
|No Pickup                        |9219 |
|Overgrown Yard/Trash             |6901 |
|Front Or Side Yard Parking       |5354 |
|Bandit Signs                     |4788 |
|Aggressive Animal(Non-Critical)  |3637 |
|Stray Animal                     |3618 |
|Damaged Cart                     |3552 |
|Cart Exchange Request            |3365 |
|Right Of Way/Sidewalk Obstruction|3136 |
|Junk Vehicle On Private Property |2915 |
+---------------------------------+-----+
only showing top 10 rows



### 10. What are the top 10 service request types in terms of average days late?


In [50]:
# what are the top 10 service request types in terms of average days late?
df.groupBy('service_request_type').agg(F.mean('num_days_late')).sort(F.desc('avg(num_days_late)')).show(10, truncate=False)

23/10/26 15:09:42 WARN TaskSetManager: Stage 71 contains a task of very large size (1005 KiB). The maximum recommended task size is 1000 KiB.
[Stage 79:>                                                         (0 + 2) / 2]

+----------------------------------------+------------------+
|service_request_type                    |avg(num_days_late)|
+----------------------------------------+------------------+
|Request for Research/Information        |NaN               |
|Zoning: Junk Yards                      |178.31639873914892|
|Emergency: Main and Accessory Structures|173.50848380000002|
|Status: On-Going CIMS Projects          |162.1255041796491 |
|Zoning: Recycle Yard                    |152.18173756625004|
|Complaint                               |136.77933500099587|
|Markings Installation SMO (NEW)         |111.3769444       |
|Vendors                                 |73.73983846695651 |
|Donation Container Enforcement          |61.648206019999996|
|No Address Posted                       |61.13694979150944 |
+----------------------------------------+------------------+
only showing top 10 rows



                                                                                

### 11. Does number of days late depend on department? (Answer without the use of a stats test)


In [51]:
# does number of days late depend on department?
df.groupBy('dept_name').agg(F.mean('num_days_late')).sort(F.desc('avg(num_days_late)')).show(10, truncate=False)

23/10/26 15:10:25 WARN TaskSetManager: Stage 83 contains a task of very large size (1005 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+-------------------------+-------------------+
|dept_name                |avg(num_days_late) |
+-------------------------+-------------------+
|City Council             |NaN                |
|NULL                     |152.18173756624998 |
|Customer Service         |111.95715140314537 |
|Development Services     |30.78518419039263  |
|Solid Waste Management   |-1.9424973241389711|
|Parks and Recreation     |-4.490874417843001 |
|Metro Health             |-5.685264872257886 |
|Trans & Cap Improvements |-19.13824761338138 |
|Code Enforcement Services|-34.12980263328008 |
|Animal Care Services     |-224.73718906677817|
+-------------------------+-------------------+



### 12. How do number of days late depend on department and request type?

In [53]:
# how do number of days late depend on department and request type?
df.groupBy('dept_name', 'service_request_type').agg(F.mean('num_days_late')).sort(F.desc('avg(num_days_late)')).show( truncate=False)

23/10/26 15:11:53 WARN TaskSetManager: Stage 107 contains a task of very large size (1005 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+-------------------------+----------------------------------------+------------------+
|dept_name                |service_request_type                    |avg(num_days_late)|
+-------------------------+----------------------------------------+------------------+
|City Council             |Request for Research/Information        |NaN               |
|Code Enforcement Services|Zoning: Junk Yards                      |178.31639873914892|
|Code Enforcement Services|Emergency: Main and Accessory Structures|173.50848380000002|
|Trans & Cap Improvements |Status: On-Going CIMS Projects          |162.1255041796491 |
|NULL                     |Zoning: Recycle Yard                    |152.18173756625004|
|Customer Service         |Complaint                               |136.77933500099587|
|Trans & Cap Improvements |Markings Installation SMO (NEW)         |111.3769444       |
|Code Enforcement Services|Vendors                                 |73.73983846695651 |
|Code Enforcement Services|Donat