In [135]:
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql.functions import * 
from pyspark.sql.functions import concat, sum, avg, min, max, count, mean
from pyspark.sql.functions import asc, desc

import env

In [2]:
spark = SparkSession.builder.getOrCreate()


22/10/22 13:09:23 WARN Utils: Your hostname, daniels-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.185 instead (on interface en0)
22/10/22 13:09:23 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/10/22 13:09:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Read the case, department, and source data into their own spark dataframes.|

In [5]:
#sql query
url = env.get_url(env.user, env.host, env.password, "311_data")
query1 = 'select * from cases'
query2 = 'select * from dept'
query3 = 'select * from source'

In [6]:
pandas_cases = pd.read_sql(query1, url)
pandas_dept = pd.read_sql(query2, url)
pandas_source = pd.read_sql(query3, url)

In [7]:
cases = spark.createDataFrame(pandas_cases)
dept = spark.createDataFrame(pandas_dept)
source = spark.createDataFrame(pandas_source)


# Write the code necessary to store the source data in both csv and json format, store these as sources_csv and sources_json


In [14]:
(
    source.write.format('csv')
    .mode('overwrite')
    .option('header', 'True')
    .save('data/sources_csv')
)

[Stage 6:>                                                        (0 + 10) / 10]                                                                                

In [15]:
source.write.json('data/sources_json', mode='overwrite')

# Inspect the data in your dataframes. Are the data types appropriate? Write the code necessary to cast the values to the appropriate types.

### Start with cases dataframe

In [18]:
cases.dtypes

[('case_id', 'bigint'),
 ('case_opened_date', 'string'),
 ('case_closed_date', 'string'),
 ('SLA_due_date', 'string'),
 ('case_late', 'string'),
 ('num_days_late', 'double'),
 ('case_closed', 'string'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'bigint')]

In [39]:
# case_opened_date should be datetime
# case closed_date should be datetime
# SLA_due_date should be datetime
# case_closed should be boolean
# case_late should be boolean

### Convert dates

In [25]:
date_format = 'M/d/yy H:m'

In [29]:
cases = cases.withColumn('case_opened_date', to_timestamp('case_opened_date', date_format))
cases = cases.withColumn('case_closed_date', to_timestamp('case_closed_date', date_format))
cases = cases.withColumn('SLA_due_date', to_timestamp('SLA_due_date', date_format))

### Convert bools

In [36]:
cases = cases.withColumn('case_late', expr("case_late == 'YES'"))
cases = cases.withColumn('case_closed', expr("case_closed == 'YES'"))

In [38]:
cases.dtypes

[('case_id', 'bigint'),
 ('case_opened_date', 'timestamp'),
 ('case_closed_date', 'timestamp'),
 ('SLA_due_date', 'timestamp'),
 ('case_late', 'boolean'),
 ('num_days_late', 'double'),
 ('case_closed', 'boolean'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'bigint')]

### Next, Departments

In [40]:
dept.dtypes

[('dept_division', 'string'),
 ('dept_name', 'string'),
 ('standardized_dept_name', 'string'),
 ('dept_subject_to_SLA', 'string')]

In [41]:
dept.show(1, vertical = True)

-RECORD 0----------------------------------
 dept_division          | 311 Call Center  
 dept_name              | Customer Service 
 standardized_dept_name | Customer Service 
 dept_subject_to_SLA    | YES              
only showing top 1 row



In [42]:
# convert dept_subject_to_SLA to boolean

In [43]:
dept = dept.withColumn('dept_subject_to_SLA', expr("dept_subject_to_SLA == 'YES'"))


In [44]:
dept.show(1, vertical = True)

-RECORD 0----------------------------------
 dept_division          | 311 Call Center  
 dept_name              | Customer Service 
 standardized_dept_name | Customer Service 
 dept_subject_to_SLA    | true             
only showing top 1 row



### Finally, source

In [45]:
source.dtypes

[('index', 'bigint'), ('source_id', 'string'), ('source_username', 'string')]

