In [1]:
import numpy as np
import pandas as pd

import warnings
warnings.filterwarnings('ignore')

import pyspark
from pyspark.sql.functions import *

from env import host, user, password, get_db_url
from sqlalchemy import text, create_engine

spark = pyspark.sql.SparkSession.builder.getOrCreate()
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/05 13:30:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/07/05 13:30:37 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
def read_query(db, query):
    """ Function to read in a query from a db in Codeup's MySQL server """
    # get url
    url = env.get_db_url(db)
    # create this "engine" and use it to make a "connection"
    engine = create_engine(url)
    connection = engine.connect()
    # use text function to turn a string query into the format needed
    query = text(query)
    # Now we can use the connection and read the query into a df
    df = pd.read_sql(query, connection)
    return df

# Exercises
1. Read the case, department, and source data into their own spark dataframes
- Write code to store source data in both csv and json format; store as sources_csv adn sources_json
- Inspect folder structure. What do you notice?
    - Interesting, neither one split it into multiple partitions. it did make the folder and put in a single "partition" file, but not multiple files. Maybe because sources was so small?
- Inspect dataframes. Are data types appropriate? Write code to cast values to appropriate types

In [3]:
# reading in the 3 separate .csv files into spark df's
case_df = spark.read.csv('case.csv', sep=',', header=True, inferSchema=True)
case_df

                                                                                

DataFrame[case_id: int, case_opened_date: string, case_closed_date: string, SLA_due_date: string, case_late: string, num_days_late: double, case_closed: string, dept_division: string, service_request_type: string, SLA_days: double, case_status: string, source_id: string, request_address: string, council_district: int]

In [4]:
source_df = spark.read.csv('source.csv', sep=',', header=True, inferSchema=True)
source_df

DataFrame[source_id: string, source_username: string]

In [5]:
dept_df = spark.read.csv('dept.csv', sep=',', header=True, inferSchema=True)
dept_df

DataFrame[dept_division: string, dept_name: string, standardized_dept_name: string, dept_subject_to_SLA: string]

In [6]:
# writing one df to csv and json for now for the exercise instructions
source_df.write.json('sources_json', mode="overwrite")

In [7]:
source_df.write.csv('sources_csv', mode = "overwrite")

In [8]:
# Or...
(
    source_df.write.format("csv")
    .mode("overwrite")
    .option("header", True)
    .save("sources_csv")
)

