In [9]:
import pandas as pd
import numpy as np

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

from env import get_db_url
import os

## This exercises use the cases, dept, and source tables from the 311_data on the Codeup MySQL server.

In [4]:
#create session
spark = SparkSession.builder.getOrCreate()

### Read the case, department, and source data into their own spark dataframes.

In [5]:
url = get_db_url('311_data')

In [6]:
query = 'select * from source'
source = pd.read_sql(query, url)
source = spark.createDataFrame(source)
source

DataFrame[index: bigint, source_id: string, source_username: string]

In [7]:
query = 'select * from cases'
df = pd.read_sql(query, url)
df = spark.createDataFrame(df)
df

DataFrame[case_id: bigint, case_opened_date: string, case_closed_date: string, SLA_due_date: string, case_late: string, num_days_late: double, case_closed: string, dept_division: string, service_request_type: string, SLA_days: double, case_status: string, source_id: string, request_address: string, council_district: bigint]

In [8]:
query = 'select * from dept'
dept = pd.read_sql(query, url)
dept = spark.createDataFrame(dept)
dept

DataFrame[dept_division: string, dept_name: string, standardized_dept_name: string, dept_subject_to_SLA: string]

### Let's see how writing to the local disk works in spark:

### Write the code necessary to store the source data in both csv and json format, store these as sources_csv and sources_json

In [15]:
source.write.json('data/sources_json', mode='overwrite')

                                                                                

In [16]:
source.write.csv('data/sources_csv', mode='overwrite')

                                                                                

### Inspect your folder structure. What do you notice?

In [17]:
os.listdir('data/sources_csv')