In [170]:
source.show(3, vertical=True)

-RECORD 0---------------------------
 index           | 0                
 source_id       | 0                
 source_username | Merlene Blodgett 
-RECORD 1---------------------------
 index           | 1                
 source_id       | 0                
 source_username | Carmen Cura      
-RECORD 2---------------------------
 index           | 2                
 source_id       | 0                
 source_username | Richard Sanchez  
only showing top 3 rows



In [51]:
# convert source_id to integer

In [172]:
source = source.withColumn("source_id",source.source_id.cast('integer'))


In [173]:
source.dtypes

[('index', 'bigint'), ('source_id', 'int'), ('source_username', 'string')]

# How old is the latest (in terms of days past SLA) currently open issue? 

In [144]:
cases.select(max(cases.case_opened_date)).show()

22/10/22 15:22:56 WARN TaskSetManager: Stage 80 contains a task of very large size (15174 KiB). The maximum recommended task size is 1000 KiB.




+---------------------+
|max(case_opened_date)|
+---------------------+
|  2018-08-08 10:38:00|
+---------------------+



                                                                                

In [145]:
today = to_date(lit("2018-08-08"))

In [149]:
(
    cases.filter(cases.case_status == 'Open')
    .select('case_id', 'SLA_due_date', datediff(today, 'SLA_due_date').alias('days_late'))
    .sort(desc('days_late'))
    .show(5) 
)

