# Data Wrangling

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.getOrCreate()

Data comes from calls to San Antonio's 311 call center.

## Reading Data

In [2]:
df = spark.read.csv("source.csv", sep=",", header=True, inferSchema=True)

The above code could also be written like so:

In [3]:
(
    spark.read.format("csv")
    .option("sep", ",")
    .option("inferSchema", True)
    .option("header", True)
    .load("source.csv")
)

DataFrame[source_id: string, source_username: string]

### Data Schemas

Common Spark Data Types:

- `StringType`
- `IntegerType`
- `FloatType`

StuctType and StructField wrap the above.

- `StructType`
    - `StructField`: `StringType`
    - `StructField`: `IntegerType`

In [4]:
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType(
    [
        StructField("source_id", StringType()),
        StructField("source_username", StringType()),
    ]
)

spark.read.csv("source.csv", header=True, schema=schema)

DataFrame[source_id: string, source_username: string]

### Writing Data

In [5]:
# for demo purposes
from pydataset import data

mpg = spark.createDataFrame(data("mpg"))

mpg.write.json("mpg_json", mode="overwrite")

# like much else in spark, there's multiple ways we could do this:
(
    mpg.write.format("csv")
    .mode("overwrite")
    .option("header", "true")
    .save("mpg_csv")
)

## Data Preparation

In [6]:
df = spark.read.csv("case.csv", header=True, inferSchema=True)
df.show(2)

+----------+----------------+----------------+------------+---------+-------------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|   case_id|case_opened_date|case_closed_date|SLA_due_date|case_late|      num_days_late|case_closed|   dept_division|service_request_type|   SLA_days|case_status|source_id|     request_address|council_district|
+----------+----------------+----------------+------------+---------+-------------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|1014127332|     1/1/18 0:42|    1/1/18 12:29|9/26/20 0:42|       NO| -998.5087616000001|        YES|Field Operations|        Stray Animal|      999.0|     Closed| svcCRMLS|2315  EL PASO ST,...|               5|
|1014127333|     1/1/18 0:46|     1/3/18 8:11| 1/5/18 8:30|       NO|-2.0126041669999997|        YES|     Storm Water|Removal Of Obstru...|4.322222222| 

In [7]:
df.show(2, vertical=True, truncate=False)

-RECORD 0----------------------------------------------------
 case_id              | 1014127332                           
 case_opened_date     | 1/1/18 0:42                          
 case_closed_date     | 1/1/18 12:29                         
 SLA_due_date         | 9/26/20 0:42                         
 case_late            | NO                                   
 num_days_late        | -998.5087616000001                   
 case_closed          | YES                                  
 dept_division        | Field Operations                     
 service_request_type | Stray Animal                         
 SLA_days             | 999.0                                
 case_status          | Closed                               
 source_id            | svcCRMLS                             
 request_address      | 2315  EL PASO ST, San Antonio, 78207 
 council_district     | 5                                    
-RECORD 1----------------------------------------------------
 case_id

### Renaming Columns

In [8]:
df = df.withColumnRenamed("SLA_due_date", "case_due_date")

In [9]:
df.show(2, vertical=True, truncate=False)

-RECORD 0----------------------------------------------------
 case_id              | 1014127332                           
 case_opened_date     | 1/1/18 0:42                          
 case_closed_date     | 1/1/18 12:29                         
 case_due_date        | 9/26/20 0:42                         
 case_late            | NO                                   
 num_days_late        | -998.5087616000001                   
 case_closed          | YES                                  
 dept_division        | Field Operations                     
 service_request_type | Stray Animal                         
 SLA_days             | 999.0                                
 case_status          | Closed                               
 source_id            | svcCRMLS                             
 request_address      | 2315  EL PASO ST, San Antonio, 78207 
 council_district     | 5                                    
-RECORD 1----------------------------------------------------
 case_id

### Casting Data Types

In [10]:
# demonstrating we only have yes/no in each field
df.groupBy("case_closed", "case_late").count().show()

