In [1]:
import pyspark
from pyspark.sql.functions import *

spark = pyspark.sql.SparkSession.builder.getOrCreate()

1. Read the case, department, and source data into their own spark dataframes.

In [2]:
df = spark.read.csv('data/case.csv', sep=",", header=True, inferSchema=True)
print('nrows:', df.count())

nrows: 841704


In [3]:
stray_cases = df.filter(df.service_request_type == 'Stray Animal').count()
print('stray animal cases:', stray_cases)

stray animal cases: 26760


In [4]:
# rename columns
df = df.withColumnRenamed('SLA_due_date', 'case_due_date')

In [5]:
# improve data types
df = df.withColumn("case_late", col("case_late") == "Yes").withColumn(
    "case_closed", col("case_closed") == "Yes"
)

df = df.withColumn(
    "council_district", format_string("%04d", col("council_district"))
)

df = (
    df.withColumn(
        "case_opened_date", to_timestamp(col("case_opened_date"), "M/d/yy H:mm")
    )
    .withColumn(
        "case_closed_date", to_timestamp(col("case_closed_date"), "M/d/yy H:mm")
    )
    .withColumn("case_due_date", to_timestamp(col("case_due_date"), "M/d/yy H:mm"))
)

In [6]:
df = df.withColumn('request_address', lower(trim(col('request_address'))))

In [7]:
df = df.withColumn('zipcode', regexp_extract(col('request_address'), r'\d+$', 0))

In [8]:
# case lifetime
df = (
    df.withColumn("case_age", datediff(current_timestamp(), "case_opened_date"))
    .withColumn("days_to_closed", datediff("case_closed_date", "case_opened_date"))
    .withColumn(
        "case_lifetime",
        when(col("case_closed"), col("days_to_closed")).otherwise(col("case_age")),
    )
    .drop("case_age", "days_to_closed")
)

In [9]:
# get depts and sources
depts = spark.read.csv('data/dept.csv', header=True, inferSchema=True)
depts.show(5)

+--------------------+--------------------+----------------------+-------------------+
|       dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+--------------------+--------------------+----------------------+-------------------+
|     311 Call Center|    Customer Service|      Customer Service|                YES|
|               Brush|Solid Waste Manag...|           Solid Waste|                YES|
|     Clean and Green|Parks and Recreation|    Parks & Recreation|                YES|
|Clean and Green N...|Parks and Recreation|    Parks & Recreation|                YES|
|    Code Enforcement|Code Enforcement ...|  DSD/Code Enforcement|                YES|
+--------------------+--------------------+----------------------+-------------------+
only showing top 5 rows



In [11]:
sources = spark.read.csv('data/source.csv', header=True, inferSchema=True)
sources.show(5)

+---------+----------------+
|source_id| source_username|
+---------+----------------+
|   100137|Merlene Blodgett|
|   103582|     Carmen Cura|
|   106463| Richard Sanchez|
|   119403|  Betty De Hoyos|
|   119555|  Socorro Quiara|
+---------+----------------+
only showing top 5 rows



In [12]:
df = df.join(depts, 'dept_division', 'left').join(sources, 'source_id', 'left')

In [14]:
df.count()

855269

In [15]:
df.createOrReplaceTempView('df')

In [16]:
df.show(5)

+---------+----------------+----------+-------------------+-------------------+-------------------+---------+-------------------+-----------+--------------------+-----------+-----------+--------------------+----------------+-------+-------------+--------------------+----------------------+-------------------+---------------+
|source_id|   dept_division|   case_id|   case_opened_date|   case_closed_date|      case_due_date|case_late|      num_days_late|case_closed|service_request_type|   SLA_days|case_status|     request_address|council_district|zipcode|case_lifetime|           dept_name|standardized_dept_name|dept_subject_to_SLA|source_username|
+---------+----------------+----------+-------------------+-------------------+-------------------+---------+-------------------+-----------+--------------------+-----------+-----------+--------------------+----------------+-------+-------------+--------------------+----------------------+-------------------+---------------+
| svcCRMLS|Field Op

In [19]:
# how old is the latest currently open issue? (terms of days past SLA)
spark.sql('''
SELECT DATEDIFF(current_timestamp, case_due_date) AS days_past_due
FROM df
WHERE NOT case_closed
ORDER BY days_past_due DESC
LIMIT 5
''').show()