22/10/22 15:26:22 WARN TaskSetManager: Stage 86 contains a task of very large size (15174 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

+----------+-------------------+---------+
|   case_id|       SLA_due_date|days_late|
+----------+-------------------+---------+
|1013225646|2017-01-17 08:30:00|      568|
|1013225651|2017-01-17 08:30:00|      568|
|1013226813|2017-01-17 11:26:00|      568|
|1013229328|2017-01-18 10:01:00|      567|
|1013236238|2017-01-20 14:39:00|      565|
+----------+-------------------+---------+
only showing top 5 rows



# How long has the oldest (in terms of days since opened) currently opened issue been open?


In [148]:
(
    cases.filter(cases.case_status == 'Open')
    .select('case_id', 'case_opened_date', datediff(today, 'case_opened_date').alias('days_open'))
    .sort(desc('days_open'))
    .show(5) 
)

22/10/22 15:26:06 WARN TaskSetManager: Stage 85 contains a task of very large size (15174 KiB). The maximum recommended task size is 1000 KiB.




+----------+-------------------+---------+
|   case_id|   case_opened_date|days_open|
+----------+-------------------+---------+
|1013225651|2017-01-01 13:57:00|      584|
|1013225646|2017-01-01 13:48:00|      584|
|1013226813|2017-01-02 11:26:00|      583|
|1013229328|2017-01-03 10:01:00|      582|
|1013232331|2017-01-04 10:16:00|      581|
+----------+-------------------+---------+
only showing top 5 rows





# How many Stray Animal cases are there?


In [160]:
cases.filter(cases.service_request_type == 'Stray Animal').count()

22/10/22 15:35:51 WARN TaskSetManager: Stage 109 contains a task of very large size (15174 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

26760

# How many service requests that are assigned to the Field Operations department (dept_division) are not classified as "Officer Standby" request type (service_request_type)?

In [167]:
(
cases.filter((cases.dept_division == 'Field Operations') & (cases.service_request_type != 'Officer Standby'))
    .count()
    
)

22/10/22 15:40:45 WARN TaskSetManager: Stage 116 contains a task of very large size (15174 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

113902

# Convert the council_district column to a string column.



In [176]:
cases = cases.withColumn("council_district",cases.council_district.cast('string'))


In [179]:
cases.dtypes

[('case_id', 'bigint'),
 ('case_opened_date', 'timestamp'),
 ('case_closed_date', 'timestamp'),
 ('SLA_due_date', 'timestamp'),
 ('case_late', 'boolean'),
 ('num_days_late', 'double'),
 ('case_closed', 'boolean'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'string')]

# Extract the year from the case_closed_date column.


In [184]:
cases.select('case_closed_date', (regexp_extract('case_closed_date', r'\d+', 0).alias('year'))).show()

22/10/22 15:52:51 WARN TaskSetManager: Stage 123 contains a task of very large size (15174 KiB). The maximum recommended task size is 1000 KiB.


[Stage 123:>                                                        (0 + 1) / 1]

22/10/22 15:52:55 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 123 (TID 404): Attempting to kill Python Worker
+-------------------+----+
|   case_closed_date|year|
+-------------------+----+
|2018-01-01 12:29:00|2018|
|2018-01-03 08:11:00|2018|
|2018-01-02 07:57:00|2018|
|2018-01-02 08:13:00|2018|
|2018-01-01 13:29:00|2018|
|2018-01-01 14:38:00|2018|
|2018-01-02 15:32:00|2018|
|2018-01-02 15:32:00|2018|
|2018-01-02 15:32:00|2018|
|2018-01-02 15:32:00|2018|
|2018-01-02 15:32:00|2018|
|2018-01-02 15:32:00|2018|
|2018-01-02 15:33:00|2018|
|2018-01-02 15:32:00|2018|
|2018-01-02 15:33:00|2018|
|2018-01-02 15:33:00|2018|
|2018-01-02 15:33:00|2018|
|2018-01-02 15:33:00|2018|
|2018-01-02 15:33:00|2018|
|2018-01-02 15:33:00|2018|
+-------------------+----+
only showing top 20 rows



                                                                                

# Convert num_days_late from days to hours in new columns num_hours_late.



In [188]:
(
cases.withColumn('num_hours_late', expr('num_days_late * 24'))
    .select('num_days_late', 'num_hours_late')
    .show(6)                                                            
)                                                                                                                                          

22/10/22 16:01:26 WARN TaskSetManager: Stage 126 contains a task of very large size (15174 KiB). The maximum recommended task size is 1000 KiB.


[Stage 126:>                                                        (0 + 1) / 1]

22/10/22 16:01:30 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 126 (TID 407): Attempting to kill Python Worker
+-------------+-------------------+
|num_days_late|     num_hours_late|
+-------------+-------------------+
| -998.5087616|-23964.210278399998|
| -2.012604167|      -48.302500008|
| -3.022337963|      -72.536111112|
| -15.01148148|      -360.27555552|
|  0.372164352|        8.931944448|
| -29.74398148| -713.8555555199999|
+-------------+-------------------+
only showing top 6 rows



                                                                                

# Join the case data with the source and department data.



In [192]:
#join tables
cases = cases.join(dept,
        'dept_division',
        'left')

In [202]:
cases = cases.join(source, 'source_id', 'left')

In [203]:
cases.columns

['source_id',
 'dept_division',
 'case_id',
 'case_opened_date',
 'case_closed_date',
 'SLA_due_date',
 'case_late',
 'num_days_late',
 'case_closed',
 'service_request_type',
 'SLA_days',
 'case_status',
 'request_address',
 'council_district',
 'dept_name',
 'standardized_dept_name',
 'dept_subject_to_SLA',
 'index',
 'source_username']

# Are there any cases that do not have a request source?



In [208]:
cases.filter("source_id is NULL").count()

22/10/22 16:21:35 WARN TaskSetManager: Stage 174 contains a task of very large size (15174 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

0

In [211]:
cases.select(min(cases.source_id)).show()

22/10/22 16:22:48 WARN TaskSetManager: Stage 182 contains a task of very large size (15174 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

+--------------+
|min(source_id)|
+--------------+
|        100137|
+--------------+



# What are the top 10 service request types in terms of number of requests?



In [217]:
(  
    cases.groupBy('service_request_type')
    .count().alias('count')
    .sort(desc('count'))
    .show(10, truncate=False)
)

22/10/22 16:27:53 WARN TaskSetManager: Stage 230 contains a task of very large size (15174 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

+--------------------------------+-----+
|service_request_type            |count|
+--------------------------------+-----+
|No Pickup                       |86855|
|Overgrown Yard/Trash            |65895|
|Bandit Signs                    |32910|
|Damaged Cart                    |30338|
|Front Or Side Yard Parking      |28794|
|Stray Animal                    |26760|
|Aggressive Animal(Non-Critical) |24882|
|Cart Exchange Request           |22024|
|Junk Vehicle On Private Property|21473|
|Pot Hole Repair                 |20616|
+--------------------------------+-----+
only showing top 10 rows



# What are the top 10 service request types in terms of average days late?


In [233]:
(  
    cases.groupBy('service_request_type')
    .agg(mean('num_days_late').alias('avg_days_late'))
    .sort(desc('avg_days_late'))
    .show(10, truncate=False)
)

22/10/22 16:40:02 WARN TaskSetManager: Stage 353 contains a task of very large size (15174 KiB). The maximum recommended task size is 1000 KiB.




+--------------------------------------+------------------+
|service_request_type                  |avg_days_late     |
+--------------------------------------+------------------+
|Request for Research/Information      |NaN               |
|CCO_Request for Research/Information_1|NaN               |
|Zoning: Junk Yards                    |175.9563621042095 |
|Labeling for Used Mattress            |162.43032902285717|
|Record Keeping of Used Mattresses     |153.9972403942857 |
|Signage Requied for Sale of Used Mattr|151.63868055333333|
|Storage of Used Mattress              |142.112556415     |
|Zoning: Recycle Yard                  |135.92851612479797|
|Donation Container Enforcement        |131.75610506358709|
|License Requied Used Mattress Sales   |128.79828704142858|
+--------------------------------------+------------------+
only showing top 10 rows



                                                                                

# Does number of days late depend on department?


In [235]:
(  
    cases.groupBy('dept_name')
    .agg(mean('num_days_late').alias('avg_days_late'))
    .sort(desc('avg_days_late'))
    .show(10, truncate=False)
)

22/10/22 16:40:31 WARN TaskSetManager: Stage 365 contains a task of very large size (15174 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

+-------------------------+-------------------+
|dept_name                |avg_days_late      |
+-------------------------+-------------------+
|City Council             |NaN                |
|null                     |135.92851612479797 |
|Customer Service         |59.49019459221514  |
|Development Services     |13.14805401407781  |
|Solid Waste Management   |-2.1938644240225544|
|Metro Health             |-4.904223205386015 |
|Parks and Recreation     |-5.283345998745912 |
|Trans & Cap Improvements |-20.50979350178539 |
|Code Enforcement Services|-38.65265755027796 |
|Animal Care Services     |-226.1654977071752 |
+-------------------------+-------------------+



# How do number of days late depend on department and request type?


In [237]:
(  
    cases.groupBy('dept_name', 'service_request_type')
    .agg(mean('num_days_late').alias('avg_days_late'))
    .sort(desc('avg_days_late'))
    .show(10, truncate=False)
)

22/10/22 16:41:37 WARN TaskSetManager: Stage 377 contains a task of very large size (15174 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

+-------------------------+--------------------------------------+------------------+
|dept_name                |service_request_type                  |avg_days_late     |
+-------------------------+--------------------------------------+------------------+
|City Council             |Request for Research/Information      |NaN               |
|City Council             |CCO_Request for Research/Information_1|NaN               |
|Code Enforcement Services|Zoning: Junk Yards                    |175.9563621042095 |
|Code Enforcement Services|Labeling for Used Mattress            |162.43032902285717|
|Code Enforcement Services|Record Keeping of Used Mattresses     |153.9972403942857 |
|Code Enforcement Services|Signage Requied for Sale of Used Mattr|151.63868055333333|
|Code Enforcement Services|Storage of Used Mattress              |142.112556415     |
|null                     |Zoning: Recycle Yard                  |135.92851612479797|
|Code Enforcement Services|Donation Container Enforcem