+-----------+---------+------+
|case_closed|case_late| count|
+-----------+---------+------+
|         NO|      YES|  6525|
|        YES|      YES| 87978|
|         NO|       NO| 11585|
|        YES|       NO|735616|
+-----------+---------+------+



In [11]:
# demo simple arithmetic
df.select(expr('num_days_late * SLA_days / 3')).show(5)

+--------------------------------+
|((num_days_late * SLA_days) / 3)|
+--------------------------------+
|                 -332503.4176128|
|              -2.899640818232399|
|              -4.352901263088489|
|              -81.52178450660524|
|            0.015506848000000002|
+--------------------------------+
only showing top 5 rows



In [12]:
df = df.withColumn("case_closed", expr('case_closed == "YES"')).withColumn(
    "case_late", expr('case_late == "YES"')
)

df.select("case_closed", "case_late").show(5)

+-----------+---------+
|case_closed|case_late|
+-----------+---------+
|       true|    false|
|       true|    false|
|       true|    false|
|       true|    false|
|       true|     true|
+-----------+---------+
only showing top 5 rows



In [13]:
df.groupBy("council_district").count().show()

+----------------+------+
|council_district| count|
+----------------+------+
|               1|119309|
|               6| 74095|
|               3|102706|
|               5|114609|
|               9| 40916|
|               4| 93778|
|               8| 42345|
|               7| 72445|
|              10| 62926|
|               2|114745|
|               0|  3830|
+----------------+------+



In [14]:
# convert council_district to a string
df = df.withColumn("council_district", col("council_district").cast("string"))

In [15]:
df.show(2, vertical=True, truncate=False)

-RECORD 0----------------------------------------------------
 case_id              | 1014127332                           
 case_opened_date     | 1/1/18 0:42                          
 case_closed_date     | 1/1/18 12:29                         
 case_due_date        | 9/26/20 0:42                         
 case_late            | false                                
 num_days_late        | -998.5087616000001                   
 case_closed          | true                                 
 dept_division        | Field Operations                     
 service_request_type | Stray Animal                         
 SLA_days             | 999.0                                
 case_status          | Closed                               
 source_id            | svcCRMLS                             
 request_address      | 2315  EL PASO ST, San Antonio, 78207 
 council_district     | 5                                    
-RECORD 1----------------------------------------------------
 case_id