['.part-00004-d09d2390-d556-46d9-90ee-ceebc2af6078-c000.csv.crc',
 'part-00008-d09d2390-d556-46d9-90ee-ceebc2af6078-c000.csv',
 '.part-00005-d09d2390-d556-46d9-90ee-ceebc2af6078-c000.csv.crc',
 'part-00004-d09d2390-d556-46d9-90ee-ceebc2af6078-c000.csv',
 '.part-00007-d09d2390-d556-46d9-90ee-ceebc2af6078-c000.csv.crc',
 'part-00009-d09d2390-d556-46d9-90ee-ceebc2af6078-c000.csv',
 'part-00005-d09d2390-d556-46d9-90ee-ceebc2af6078-c000.csv',
 '.part-00006-d09d2390-d556-46d9-90ee-ceebc2af6078-c000.csv.crc',
 '._SUCCESS.crc',
 'part-00006-d09d2390-d556-46d9-90ee-ceebc2af6078-c000.csv',
 '.part-00009-d09d2390-d556-46d9-90ee-ceebc2af6078-c000.csv.crc',
 'part-00007-d09d2390-d556-46d9-90ee-ceebc2af6078-c000.csv',
 '.part-00008-d09d2390-d556-46d9-90ee-ceebc2af6078-c000.csv.crc',
 'part-00000-d09d2390-d556-46d9-90ee-ceebc2af6078-c000.csv',
 'part-00001-d09d2390-d556-46d9-90ee-ceebc2af6078-c000.csv',
 'part-00002-d09d2390-d556-46d9-90ee-ceebc2af6078-c000.csv',
 '.part-00001-d09d2390-d556-46d9-90ee

### Inspect the data in your dataframes. Are the data types appropriate? Write the code necessary to cast the values to the appropriate types.

In [20]:
df.dtypes

[('case_id', 'bigint'),
 ('case_opened_date', 'timestamp'),
 ('case_closed_date', 'timestamp'),
 ('case_due_date', 'timestamp'),
 ('case_late', 'boolean'),
 ('num_days_late', 'double'),
 ('case_closed', 'boolean'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'string')]

In [21]:
# Rename column
df = df.withColumnRenamed('SLA_due_date', 'case_due_date')

In [23]:
# Convert to better data types
df = (
    df.withColumn('case_late', col('case_late') == 'YES')
    .withColumn('case_closed', col('case_closed') == 'YES')
)

df = df.withColumn('council_district', format_string('%03d', col('council_district')))

fmt = 'M/d/yy H:mm'
df = (
    df.withColumn('case_opened_date', to_timestamp(col('case_opened_date'), fmt))
    .withColumn('case_closed_date', to_timestamp(col('case_closed_date'), fmt))
    .withColumn('case_due_date', to_timestamp(col('case_due_date'), fmt))
)

In [24]:
# Cleanup text data
df = df.withColumn('request_address', lower(trim(col('request_address'))))

In [25]:
# Extract zipcode
df = df.withColumn('zipcode', regexp_extract(col('request_address'), r'\d+$', 0))

# Create a `case_lifetime` feature
df = (
    df.withColumn('case_age', datediff(current_timestamp(), 'case_opened_date'))
    .withColumn('days_to_closed', datediff('case_closed_date', 'case_opened_date'))
    .withColumn('case_lifetime', when(
        col('case_closed'), col('days_to_closed')).otherwise(col('case_age')))
    .drop('case_age', 'days_to_closed')
)

### How old is the latest (in terms of days past SLA) currently open issue? 

In [26]:
df.columns

['case_id',
 'case_opened_date',
 'case_closed_date',
 'case_due_date',
 'case_late',
 'num_days_late',
 'case_closed',
 'dept_division',
 'service_request_type',
 'SLA_days',
 'case_status',
 'source_id',
 'request_address',
 'council_district',
 'zipcode',
 'case_lifetime']

In [40]:
(
    df.select('case_closed', 'case_lifetime', 'num_days_late')   
    .filter('! case_closed')
    .sort(desc('num_days_late'))
    .where('num_days_late != "nan"')
).show(5)

22/10/24 09:29:59 WARN TaskSetManager: Stage 13 contains a task of very large size (15174 KiB). The maximum recommended task size is 1000 KiB.


[Stage 13:>                                                       (0 + 10) / 10]

+-----------+-------------+------------------+
|case_closed|case_lifetime|     num_days_late|
+-----------+-------------+------------------+
|      false|         2122|       348.6458333|
|      false|         2122|       348.6458333|
|      false|         2121|348.52356480000003|
|      false|         2120|347.58256939999995|
|      false|         2118|       345.3894213|
+-----------+-------------+------------------+
only showing top 5 rows



                                                                                

### How long has the oldest (in terms of days since opened) currently opened issue been open?

In [42]:
(
    df.select('case_closed', 'case_lifetime', 'num_days_late')   
    .filter('! case_closed')
    .sort(desc('num_days_late'))
    .where('num_days_late != "nan"')
).show(5)

22/10/24 09:31:25 WARN TaskSetManager: Stage 15 contains a task of very large size (15174 KiB). The maximum recommended task size is 1000 KiB.
+-----------+-------------+------------------+
|case_closed|case_lifetime|     num_days_late|
+-----------+-------------+------------------+
|      false|         2122|       348.6458333|
|      false|         2122|       348.6458333|
|      false|         2121|348.52356480000003|
|      false|         2120|347.58256939999995|
|      false|         2118|       345.3894213|
+-----------+-------------+------------------+
only showing top 5 rows



[Stage 15:>                                                       (0 + 10) / 10]                                                                                

### How many Stray Animal cases are there?

In [44]:
df.columns

['case_id',
 'case_opened_date',
 'case_closed_date',
 'case_due_date',
 'case_late',
 'num_days_late',
 'case_closed',
 'dept_division',
 'service_request_type',
 'SLA_days',
 'case_status',
 'source_id',
 'request_address',
 'council_district',
 'zipcode',
 'case_lifetime']

In [47]:
df.select('service_request_type').distinct().show()

22/10/24 09:32:20 WARN TaskSetManager: Stage 20 contains a task of very large size (15174 KiB). The maximum recommended task size is 1000 KiB.


[Stage 20:>                                                       (0 + 10) / 10]

+--------------------+
|service_request_type|
+--------------------+
|Minimum Housing-O...|
|        Tree Removal|
| Service Information|
|    Sign Maintenance|
|Park Building Mai...|
|Brush Property Da...|
|Vacant Lot-Privat...|
|Graffiti: Private...|
|Traffic Sign Graf...|
|License Renewal I...|
|Guardrail- New Re...|
|   Sewer Line Broken|
|Zoning: Multi-Fam...|
|Engineering Inves...|
|    Zoning: Setbacks|
|  Traffic Sign Faded|
|     Permits, Fences|
|Certificates of O...|
|  Dead Animal - Misc|
|Garage Sales No P...|
+--------------------+
only showing top 20 rows



                                                                                

In [52]:
df.filter(expr('service_request_type == "Stray Animal"')).count()

22/10/24 09:33:47 WARN TaskSetManager: Stage 26 contains a task of very large size (15174 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

26760

### How many service requests that are assigned to the Field Operations department (dept_division) are not classified as "Officer Standby" request type (service_request_type)?

In [53]:
df.columns

['case_id',
 'case_opened_date',
 'case_closed_date',
 'case_due_date',
 'case_late',
 'num_days_late',
 'case_closed',
 'dept_division',
 'service_request_type',
 'SLA_days',
 'case_status',
 'source_id',
 'request_address',
 'council_district',
 'zipcode',
 'case_lifetime']

In [54]:
(
    df.filter(df.dept_division == 'Field Operations')
    .filter(df.service_request_type != 'Officer Standby')
).count()

22/10/24 09:36:13 WARN TaskSetManager: Stage 29 contains a task of very large size (15174 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

113902

### Convert the council_district column to a string column.

In [55]:
df.dtypes

[('case_id', 'bigint'),
 ('case_opened_date', 'timestamp'),
 ('case_closed_date', 'timestamp'),
 ('case_due_date', 'timestamp'),
 ('case_late', 'boolean'),
 ('num_days_late', 'double'),
 ('case_closed', 'boolean'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'string'),
 ('zipcode', 'string'),
 ('case_lifetime', 'int')]

### Extract the year from the case_closed_date column.

In [57]:
df.select('case_closed_date').show(5)

22/10/24 09:37:01 WARN TaskSetManager: Stage 33 contains a task of very large size (15174 KiB). The maximum recommended task size is 1000 KiB.


[Stage 33:>                                                         (0 + 1) / 1]

22/10/24 09:37:05 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 33 (TID 182): Attempting to kill Python Worker
+-------------------+
|   case_closed_date|
+-------------------+
|2018-01-01 12:29:00|
|2018-01-03 08:11:00|
|2018-01-02 07:57:00|
|2018-01-02 08:13:00|
|2018-01-01 13:29:00|
+-------------------+
only showing top 5 rows



                                                                                

In [61]:
df = df.withColumn('case_closed_year',
             year('case_closed_date'))
# .select('case_closed_date','case_closed_year').show(5)

In [62]:
df.select('case_closed_date','case_closed_year').show(5)

22/10/24 09:38:29 WARN TaskSetManager: Stage 35 contains a task of very large size (15174 KiB). The maximum recommended task size is 1000 KiB.


[Stage 35:>                                                         (0 + 1) / 1]

22/10/24 09:38:33 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 35 (TID 184): Attempting to kill Python Worker
+-------------------+----------------+
|   case_closed_date|case_closed_year|
+-------------------+----------------+
|2018-01-01 12:29:00|            2018|
|2018-01-03 08:11:00|            2018|
|2018-01-02 07:57:00|            2018|
|2018-01-02 08:13:00|            2018|
|2018-01-01 13:29:00|            2018|
+-------------------+----------------+
only showing top 5 rows



                                                                                

### Convert num_days_late from days to hours in new columns num_hours_late.

In [64]:
df = df.withColumn('num_hours_late',
             expr('num_days_late * 24'))

In [67]:
df.select('num_days_late', 'num_hours_late').show(5)

22/10/24 09:39:35 WARN TaskSetManager: Stage 36 contains a task of very large size (15174 KiB). The maximum recommended task size is 1000 KiB.


[Stage 36:>                                                         (0 + 1) / 1]

22/10/24 09:39:39 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 36 (TID 185): Attempting to kill Python Worker
+-------------+-------------------+
|num_days_late|     num_hours_late|
+-------------+-------------------+
| -998.5087616|-23964.210278399998|
| -2.012604167|      -48.302500008|
| -3.022337963|      -72.536111112|
| -15.01148148|      -360.27555552|
|  0.372164352|        8.931944448|
+-------------+-------------------+
only showing top 5 rows



                                                                                

### Join the case data with the source and department data.

In [71]:
df.columns

['case_id',
 'case_opened_date',
 'case_closed_date',
 'case_due_date',
 'case_late',
 'num_days_late',
 'case_closed',
 'dept_division',
 'service_request_type',
 'SLA_days',
 'case_status',
 'source_id',
 'request_address',
 'council_district',
 'zipcode',
 'case_lifetime',
 'case_closed_year',
 'num_hours_late']

In [72]:
source.columns

['index', 'source_id', 'source_username']

In [73]:
dept.columns

['dept_division', 'dept_name', 'standardized_dept_name', 'dept_subject_to_SLA']

In [76]:
df = df.join(dept,
        'dept_division',
       'left').join(source,
                   'source_id',
                   'left')

In [77]:
df.columns

['source_id',
 'dept_division',
 'case_id',
 'case_opened_date',
 'case_closed_date',
 'case_due_date',
 'case_late',
 'num_days_late',
 'case_closed',
 'service_request_type',
 'SLA_days',
 'case_status',
 'request_address',
 'council_district',
 'zipcode',
 'case_lifetime',
 'case_closed_year',
 'num_hours_late',
 'dept_name',
 'standardized_dept_name',
 'dept_subject_to_SLA',
 'index',
 'source_username']

### Are there any cases that do not have a request source?

In [81]:
df.filter(expr('index is null')).count()

22/10/24 09:49:34 WARN TaskSetManager: Stage 61 contains a task of very large size (15174 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

0

### What are the top 10 service request types in terms of number of requests?

In [87]:
(
    df.groupby('service_request_type').count()
    .sort(desc('count'))
).show(10)

22/10/24 09:51:12 WARN TaskSetManager: Stage 100 contains a task of very large size (15174 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

+--------------------+-----+
|service_request_type|count|
+--------------------+-----+
|           No Pickup|89210|
|Overgrown Yard/Trash|66403|
|        Bandit Signs|32968|
|        Damaged Cart|31163|
|Front Or Side Yar...|28920|
|        Stray Animal|27361|
|Aggressive Animal...|25492|
|Cart Exchange Req...|22608|
|Junk Vehicle On P...|21649|
|     Pot Hole Repair|20827|
+--------------------+-----+
only showing top 10 rows



### What are the top 10 service request types in terms of average days late?

In [107]:
(
    df.groupby('service_request_type')
    .agg(round(mean('num_days_late'), 2).alias('mean_days_late'))
    .sort(desc('mean_days_late'))
    .na.drop()
).show(10, truncate=False)

22/10/24 09:59:18 WARN TaskSetManager: Stage 261 contains a task of very large size (15174 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

+--------------------------------------+--------------+
|service_request_type                  |mean_days_late|
+--------------------------------------+--------------+
|Zoning: Junk Yards                    |175.96        |
|Labeling for Used Mattress            |162.43        |
|Record Keeping of Used Mattresses     |154.0         |
|Signage Requied for Sale of Used Mattr|151.64        |
|Storage of Used Mattress              |142.11        |
|Zoning: Recycle Yard                  |135.93        |
|Donation Container Enforcement        |131.76        |
|License Requied Used Mattress Sales   |128.8         |
|Traffic Signal Graffiti               |101.8         |
|Complaint                             |72.87         |
+--------------------------------------+--------------+
only showing top 10 rows





In [111]:
(
    df.groupby('service_request_type')
    .avg()
    .sort(desc('avg(num_days_late)'))
    .na.drop()
).select('service_request_type', col('avg(num_days_late)').alias('test'))\
.show(10, truncate=False)

22/10/24 10:00:09 WARN TaskSetManager: Stage 300 contains a task of very large size (15174 KiB). The maximum recommended task size is 1000 KiB.




22/10/24 10:00:11 WARN PythonRunner: Detected deadlock while completing task 4.0 in stage 297 (TID 1050): Attempting to kill Python Worker
22/10/24 10:00:11 WARN TaskSetManager: Lost task 4.0 in stage 297.0 (TID 1050) (mistygarciasmbp executor driver): TaskKilled (Stage cancelled)


                                                                                

+--------------------------------------+------------------+
|service_request_type                  |test              |
+--------------------------------------+------------------+
|Zoning: Junk Yards                    |175.95636210420943|
|Labeling for Used Mattress            |162.43032902285717|
|Record Keeping of Used Mattresses     |153.9972403942857 |
|Signage Requied for Sale of Used Mattr|151.63868055333333|
|Storage of Used Mattress              |142.11255641500003|
|Zoning: Recycle Yard                  |135.92851612479797|
|Donation Container Enforcement        |131.75610506358709|
|License Requied Used Mattress Sales   |128.79828704142858|
|Complaint                             |72.87050230311687 |
|Vendors                               |66.54809898507798 |
+--------------------------------------+------------------+
only showing top 10 rows



### Does number of days late depend on department?

In [112]:
(
    df.groupby('dept_name')
    .mean('num_days_late')
).show()

22/10/24 10:01:06 WARN TaskSetManager: Stage 312 contains a task of very large size (15174 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

+--------------------+-------------------+
|           dept_name| avg(num_days_late)|
+--------------------+-------------------+
|Animal Care Services|-226.51783940550436|
|                null| 135.92851612479797|
|Solid Waste Manag...|-2.2000575136721845|
|Development Services| 13.433724555869714|
|Trans & Cap Impro...|  -20.6128373540527|
|    Customer Service| 59.737091496300785|
|        Metro Health| -4.911766979607001|
|Parks and Recreation| -5.251521960055142|
|Code Enforcement ...| -38.70133068329558|
|        City Council|                NaN|
+--------------------+-------------------+



In [113]:
df.select('dept_name').distinct().count()

22/10/24 10:01:25 WARN TaskSetManager: Stage 324 contains a task of very large size (15174 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

10

### How do number of days late depend on department and request type?

In [117]:
(
    df.groupby('dept_name', 'service_request_type')
    .mean('num_days_late')
    .sort('service_request_type')
).show()

22/10/24 10:04:10 WARN TaskSetManager: Stage 368 contains a task of very large size (15174 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

+--------------------+--------------------+--------------------+
|           dept_name|service_request_type|  avg(num_days_late)|
+--------------------+--------------------+--------------------+
|Solid Waste Manag...|1st Call Bagged L...| -3.8689919898581904|
|Solid Waste Manag...|2nd Call Bagged L...|   -4.21210716234353|
|Solid Waste Manag...|3rd Call Bagged L...|        -3.152896412|
|Trans & Cap Impro...|ADA Infrastructur...|       -11.841508137|
|Code Enforcement ...|    ADA Obstructions|        -13.22349537|
|Code Enforcement ...|Absentee Property...| -14.231748829519532|
|Trans & Cap Impro...|Accident Problem ...|  -22.97103045993122|
|Solid Waste Manag...|Additional Cart R...|  -5.063509766847671|
|Solid Waste Manag...|Additional Garbag...|  -2.920198210360159|
|Solid Waste Manag...|Additional Organi...|  -5.997874052909091|
|Animal Care Services|Aggressive Animal...|  16.696368811892242|
|Animal Care Services|Aggressive Animal...|   2.645033883163259|
|Trans & Cap Impro...|  A