# Data Wrangling

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.getOrCreate()

22/05/20 10:29:26 WARN Utils: Your hostname, Zach-Guldes-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.10.2.175 instead (on interface en0)
22/05/20 10:29:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/20 10:29:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Data comes from calls to San Antonio's 311 call center and can be found in the google classroom.

## Reading Data

In [2]:
df = spark.read.csv("source.csv", sep=",", header=True, inferSchema=True)

Like most things spark, there's multiple ways to do it:

In [3]:
(
    spark.read.format("csv")
    .option("sep", ",")
    .option("inferSchema", True)
    .option("header", True)
    .load("source.csv")
)

DataFrame[source_id: string, source_username: string]

In [4]:
df.show()

+---------+--------------------+
|source_id|     source_username|
+---------+--------------------+
|   100137|    Merlene Blodgett|
|   103582|         Carmen Cura|
|   106463|     Richard Sanchez|
|   119403|      Betty De Hoyos|
|   119555|      Socorro Quiara|
|   119868| Michelle San Miguel|
|   120752|      Eva T. Kleiber|
|   124405|           Lori Lara|
|   132408|       Leonard Silva|
|   135723|        Amy Cardenas|
|   136202|    Michelle Urrutia|
|   136979|      Leticia Garcia|
|   137943|    Pamela K. Baccus|
|   138605|        Marisa Ozuna|
|   138650|      Kimberly Green|
|   138650|Kimberly Green-Woods|
|   138793| Guadalupe Rodriguez|
|   138810|       Tawona Martin|
|   139342|     Jessica Mendoza|
|   139344|        Isis Mendoza|
+---------+--------------------+
only showing top 20 rows



### Data Schemas

Common Spark Data Types:

- `StringType`
- `IntegerType`
- `FloatType`

StuctType and StructField wrap the above.

- `StructType`
    - `StructField`: `StringType`
    - `StructField`: `IntegerType`

In [5]:
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType(
    [
        StructField("source_id", StringType()),
        StructField("source_username", StringType()),
    ]
)

spark.read.csv("source.csv", header=True, schema=schema)

DataFrame[source_id: string, source_username: string]

### Writing Data

In [6]:
# for demo purposes
from pydataset import data

mpg = spark.createDataFrame(data("mpg"))

mpg.write.json("mpg_json", mode="overwrite")

# Or...
(
    mpg.write.format("csv")
    .mode("overwrite")
    .option("header", "true")
    .save("mpg_csv")
)

                                                                                

In [7]:
spark.read.json("mpg_json").count()

234

## Data Preparation

In [8]:
df = spark.read.csv("case.csv", header=True, inferSchema=True)
df.show(2, vertical=True, truncate=False)

                                                                                

-RECORD 0----------------------------------------------------
 case_id              | 1014127332                           
 case_opened_date     | 1/1/18 0:42                          
 case_closed_date     | 1/1/18 12:29                         
 SLA_due_date         | 9/26/20 0:42                         
 case_late            | NO                                   
 num_days_late        | -998.5087616000001                   
 case_closed          | YES                                  
 dept_division        | Field Operations                     
 service_request_type | Stray Animal                         
 SLA_days             | 999.0                                
 case_status          | Closed                               
 source_id            | svcCRMLS                             
 request_address      | 2315  EL PASO ST, San Antonio, 78207 
 council_district     | 5                                    
-RECORD 1----------------------------------------------------
 case_id

In [9]:
df.limit(3).toPandas()

Unnamed: 0,case_id,case_opened_date,case_closed_date,SLA_due_date,case_late,num_days_late,case_closed,dept_division,service_request_type,SLA_days,case_status,source_id,request_address,council_district
0,1014127332,1/1/18 0:42,1/1/18 12:29,9/26/20 0:42,NO,-998.508762,YES,Field Operations,Stray Animal,999.0,Closed,svcCRMLS,"2315 EL PASO ST, San Antonio, 78207",5
1,1014127333,1/1/18 0:46,1/3/18 8:11,1/5/18 8:30,NO,-2.012604,YES,Storm Water,Removal Of Obstruction,4.322222,Closed,svcCRMSS,"2215 GOLIAD RD, San Antonio, 78223",3
2,1014127334,1/1/18 0:48,1/2/18 7:57,1/5/18 8:30,NO,-3.022338,YES,Storm Water,Removal Of Obstruction,4.320729,Closed,svcCRMSS,"102 PALFREY ST W, San Antonio, 78223",3


