In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/02/15 09:24:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
source_df = spark.read.csv("source.csv", sep=",", header=True, inferSchema=True)
case_df = spark.read.csv("case.csv", sep=",", header=True, inferSchema=True)
dept_df = spark.read.csv("dept.csv", sep=",", header=True, inferSchema=True)

                                                                                

In [3]:
source_df.dtypes

[('source_id', 'string'), ('source_username', 'string')]

In [4]:
case_df.dtypes

[('case_id', 'int'),
 ('case_opened_date', 'string'),
 ('case_closed_date', 'string'),
 ('SLA_due_date', 'string'),
 ('case_late', 'string'),
 ('num_days_late', 'double'),
 ('case_closed', 'string'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'int')]

In [5]:
# Rename 'SLA_due_date' to 'case_due_date' using .withColumnRenamed

case_df = case_df.withColumnRenamed('SLA_due_date', 'case_due_date')

In [6]:
# use .withColumn to change columns from string to boolean values

case_df = case_df.withColumn('case_closed', expr('case_closed == "YES"')).withColumn('case_late', expr('case_late == "YES"'))

In [7]:
# council_district as a string instead of int
case_df = case_df.withColumn('council_district', col('council_district').cast('string'))

In [8]:
# to_timestamp, fmt

fmt = "M/d/yy H:mm"

case_df = case_df.withColumn('case_opened_date', to_timestamp('case_opened_date', fmt)).withColumn('case_closed_date', to_timestamp('case_closed_date', fmt)).withColumn('case_due_date', to_timestamp('case_due_date', fmt))

In [9]:
# request_address: trim and lowercase

case_df = case_df.withColumn('request_address', lower(trim('request_address')))

In [10]:
# convert the number of days a case is late to a number of weeks

case_df = case_df.withColumn('num_weeks_late', expr('(num_days_late) / 7'))

In [11]:
# use format_string function to pad zeros for council_district
case_df = case_df.withColumn('council_district', format_string('%03d', col('council_district').cast('int')))

In [12]:
# create a new column for zipcode:

case_df = case_df.withColumn('zipcode', expr('right(request_address, 5)'))

In [13]:
#create three new columns 'case_age', 'days_to_closed', 'case_lifetime'

case_df = (
    case_df.withColumn(
        "case_age", datediff(current_timestamp(), "case_opened_date")
    )
    .withColumn(
        "days_to_closed", datediff("case_closed_date", "case_opened_date")
    )
    .withColumn(
        "case_lifetime",
        when(expr("! case_closed"), col("case_age")).otherwise(
            col("days_to_closed")
        ),
    )
)

In [14]:
# read the dept.csv file:

dept = spark.read.csv("dept.csv", header=True, inferSchema=True)
dept.show(5, False, True)

-RECORD 0-----------------------------------------------
 dept_division          | 311 Call Center               
 dept_name              | Customer Service              
 standardized_dept_name | Customer Service              
 dept_subject_to_SLA    | YES                           
-RECORD 1-----------------------------------------------
 dept_division          | Brush                         
 dept_name              | Solid Waste Management        
 standardized_dept_name | Solid Waste                   
 dept_subject_to_SLA    | YES                           
-RECORD 2-----------------------------------------------
 dept_division          | Clean and Green               
 dept_name              | Parks and Recreation          
 standardized_dept_name | Parks & Recreation            
 dept_subject_to_SLA    | YES                           
-RECORD 3-----------------------------------------------
 dept_division          | Clean and Green Natural Areas 
 dept_name              | Parks

In [15]:
case_df = (
    case_df
    # left join on dept_division
    .join(dept, "dept_division", "left")
    # drop all the columns except for standardized name, as it has much fewer unique values
    .drop(dept.dept_division)
    .drop(dept.dept_name)
    .withColumnRenamed("standardized_dept_name", "department")
    # convert to a boolean
    .withColumn("dept_subject_to_SLA", col("dept_subject_to_SLA") == "YES")
)

In [16]:
case_df.show(1, False, True)

-RECORD 0----------------------------------------------------
 dept_division        | Field Operations                     
 case_id              | 1014127332                           
 case_opened_date     | 2018-01-01 00:42:00                  
 case_closed_date     | 2018-01-01 12:29:00                  
 case_due_date        | 2020-09-26 00:42:00                  
 case_late            | false                                
 num_days_late        | -998.5087616000001                   
 case_closed          | true                                 
 service_request_type | Stray Animal                         
 SLA_days             | 999.0                                
 case_status          | Closed                               
 source_id            | svcCRMLS                             
 request_address      | 2315  el paso st, san antonio, 78207 
 council_district     | 005                                  
 num_weeks_late       | -142.6441088                         
 zipcode

## 1. How old is the latest (in terms of days past SLA) currently open issue? How long has the oldest (in terms of days since opened) currently opened issue been open?

In [17]:
case_df.show(3, vertical=True)

-RECORD 0------------------------------------
 dept_division        | Field Operations     
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  el paso st,... 
 council_district     | 005                  
 num_weeks_late       | -142.6441088         
 zipcode              | 78207                
 case_age             | 1506                 
 days_to_closed       | 0                    
 case_lifetime        | 0                    
 department           | Animal Care Services 
 dept_subject_to_SLA  | true      

In [18]:
case_df.where(case_df['case_status']== 'Open').show(3, vertical=True)

-RECORD 0------------------------------------
 dept_division        | 311 Call Center      
 case_id              | 1014128388           
 case_opened_date     | 2018-01-02 09:39:00  
 case_closed_date     | null                 
 case_due_date        | 2018-01-09 09:39:00  
 case_late            | true                 
 num_days_late        | 211.5974884          
 case_closed          | false                
 service_request_type | Complaint            
 SLA_days             | 7.0                  
 case_status          | Open                 
 source_id            | mt13131              
 request_address      | 7326  westglade p... 
 council_district     | 006                  
 num_weeks_late       | 30.22821262857143    
 zipcode              | 78227                
 case_age             | 1505                 
 days_to_closed       | null                 
 case_lifetime        | 1505                 
 department           | Customer Service     
 dept_subject_to_SLA  | true      

In [23]:
case_df.where(case_df['case_status']== 'Open').show(3, vertical=True)

-RECORD 0------------------------------------
 dept_division        | 311 Call Center      
 case_id              | 1014128388           
 case_opened_date     | 2018-01-02 09:39:00  
 case_closed_date     | null                 
 case_due_date        | 2018-01-09 09:39:00  
 case_late            | true                 
 num_days_late        | 211.5974884          
 case_closed          | false                
 service_request_type | Complaint            
 SLA_days             | 7.0                  
 case_status          | Open                 
 source_id            | mt13131              
 request_address      | 7326  westglade p... 
 council_district     | 006                  
 num_weeks_late       | 30.22821262857143    
 zipcode              | 78227                
 case_age             | 1505                 
 days_to_closed       | null                 
 case_lifetime        | 1505                 
 department           | Customer Service     
 dept_subject_to_SLA  | true      