In [1]:
import pyspark.sql
from pyspark.sql.functions import (
    expr,
    col,
    to_timestamp,
    format_string,
    regexp_extract,
    datediff,
    current_timestamp,
    when,
    max,
    lit,
)

In [2]:
def get_311_data(spark: pyspark.sql.SparkSession) -> pyspark.sql.DataFrame:
    print("[wrangle.py] reading case.csv")
    df = spark.read.csv("data/case.csv", header=True, inferSchema=True)
    return df.withColumnRenamed("SLA_due_date", "case_due_date")

In [3]:
def handle_dtypes(df: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:
    print("[wrangle.py] handling data types")
    return (
        df.withColumn("case_closed", expr('case_closed == "YES"'))
        .withColumn("case_late", expr('case_late == "YES"'))
        .withColumn("council_district", col("council_district").cast("string"))
    )

In [4]:
def handle_dates(df: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:
    print("[wrangle.py] parsing dates")
    fmt = "M/d/yy H:mm"
    return (
        df.withColumn("case_opened_date", to_timestamp("case_opened_date", fmt))
        .withColumn("case_closed_date", to_timestamp("case_closed_date", fmt))
        .withColumn("case_due_date", to_timestamp("case_due_date", fmt))
    )

In [5]:
def add_features(df: pyspark.sql.DataFrame) -> pyspark.sql.DataFrame:
    print("[wrangle.py] adding features")
    max_date = df.select(max("case_closed_date")).first()[0]
    return (
        df.withColumn("num_weeks_late", expr("num_days_late / 7 AS num_weeks_late"))
        .withColumn(
            "council_district",
            format_string("%03d", col("council_district").cast("int")),
        )
        .withColumn("zipcode", regexp_extract("request_address", r"\d+$", 0))
        .withColumn("case_age", datediff(lit(max_date), "case_opened_date"))
        .withColumn("days_to_closed", datediff("case_closed_date", "case_opened_date"))
        .withColumn(
            "case_lifetime",
            when(expr("! case_closed"), col("case_age")).otherwise(
                col("days_to_closed")
            ),
        )
    )

In [6]:
def join_departments(
    case_df: pyspark.sql.DataFrame, spark: pyspark.sql.SparkSession
) -> pyspark.sql.DataFrame:
    print("[wrangle.py] joining departments")
    dept = spark.read.csv("data/dept.csv", header=True, inferSchema=True)
    return (
        case_df.join(dept, "dept_division", "left")
        # drop all the columns except for standardized name, as it has much fewer unique values
        .drop(dept.dept_name)
        .withColumnRenamed("standardized_dept_name", "department")
        # convert to a boolean
        .withColumn("dept_subject_to_SLA", col("dept_subject_to_SLA") == "YES")
    )

In [7]:
def wrangle_311(spark: pyspark.sql.SparkSession) -> pyspark.sql.DataFrame:
    df = add_features(handle_dates(handle_dtypes(get_311_data(spark))))
    return join_departments(df, spark)

In [8]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()
df = wrangle_311(spark)

[wrangle.py] reading case.csv
[wrangle.py] handling data types
[wrangle.py] parsing dates
[wrangle.py] adding features
[wrangle.py] joining departments


In [9]:
df.select('zipcode').show()

+-------+
|zipcode|
+-------+
|  78207|
|  78223|
|  78223|
|  78223|
|  78228|
|       |
|  78251|
|  78251|
|  78251|
|  78251|
|  78251|
|  78251|
|  78251|
|  78251|
|  78251|
|  78251|
|  78251|
|  78251|
|  78251|
|  78251|
+-------+
only showing top 20 rows



In [10]:
row = df.select('service_request_type', 'department').head()
row

Row(service_request_type='Stray Animal', department='Animal Care Services')

In [11]:
row.service_request_type

'Stray Animal'

In [12]:
row = df.select(max('case_closed_date')).head()
row[0]

datetime.datetime(2018, 8, 8, 10, 38)

In [13]:
df.select('case_lifetime').show()

+-------------+
|case_lifetime|
+-------------+
|            0|
|            2|
|            1|
|            1|
|            0|
|            0|
|            1|
|            1|
|            1|
|            1|
|            1|
|            1|
|            1|
|            1|
|            1|
|            1|
|            1|
|            1|
|            1|
|            1|
+-------------+
only showing top 20 rows



## Exercises

In [14]:
from pyspark.sql.functions import *

In [15]:
df

DataFrame[dept_division: string, case_id: int, case_opened_date: timestamp, case_closed_date: timestamp, case_due_date: timestamp, case_late: boolean, num_days_late: double, case_closed: boolean, service_request_type: string, SLA_days: double, case_status: string, source_id: string, request_address: string, council_district: string, num_weeks_late: double, zipcode: string, case_age: int, days_to_closed: int, case_lifetime: int, department: string, dept_subject_to_SLA: boolean]

How old is the latest (in terms of days past SLA) currently open issue? How long has the oldest (in terms of days since opened) currently opened issue been open?

In [16]:
df.filter(expr('! case_closed')).sort(desc('num_days_late')).show(3, vertical=True)

-RECORD 0------------------------------------
 dept_division        | Code Enforcement     
 case_id              | 1013225646           
 case_opened_date     | 2017-01-01 13:48:00  
 case_closed_date     | null                 
 case_due_date        | 2017-01-17 08:30:00  
 case_late            | true                 
 num_days_late        | 348.6458333          
 case_closed          | false                
 service_request_type | No Address Posted    
 SLA_days             | 15.77859954          
 case_status          | Open                 
 source_id            | svcCRMSS             
 request_address      | 7299  SHADOW RIDG... 
 council_district     | 006                  
 num_weeks_late       | 49.80654761428571    
 zipcode              | 78250                
 case_age             | 584                  
 days_to_closed       | null                 
 case_lifetime        | 584                  
 department           | DSD/Code Enforcement 
 dept_subject_to_SLA  | true      

How many Stray Animal cases are there?

In [17]:
df.where(expr('service_request_type = "Stray Animal"')).count()

26760

How many service requests that are assigned to the Field Operations department (dept_division) are not classified as "Officer Standby" request type (service_request_type)?

In [18]:
df.dept_division

Column<b'dept_division'>

In [19]:
(df.select('department', 'dept_division', 'service_request_type')
 .where(expr('dept_division = "Field Operations"'))
 .where(expr('service_request_type != "Officer Standby"'))
 .count())

113902

Convert the council_district column to a string column.

In [20]:
df = df.withColumn('council_district', df.council_district.cast('string'))

Extract the year from the case_closed_date column.

In [21]:
df = df.selectExpr('*', 'year(case_closed_date) AS year')
df.show(5)

+----------------+----------+-------------------+-------------------+-------------------+---------+-------------------+-----------+--------------------+-----------+-----------+---------+--------------------+----------------+--------------------+-------+--------+--------------+-------------+--------------------+-------------------+----+
|   dept_division|   case_id|   case_opened_date|   case_closed_date|      case_due_date|case_late|      num_days_late|case_closed|service_request_type|   SLA_days|case_status|source_id|     request_address|council_district|      num_weeks_late|zipcode|case_age|days_to_closed|case_lifetime|          department|dept_subject_to_SLA|year|
+----------------+----------+-------------------+-------------------+-------------------+---------+-------------------+-----------+--------------------+-----------+-----------+---------+--------------------+----------------+--------------------+-------+--------+--------------+-------------+--------------------+------------

Convert num_days_late from days to hours in new columns num_hours_late.

In [22]:
df = df.withColumn('num_hours_late', expr('num_days_late * 24'))

Join the case data with the source and department data. (already done in the prep functions)

Are there any cases that do not have a request source?

In [27]:
# percent nulls for source_id
df.select(avg(df.source_id.isNull().cast('int'))).show()

+-------------------------------------+
|avg(CAST((source_id IS NULL) AS INT))|
+-------------------------------------+
|                                  0.0|
+-------------------------------------+



What are the top 10 service request types in terms of number of requests?

In [30]:
df.groupBy('department', 'service_request_type').count().orderBy(desc('count')).show(truncate=False)

+------------------------+---------------------------------+-----+
|department              |service_request_type             |count|
+------------------------+---------------------------------+-----+
|Solid Waste             |No Pickup                        |86855|
|DSD/Code Enforcement    |Overgrown Yard/Trash             |65895|
|DSD/Code Enforcement    |Bandit Signs                     |32910|
|Solid Waste             |Damaged Cart                     |30338|
|DSD/Code Enforcement    |Front Or Side Yard Parking       |28794|
|Animal Care Services    |Stray Animal                     |26760|
|Animal Care Services    |Aggressive Animal(Non-Critical)  |24882|
|Solid Waste             |Cart Exchange Request            |22024|
|DSD/Code Enforcement    |Junk Vehicle On Private Property |21473|
|Trans & Cap Improvements|Pot Hole Repair                  |20616|
|DSD/Code Enforcement    |Alley-Way Maintenance            |20214|
|Solid Waste             |Lost/Stolen Cart                 |18

What are the top 10 service request types in terms of average days late?

In [37]:
(df.groupBy('department', 'service_request_type')
 .agg(avg('num_days_late').alias('avg_days_late'), count('*').alias('n'))
 .sort(desc('avg_days_late'))
 .na.drop()
 .show(10, truncate=False))

+------------------------+--------------------------------------+------------------+----+
|department              |service_request_type                  |avg_days_late     |n   |
+------------------------+--------------------------------------+------------------+----+
|DSD/Code Enforcement    |Zoning: Junk Yards                    |175.95636210420943|296 |
|DSD/Code Enforcement    |Labeling for Used Mattress            |162.43032902285717|7   |
|DSD/Code Enforcement    |Record Keeping of Used Mattresses     |153.99724039428568|7   |
|DSD/Code Enforcement    |Signage Requied for Sale of Used Mattr|151.63868055333333|12  |
|DSD/Code Enforcement    |Storage of Used Mattress              |142.11255641500003|8   |
|DSD/Code Enforcement    |Zoning: Recycle Yard                  |135.92851612479797|198 |
|DSD/Code Enforcement    |Donation Container Enforcement        |131.75610506358706|155 |
|DSD/Code Enforcement    |License Requied Used Mattress Sales   |128.79828704142858|7   |
|Trans & C

Does number of days late depend on department?

In [41]:
df.groupBy('department').agg(avg('num_days_late').alias('avg_days_late')).show(truncate=False)

+------------------------+-------------------+
|department              |avg_days_late      |
+------------------------+-------------------+
|Solid Waste             |-2.1938644240225362|
|Animal Care Services    |-226.1654977071745 |
|Trans & Cap Improvements|-20.5097935017854  |
|Parks & Recreation      |-5.283345998745914 |
|Customer Service        |59.49019459221512  |
|Metro Health            |-4.904223205386018 |
|City Council            |null               |
|DSD/Code Enforcement    |-38.32346772537442 |
+------------------------+-------------------+



How do number of days late depend on department and request type?

In [43]:
(df.groupBy('department', 'service_request_type')
 .agg(avg('num_days_late').alias('avg_days_late'))
 .sort('avg_days_late')
 .show(truncate=False)
)

+------------------------+--------------------------------------+-------------------+
|department              |service_request_type                  |avg_days_late      |
+------------------------+--------------------------------------+-------------------+
|City Council            |CCO_Request for Research/Information_1|null               |
|City Council            |Request for Research/Information      |null               |
|Trans & Cap Improvements|Engineering Design                    |-1399.1272335      |
|Trans & Cap Improvements|Signal Timing Modification By Engineer|-1247.0797799732143|
|Animal Care Services    |Stray Animal                          |-998.804572616084  |
|Parks & Recreation      |Major Park Improvement Install        |-280.2546235360405 |
|Trans & Cap Improvements|Sidewalk Cost Sharing Program         |-184.87626063647144|
|DSD/Code Enforcement    |Multi Tenant Exterior                 |-135.7158812804762 |
|DSD/Code Enforcement    |CPS Energy Towers           