### Renaming Columns

In [10]:
df = df.withColumnRenamed("SLA_due_date", "case_due_date")

In [11]:
df

DataFrame[case_id: int, case_opened_date: string, case_closed_date: string, case_due_date: string, case_late: string, num_days_late: double, case_closed: string, dept_division: string, service_request_type: string, SLA_days: double, case_status: string, source_id: string, request_address: string, council_district: int]

### Casting Data Types

In [12]:
# demonstrating we only have yes/no in each field
df.groupBy("case_closed", "case_late").count().show()

[Stage 15:>                                                       (0 + 10) / 10]

+-----------+---------+------+
|case_closed|case_late| count|
+-----------+---------+------+
|         NO|      YES|  6525|
|        YES|      YES| 87978|
|         NO|       NO| 11585|
|        YES|       NO|735616|
+-----------+---------+------+



                                                                                

In [13]:
expr('case_closed == "YES"')

Column<'(case_closed = YES)'>

In [14]:
df = df.withColumn("case_closed", expr('case_closed == "YES"')).withColumn(
    "case_late", expr('case_late == "YES"')
)

df.select("case_closed", "case_late").show(5)

+-----------+---------+
|case_closed|case_late|
+-----------+---------+
|       true|    false|
|       true|    false|
|       true|    false|
|       true|    false|
|       true|     true|
+-----------+---------+
only showing top 5 rows



In [15]:
df.groupBy("council_district").count().show()

[Stage 19:>                                                       (0 + 10) / 10]

+----------------+------+
|council_district| count|
+----------------+------+
|               1|119309|
|               6| 74095|
|               3|102706|
|               5|114609|
|               9| 40916|
|               4| 93778|
|               8| 42345|
|               7| 72445|
|              10| 62926|
|               2|114745|
|               0|  3830|
+----------------+------+



                                                                                

In [16]:
df = df.withColumn("council_district", col("council_district").cast("string"))