In [16]:
df.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- case_due_date: string (nullable = true)
 |-- case_late: boolean (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: boolean (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = true)



`to_timestamp` for date conversion

[Java's `SimpleDateFormat`][1] for the format string

[1]: https://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html

In [17]:
print("--- Before handling dates")
df.select("case_opened_date", "case_closed_date", "case_due_date").show(5)

--- Before handling dates
+----------------+----------------+-------------+
|case_opened_date|case_closed_date|case_due_date|
+----------------+----------------+-------------+
|     1/1/18 0:42|    1/1/18 12:29| 9/26/20 0:42|
|     1/1/18 0:46|     1/3/18 8:11|  1/5/18 8:30|
|     1/1/18 0:48|     1/2/18 7:57|  1/5/18 8:30|
|     1/1/18 1:29|     1/2/18 8:13| 1/17/18 8:30|
|     1/1/18 1:34|    1/1/18 13:29|  1/1/18 4:34|
+----------------+----------------+-------------+
only showing top 5 rows



In [18]:
fmt = "M/d/yy H:mm"
df = (
    df.withColumn("case_opened_date", to_timestamp("case_opened_date", fmt))
    .withColumn("case_closed_date", to_timestamp("case_opened_date", fmt))
    .withColumn("case_due_date", to_timestamp("case_opened_date", fmt))
)

In [19]:
print("--- After")
df.select("case_opened_date", "case_closed_date", "case_due_date").show(5)

--- After
+-------------------+-------------------+-------------------+
|   case_opened_date|   case_closed_date|      case_due_date|
+-------------------+-------------------+-------------------+
|2018-01-01 00:42:00|2018-01-01 00:42:00|2018-01-01 00:42:00|
|2018-01-01 00:46:00|2018-01-01 00:46:00|2018-01-01 00:46:00|
|2018-01-01 00:48:00|2018-01-01 00:48:00|2018-01-01 00:48:00|
|2018-01-01 01:29:00|2018-01-01 01:29:00|2018-01-01 01:29:00|
|2018-01-01 01:34:00|2018-01-01 01:34:00|2018-01-01 01:34:00|
+-------------------+-------------------+-------------------+
only showing top 5 rows



### Data Transformations

Normalize the address

In [20]:
print("--- Before")
df.select("request_address").show(5)

--- Before
+--------------------+
|     request_address|
+--------------------+
|2315  EL PASO ST,...|
|2215  GOLIAD RD, ...|
|102  PALFREY ST W...|
|114  LA GARDE ST,...|
|734  CLEARVIEW DR...|
+--------------------+
only showing top 5 rows



In [21]:
df = df.withColumn("request_address", trim(lower(df.request_address)))

In [22]:
print("--- After")
df.select("request_address").show(5)

--- After
+--------------------+
|     request_address|
+--------------------+
|2315  el paso st,...|
|2215  goliad rd, ...|
|102  palfrey st w...|
|114  la garde st,...|
|734  clearview dr...|
+--------------------+
only showing top 5 rows



Number of days to number of weeks

In [23]:
df = df.withColumn(
    "num_weeks_late", expr("num_days_late / 7 AS num_weeks_late")
)

df.select("num_days_late", "num_weeks_late").show(5)

+-------------------+--------------------+
|      num_days_late|      num_weeks_late|
+-------------------+--------------------+
| -998.5087616000001|        -142.6441088|
|-2.0126041669999997|-0.28751488099999994|
|       -3.022337963|-0.43176256614285713|
|       -15.01148148| -2.1444973542857144|
|0.37216435200000003|         0.053166336|
+-------------------+--------------------+
only showing top 5 rows



String formatting

1 -> 0001
3 -> 0003
12 -> 0012

lets user know this is a not a number to be used for arithmetic

In [24]:
# '%03d' means at least 3 digits, pad with 0s
#
# In order to use the format_string function the way we are, we'll need to
# convert council_district back to an integer temporarily, but the final output
# will be a string.
df = df.withColumn(
    "council_district",
    format_string("%03d", col("council_district").cast("int")),
)

df.select("council_district").show(5)

+----------------+
|council_district|
+----------------+
|             005|
|             003|
|             003|
|             003|
|             007|
+----------------+
only showing top 5 rows



In [25]:
df.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- case_due_date: timestamp (nullable = true)
 |-- case_late: boolean (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: boolean (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = false)
 |-- num_weeks_late: double (nullable = true)



### New Features

In [26]:
df = df.withColumn("zipcode", regexp_extract("request_address", r"\d+$", 0))

df.select("zipcode").show(5)

+-------+
|zipcode|
+-------+
|  78207|
|  78223|
|  78223|
|  78223|
|  78228|
+-------+
only showing top 5 rows



In [27]:
df.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- case_due_date: timestamp (nullable = true)
 |-- case_late: boolean (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: boolean (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = false)
 |-- num_weeks_late: double (nullable = true)
 |-- zipcode: string (nullable = true)



- `case_age`: How old the case is; the difference in days between when the case was opened and the current day
- `days_to_closed`: The number of days between when the case was opened and when it was closed
- `case_lifetime`: Number of days between when the case was opened and when it was closed, if the case is still open, the number of days since the case was opened

In [28]:
# Chaining methods
#df.select(
#    when('con1', 'col_a').otherwise(when('con2', col_b).otherwise('col_c'))
#)

In [29]:
df = (
    df.withColumn(
        "case_age", datediff(current_timestamp(), "case_opened_date")
    )
    .withColumn(
        "days_to_closed", datediff("case_closed_date", "case_opened_date")
    )
    .withColumn(
        "case_lifetime",
        when(expr("! case_closed"), col("case_age")).otherwise(
            col("days_to_closed")
        ),
    )
)

df.select(
    "case_closed",
    "case_opened_date",
    "case_closed_date",
    "case_age",
    "days_to_closed",
    "case_lifetime",
).where(expr("case_closed")).show(5)

df.select(
    "case_closed",
    "case_opened_date",
    "case_closed_date",
    "case_age",
    "days_to_closed",
    "case_lifetime",
).where(expr("! case_closed")).show(5)

+-----------+-------------------+-------------------+--------+--------------+-------------+
|case_closed|   case_opened_date|   case_closed_date|case_age|days_to_closed|case_lifetime|
+-----------+-------------------+-------------------+--------+--------------+-------------+
|       true|2018-01-01 00:42:00|2018-01-01 00:42:00|    1407|             0|            0|
|       true|2018-01-01 00:46:00|2018-01-01 00:46:00|    1407|             0|            0|
|       true|2018-01-01 00:48:00|2018-01-01 00:48:00|    1407|             0|            0|
|       true|2018-01-01 01:29:00|2018-01-01 01:29:00|    1407|             0|            0|
|       true|2018-01-01 01:34:00|2018-01-01 01:34:00|    1407|             0|            0|
+-----------+-------------------+-------------------+--------+--------------+-------------+
only showing top 5 rows

+-----------+-------------------+-------------------+--------+--------------+-------------+
|case_closed|   case_opened_date|   case_closed_date|ca

In [30]:
print('Complete')
df.select('*').where('case_closed').show(1, vertical=True, truncate=True)
print('-----------------')
print('Incomplete')
df.select('*').where('! case_closed').show(1, vertical=True, truncate=True)

Complete
-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 00:42:00  
 case_due_date        | 2018-01-01 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  el paso st,... 
 council_district     | 005                  
 num_weeks_late       | -142.6441088         
 zipcode              | 78207                
 case_age             | 1407                 
 days_to_closed       | 0                    
 case_lifetime        | 0                    
only showing top 1 row

-----------------
Incomplete
-RECORD 0---------

In [31]:
df.groupBy('dept_division').count().show()

+--------------------+------+
|       dept_division| count|
+--------------------+------+
|       Miscellaneous| 45123|
|         Solid Waste|   813|
|    Field Operations|116915|
|             Streets| 38510|
|    Waste Collection|215122|
|          District 7|     2|
|Code Enforcement ...|  2189|
|              Vector|   538|
|        Reservations|     2|
|   Dangerous Premise| 15479|
|     311 Call Center|  2849|
|               Brush| 18212|
|Dangerous Premise...|    36|
|Traffic Engineeri...|  4334|
|Code Enforcement ...|   198|
|          District 2|     3|
|             Signals| 20700|
|Engineering Division|  1375|
|Director's Office...|   515|
|         Storm Water| 13769|
+--------------------+------+
only showing top 20 rows



### Joining Department Data

In [32]:
dept = spark.read.csv("dept.csv", header=True, inferSchema=True)
dept.show(100, truncate=False)

+-----------------------------+-------------------------+------------------------+-------------------+
|dept_division                |dept_name                |standardized_dept_name  |dept_subject_to_SLA|
+-----------------------------+-------------------------+------------------------+-------------------+
|311 Call Center              |Customer Service         |Customer Service        |YES                |
|Brush                        |Solid Waste Management   |Solid Waste             |YES                |
|Clean and Green              |Parks and Recreation     |Parks & Recreation      |YES                |
|Clean and Green Natural Areas|Parks and Recreation     |Parks & Recreation      |YES                |
|Code Enforcement             |Code Enforcement Services|DSD/Code Enforcement    |YES                |
|Code Enforcement (IntExp)    |Code Enforcement Services|DSD/Code Enforcement    |YES                |
|Code Enforcement (Internal)  |null                     |DSD/Code Enforce

Join using `dept_division`.

In [33]:
df.join(dept, "dept_division", "left").show(2, vertical=True, truncate=False)

-RECORD 0------------------------------------------------------
 dept_division          | Field Operations                     
 case_id                | 1014127332                           
 case_opened_date       | 2018-01-01 00:42:00                  
 case_closed_date       | 2018-01-01 00:42:00                  
 case_due_date          | 2018-01-01 00:42:00                  
 case_late              | false                                
 num_days_late          | -998.5087616000001                   
 case_closed            | true                                 
 service_request_type   | Stray Animal                         
 SLA_days               | 999.0                                
 case_status            | Closed                               
 source_id              | svcCRMLS                             
 request_address        | 2315  el paso st, san antonio, 78207 
 council_district       | 005                                  
 num_weeks_late         | -142.6441088  

Drop the unused or redundant columns

In [34]:
(
    df.join(dept, "dept_division", "left")
        .drop(dept.dept_division)
        .drop(dept.dept_name)
        .drop(df.dept_division)
    .show(2, vertical=True, truncate=False)
)

-RECORD 0------------------------------------------------------
 case_id                | 1014127332                           
 case_opened_date       | 2018-01-01 00:42:00                  
 case_closed_date       | 2018-01-01 00:42:00                  
 case_due_date          | 2018-01-01 00:42:00                  
 case_late              | false                                
 num_days_late          | -998.5087616000001                   
 case_closed            | true                                 
 service_request_type   | Stray Animal                         
 SLA_days               | 999.0                                
 case_status            | Closed                               
 source_id              | svcCRMLS                             
 request_address        | 2315  el paso st, san antonio, 78207 
 council_district       | 005                                  
 num_weeks_late         | -142.6441088                         
 zipcode                | 78207         

- Rename `standardized_dept_name` to `department`
- Convert `dept_subject_to_SLA` to boolean

In [35]:
(
    df.join(dept, "dept_division", "left")
    .drop(dept.dept_division)
    .drop(dept.dept_name)
    .drop(df.dept_division)
    .withColumnRenamed("standardized_dept_name", "department")
    .withColumn("dept_subject_to_SLA", col("dept_subject_to_SLA") == "YES")
    .show(2, vertical=True, truncate=False)
)

-RECORD 0----------------------------------------------------
 case_id              | 1014127332                           
 case_opened_date     | 2018-01-01 00:42:00                  
 case_closed_date     | 2018-01-01 00:42:00                  
 case_due_date        | 2018-01-01 00:42:00                  
 case_late            | false                                
 num_days_late        | -998.5087616000001                   
 case_closed          | true                                 
 service_request_type | Stray Animal                         
 SLA_days             | 999.0                                
 case_status          | Closed                               
 source_id            | svcCRMLS                             
 request_address      | 2315  el paso st, san antonio, 78207 
 council_district     | 005                                  
 num_weeks_late       | -142.6441088                         
 zipcode              | 78207                                
 case_ag

In [36]:
df = (
    df
    # left join on dept_division
    .join(dept, "dept_division", "left")
    # drop all the columns except for standardized name, as it has much fewer unique values
    .drop(dept.dept_division)
    .drop(dept.dept_name)
    .drop(df.dept_division)
    .withColumnRenamed("standardized_dept_name", "department")
    # convert to a boolean
    .withColumn("dept_subject_to_SLA", col("dept_subject_to_SLA") == "YES")
)

df.show(2, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 00:42:00  
 case_due_date        | 2018-01-01 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  el paso st,... 
 council_district     | 005                  
 num_weeks_late       | -142.6441088         
 zipcode              | 78207                
 case_age             | 1407                 
 days_to_closed       | 0                    
 case_lifetime        | 0                    
 department           | Animal Care Services 
 dept_subject_to_SLA  | true                 
-RECORD 1-------------------------

## Train Test Split

In [37]:
train, test = df.randomSplit([0.8, 0.2])

print('train', train.count(), 'x', len(train.columns))
print('test', test.count(), 'x', len(test.columns))

train 672930 x 20
test 168774 x 20


In [38]:
train, validate, test = df.randomSplit([0.6, 0.2, 0.2])

print('train', train.count(), 'x', len(train.columns))
print('validate', validate.count(), 'x', len(validate.columns))
print('test', test.count(), 'x', len(test.columns))

train 505364 x 20
validate 168004 x 20
test 168336 x 20
