# Data Wrangling with Spark

In [1]:
import pyspark
from pyspark.sql.functions import *

In [2]:
# to get back the original sum function
from builtins import sum as builtin_sum

In [3]:
sum

<function pyspark.sql.functions._create_function.<locals>._(col)>

In [4]:
builtin_sum([1, 2, 3, 4, 5])

15

In [5]:
# This step could get very complicated
spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [6]:
spark

## Reading / Writing Data

- `spark.read.csv`
- `spark.read.json`

These functions will produce a dataframe.

Important options

- `header=True`
- `inferSchema=True`

Instead of `inferSchema` you can create a custom data schema. Why?

- Speed
- Programatic documentation of data structure

Writing Data

- `df.write.csv(filename)`
- `df.write.json(filename)`

## Data Prep

In [7]:
df = spark.read.csv('data/case.csv', header=True, inferSchema=True)

In [8]:
df.show(5, vertical=True, truncate=False)

-RECORD 0-----------------------------------------------------
 case_id              | 1014127332                            
 case_opened_date     | 1/1/18 0:42                           
 case_closed_date     | 1/1/18 12:29                          
 SLA_due_date         | 9/26/20 0:42                          
 case_late            | NO                                    
 num_days_late        | -998.5087616000001                    
 case_closed          | YES                                   
 dept_division        | Field Operations                      
 service_request_type | Stray Animal                          
 SLA_days             | 999.0                                 
 case_status          | Closed                                
 source_id            | svcCRMLS                              
 request_address      | 2315  EL PASO ST, San Antonio, 78207  
 council_district     | 5                                     
-RECORD 1----------------------------------------------

In [9]:
df = df.withColumnRenamed('SLA_due_date', 'case_due_date')