+-------------+
|days_past_due|
+-------------+
|         1258|
|         1258|
|         1258|
|         1258|
|         1258|
+-------------+



In [20]:
# how many Stray Animals cases are there?
df.filter(df.service_request_type == 'Stray Animal').count()

27361

In [22]:
# How many service requests that are assigned to the Field Operations department
# are not classified as "Officer Standby" request type (service_request_type)?

(
    df.filter(expr("dept_division == 'Field Operations'"))
      .filter(expr('service_request_type != "Officer Standby"'))
      .count()
)

116295

In [23]:
# extract the year from the case_closed_date column
df.select('case_closed_date', year('case_closed_date')).show(5)

+-------------------+----------------------+
|   case_closed_date|year(case_closed_date)|
+-------------------+----------------------+
|2018-01-01 12:29:00|                  2018|
|2018-01-03 08:11:00|                  2018|
|2018-01-02 07:57:00|                  2018|
|2018-01-02 08:13:00|                  2018|
|2018-01-01 13:29:00|                  2018|
+-------------------+----------------------+
only showing top 5 rows



In [24]:
df.select('case_closed_date', month('case_closed_date')).show(3)

+-------------------+-----------------------+
|   case_closed_date|month(case_closed_date)|
+-------------------+-----------------------+
|2018-01-01 12:29:00|                      1|
|2018-01-03 08:11:00|                      1|
|2018-01-02 07:57:00|                      1|
+-------------------+-----------------------+
only showing top 3 rows



In [25]:
df.select('case_closed_date', month('case_closed_date'), year('case_closed_date')).show(5)

+-------------------+-----------------------+----------------------+
|   case_closed_date|month(case_closed_date)|year(case_closed_date)|
+-------------------+-----------------------+----------------------+
|2018-01-01 12:29:00|                      1|                  2018|
|2018-01-03 08:11:00|                      1|                  2018|
|2018-01-02 07:57:00|                      1|                  2018|
|2018-01-02 08:13:00|                      1|                  2018|
|2018-01-01 13:29:00|                      1|                  2018|
+-------------------+-----------------------+----------------------+
only showing top 5 rows



In [26]:
# convert num_days from days to hours in new column num_hours_late
(df.withColumn('num_hours_late', df.num_days_late * 24)
   .select('num_days_late', 'num_hours_late')
   .show()
)

+-------------------+-------------------+
|      num_days_late|     num_hours_late|
+-------------------+-------------------+
| -998.5087616000001|     -23964.2102784|
|-2.0126041669999997|-48.302500007999996|
|       -3.022337963|      -72.536111112|
|       -15.01148148|      -360.27555552|
|0.37216435200000003|  8.931944448000001|
|       -29.74398148| -713.8555555199999|
|       -14.70673611|      -352.96166664|
|       -14.70662037|      -352.95888888|
|       -14.70662037|      -352.95888888|
|       -14.70649306|      -352.95583344|
|       -14.70649306|      -352.95583344|
|       -14.70636574|      -352.95277776|
|          -14.70625|-352.95000000000005|
|       -14.70636574|      -352.95277776|
|       -14.70623843|-352.94972232000003|
|-14.705891199999998|-352.94138879999997|
|       -14.70600694|      -352.94416656|
|       -14.70576389|      -352.93833336|
|       -14.70576389|      -352.93833336|
|       -14.70564815|       -352.9355556|
+-------------------+-------------

In [27]:
# cases without request source?
(
df.select(df.source_id.isNull().cast('int').alias('is_null'))
    .agg(sum('is_null'))
    .show()
)

+------------+
|sum(is_null)|
+------------+
|           0|
+------------+



In [28]:
df.filter(col('source_id').isNull()).show(vertical=True)

(0 rows)



In [29]:
# the top 10 service request types by #s
(df.groupBy('service_request_type').count().sort(col('count').desc()).show(10, truncate=False))

+--------------------------------+-----+
|service_request_type            |count|
+--------------------------------+-----+
|No Pickup                       |89210|
|Overgrown Yard/Trash            |66403|
|Bandit Signs                    |32968|
|Damaged Cart                    |31163|
|Front Or Side Yard Parking      |28920|
|Stray Animal                    |27361|
|Aggressive Animal(Non-Critical) |25492|
|Cart Exchange Request           |22608|
|Junk Vehicle On Private Property|21649|
|Pot Hole Repair                 |20827|
+--------------------------------+-----+
only showing top 10 rows

