In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.getOrCreate()

## Data Acquisition


In [2]:
# Read in CSV file 
source = (spark.read.csv("source.csv",
                     sep=",",
                     header=True,
                     inferSchema=True)
     )

In [None]:
# Read in CSV file 
case = (spark.read.csv("case.csv",
                     sep=",",
                     header=True,
                     inferSchema=True)
     )

In [None]:
# Read in CSV file 
dept = (spark.read.csv("dept.csv",
                     sep=",",
                     header=True,
                     inferSchema=True)
     )

In [None]:
source.show(5) ## <-- previewing source

In [None]:
case.show(3, False, True) ## <-- previewing case

In [None]:
dept.show(5) ## <-- previewing departments

In [None]:
## inspecting our spark dataframe date types using dtypes

case.dtypes

In [None]:
dept.dtypes

In [None]:
source.dtypes

### Data Preparation

Things to do:

Rename Columns:
* 'SLA_due_date -> case_due_date

Correct Data Types:
* case_closed and case_late to boolean
* council_district as a string
* case_opened_date, case_closed_date and case_due_date to datetime format

Data Transformation:
* request_address: trim and lowercase
* format council district with leading zeros
* convert the number of days a case is late to a number of weeks

New features:
* zip_code : extract from address
* case_age
* days_to_closed
* case_lifetime

Join cases data with department data:

In [None]:
# Rename 'SLA_due_date' to 'case_due_date' using .withColumnRenamed

case = case.withColumnRenamed('SLA_due_date', 'case_due_date')

In [None]:
case.show(1, truncate = False, vertical = True)

Correct Data Types:

In [None]:
# correct data types: case_closed and case_late to boolean

case.select("case_closed", "case_late").show(5)

In [None]:
# use .withColumn to change columns from string to boolean values

case = case.withColumn('case_closed', expr('case_closed == "YES"'))\
.withColumn('case_late', expr('case_late == "YES"'))

In [None]:
case.select("case_closed", "case_late").show(5)

In [None]:
# council_district cast as string
case.select('council_district').show(4)

In [None]:
# council_district as a string instead of int
case = case.withColumn('council_district', col('council_district').cast('string'))

In [None]:
# view the column

case.select('council_district').show(4)

In [None]:
# check datatypes
case.dtypes

In [None]:
# convert case_opened_date, case_closed_date and case_due_date to datetime format

case.select('case_opened_date', 'case_closed_date', 'case_due_date').show(5)

In [None]:
# to_timestamp, fmt

fmt = "M/d/yy H:mm"

case = case.withColumn('case_opened_date', to_timestamp('case_opened_date', fmt))\
.withColumn('case_closed_date', to_timestamp('case_closed_date', fmt))\
.withColumn('case_due_date', to_timestamp('case_due_date', fmt))

In [None]:
# check the three columns again

case.select('case_opened_date', 'case_closed_date', 'case_due_date').show(5)

Data Transformation

In [None]:
# request_address: trim and lowercase

case.select('request_address').show(5, False)

In [None]:
case = case.withColumn('request_address', trim(lower(case.request_address)))

case.select('request_address').show(5, False)

In [None]:
# convert the number of days a case is late to a number of weeks

case = case.withColumn('num_weeks_late', expr('num_days_late/7'))

case.select("num_days_late", "num_weeks_late").show(5)

In [None]:
# use format_string function to pad zeros for council_district

case = case.withColumn('council_district', format_string('%03d', col('council_district').cast('int')))

In [None]:
case.select('council_district').show(5)

New Features: 

In [None]:
# create a new column for zipcode:

case = case.withColumn('zipcode', regexp_extract('request_address', r"(\d+$)", 1))

case.select('zipcode').show(5)

In [None]:
case.show(1, False, True)

case_age: How old the case is; the difference in days between when the case was opened and the current day

days_to_closed: The number of days between when the case was opened and when it was closed

case_lifetime: Number of days between when the case was opened and when it was closed, if the case is still open, the number of days since the case was opened

In [None]:
#create three new columns 'case_age', 'days_to_closed', 'case_lifetime'

case = (
    case.withColumn(
        "case_age", datediff(current_timestamp(), "case_opened_date")
    )
    .withColumn(
        "days_to_closed", datediff("case_closed_date", "case_opened_date")
    )
    .withColumn(
        "case_lifetime",
        when(expr("! case_closed"), col("case_age")).otherwise(
            col("days_to_closed")
        ),
    )
)

In [None]:
case.select(expr('CASE WHEN case_closed == False THEN case_age ELSE days_to_closed END').alias('case_lifetime')).show(3)

In [None]:
case.show(1, False, True)

Joining dept on case

In [None]:
# join the df and dept dataframe using 'dept_division' as common key
# drop columns as needed (keep standardized_dept_name)
# convert dept_subject_to_SLA to boolean

case = (
    case
    # left join on dept_division
    .join(dept, "dept_division", "left")
    # drop all the columns except for standardized name, as it has much fewer unique values
    .drop(dept.dept_division)
    .drop(dept.dept_name)
    .drop(case.dept_division)
    .withColumnRenamed("standardized_dept_name", "department")
    # convert to a boolean
    .withColumn("dept_subject_to_SLA", col("dept_subject_to_SLA") == "YES")
)

In [None]:
case.show(1, False, True) ## <-- previewing our new dataframe

### Data Exploration