In [10]:
df.withColumnRenamed('SLA_due_date', 'case_due_date').show(1, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 case_due_date        | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
only showing top 1 row



In [11]:
df.groupby('case_late').pivot('case_closed').count().show()

+---------+-----+------+
|case_late|   NO|   YES|
+---------+-----+------+
|      YES| 6525| 87978|
|       NO|11585|735616|
+---------+-----+------+



In [12]:
# .withColumn to transform columns
df.withColumn('case_late', col('case_late') == 'YES').show(1, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 case_due_date        | 9/26/20 0:42         
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
only showing top 1 row



In [13]:
# .withColumn to transform columns inplace
df = (
    df.withColumn('case_late', col('case_late') == 'YES')
    .withColumn('case_closed', col('case_closed') == 'YES')
)

In [14]:
df.show(2, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 case_due_date        | 9/26/20 0:42         
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 1/1/18 0:46          
 case_closed_date     | 1/3/18 8:11          
 case_due_date        | 1/5/18 8:30          
 case_late            | false                
 num_days_late        | -2.0126041

In [15]:
df.select('council_district').show()

+----------------+
|council_district|
+----------------+
|               5|
|               3|
|               3|
|               3|
|               7|
|               7|
|               4|
|               4|
|               4|
|               4|
|               4|
|               4|
|               4|
|               4|
|               4|
|               4|
|               4|
|               4|
|               4|
|               4|
+----------------+
only showing top 20 rows



In [16]:
df.groupby('council_district').count().show()

+----------------+------+
|council_district| count|
+----------------+------+
|               1|119309|
|               6| 74095|
|               3|102706|
|               5|114609|
|               9| 40916|
|               4| 93778|
|               8| 42345|
|               7| 72445|
|              10| 62926|
|               2|114745|
|               0|  3830|
+----------------+------+



In [17]:
df.show(5)

+----------+----------------+----------------+-------------+---------+-------------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|   case_id|case_opened_date|case_closed_date|case_due_date|case_late|      num_days_late|case_closed|   dept_division|service_request_type|   SLA_days|case_status|source_id|     request_address|council_district|
+----------+----------------+----------------+-------------+---------+-------------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|1014127332|     1/1/18 0:42|    1/1/18 12:29| 9/26/20 0:42|    false| -998.5087616000001|       true|Field Operations|        Stray Animal|      999.0|     Closed| svcCRMLS|2315  EL PASO ST,...|               5|
|1014127333|     1/1/18 0:46|     1/3/18 8:11|  1/5/18 8:30|    false|-2.0126041669999997|       true|     Storm Water|Removal Of Obstru...|4.322222

In [18]:
df = df.withColumn('council_district', format_string('%04d', col('council_district')))

In [19]:
df.select('case_opened_date', 'case_closed_date', 'case_due_date').show()

+----------------+----------------+-------------+
|case_opened_date|case_closed_date|case_due_date|
+----------------+----------------+-------------+
|     1/1/18 0:42|    1/1/18 12:29| 9/26/20 0:42|
|     1/1/18 0:46|     1/3/18 8:11|  1/5/18 8:30|
|     1/1/18 0:48|     1/2/18 7:57|  1/5/18 8:30|
|     1/1/18 1:29|     1/2/18 8:13| 1/17/18 8:30|
|     1/1/18 1:34|    1/1/18 13:29|  1/1/18 4:34|
|     1/1/18 6:28|    1/1/18 14:38| 1/31/18 8:30|
|     1/1/18 6:57|    1/2/18 15:32| 1/17/18 8:30|
|     1/1/18 6:58|    1/2/18 15:32| 1/17/18 8:30|
|     1/1/18 6:58|    1/2/18 15:32| 1/17/18 8:30|
|     1/1/18 6:59|    1/2/18 15:32| 1/17/18 8:30|
|     1/1/18 7:00|    1/2/18 15:32| 1/17/18 8:30|
|     1/1/18 7:02|    1/2/18 15:32| 1/17/18 8:30|
|     1/1/18 7:02|    1/2/18 15:33| 1/17/18 8:30|
|     1/1/18 7:03|    1/2/18 15:32| 1/17/18 8:30|
|     1/1/18 7:04|    1/2/18 15:33| 1/17/18 8:30|
|     1/1/18 7:04|    1/2/18 15:33| 1/17/18 8:30|
|     1/1/18 7:05|    1/2/18 15:33| 1/17/18 8:30|


[Java SimpleDateFormat](https://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html)

In [20]:
df = (
    df.withColumn('case_opened_date', to_timestamp(col('case_opened_date'), 'M/d/yy H:mm'))
    .withColumn('case_closed_date', to_timestamp(col('case_closed_date'), 'M/d/yy H:mm'))
    .withColumn('case_due_date', to_timestamp(col('case_due_date'), 'M/d/yy H:mm'))
)

In [21]:
df.show(3, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 0005                 
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 2018-01-01 00:46:00  
 case_closed_date     | 2018-01-03 08:11:00  
 case_due_date        | 2018-01-05 08:30:00  
 case_late            | false                
 num_days_late        | -2.0126041

In [22]:
df.dtypes

[('case_id', 'int'),
 ('case_opened_date', 'timestamp'),
 ('case_closed_date', 'timestamp'),
 ('case_due_date', 'timestamp'),
 ('case_late', 'boolean'),
 ('num_days_late', 'double'),
 ('case_closed', 'boolean'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'string')]

In [23]:
'    a   b    c    '.strip()

'a   b    c'

In [24]:
df = df.withColumn('request_address', lower(trim(col('request_address'))))

In [25]:
df.select('request_address').show(truncate=False)

+----------------------------------------+
|request_address                         |
+----------------------------------------+
|2315  el paso st, san antonio, 78207    |
|2215  goliad rd, san antonio, 78223     |
|102  palfrey st w, san antonio, 78223   |
|114  la garde st, san antonio, 78223    |
|734  clearview dr, san antonio, 78228   |
|bandera rd and bresnahan                |
|10133  figaro canyon, san antonio, 78251|
|10133  figaro canyon, san antonio, 78251|
|10133  figaro canyon, san antonio, 78251|
|10133  figaro canyon, san antonio, 78251|
|10133  figaro canyon, san antonio, 78251|
|10133  figaro canyon, san antonio, 78251|
|10129  boxing pass, san antonio, 78251  |
|10129  boxing pass, san antonio, 78251  |
|10129  boxing pass, san antonio, 78251  |
|834  barrel point, san antonio, 78251   |
|834  barrel point, san antonio, 78251   |
|834  barrel point, san antonio, 78251   |
|834  barrel point, san antonio, 78251   |
|834  barrel point, san antonio, 78251   |
+----------

In [26]:
(
    # 0 is the capture group, 0th capture group is the entire match
    df.withColumn('zipcode', regexp_extract(col('request_address'), r'\d+$', 0))
    .select('zipcode', 'request_address')
    .show(truncate=False)
)

+-------+----------------------------------------+
|zipcode|request_address                         |
+-------+----------------------------------------+
|78207  |2315  el paso st, san antonio, 78207    |
|78223  |2215  goliad rd, san antonio, 78223     |
|78223  |102  palfrey st w, san antonio, 78223   |
|78223  |114  la garde st, san antonio, 78223    |
|78228  |734  clearview dr, san antonio, 78228   |
|       |bandera rd and bresnahan                |
|78251  |10133  figaro canyon, san antonio, 78251|
|78251  |10133  figaro canyon, san antonio, 78251|
|78251  |10133  figaro canyon, san antonio, 78251|
|78251  |10133  figaro canyon, san antonio, 78251|
|78251  |10133  figaro canyon, san antonio, 78251|
|78251  |10133  figaro canyon, san antonio, 78251|
|78251  |10129  boxing pass, san antonio, 78251  |
|78251  |10129  boxing pass, san antonio, 78251  |
|78251  |10129  boxing pass, san antonio, 78251  |
|78251  |834  barrel point, san antonio, 78251   |
|78251  |834  barrel point, san

In [27]:
df = df.withColumn('zipcode', regexp_extract(col('request_address'), r'\d+$', 0))

In [28]:
df.show(3, vertical=True, truncate=False)

-RECORD 0-----------------------------------------------------
 case_id              | 1014127332                            
 case_opened_date     | 2018-01-01 00:42:00                   
 case_closed_date     | 2018-01-01 12:29:00                   
 case_due_date        | 2020-09-26 00:42:00                   
 case_late            | false                                 
 num_days_late        | -998.5087616000001                    
 case_closed          | true                                  
 dept_division        | Field Operations                      
 service_request_type | Stray Animal                          
 SLA_days             | 999.0                                 
 case_status          | Closed                                
 source_id            | svcCRMLS                              
 request_address      | 2315  el paso st, san antonio, 78207  
 council_district     | 0005                                  
 zipcode              | 78207                          

Case Lifetime: how long it took to close the case or how long the case has been open

In [29]:
(
    df.withColumn('case_age', datediff(current_timestamp(), 'case_opened_date'))
    .withColumn('days_to_closed', datediff('case_closed_date', 'case_opened_date'))
    .withColumn('case_lifetime', when(col('case_closed'), col('days_to_closed')).otherwise(col('case_age')))
    .filter(~ col('case_closed'))
    .select('case_opened_date', 'case_closed_date', 'case_age', 'days_to_closed', 'case_lifetime')
    .show()
)

+-------------------+----------------+--------+--------------+-------------+
|   case_opened_date|case_closed_date|case_age|days_to_closed|case_lifetime|
+-------------------+----------------+--------+--------------+-------------+
|2018-01-02 09:39:00|            null|     888|          null|          888|
|2018-01-02 10:49:00|            null|     888|          null|          888|
|2018-01-02 13:45:00|            null|     888|          null|          888|
|2018-01-02 14:09:00|            null|     888|          null|          888|
|2018-01-02 14:34:00|            null|     888|          null|          888|
|2018-01-02 15:22:00|            null|     888|          null|          888|
|2018-01-02 15:58:00|            null|     888|          null|          888|
|2018-01-03 08:04:00|            null|     887|          null|          887|
|2018-01-03 09:18:00|            null|     887|          null|          887|
|2018-01-03 09:26:00|            null|     887|          null|          887|

In [30]:
df = (
    df.withColumn('case_age', datediff(current_timestamp(), 'case_opened_date'))
    .withColumn('days_to_closed', datediff('case_closed_date', 'case_opened_date'))
    .withColumn('case_lifetime', when(col('case_closed'), col('days_to_closed')).otherwise(col('case_age')))
    .drop('case_age', 'days_to_closed')
)

In [31]:
df.show(3, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  el paso st,... 
 council_district     | 0005                 
 zipcode              | 78207                
 case_lifetime        | 0                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 2018-01-01 00:46:00  
 case_closed_date     | 2018-01-03 08:11:00  
 case_due_date        | 2018-01-05

In [32]:
df.sample(fraction=.01).show(5, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127576           
 case_opened_date     | 2018-01-01 14:51:00  
 case_closed_date     | 2018-01-21 21:08:00  
 case_due_date        | 2018-01-06 14:51:00  
 case_late            | true                 
 num_days_late        | 15.26155093          
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Animal Neglect       
 SLA_days             | 5.0                  
 case_status          | Closed               
 source_id            | 124405               
 request_address      | 342  freiling dr,... 
 council_district     | 0001                 
 zipcode              | 78213                
 case_lifetime        | 20                   
-RECORD 1------------------------------------
 case_id              | 1014127648           
 case_opened_date     | 2018-01-01 16:57:00  
 case_closed_date     | 2018-01-02 16:13:00  
 case_due_date        | 2018-01-06

In [33]:
dept = spark.read.csv('data/dept.csv', header=True, inferSchema=True)
dept.show()

+--------------------+--------------------+----------------------+-------------------+
|       dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+--------------------+--------------------+----------------------+-------------------+
|     311 Call Center|    Customer Service|      Customer Service|                YES|
|               Brush|Solid Waste Manag...|           Solid Waste|                YES|
|     Clean and Green|Parks and Recreation|    Parks & Recreation|                YES|
|Clean and Green N...|Parks and Recreation|    Parks & Recreation|                YES|
|    Code Enforcement|Code Enforcement ...|  DSD/Code Enforcement|                YES|
|Code Enforcement ...|Code Enforcement ...|  DSD/Code Enforcement|                YES|
|Code Enforcement ...|                null|  DSD/Code Enforcement|                YES|
|   Dangerous Premise|Code Enforcement ...|  DSD/Code Enforcement|                YES|
|Dangerous Premise...|Code Enforcement ...|

In [34]:
df = df.join(dept, 'dept_division', 'left')

In [35]:
df

DataFrame[dept_division: string, case_id: int, case_opened_date: timestamp, case_closed_date: timestamp, case_due_date: timestamp, case_late: boolean, num_days_late: double, case_closed: boolean, service_request_type: string, SLA_days: double, case_status: string, source_id: string, request_address: string, council_district: string, zipcode: string, case_lifetime: int, dept_name: string, standardized_dept_name: string, dept_subject_to_SLA: string]

In [37]:
# data splitting with spark

train, test = df.randomSplit([.8, .2])

In [38]:
train.count()

673484

In [39]:
test.count()

168220

In [46]:
train, validate, test = df.randomSplit([.7, .15, .15], seed=123)