In [1]:
import pyspark
from pyspark.sql.functions import *
import ignore
spark = pyspark.sql.SparkSession.builder.getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/18 22:23:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/05/18 22:23:23 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:

df = case = spark.read.csv('case.csv', header=True, inferSchema=True)
print('nrows:', df.count())
# stray_animal_cases = df.filter(df.service_request_type == 'Stray Animal').count()
# print('stray animal cases:', stray_animal_cases)

[Stage 2:>                                                          (0 + 4) / 4]

nrows: 841704


                                                                                

In [5]:
# Rename column
df = df.withColumnRenamed('SLA_due_date', 'case_due_date')

# Convert to better data types
df = (
    df.withColumn('case_late', col('case_late') == 'YES')
    .withColumn('case_closed', col('case_closed') == 'YES')
)
df = df.withColumn('council_district', format_string('%04d', col('council_district')))
df = (
    df.withColumn('case_opened_date', to_timestamp(col('case_opened_date'), 'M/d/yy H:mm'))
    .withColumn('case_closed_date', to_timestamp(col('case_closed_date'), 'M/d/yy H:mm'))
    .withColumn('case_due_date', to_timestamp(col('case_due_date'), 'M/d/yy H:mm'))
)

# Cleanup text data
df = df.withColumn('request_address', lower(trim(col('request_address'))))
# Extract zipcode
df = df.withColumn('zipcode', regexp_extract(col('request_address'), r'\d+$', 0))

# Create a `case_lifetime` feature
df = (
    df.withColumn('case_age', datediff(current_timestamp(), 'case_opened_date'))
    .withColumn('days_to_closed', datediff('case_closed_date', 'case_opened_date'))
    .withColumn('case_lifetime', when(col('case_closed'), col('days_to_closed')).otherwise(col('case_age')))
    .drop('case_age', 'days_to_closed')
)


# Join departments and sources
depts = spark.read.csv('dept.csv', header=True, inferSchema=True)
sources = spark.read.csv('source.csv', header=True, inferSchema=True)

df = df.join(depts, 'dept_division', 'left').join(sources, 'source_id', 'left')


In [6]:
df.count()

                                                                                

855269

In [None]:
# How old is the latest (in terms of days past SLA) currently open issue?
spark.sql('''
SELECT DATEDIFF(current_timestamp, case_due_date) AS days_past_due
FROM df
WHERE NOT case_closed
ORDER BY days_past_due DESC
LIMIT 15
''').show()

In [None]:
# Another way:
(
    df.select(datediff(current_timestamp(), 'case_due_date')
    .alias('days_past_due'))
    .where(df.case_closed == False)
    .sort(col('days_past_due').desc())
    .show(5)
    
)

In [None]:
# How long has the oldest (in terms of days since opened) currently opened issue been open?
spark.sql('''
SELECT DATEDIFF(current_timestamp, case_opened_date) AS days_past_opened
FROM df
WHERE NOT case_closed
ORDER BY days_past_opened DESC
LIMIT 15
''').show()

In [None]:
# How many Stray Animal cases are there?
df.filter(df.service_request_type == 'Stray Animal').count()

In [None]:
(
    df.groupBy('service_request_type')
    .count()
    .filter(expr('service_request_type == "Stray Animal"'))
    .show()
)

In [None]:
# How many service requests that are assigned to the Field Operations department (dept_division)
# are not classified as "Officer Standby" request type (service_request_type)?
(
    df.filter(df.dept_division == 'Field Operations')
    .filter(df.service_request_type != 'Officer Standby')
    .count()
)

In [None]:
# Another way to do it
(
    df.filter(expr("dept_division == 'Field Operations'"))
    .filter(expr('service_request_type != "Officer Standby"'))
    .count()
)

In [None]:
# Extract the year from the case_closed_date column.
df.select('case_closed_date', year('case_closed_date')).show(5)

In [None]:
# Convert num_days_late from days to hours in new columns num_hours_late.
(
    df.withColumn('num_hours_late', df.num_days_late * 24)
    .select('num_days_late', 'num_hours_late')
    .show()
)

In [None]:

# What are the top 10 service request types in terms of number of requests?
(
    df.groupby('service_request_type')
    .count()
    .sort(col('count').desc())
    .show(10, truncate=False)
)

In [None]:
# What are the top 10 service request types in terms of average days late?
# - just the late cases
# - for the late cases:
#   - what is the average number of days late by request type?
(
    df.where('case_late') # just the rows where case_late == true
    .groupBy('service_request_type')
    .agg(mean('num_days_late').alias('n_days_late'), count('*').alias('n_cases'))
    .sort(desc('n_days_late'))
    .show(10, truncate=False)
)

In [None]:
# Does number of days late depend on department?
(
    df.filter('case_late')
    .groupby('dept_name')
    .agg(mean('num_days_late').alias('days_late'), count('num_days_late').alias('n_cases_late'))
    .sort('days_late')
    .withColumn('days_late', round(col('days_late'), 1))
    .show(truncate=False)
)

In [None]:
# How do number of days late depend on department and request type?
(
    df.filter("case_closed")
#     .filter("case_late")
    .groupby("standardized_dept_name", "service_request_type")
    .agg(avg("num_days_late").alias("days_late"), count("*").alias("n_cases"))
    .withColumn("days_late", round(col("days_late"), 1))
    .where(col('days_late') > 0)
    .sort(desc("days_late"))
    .show(40, truncate=False)
)