In [9]:
# inspect df's and cast to appropriate datatypes, starting with case_df
case_df.show(2, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 SLA_due_date         | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 1/1/18 0:46          
 case_closed_date     | 1/3/18 8:11          
 SLA_due_date         | 1/5/18 8:30          
 case_late            | NO                   
 num_days_late        | -2.0126041

In [10]:
# start with case_df; I'll just follow what we did in the exercise/curriculum
case_df.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- SLA_due_date: string (nullable = true)
 |-- case_late: string (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: integer (nullable = true)



In [11]:
# rename a column (Note: I can run this more than once and it doesn't error out. Weird)
case_df = case_df.withColumnRenamed('SLA_due_date', 'case_due_date')

In [12]:
# case_late, case_closed are YES/NO, change to Bool
case_df.groupby('case_late', 'case_closed').count().show()



+---------+-----------+------+
|case_late|case_closed| count|
+---------+-----------+------+
|       NO|        YES|735616|
|      YES|        YES| 87978|
|       NO|         NO| 11585|
|      YES|         NO|  6525|
+---------+-----------+------+



                                                                                

In [13]:
case_df = case_df.withColumn('case_late', expr('case_late == "YES"')).withColumn('case_closed', expr('case_closed == "YES"'))

In [14]:
case_df.show(2, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 case_due_date        | 9/26/20 0:42         
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 1/1/18 0:46          
 case_closed_date     | 1/3/18 8:11          
 case_due_date        | 1/5/18 8:30          
 case_late            | false                
 num_days_late        | -2.0126041

In [15]:
# next, change council district to a string with leading zeroes
case_df.groupby('council_district').count().show()



+----------------+------+
|council_district| count|
+----------------+------+
|               1|119309|
|               6| 74095|
|               3|102706|
|               5|114609|
|               9| 40916|
|               4| 93778|
|               8| 42345|
|               7| 72445|
|              10| 62926|
|               2|114745|
|               0|  3830|
+----------------+------+



                                                                                

In [18]:
case_df = case_df.withColumn('council_district', 
                   format_string('%03d', col('council_district').cast('int'))
                  )
case_df.show(2, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 case_due_date        | 9/26/20 0:42         
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 005                  
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 1/1/18 0:46          
 case_closed_date     | 1/3/18 8:11          
 case_due_date        | 1/5/18 8:30          
 case_late            | false                
 num_days_late        | -2.0126041

In [19]:
case_df.groupby('council_district').count().show()

[Stage 18:>                                                         (0 + 8) / 8]

+----------------+------+
|council_district| count|
+----------------+------+
|             009| 40916|
|             006| 74095|
|             005|114609|
|             003|102706|
|             008| 42345|
|             001|119309|
|             010| 62926|
|             004| 93778|
|             000|  3830|
|             007| 72445|
|             002|114745|
+----------------+------+



                                                                                

In [22]:
# let's change the dates from strings to dates
# set format to send in to to_timestamp (slightly different from pd; uses Java's SimpleDateFormat)
fmt = 'M/d/yy H:mm'
case_df = (
    case_df.withColumn('case_opened_date', to_timestamp('case_opened_date', fmt))
    .withColumn('case_closed_date', to_timestamp('case_closed_date', fmt))
    .withColumn('case_due_date', to_timestamp('case_due_date', fmt))
)
case_df.select('case_opened_date', 'case_closed_date', 'case_due_date').show(5)

+-------------------+-------------------+-------------------+
|   case_opened_date|   case_closed_date|      case_due_date|
+-------------------+-------------------+-------------------+
|2018-01-01 00:42:00|2018-01-01 12:29:00|2020-09-26 00:42:00|
|2018-01-01 00:46:00|2018-01-03 08:11:00|2018-01-05 08:30:00|
|2018-01-01 00:48:00|2018-01-02 07:57:00|2018-01-05 08:30:00|
|2018-01-01 01:29:00|2018-01-02 08:13:00|2018-01-17 08:30:00|
|2018-01-01 01:34:00|2018-01-01 13:29:00|2018-01-01 04:34:00|
+-------------------+-------------------+-------------------+
only showing top 5 rows



In [26]:
# let's trim (spark for strip) and lowercase the address column
case_df = case_df.withColumn('request_address', trim(lower('request_address')))
case_df.select('request_address').show(5, 50)

+-------------------------------------+
|                      request_address|
+-------------------------------------+
| 2315  el paso st, san antonio, 78207|
|  2215  goliad rd, san antonio, 78223|
|102  palfrey st w, san antonio, 78223|
| 114  la garde st, san antonio, 78223|
|734  clearview dr, san antonio, 78228|
+-------------------------------------+
only showing top 5 rows



In [27]:
# look at structure and datatypes again
case_df.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- case_due_date: timestamp (nullable = true)
 |-- case_late: boolean (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: boolean (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = false)



In [28]:
case_df.show(2, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  el paso st,... 
 council_district     | 005                  
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 2018-01-01 00:46:00  
 case_closed_date     | 2018-01-03 08:11:00  
 case_due_date        | 2018-01-05 08:30:00  
 case_late            | false                
 num_days_late        | -2.0126041

In [29]:
# Good enough. i could lowercase the other string columns, but I'm moving on to the rest of the exercises

# Exercises (cont.)
    How old is the latest (in terms of days past SLA) currently open issue? 
        - case_id 1013225646 was opened 2017-01-01
        - it is 349 days late (past case_due_date)
        - it has been open 2376 days (cao 5 Jul 2023)
        
    How long has the oldest (in terms of days since opened) currently opened issue been open?
        - There are two cases that have been open 2376 days which is the oldest currently open cases
            - 1013225646 and 1013225651

In [33]:
# I am reading this to mean what currently open item is most late, and I am reading
# that num_days_late is a measure of how late something is. So what is the record 
# with the max value for num_days_late
case_df[case_df.case_closed == 'false'].sort(col('num_days_late').desc()).show(2, vertical=True)

[Stage 28:>                                                         (0 + 8) / 8]

-RECORD 0------------------------------------
 case_id              | 1013225646           
 case_opened_date     | 2017-01-01 13:48:00  
 case_closed_date     | null                 
 case_due_date        | 2017-01-17 08:30:00  
 case_late            | true                 
 num_days_late        | 348.6458333          
 case_closed          | false                
 dept_division        | Code Enforcement     
 service_request_type | No Address Posted    
 SLA_days             | 15.77859954          
 case_status          | Open                 
 source_id            | svcCRMSS             
 request_address      | 7299  shadow ridg... 
 council_district     | 006                  
-RECORD 1------------------------------------
 case_id              | 1013225651           
 case_opened_date     | 2017-01-01 13:57:00  
 case_closed_date     | null                 
 case_due_date        | 2017-01-17 08:30:00  
 case_late            | true                 
 num_days_late        | 348.645833

                                                                                

In [37]:
# this code gets me the number of days since a case was opened
case_df = case_df.withColumn('days_since_opened', datediff(current_timestamp(), 'case_opened_date'))

In [43]:
( 
    case_df[case_df.case_closed == 'false']
    .sort(col('days_since_opened').desc())
    .select('case_id', 'case_opened_date', 'case_due_date', 'case_closed', 'days_since_opened').show(5)
)

[Stage 34:>                                                         (0 + 8) / 8]

+----------+-------------------+-------------------+-----------+-----------------+
|   case_id|   case_opened_date|      case_due_date|case_closed|days_since_opened|
+----------+-------------------+-------------------+-----------+-----------------+
|1013225646|2017-01-01 13:48:00|2017-01-17 08:30:00|      false|             2376|
|1013225651|2017-01-01 13:57:00|2017-01-17 08:30:00|      false|             2376|
|1013226813|2017-01-02 11:26:00|2017-01-17 11:26:00|      false|             2375|
|1013229328|2017-01-03 10:01:00|2017-01-18 10:01:00|      false|             2374|
|1013232133|2017-01-04 09:42:00|2017-05-12 09:42:00|      false|             2373|
+----------+-------------------+-------------------+-----------+-----------------+
only showing top 5 rows



                                                                                

    How many Stray Animal cases are there?
    - 26,760

In [59]:
# answer for that question
case_df[case_df.service_request_type == 'Stray Animal'].count()

                                                                                

26760

In [60]:
# just checking to see if there might be other version of "Stray Animal"
case_df[case_df.service_request_type.contains('Animal')].groupby('service_request_type').count().show(50, 50)

+--------------------------------------+-----+
|                  service_request_type|count|
+--------------------------------------+-----+
|                    Dead Animal - Misc|13234|
|         Animal Investigation Referral|   15|
|                Animal Permits Request| 3026|
|        Solid Waste Insp: Dead Animals|  401|
|              Injured Animal(Critical)| 9633|
|        Dead Animal - Private Property|  190|
|             Animal Bite(Non-Critical)| 4750|
|                          Stray Animal|26760|
|               Trapped/Confined Animal|11354|
|           City Council Animal Request|  365|
|       Aggressive Animal(Non-Critical)|24882|
|                 Animal Bite(Critical)|  708|
|           Public Nuisance(Own Animal)|10715|
|                     Dead Animal - Dog|16714|
|Fly Breeding and Animal Waste Disposal|   51|
|                     Dead Animal - Cat|14983|
|           Aggressive Animal(Critical)| 5266|
|              Animal Cruelty(Critical)| 3001|
|            

                                                                                

    How many service requests that are assigned to the Field Operations department (dept_division) are not classified as "Officer Standby" request type (service_request_type)?
    - 113902

In [62]:
# answer for that question
case_df[(case_df.dept_division == 'Field Operations') & (case_df.service_request_type != 'Officer Standby')].count()

                                                                                

113902

In [64]:
# just checking to see what those two columns look like
case_df.groupby('dept_division').count().show()

[Stage 73:>                                                         (0 + 8) / 8]

+--------------------+------+
|       dept_division| count|
+--------------------+------+
|       Miscellaneous| 45123|
|         Solid Waste|   813|
|    Field Operations|116915|
|             Streets| 38510|
|    Waste Collection|215122|
|Code Enforcement ...|  2189|
|              Vector|   538|
|   Dangerous Premise| 15479|
|     311 Call Center|  2849|
|               Brush| 18212|
|Dangerous Premise...|    36|
|Traffic Engineeri...|  4334|
|Code Enforcement ...|   198|
|          District 2|     3|
|             Signals| 20700|
|Engineering Division|  1375|
|Director's Office...|   515|
|         Storm Water| 13769|
|               Shops|   112|
|Storm Water Engin...|   494|
+--------------------+------+
only showing top 20 rows



                                                                                

In [65]:
# just checking to see what those two columns look like
case_df.groupby('service_request_type').count().show()

[Stage 76:>                                                         (0 + 8) / 8]

+--------------------+-----+
|service_request_type|count|
+--------------------+-----+
|Minimum Housing-O...| 8543|
|        Tree Removal|  298|
| Service Information|  160|
|    Sign Maintenance|   82|
|Park Building Mai...|   48|
|Brush Property Da...|  184|
|Graffiti: Private...| 8525|
|Traffic Sign Graf...| 2123|
|License Renewal I...| 1349|
|Guardrail- New Re...|  100|
|Markings Installa...|    8|
|   Sewer Line Broken| 1107|
|Zoning: Multi-Fam...|  735|
|Engineering Inves...|  489|
|    Zoning: Setbacks|  809|
|  Traffic Sign Faded| 2122|
|     Permits, Fences| 1137|
|Certificates of O...| 1571|
|  Dead Animal - Misc|13234|
|Garage Sales No P...| 2424|
+--------------------+-----+
only showing top 20 rows



                                                                                

    Convert the council_district column to a string column.
    - already done
    
    Extract the year from the case_closed_date column.
    - 

In [67]:
# extracting year from case_closed_date column
case_df.withColumn('case_closed_year', year('case_closed_date')).select('case_closed_date', 'case_closed_year').show(5)

+-------------------+----------------+
|   case_closed_date|case_closed_year|
+-------------------+----------------+
|2018-01-01 12:29:00|            2018|
|2018-01-03 08:11:00|            2018|
|2018-01-02 07:57:00|            2018|
|2018-01-02 08:13:00|            2018|
|2018-01-01 13:29:00|            2018|
+-------------------+----------------+
only showing top 5 rows



    Convert num_days_late from days to hours in new columns num_hours_late.

In [68]:
case_df.withColumn('num_hours_late', col('num_days_late') * 24).show(2, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  el paso st,... 
 council_district     | 005                  
 days_since_opened    | 2011                 
 num_hours_late       | -23964.2102784       
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 2018-01-01 00:46:00  
 case_closed_date     | 2018-01-03 08:11:00  
 case_due_date        | 2018-01-05

    Join the case data with the source and department data.

In [71]:
case_df.select('source_id').show(10)

+---------+
|source_id|
+---------+
| svcCRMLS|
| svcCRMSS|
| svcCRMSS|
| svcCRMSS|
| svcCRMSS|
| svcCRMSS|
| svcCRMSS|
| svcCRMSS|
| svcCRMSS|
| svcCRMSS|
+---------+
only showing top 10 rows



In [77]:
source_df.printSchema()

root
 |-- source_id: string (nullable = true)
 |-- source_username: string (nullable = true)



In [78]:
source_df.show(5)

+---------+----------------+
|source_id| source_username|
+---------+----------------+
|   100137|Merlene Blodgett|
|   103582|     Carmen Cura|
|   106463| Richard Sanchez|
|   119403|  Betty De Hoyos|
|   119555|  Socorro Quiara|
+---------+----------------+
only showing top 5 rows



In [80]:
source_df.groupby('source_id').count().show()

+---------+-----+
|source_id|count|
+---------+-----+
|   136202|    1|
|   141239|    1|
|  MW16328|    1|
|  df03076|    1|
|   141549|    1|
|   142989|    1|
|  mp21218|    1|
|  sp26368|    1|
|  sg26196|    1|
|  ec25702|    1|
|  bo26471|    1|
|  js26451|    1|
|  js12254|    1|
|  sg22264|    1|
|  ss26317|    1|
|  bn26322|    1|
|  BA10591|    1|
|  jg06389|    1|
|   140509|    1|
|  MR25792|    1|
+---------+-----+
only showing top 20 rows



In [70]:
dept_df.show(5)

+--------------------+--------------------+----------------------+-------------------+
|       dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+--------------------+--------------------+----------------------+-------------------+
|     311 Call Center|    Customer Service|      Customer Service|                YES|
|               Brush|Solid Waste Manag...|           Solid Waste|                YES|
|     Clean and Green|Parks and Recreation|    Parks & Recreation|                YES|
|Clean and Green N...|Parks and Recreation|    Parks & Recreation|                YES|
|    Code Enforcement|Code Enforcement ...|  DSD/Code Enforcement|                YES|
+--------------------+--------------------+----------------------+-------------------+
only showing top 5 rows



In [74]:
join_df = (
    
    case_df.join(dept_df, 'dept_division', 'left')
    # drop all the 'dept' columns except for standardized_dept_name since it has fewere characters
    .drop(case_df.dept_division)
    .drop(dept_df.dept_division)
    .drop(dept_df.dept_name)
    # rename standardized_dept_name column to department
    .withColumnRenamed('standardized_dept_name', 'department')
    # convert last column to boolean
    .withColumn('dept_subject_to_SLA', col('dept_subject_to_SLA') == 'YES')
)
join_df.show(2, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  el paso st,... 
 council_district     | 005                  
 days_since_opened    | 2011                 
 department           | Animal Care Services 
 dept_subject_to_SLA  | true                 
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 2018-01-01 00:46:00  
 case_closed_date     | 2018-01-03 08:11:00  
 case_due_date        | 2018-01-05

In [83]:
# I'm just going to do a left join with source_id. there may be some nulls for 
# source_username, but at least I won't generate nulls for all those other columns IF
# there happened to be sources with no cases
join_df = (
    join_df.join(source_df, 'source_id', 'left')
    # drop the extra source_id
    .drop(source_df.source_id)
)
join_df.show(2, vertical=True)

-RECORD 0------------------------------------
 source_id            | svcCRMLS             
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 request_address      | 2315  el paso st,... 
 council_district     | 005                  
 days_since_opened    | 2011                 
 department           | Animal Care Services 
 dept_subject_to_SLA  | true                 
 source_username      | svcCRMLS             
-RECORD 1------------------------------------
 source_id            | svcCRMSS             
 case_id              | 1014127333           
 case_opened_date     | 2018-01-01

    Are there any cases that do not have a request source?
    - every case has some kind of a source listed; however sometimes the source_id is one of these three values:
        - svcCFlag
        - svcCRMLS
        - svcCRMSS
        
    - in those cases, the source_username is not a person's name

In [84]:
# investigating source_df a little bit
source_df[source_df.source_id.contains('svc')].show(10, 50)

+---------+---------------+
|source_id|source_username|
+---------+---------------+
| svcCFlag|       CityFlag|
| svcCRMLS|       svcCRMLS|
| svcCRMSS|       svcCRMSS|
+---------+---------------+



In [86]:
join_df[join_df.source_username == 'null'].show(2, vertical=True)

(0 rows)



In [92]:
join_df.groupby('source_username').count().agg(sum('count')).show()

[Stage 127:>                                                        (0 + 8) / 8]

+----------+
|sum(count)|
+----------+
|    855269|
+----------+



                                                                                

In [93]:
join_df.count()

                                                                                

855269

    What are the top 10 service request types in terms of number of requests?
    - No Pickup
    - Overgrown Yard/Trash
    - Bandit Signs

In [94]:
join_df.groupby('service_request_type').count().sort(desc('count')).show()

[Stage 140:>                                                        (0 + 8) / 8]

+--------------------+-----+
|service_request_type|count|
+--------------------+-----+
|           No Pickup|89210|
|Overgrown Yard/Trash|66403|
|        Bandit Signs|32968|
|        Damaged Cart|31163|
|Front Or Side Yar...|28920|
|        Stray Animal|27361|
|Aggressive Animal...|25492|
|Cart Exchange Req...|22608|
|Junk Vehicle On P...|21649|
|     Pot Hole Repair|20827|
|Alley-Way Mainten...|20293|
|    Lost/Stolen Cart|19299|
|Right Of Way/Side...|17836|
|   Dead Animal - Dog|17092|
|       Cart Delivery|15761|
|   Dead Animal - Cat|15345|
|      Animal Neglect|13851|
|  Dead Animal - Misc|13535|
|Trapped/Confined ...|11605|
|Public Nuisance(O...|10969|
+--------------------+-----+
only showing top 20 rows



                                                                                

    What are the top 10 service request types in terms of average days late?
    

In [104]:
# here are the top 10 service_request_types sorted by num_requests with the avg_days late
(
    join_df.groupby('service_request_type')
    .agg(count('service_request_type').alias('num_requests'),
         mean('num_days_late').alias('avg_days_late'))
    .sort(desc('num_requests')).show(10)
)



+--------------------+------------+-------------------+
|service_request_type|num_requests|      avg_days_late|
+--------------------+------------+-------------------+
|           No Pickup|       89210| -2.469090182038157|
|Overgrown Yard/Trash|       66403| -46.35809395583575|
|        Bandit Signs|       32968| -4.479319586874467|
|        Damaged Cart|       31163|-3.5030070988018145|
|Front Or Side Yar...|       28920|  -9.14277794990003|
|        Stray Animal|       27361| -998.8064665118969|
|Aggressive Animal...|       25492|  2.645033883163268|
|Cart Exchange Req...|       22608| -3.527041146943587|
|Junk Vehicle On P...|       21649| -42.66611918822536|
|     Pot Hole Repair|       20827|-1.9402268156181486|
+--------------------+------------+-------------------+
only showing top 10 rows



                                                                                

In [106]:
# here are the top 10 service_request_types sorted by avg_days late
(
    join_df.groupby('service_request_type')
    .agg(count('service_request_type').alias('num_requests'),
         mean('num_days_late').alias('avg_days_late'))
    .sort(desc('avg_days_late')).show(10, 50)
)

[Stage 165:>                                                        (0 + 8) / 8]

+--------------------------------------+------------+------------------+
|                  service_request_type|num_requests|     avg_days_late|
+--------------------------------------+------------+------------------+
|                    Zoning: Junk Yards|         296|175.95636210420943|
|            Labeling for Used Mattress|           7|162.43032902285717|
|     Record Keeping of Used Mattresses|           7|153.99724039428568|
|Signage Requied for Sale of Used Mattr|          12|151.63868055333333|
|              Storage of Used Mattress|           8|142.11255641500003|
|                  Zoning: Recycle Yard|         198|135.92851612479797|
|        Donation Container Enforcement|         155|131.75610506358706|
|   License Requied Used Mattress Sales|           7|128.79828704142858|
|               Traffic Signal Graffiti|           5|101.79846062200002|
|                             Complaint|        2420|  72.8705023031169|
+--------------------------------------+-----------

                                                                                

    Does number of days late depend on department?


In [108]:
# here are the number of requests per department sorted by avg_days late
(
    join_df.groupby('department')
    .agg(count('department').alias('num_requests'),
         mean('num_days_late').alias('avg_days_late'))
    .sort(desc('avg_days_late')).show(10, 50)
)

[Stage 175:>                                                        (0 + 8) / 8]

+------------------------+------------+-------------------+
|              department|num_requests|      avg_days_late|
+------------------------+------------+-------------------+
|        Customer Service|        2889|  59.73709149630077|
|             Solid Waste|      286287| -2.200057513672164|
|            Metro Health|        5313| -4.911766979607004|
|      Parks & Recreation|       19964| -5.251521960055145|
|Trans & Cap Improvements|       97841|-20.612837354052708|
|    DSD/Code Enforcement|      323579| -38.36938892614506|
|    Animal Care Services|      119362|-226.51783940550334|
|            City Council|          34|               null|
+------------------------+------------+-------------------+



                                                                                

In [111]:
# interesting that City Council had a null value for avg_days_late; let's check on those 34 requests
## Looks like none of those cases had a case_due_date which caused null values for num_days_late
(
    join_df[join_df.department == 'City Council']
    .select('department', 'case_opened_date', 'case_due_date', 'case_closed', 'num_days_late')
    .show(34)
)

                                                                                

+------------+-------------------+-------------+-----------+-------------+
|  department|   case_opened_date|case_due_date|case_closed|num_days_late|
+------------+-------------------+-------------+-----------+-------------+
|City Council|2018-01-30 16:54:00|         null|      false|         null|
|City Council|2018-02-21 14:21:00|         null|      false|         null|
|City Council|2018-03-15 16:10:00|         null|      false|         null|
|City Council|2018-03-22 12:06:00|         null|       true|         null|
|City Council|2018-03-27 10:53:00|         null|      false|         null|
|City Council|2018-04-04 16:31:00|         null|      false|         null|
|City Council|2018-04-05 14:59:00|         null|      false|         null|
|City Council|2018-04-17 15:20:00|         null|      false|         null|
|City Council|2018-04-18 14:37:00|         null|      false|         null|
|City Council|2018-04-20 11:29:00|         null|      false|         null|
|City Council|2018-05-08 

In [112]:
# may want to further investigate where the null values are in this df ... later

    How do number of days late depend on department and request type?
    - kind of interesting; DSD/Code Enforcement had a low value for average days late (-38), but when we grouped by department and service_request_type, we see they have a number of request_types that are > 100 days late

In [113]:
# 
# here are the number of requests per department per request type sorted by avg_days late
(
    join_df.groupby('department', 'service_request_type')
    .agg(count('service_request_type').alias('num_requests'),
         mean('num_days_late').alias('avg_days_late'))
    .sort(desc('avg_days_late')).show(10, 50)
)

[Stage 194:>                                                        (0 + 8) / 8]

+------------------------+--------------------------------------+------------+------------------+
|              department|                  service_request_type|num_requests|     avg_days_late|
+------------------------+--------------------------------------+------------+------------------+
|    DSD/Code Enforcement|                    Zoning: Junk Yards|         296|175.95636210420943|
|    DSD/Code Enforcement|            Labeling for Used Mattress|           7|162.43032902285717|
|    DSD/Code Enforcement|     Record Keeping of Used Mattresses|           7|153.99724039428568|
|    DSD/Code Enforcement|Signage Requied for Sale of Used Mattr|          12|151.63868055333333|
|    DSD/Code Enforcement|              Storage of Used Mattress|           8|142.11255641500003|
|    DSD/Code Enforcement|                  Zoning: Recycle Yard|         198|135.92851612479797|
|    DSD/Code Enforcement|        Donation Container Enforcement|         155|131.75610506358706|
|    DSD/Code Enforc

                                                                                

# Bonus
You might have noticed that the latest date in the dataset is fairly far off from the present day. To account for this, replace any occurances of the current time with the maximum date from the dataset.

In [114]:
join_df.printSchema()

root
 |-- source_id: string (nullable = true)
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- case_due_date: timestamp (nullable = true)
 |-- case_late: boolean (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: boolean (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = false)
 |-- days_since_opened: integer (nullable = true)
 |-- department: string (nullable = true)
 |-- dept_subject_to_SLA: boolean (nullable = true)
 |-- source_username: string (nullable = true)



In [121]:
join_df.select('case_opened_date', 'case_closed_date', 'case_due_date', 'days_since_opened').sort(desc('case_due_date')).show()

                                                                                

+-------------------+-------------------+-------------------+-----------------+
|   case_opened_date|   case_closed_date|      case_due_date|days_since_opened|
+-------------------+-------------------+-------------------+-----------------+
|2018-07-27 15:07:00|               null|2022-05-27 15:07:00|             1804|
|2018-07-25 15:42:00|               null|2022-05-25 15:42:00|             1806|
|2018-07-24 10:01:00|               null|2022-05-24 10:01:00|             1807|
|2018-06-19 10:43:00|               null|2022-04-20 10:43:00|             1842|
|2018-04-22 20:27:00|               null|2022-02-24 08:30:00|             1900|
|2018-04-12 11:07:00|               null|2022-02-15 11:07:00|             1910|
|2018-04-01 12:28:00|               null|2022-02-03 08:30:00|             1921|
|2018-02-16 16:59:00|               null|2021-12-22 16:59:00|             1965|
|2018-02-16 16:42:00|               null|2021-12-22 16:42:00|             1965|
|2018-01-19 17:07:00|               null

In [126]:
# 2018-08-08 10:38:00 is the latest time in either the open or closed date columns
# the only column that this matters for is the one we created: days_since_opened
# so, this code uses to_timestamp(lit(date)) inside the datediff function to fix that
# pretty cool
join_df = join_df.withColumn('days_since_opened', datediff(to_timestamp(lit('2018-08-08 10:38:00')), 'case_opened_date'))

In [128]:
# for comparison, printing out the same call from above after fixing days_since_opened
join_df.select('case_opened_date', 'case_closed_date', 'case_due_date', 'days_since_opened').sort(desc('case_due_date')).show()



+-------------------+-------------------+-------------------+-----------------+
|   case_opened_date|   case_closed_date|      case_due_date|days_since_opened|
+-------------------+-------------------+-------------------+-----------------+
|2018-07-27 15:07:00|               null|2022-05-27 15:07:00|               12|
|2018-07-25 15:42:00|               null|2022-05-25 15:42:00|               14|
|2018-07-24 10:01:00|               null|2022-05-24 10:01:00|               15|
|2018-06-19 10:43:00|               null|2022-04-20 10:43:00|               50|
|2018-04-22 20:27:00|               null|2022-02-24 08:30:00|              108|
|2018-04-12 11:07:00|               null|2022-02-15 11:07:00|              118|
|2018-04-01 12:28:00|               null|2022-02-03 08:30:00|              129|
|2018-02-16 16:59:00|               null|2021-12-22 16:59:00|              173|
|2018-02-16 16:42:00|               null|2021-12-22 16:42:00|              173|
|2018-01-19 17:07:00|               null

                                                                                

In [132]:
# now how about those nulls
# Check for null values in all columns
for column in join_df.columns:
    print(f'{column} nulls = {join_df.select(sum(col(column).isNull().cast("int"))).first()[0]}')


                                                                                

source_id nulls = 0


                                                                                

case_id nulls = 0


                                                                                

case_opened_date nulls = 0


                                                                                

case_closed_date nulls = 18333


                                                                                

case_due_date nulls = 34


                                                                                

case_late nulls = 0


                                                                                

num_days_late nulls = 34


                                                                                

case_closed nulls = 0


                                                                                

service_request_type nulls = 0


                                                                                

SLA_days nulls = 34


                                                                                

case_status nulls = 0


                                                                                

request_address nulls = 0


                                                                                

council_district nulls = 0


                                                                                

days_since_opened nulls = 0


                                                                                

department nulls = 0


                                                                                

dept_subject_to_SLA nulls = 0




source_username nulls = 0


                                                                                

In [137]:
# here is another way to check for nulls, but you have to work a little; 
# notice some of the values in record 0 (count) are less than the max value for count
# and it didn't summarize any of the date columns
join_df.describe().show(vertical=True)



-RECORD 0------------------------------------
 summary              | count                
 source_id            | 855269               
 case_id              | 855269               
 num_days_late        | 855235               
 service_request_type | 855269               
 SLA_days             | 855235               
 case_status          | 855269               
 request_address      | 855269               
 council_district     | 855269               
 days_since_opened    | 855269               
 department           | 855269               
 source_username      | 855269               
-RECORD 1------------------------------------
 summary              | mean                 
 source_id            | 136713.3785816733    
 case_id              | 1.01396776661975E9   
 num_days_late        | -49.17728633237318   
 service_request_type | null                 
 SLA_days             | 59.30801898738834    
 case_status          | null                 
 request_address      | null      

                                                                                