In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
# Creating spark session
spark = SparkSession.builder.getOrCreate()

In [7]:
df = spark.read.csv("data/case.csv", header=True, inferSchema=True)
df.printSchema()
df.show(5, vertical=True)

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- SLA_due_date: string (nullable = true)
 |-- case_late: string (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: integer (nullable = true)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 SLA_due_date         | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  


## Have some data prep. Lets rename some columns

In [8]:
df = df.withColumnRenamed('SLA_due_date', 'case_due_date')

In [9]:
df.explain()

== Physical Plan ==
*(1) Project [case_id#217, case_opened_date#218, case_closed_date#219, SLA_due_date#220 AS case_due_date#316, case_late#221, num_days_late#222, case_closed#223, dept_division#224, service_request_type#225, SLA_days#226, case_status#227, source_id#228, request_address#229, council_district#230]
+- FileScan csv [case_id#217,case_opened_date#218,case_closed_date#219,SLA_due_date#220,case_late#221,num_days_late#222,case_closed#223,dept_division#224,service_request_type#225,SLA_days#226,case_status#227,source_id#228,request_address#229,council_district#230] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/Users/georgearredondo/Desktop/codeup-data-science/spark-exercises/data/ca..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<case_id:int,case_opened_date:string,case_closed_date:string,SLA_due_date:string,case_late:...




In [10]:
# Want to covert to booleans
df.groupby('case_late', 'case_closed').count().show()

+---------+-----------+------+
|case_late|case_closed| count|
+---------+-----------+------+
|       NO|        YES|735616|
|      YES|        YES| 87978|
|       NO|         NO| 11585|
|      YES|         NO|  6525|
+---------+-----------+------+



In [13]:
# Bools for case closed and late
df = df.withColumn("case_closed", expr('case_closed == "YES"')).withColumn(
    "case_late", expr('case_late == "YES"')
)

df.select("case_closed", "case_late").show(5)

+-----------+---------+
|case_closed|case_late|
+-----------+---------+
|       true|    false|
|       true|    false|
|       true|    false|
|       true|    false|
|       true|     true|
+-----------+---------+
only showing top 5 rows



In [19]:
# council_district
df = df.withColumn("council_district", col("council_district").cast("string"))

In [21]:
df.explain()

== Physical Plan ==
*(1) Project [case_id#217, case_opened_date#218, case_closed_date#219, SLA_due_date#220 AS case_due_date#316, (case_late#221 = YES) AS case_late#382, num_days_late#222, (case_closed#223 = YES) AS case_closed#367, dept_division#224, service_request_type#225, SLA_days#226, case_status#227, source_id#228, request_address#229, cast(council_district#230 as string) AS council_district#456]
+- FileScan csv [case_id#217,case_opened_date#218,case_closed_date#219,SLA_due_date#220,case_late#221,num_days_late#222,case_closed#223,dept_division#224,service_request_type#225,SLA_days#226,case_status#227,source_id#228,request_address#229,council_district#230] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/Users/georgearredondo/Desktop/codeup-data-science/spark-exercises/data/ca..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<case_id:int,case_opened_date:string,case_closed_date:string,SLA_due_date:string,case_late:...




In [22]:
# Converting date/times to timestamp. Format use is java for date and time. 
df.select("case_opened_date", "case_closed_date", "case_due_date").show(5)

fmt = "M/d/yy H:mm"
df = (
    df.withColumn("case_opened_date", to_timestamp("case_opened_date", fmt))
    .withColumn("case_closed_date", to_timestamp("case_opened_date", fmt))
    .withColumn("case_due_date", to_timestamp("case_opened_date", fmt))
)

print("--- After")
df.select("case_opened_date", "case_closed_date", "case_due_date").show(5)

+----------------+----------------+-------------+
|case_opened_date|case_closed_date|case_due_date|
+----------------+----------------+-------------+
|     1/1/18 0:42|    1/1/18 12:29| 9/26/20 0:42|
|     1/1/18 0:46|     1/3/18 8:11|  1/5/18 8:30|
|     1/1/18 0:48|     1/2/18 7:57|  1/5/18 8:30|
|     1/1/18 1:29|     1/2/18 8:13| 1/17/18 8:30|
|     1/1/18 1:34|    1/1/18 13:29|  1/1/18 4:34|
+----------------+----------------+-------------+
only showing top 5 rows

--- After
+-------------------+-------------------+-------------------+
|   case_opened_date|   case_closed_date|      case_due_date|
+-------------------+-------------------+-------------------+
|2018-01-01 00:42:00|2018-01-01 00:42:00|2018-01-01 00:42:00|
|2018-01-01 00:46:00|2018-01-01 00:46:00|2018-01-01 00:46:00|
|2018-01-01 00:48:00|2018-01-01 00:48:00|2018-01-01 00:48:00|
|2018-01-01 01:29:00|2018-01-01 01:29:00|2018-01-01 01:29:00|
|2018-01-01 01:34:00|2018-01-01 01:34:00|2018-01-01 01:34:00|
+------------------

In [23]:
df.explain()

== Physical Plan ==
*(1) Project [case_id#217, gettimestamp(case_opened_date#218, M/d/yy H:mm, Some(America/Chicago)) AS case_opened_date#490, gettimestamp(gettimestamp(case_opened_date#218, M/d/yy H:mm, Some(America/Chicago)), M/d/yy H:mm, Some(America/Chicago)) AS case_closed_date#505, gettimestamp(gettimestamp(case_opened_date#218, M/d/yy H:mm, Some(America/Chicago)), M/d/yy H:mm, Some(America/Chicago)) AS case_due_date#520, (case_late#221 = YES) AS case_late#382, num_days_late#222, (case_closed#223 = YES) AS case_closed#367, dept_division#224, service_request_type#225, SLA_days#226, case_status#227, source_id#228, request_address#229, cast(council_district#230 as string) AS council_district#456]
+- FileScan csv [case_id#217,case_opened_date#218,case_late#221,num_days_late#222,case_closed#223,dept_division#224,service_request_type#225,SLA_days#226,case_status#227,source_id#228,request_address#229,council_district#230] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryF

In [25]:
# truncate = False allows us to see the entire address
df.select(
    col('request_address'),
    lower(trim('request_address'))
).show(truncate=False)

+----------------------------------------+----------------------------------------+
|request_address                         |lower(trim(request_address))            |
+----------------------------------------+----------------------------------------+
|2315  EL PASO ST, San Antonio, 78207    |2315  el paso st, san antonio, 78207    |
|2215  GOLIAD RD, San Antonio, 78223     |2215  goliad rd, san antonio, 78223     |
|102  PALFREY ST W, San Antonio, 78223   |102  palfrey st w, san antonio, 78223   |
|114  LA GARDE ST, San Antonio, 78223    |114  la garde st, san antonio, 78223    |
|734  CLEARVIEW DR, San Antonio, 78228   |734  clearview dr, san antonio, 78228   |
|BANDERA RD and BRESNAHAN                |bandera rd and bresnahan                |
|10133  FIGARO CANYON, San Antonio, 78251|10133  figaro canyon, san antonio, 78251|
|10133  FIGARO CANYON, San Antonio, 78251|10133  figaro canyon, san antonio, 78251|
|10133  FIGARO CANYON, San Antonio, 78251|10133  figaro canyon, san antonio,

In [29]:
df = df.withColumn('request_address', lower(trim('request_address')))
df.show(2, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 00:42:00  
 case_due_date        | 2018-01-01 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  el paso st,... 
 council_district     | 5                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 2018-01-01 00:46:00  
 case_closed_date     | 2018-01-01 00:46:00  
 case_due_date        | 2018-01-01 00:46:00  
 case_late            | false                
 num_days_late        | -2.0126041

In [42]:
df = df.withColumn("zipcode", regexp_extract("request_address", r"\d+$", 0))

df.select("zipcode").show(5)

+-------+
|zipcode|
+-------+
|  78207|
|  78223|
|  78223|
|  78223|
|  78228|
+-------+
only showing top 5 rows



In [32]:
# Going to pull the zip code out of the address
print("--- Before")
df.select("request_address").show(5,truncate=False)

df = df.withColumn("request_address", trim(lower(df.request_address)))

print("--- After")
df.select("request_address").show(5, truncate=False)

--- Before
+-------------------------------------+
|request_address                      |
+-------------------------------------+
|2315  el paso st, san antonio, 78207 |
|2215  goliad rd, san antonio, 78223  |
|102  palfrey st w, san antonio, 78223|
|114  la garde st, san antonio, 78223 |
|734  clearview dr, san antonio, 78228|
+-------------------------------------+
only showing top 5 rows

--- After
+-------------------------------------+
|request_address                      |
+-------------------------------------+
|2315  el paso st, san antonio, 78207 |
|2215  goliad rd, san antonio, 78223  |
|102  palfrey st w, san antonio, 78223|
|114  la garde st, san antonio, 78223 |
|734  clearview dr, san antonio, 78228|
+-------------------------------------+
only showing top 5 rows



In [39]:
# Going to see how long cases were open/closed. Case lifetime
# - if case is closed then diff between close and opn dates, day_to_close
# - else diff between open date and now -- case_age

df = (
    df.withColumn(
        "case_age", datediff(current_timestamp(), "case_opened_date")
    )
    .withColumn(
        "days_to_closed", datediff("case_closed_date", "case_opened_date")
    )
    .withColumn(
        "case_lifetime",
        when(expr("! case_closed"), col("case_age")).otherwise(
            col("days_to_closed")
        ),
    )
)

df.select(
    "case_closed",
    "case_opened_date",
    "case_closed_date",
    "case_age",
    "days_to_closed",
    "case_lifetime",
).where(expr("case_closed")).show(5)

df.select(
    "case_closed",
    "case_opened_date",
    "case_closed_date",
    "case_age",
    "days_to_closed",
    "case_lifetime",
).where(expr("! case_closed")).show(5)

+-----------+-------------------+-------------------+--------+--------------+-------------+
|case_closed|   case_opened_date|   case_closed_date|case_age|days_to_closed|case_lifetime|
+-----------+-------------------+-------------------+--------+--------------+-------------+
|       true|2018-01-01 00:42:00|2018-01-01 00:42:00|    1064|             0|            0|
|       true|2018-01-01 00:46:00|2018-01-01 00:46:00|    1064|             0|            0|
|       true|2018-01-01 00:48:00|2018-01-01 00:48:00|    1064|             0|            0|
|       true|2018-01-01 01:29:00|2018-01-01 01:29:00|    1064|             0|            0|
|       true|2018-01-01 01:34:00|2018-01-01 01:34:00|    1064|             0|            0|
+-----------+-------------------+-------------------+--------+--------------+-------------+
only showing top 5 rows

+-----------+-------------------+-------------------+--------+--------------+-------------+
|case_closed|   case_opened_date|   case_closed_date|ca

In [41]:
df

DataFrame[case_id: int, case_opened_date: timestamp, case_closed_date: timestamp, case_due_date: timestamp, case_late: boolean, num_days_late: double, case_closed: boolean, dept_division: string, service_request_type: string, SLA_days: double, case_status: string, source_id: string, request_address: string, council_district: string, case_age: int, days_to_closed: int, case_lifetime: int]

In [40]:
dept = spark.read.csv("data/dept.csv", header=True, inferSchema=True)
dept.show(5)

+--------------------+--------------------+----------------------+-------------------+
|       dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+--------------------+--------------------+----------------------+-------------------+
|     311 Call Center|    Customer Service|      Customer Service|                YES|
|               Brush|Solid Waste Manag...|           Solid Waste|                YES|
|     Clean and Green|Parks and Recreation|    Parks & Recreation|                YES|
|Clean and Green N...|Parks and Recreation|    Parks & Recreation|                YES|
|    Code Enforcement|Code Enforcement ...|  DSD/Code Enforcement|                YES|
+--------------------+--------------------+----------------------+-------------------+
only showing top 5 rows



In [43]:
df = (
    df
    # left join on dept_division
    .join(dept, "dept_division", "left")
    # drop all the columns except for standardized name, as it has much fewer unique values
    .drop(dept.dept_division)
    .drop(dept.dept_name)
    .drop(df.dept_division)
    .withColumnRenamed("standardized_dept_name", "department")
    # convert to a boolean
    .withColumn("dept_subject_to_SLA", col("dept_subject_to_SLA") == "YES")
)

df.show(2, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 00:42:00  
 case_due_date        | 2018-01-01 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  el paso st,... 
 council_district     | 5                    
 case_age             | 1064                 
 days_to_closed       | 0                    
 case_lifetime        | 0                    
 zipcode              | 78207                
 department           | Animal Care Services 
 dept_subject_to_SLA  | true                 
-RECORD 1------------------------------------
 case_id              | 1014127333

In [44]:
train, validate, test = df.randomSplit([0.6, 0.2, 0.2])

In [47]:
train.count(), validate.count(), test.count()

(505064, 168286, 168354)