In [17]:
df.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- case_due_date: string (nullable = true)
 |-- case_late: boolean (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: boolean (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = true)



In [18]:
# taking the average of a boolean column
# requires an explicit type change in spark
# (pandas converts automatically under the hood)

case_closed_as_1s_and_0s = when(df.case_closed, 1).otherwise(0)

df.select(mean(case_closed_as_1s_and_0s)).show()

[Stage 22:>                                                       (0 + 10) / 10]

+--------------------------------------------+
|avg(CASE WHEN case_closed THEN 1 ELSE 0 END)|
+--------------------------------------------+
|                          0.9784841226844592|
+--------------------------------------------+



                                                                                

In [19]:
df.show(2, vertical=True, truncate=False)

-RECORD 0----------------------------------------------------
 case_id              | 1014127332                           
 case_opened_date     | 1/1/18 0:42                          
 case_closed_date     | 1/1/18 12:29                         
 case_due_date        | 9/26/20 0:42                         
 case_late            | false                                
 num_days_late        | -998.5087616000001                   
 case_closed          | true                                 
 dept_division        | Field Operations                     
 service_request_type | Stray Animal                         
 SLA_days             | 999.0                                
 case_status          | Closed                               
 source_id            | svcCRMLS                             
 request_address      | 2315  EL PASO ST, San Antonio, 78207 
 council_district     | 5                                    
-RECORD 1----------------------------------------------------
 case_id

`to_timestamp` for date conversion

[Java's `SimpleDateFormat`][1] for the format string

[1]: https://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html

In [20]:
print("--- Before handling dates")
df.select("case_opened_date", "case_closed_date", "case_due_date").show(5)

fmt = "M/d/yy H:mm"
df = (
    df.withColumn("case_opened_date", to_timestamp("case_opened_date", fmt))
#     .withColumn("case_closed_date", to_timestamp("case_opened_date", fmt))
     .withColumn("case_closed_date", to_timestamp("case_closed_date", fmt))
    .withColumn("case_due_date", to_timestamp("case_due_date", fmt))
)

print("--- After")
df.select("case_opened_date", "case_closed_date", "case_due_date").show(5)

--- Before handling dates
+----------------+----------------+-------------+
|case_opened_date|case_closed_date|case_due_date|
+----------------+----------------+-------------+
|     1/1/18 0:42|    1/1/18 12:29| 9/26/20 0:42|
|     1/1/18 0:46|     1/3/18 8:11|  1/5/18 8:30|
|     1/1/18 0:48|     1/2/18 7:57|  1/5/18 8:30|
|     1/1/18 1:29|     1/2/18 8:13| 1/17/18 8:30|
|     1/1/18 1:34|    1/1/18 13:29|  1/1/18 4:34|
+----------------+----------------+-------------+
only showing top 5 rows

--- After
+-------------------+-------------------+-------------------+
|   case_opened_date|   case_closed_date|      case_due_date|
+-------------------+-------------------+-------------------+
|2018-01-01 00:42:00|2018-01-01 12:29:00|2018-01-01 00:42:00|
|2018-01-01 00:46:00|2018-01-03 08:11:00|2018-01-01 00:46:00|
|2018-01-01 00:48:00|2018-01-02 07:57:00|2018-01-01 00:48:00|
|2018-01-01 01:29:00|2018-01-02 08:13:00|2018-01-01 01:29:00|
|2018-01-01 01:34:00|2018-01-01 13:29:00|2018-01-01 01:

### Data Transformations

Normalize the address

In [21]:
print("--- Before")
df.select("request_address").show(5)

df = df.withColumn("request_address", trim(lower(df.request_address)))

print("--- After")
df.select("request_address").show(5)

--- Before
+--------------------+
|     request_address|
+--------------------+
|2315  EL PASO ST,...|
|2215  GOLIAD RD, ...|
|102  PALFREY ST W...|
|114  LA GARDE ST,...|
|734  CLEARVIEW DR...|
+--------------------+
only showing top 5 rows

--- After
+--------------------+
|     request_address|
+--------------------+
|2315  el paso st,...|
|2215  goliad rd, ...|
|102  palfrey st w...|
|114  la garde st,...|
|734  clearview dr...|
+--------------------+
only showing top 5 rows



Number of days to number of weeks

In [22]:
df = df.withColumn(
    "num_weeks_late", expr("num_days_late / 7 AS num_weeks_late")
)

df.select("num_days_late", "num_weeks_late").show(5)

+-------------------+--------------------+
|      num_days_late|      num_weeks_late|
+-------------------+--------------------+
| -998.5087616000001|        -142.6441088|
|-2.0126041669999997|-0.28751488099999994|
|       -3.022337963|-0.43176256614285713|
|       -15.01148148| -2.1444973542857144|
|0.37216435200000003|         0.053166336|
+-------------------+--------------------+
only showing top 5 rows



String formatting

- 1 -> 0001
- 3 -> 0003
- 12 -> 0012

In [23]:
# '%03d' means at least 3 digits, pad with 0sk
#
# In order to use the format_string function the way we are, we'll need to
# convert council_district back to an integer temporarily, but the final output
# will be a string.
df = df.withColumn(
    "council_district",
    format_string("%03d", col("council_district").cast("int")),
)

df.select("council_district").show(5)

+----------------+
|council_district|
+----------------+
|             005|
|             003|
|             003|
|             003|
|             007|
+----------------+
only showing top 5 rows



In [24]:
df.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- case_due_date: timestamp (nullable = true)
 |-- case_late: boolean (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: boolean (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = false)
 |-- num_weeks_late: double (nullable = true)



In [25]:
df.show(2, vertical=True, truncate=False)

-RECORD 0----------------------------------------------------
 case_id              | 1014127332                           
 case_opened_date     | 2018-01-01 00:42:00                  
 case_closed_date     | 2018-01-01 12:29:00                  
 case_due_date        | 2018-01-01 00:42:00                  
 case_late            | false                                
 num_days_late        | -998.5087616000001                   
 case_closed          | true                                 
 dept_division        | Field Operations                     
 service_request_type | Stray Animal                         
 SLA_days             | 999.0                                
 case_status          | Closed                               
 source_id            | svcCRMLS                             
 request_address      | 2315  el paso st, san antonio, 78207 
 council_district     | 005                                  
 num_weeks_late       | -142.6441088                         
-RECORD 

### New Features

In [26]:
df = df.withColumn("zipcode", regexp_extract("request_address", r"\d+$", 0))

df.select("zipcode").show(5)

+-------+
|zipcode|
+-------+
|  78207|
|  78223|
|  78223|
|  78223|
|  78228|
+-------+
only showing top 5 rows



In [27]:
df.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- case_due_date: timestamp (nullable = true)
 |-- case_late: boolean (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: boolean (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = false)
 |-- num_weeks_late: double (nullable = true)
 |-- zipcode: string (nullable = true)



- `case_age`: How old the case is; difference between case opened date and today
- `days_to_closed`: The number of days between when the case was opened and when it was closed
- `case_lifetime`: Number of days between when the case was opened and when it was closed, if the case is still open, the number of days since the case was opened

In [28]:
df = (
    df.withColumn(
        "case_age", datediff(current_timestamp(), "case_opened_date")
    )
    .withColumn(
        "days_to_closed", datediff("case_closed_date", "case_opened_date")
    )
    .withColumn(
        "case_lifetime",
        when(expr("! case_closed"), col("case_age")).otherwise(
            col("days_to_closed")
        ),
    )
)

df.select(
    "case_closed",
    "case_opened_date",
    "case_closed_date",
    "case_age",
    "days_to_closed",
    "case_lifetime",
).where(expr("case_closed")).show(5)

df.select(
    "case_closed",
    "case_opened_date",
    "case_closed_date",
    "case_age",
    "days_to_closed",
    "case_lifetime",
).where(expr("! case_closed")).show(5)

+-----------+-------------------+-------------------+--------+--------------+-------------+
|case_closed|   case_opened_date|   case_closed_date|case_age|days_to_closed|case_lifetime|
+-----------+-------------------+-------------------+--------+--------------+-------------+
|       true|2018-01-01 00:42:00|2018-01-01 12:29:00|    1600|             0|            0|
|       true|2018-01-01 00:46:00|2018-01-03 08:11:00|    1600|             2|            2|
|       true|2018-01-01 00:48:00|2018-01-02 07:57:00|    1600|             1|            1|
|       true|2018-01-01 01:29:00|2018-01-02 08:13:00|    1600|             1|            1|
|       true|2018-01-01 01:34:00|2018-01-01 13:29:00|    1600|             0|            0|
+-----------+-------------------+-------------------+--------+--------------+-------------+
only showing top 5 rows

+-----------+-------------------+----------------+--------+--------------+-------------+
|case_closed|   case_opened_date|case_closed_date|case_age

### Joining Department Data

In [37]:
df.show(1, vertical=True, truncate=False)

-RECORD 0----------------------------------------------------
 case_id              | 1014127332                           
 case_opened_date     | 2018-01-01 00:42:00                  
 case_closed_date     | 2018-01-01 12:29:00                  
 case_due_date        | 2018-01-01 00:42:00                  
 case_late            | false                                
 num_days_late        | -998.5087616000001                   
 case_closed          | true                                 
 service_request_type | Stray Animal                         
 SLA_days             | 999.0                                
 case_status          | Closed                               
 source_id            | svcCRMLS                             
 request_address      | 2315  el paso st, san antonio, 78207 
 council_district     | 005                                  
 num_weeks_late       | -142.6441088                         
 zipcode              | 78207                                
 case_ag

In [36]:
dept = spark.read.csv("dept.csv", header=True, inferSchema=True)
dept.show(4, truncate=False)

+-----------------------------+----------------------+----------------------+-------------------+
|dept_division                |dept_name             |standardized_dept_name|dept_subject_to_SLA|
+-----------------------------+----------------------+----------------------+-------------------+
|311 Call Center              |Customer Service      |Customer Service      |YES                |
|Brush                        |Solid Waste Management|Solid Waste           |YES                |
|Clean and Green              |Parks and Recreation  |Parks & Recreation    |YES                |
|Clean and Green Natural Areas|Parks and Recreation  |Parks & Recreation    |YES                |
+-----------------------------+----------------------+----------------------+-------------------+
only showing top 4 rows



Join using `dept_division`.

In [32]:
# .join(other_spark_df, join_column, how)
(
    df.join(dept, "dept_division", "left")
    .drop(dept.dept_division)
    .drop(dept.dept_name)
    .drop(df.dept_division)
    .withColumnRenamed("standardized_dept_name", "department")
    .withColumn("dept_subject_to_SLA", col("dept_subject_to_SLA") == "YES")
    .show(2, vertical=True, truncate=False)
)

-RECORD 0----------------------------------------------------
 case_id              | 1014127332                           
 case_opened_date     | 2018-01-01 00:42:00                  
 case_closed_date     | 2018-01-01 12:29:00                  
 case_due_date        | 2018-01-01 00:42:00                  
 case_late            | false                                
 num_days_late        | -998.5087616000001                   
 case_closed          | true                                 
 service_request_type | Stray Animal                         
 SLA_days             | 999.0                                
 case_status          | Closed                               
 source_id            | svcCRMLS                             
 request_address      | 2315  el paso st, san antonio, 78207 
 council_district     | 005                                  
 num_weeks_late       | -142.6441088                         
 zipcode              | 78207                                
 case_ag

In [33]:
df = (
    df
    # left join on dept_division
    .join(dept, "dept_division", "left")
    # drop all the columns except for standardized name, as it has much fewer unique values
    .drop(dept.dept_division)
    .drop(dept.dept_name)
    .drop(df.dept_division)
    .withColumnRenamed("standardized_dept_name", "department")
    # convert to a boolean
    .withColumn("dept_subject_to_SLA", col("dept_subject_to_SLA") == "YES")
)

df.show(2, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2018-01-01 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  el paso st,... 
 council_district     | 005                  
 num_weeks_late       | -142.6441088         
 zipcode              | 78207                
 case_age             | 1600                 
 days_to_closed       | 0                    
 case_lifetime        | 0                    
 department           | Animal Care Services 
 dept_subject_to_SLA  | true                 
-RECORD 1-------------------------

## Train Test Split

In [38]:
df.count()

841704

In [34]:
train, test = df.randomSplit([0.8, 0.2], seed=123)

print('train', train.count(), 'x', len(train.columns))
print('test', test.count(), 'x', len(test.columns))

                                                                                

train 674266 x 20




test 167438 x 20


                                                                                

In [35]:
train, validate, test = df.randomSplit([0.6, 0.2, 0.2], seed=123)

print('train', train.count(), 'x', len(train.columns))
print('validate', validate.count(), 'x', len(validate.columns))
print('test', test.count(), 'x', len(test.columns))

                                                                                

train 505800 x 20


                                                                                

validate 168466 x 20


[Stage 62:=====>                                                   (1 + 9) / 10]

test 167438 x 20


                                                                                

## Extras

- a function for cleaning up column names
- referring to columns that are created as part of method chaining (within .select, also df.column)

In [40]:
import pandas as pd

In [41]:
df = spark.createDataFrame(pd.DataFrame({'x': [1, 2, 3]}))
df.show()

+---+
|  x|
+---+
|  1|
|  2|
|  3|
+---+



In [43]:
df.select(df.x, (df.x + 1).alias('y')).withColumn('z', col('y') * 2).show()

+---+---+---+
|  x|  y|  z|
+---+---+---+
|  1|  2|  4|
|  2|  3|  6|
|  3|  4|  8|
+---+---+---+



In [44]:
df.select(df.x, (df.x + 1).alias('y'), col('y') * 2).show()

AnalysisException: cannot resolve 'y' given input columns: [x];
'Project [x#1877L, (x#1877L + cast(1 as bigint)) AS y#1916L, unresolvedalias(('y * 2), Some(org.apache.spark.sql.Column$$Lambda$4392/0x000000080172c040@1d1d5eaa))]
+- LogicalRDD [x#1877L], false


In [47]:
df

DataFrame[x: bigint]

In [46]:
(
    df.withColumn('y', col('x') + 1)
    .withColumn('z', df.y * 2)
    .show()
)

AttributeError: 'DataFrame' object has no attribute 'y'

Takeaway: when chaining methods and creating columns, use the `col()` function or `expr`, not `df.column_name`.