## Data Wrangling

In this lesson, we will acquire and prepare the data we will use in the rest of this module.

In [41]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.getOrCreate()

## Reading Data

Spark lets us read data in from a variety of data sources using what it calls a `DataFrameReader`. We can access the read property of our spark object and then set various options and read from a data source.

In [42]:
df = spark.read.csv("source.csv", sep=",", header=True, inferSchema=True)
df

DataFrame[source_id: string, source_username: string]

In [43]:
(
    spark.read.format("csv")
    .option("sep", ",")
    .option("inferSchema", True)
    .option("header", True)
    .load("source.csv")
)

DataFrame[source_id: string, source_username: string]

### Data Schemas
Spark includes a concept of **a data schema**, which is a way to specify the types of our data ahead of time. Doing so lets us be sure about the structure of our data, and can significantly increase the speed of loading data (inferring the schema can be a costly operation for large datasets).

We'll import several things from the `pyspark.sql.types` module:

- StringType
- DoubleType
- IntegerType
- LongType
- ShortType
- TimestampType
- FloatType
- DateType

All of the above types will go inside of a `StructField`, which will be encapsulated in a `StructType`, and the resulting object will represent our data schema.

In [44]:
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType(
    [
        StructField("source_id", StringType()),
        StructField("source_username", StringType()),
    ]
)

spark.read.csv("source.csv", header=True, schema=schema)

DataFrame[source_id: string, source_username: string]

Notice that instead of `inferSchema=True`, we pass the `schema` object in to the `read.csv` call.

### Writing Data

A spark dataframe can be written to a local destination using the `.write` property. Several common output formats are:

- `csv`: for writing to a local csv file(s)
- `parquet`: Parquet is a very popular columnar storage format for Hadoop.
- `json`: for writing to a local json file(s)
- `jdbc`: for writing to a SQL database table

In [45]:
# # for demo purposes
# from pydataset import data

# mpg = spark.createDataFrame(data("mpg"))

# mpg.write.json("data/mpg_json", mode="overwrite")

# # like much else in spark, there's multiple ways we could do this:
# (
#     mpg.write.format("csv")
#     .mode("overwrite")
#     .option("header", "true")
#     .save("data/mpg_csv")
# )

### Data Prep

In [46]:
df = spark.read.csv('case.csv', header=True, inferSchema=True)
df.printSchema()
df.show(2, vertical=True)

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- SLA_due_date: string (nullable = true)
 |-- case_late: string (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: integer (nullable = true)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 SLA_due_date         | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  


In [47]:
# Rename the column and reassign 

df = df.withColumnRenamed("SLA_due_date", "case_due_date")
df.show(2, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 case_due_date        | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 1/1/18 0:46          
 case_closed_date     | 1/3/18 8:11          
 case_due_date        | 1/5/18 8:30          
 case_late            | NO                   
 num_days_late        | -2.0126041

In [48]:
df.explain()

== Physical Plan ==
*(1) Project [case_id#1296, case_opened_date#1297, case_closed_date#1298, SLA_due_date#1299 AS case_due_date#1395, case_late#1300, num_days_late#1301, case_closed#1302, dept_division#1303, service_request_type#1304, SLA_days#1305, case_status#1306, source_id#1307, request_address#1308, council_district#1309]
+- FileScan csv [case_id#1296,case_opened_date#1297,case_closed_date#1298,SLA_due_date#1299,case_late#1300,num_days_late#1301,case_closed#1302,dept_division#1303,service_request_type#1304,SLA_days#1305,case_status#1306,source_id#1307,request_address#1308,council_district#1309] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/Users/mms3-pro/codeup-data-science/spark-exercises/case.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<case_id:int,case_opened_date:string,case_closed_date:string,SLA_due_date:string,case_late:...




In [49]:
# Verify case_late and case_closed only contain yes and no values

df.groupBy('case_late', 'case_closed').count().show()

+---------+-----------+------+
|case_late|case_closed| count|
+---------+-----------+------+
|       NO|        YES|735616|
|      YES|        YES| 87978|
|       NO|         NO| 11585|
|      YES|         NO|  6525|
+---------+-----------+------+



In [50]:
# Convert the Yes/No to boolean
df = df.withColumn("case_late", expr("case_late == 'Yes'"))

In [51]:
# Convert the Yes/No to boolean
df = df.withColumn("case_closed", expr("case_closed == 'Yes'"))

In [52]:
df.printSchema()
df.show(2, vertical=True)

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- case_due_date: string (nullable = true)
 |-- case_late: boolean (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: boolean (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: integer (nullable = true)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 case_due_date        | 9/26/20 0:42         
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | false              

In [53]:
df = df.withColumn(("council_district"), format_string("%03d", col("council_district")))
df.show(2, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 case_due_date        | 9/26/20 0:42         
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | false                
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 005                  
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 1/1/18 0:46          
 case_closed_date     | 1/3/18 8:11          
 case_due_date        | 1/5/18 8:30          
 case_late            | false                
 num_days_late        | -2.0126041

In [54]:
df.explain()

== Physical Plan ==
*(1) Project [case_id#1296, case_opened_date#1297, case_closed_date#1298, SLA_due_date#1299 AS case_due_date#1395, (case_late#1300 = Yes) AS case_late#1517, num_days_late#1301, (case_closed#1302 = Yes) AS case_closed#1532, dept_division#1303, service_request_type#1304, SLA_days#1305, case_status#1306, source_id#1307, request_address#1308, format_string(%03d, council_district#1309) AS council_district#1618]
+- FileScan csv [case_id#1296,case_opened_date#1297,case_closed_date#1298,SLA_due_date#1299,case_late#1300,num_days_late#1301,case_closed#1302,dept_division#1303,service_request_type#1304,SLA_days#1305,case_status#1306,source_id#1307,request_address#1308,council_district#1309] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/Users/mms3-pro/codeup-data-science/spark-exercises/case.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<case_id:int,case_opened_date:string,case_closed_date:string,SLA_due_date:string,case_late:

In [55]:
# Java simple data format

fmt = "M/d/yy H:mm"

df.select(
    "case_opened_date", 
    to_timestamp("case_opened_date", fmt)
).show(5)

+----------------+-----------------------------------------------+
|case_opened_date|to_timestamp(`case_opened_date`, 'M/d/yy H:mm')|
+----------------+-----------------------------------------------+
|     1/1/18 0:42|                            2018-01-01 00:42:00|
|     1/1/18 0:46|                            2018-01-01 00:46:00|
|     1/1/18 0:48|                            2018-01-01 00:48:00|
|     1/1/18 1:29|                            2018-01-01 01:29:00|
|     1/1/18 1:34|                            2018-01-01 01:34:00|
+----------------+-----------------------------------------------+
only showing top 5 rows



In [56]:
df = df.withColumn("case_opened_date", to_timestamp("case_opened_date", fmt))
df = df.withColumn("case_closed_date", to_timestamp("case_closed_date", fmt))
df = df.withColumn("case_due_date", to_timestamp("case_due_date", fmt))

In [57]:
df.printSchema()
df.show(2, vertical=True)

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- case_due_date: timestamp (nullable = true)
 |-- case_late: boolean (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: boolean (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = false)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | false     

In [59]:
df.explain()

== Physical Plan ==
*(1) Project [case_id#1296, gettimestamp(case_opened_date#1297, M/d/yy H:mm, Some(America/Chicago)) AS case_opened_date#1717, gettimestamp(case_closed_date#1298, M/d/yy H:mm, Some(America/Chicago)) AS case_closed_date#1732, gettimestamp(SLA_due_date#1299, M/d/yy H:mm, Some(America/Chicago)) AS case_due_date#1747, (case_late#1300 = Yes) AS case_late#1517, num_days_late#1301, (case_closed#1302 = Yes) AS case_closed#1532, dept_division#1303, service_request_type#1304, SLA_days#1305, case_status#1306, source_id#1307, request_address#1308, format_string(%03d, council_district#1309) AS council_district#1618]
+- FileScan csv [case_id#1296,case_opened_date#1297,case_closed_date#1298,SLA_due_date#1299,case_late#1300,num_days_late#1301,case_closed#1302,dept_division#1303,service_request_type#1304,SLA_days#1305,case_status#1306,source_id#1307,request_address#1308,council_district#1309] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/Users/mms3-p

In [62]:
df.select(
    col("request_address"),
    lower(trim("request_address"))
).show(truncate=False)

+----------------------------------------+----------------------------------------+
|request_address                         |lower(trim(request_address))            |
+----------------------------------------+----------------------------------------+
|2315  EL PASO ST, San Antonio, 78207    |2315  el paso st, san antonio, 78207    |
|2215  GOLIAD RD, San Antonio, 78223     |2215  goliad rd, san antonio, 78223     |
|102  PALFREY ST W, San Antonio, 78223   |102  palfrey st w, san antonio, 78223   |
|114  LA GARDE ST, San Antonio, 78223    |114  la garde st, san antonio, 78223    |
|734  CLEARVIEW DR, San Antonio, 78228   |734  clearview dr, san antonio, 78228   |
|BANDERA RD and BRESNAHAN                |bandera rd and bresnahan                |
|10133  FIGARO CANYON, San Antonio, 78251|10133  figaro canyon, san antonio, 78251|
|10133  FIGARO CANYON, San Antonio, 78251|10133  figaro canyon, san antonio, 78251|
|10133  FIGARO CANYON, San Antonio, 78251|10133  figaro canyon, san antonio,

In [64]:
df = df.withColumn("request_address", lower(trim("request_address")))

In [65]:
df.printSchema()
df.show(2, vertical=True, truncate=False)

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- case_due_date: timestamp (nullable = true)
 |-- case_late: boolean (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: boolean (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = false)

-RECORD 0----------------------------------------------------
 case_id              | 1014127332                           
 case_opened_date     | 2018-01-01 00:42:00                  
 case_closed_date     | 2018-01-01 12:29:00                  
 case_due_date        | 2020-09-26 00:42:00                  
 case_late            | false                 

In [68]:
# Add a new feature, zip code

df.select(
    "request_address",
    regexp_extract("request_address", r"(\d+)$", 1)
).show(2)

+--------------------+------------------------------------------+
|     request_address|regexp_extract(request_address, (\d+)$, 1)|
+--------------------+------------------------------------------+
|2315  el paso st,...|                                     78207|
|2215  goliad rd, ...|                                     78223|
+--------------------+------------------------------------------+
only showing top 2 rows



In [69]:
df = df.withColumn("zipcode", regexp_extract("request_address", r"(\d+)$", 1))

In [71]:
df.explain()

== Physical Plan ==
*(1) Project [case_id#1296, gettimestamp(case_opened_date#1297, M/d/yy H:mm, Some(America/Chicago)) AS case_opened_date#1717, gettimestamp(case_closed_date#1298, M/d/yy H:mm, Some(America/Chicago)) AS case_closed_date#1732, gettimestamp(SLA_due_date#1299, M/d/yy H:mm, Some(America/Chicago)) AS case_due_date#1747, (case_late#1300 = Yes) AS case_late#1517, num_days_late#1301, (case_closed#1302 = Yes) AS case_closed#1532, dept_division#1303, service_request_type#1304, SLA_days#1305, case_status#1306, source_id#1307, lower(trim(request_address#1308, None)) AS request_address#1854, format_string(%03d, council_district#1309) AS council_district#1618, regexp_extract(lower(trim(request_address#1308, None)), (\d+)$, 1) AS zipcode#1956]
+- FileScan csv [case_id#1296,case_opened_date#1297,case_closed_date#1298,SLA_due_date#1299,case_late#1300,num_days_late#1301,case_closed#1302,dept_division#1303,service_request_type#1304,SLA_days#1305,case_status#1306,source_id#1307,request_a

In [70]:
df.show(2, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | false                
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  el paso st,... 
 council_district     | 005                  
 zipcode              | 78207                
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 2018-01-01 00:46:00  
 case_closed_date     | 2018-01-03 08:11:00  
 case_due_date        | 2018-01-05 08:30:00  
 case_late            | false     

In [78]:
# case_lifetime
#  - if case is closed then diff between close and open dates, days_to_close
#  - else diff between open date and now -- case_age

df.select(
    "case_opened_date",
    "case_closed_date",
    "case_closed",
    datediff(current_timestamp(), 'case_opened_date').alias("case_age"),
    datediff("case_closed_date", "case_opened_date").alias("days_to_close"),
    when(col("case_closed"), "days_to_close").otherwise("case_age").alias("case_lifetime")
).show(5)

+-------------------+-------------------+-----------+--------+-------------+-------------+
|   case_opened_date|   case_closed_date|case_closed|case_age|days_to_close|case_lifetime|
+-------------------+-------------------+-----------+--------+-------------+-------------+
|2018-01-01 00:42:00|2018-01-01 12:29:00|      false|    1064|            0|     case_age|
|2018-01-01 00:46:00|2018-01-03 08:11:00|      false|    1064|            2|     case_age|
|2018-01-01 00:48:00|2018-01-02 07:57:00|      false|    1064|            1|     case_age|
|2018-01-01 01:29:00|2018-01-02 08:13:00|      false|    1064|            1|     case_age|
|2018-01-01 01:34:00|2018-01-01 13:29:00|      false|    1064|            0|     case_age|
+-------------------+-------------------+-----------+--------+-------------+-------------+
only showing top 5 rows



In [80]:
df.select(
    "case_opened_date",
    "case_closed_date",
    "case_closed",
    datediff(current_timestamp(), 'case_opened_date').alias("case_age"),
    datediff("case_closed_date", "case_opened_date").alias("days_to_close"),
).withColumn(
    "case_lifetime", 
    when(col("case_closed"), "days_to_close").otherwise(col("case_age"))
).show(5)

+-------------------+-------------------+-----------+--------+-------------+-------------+
|   case_opened_date|   case_closed_date|case_closed|case_age|days_to_close|case_lifetime|
+-------------------+-------------------+-----------+--------+-------------+-------------+
|2018-01-01 00:42:00|2018-01-01 12:29:00|      false|    1064|            0|         1064|
|2018-01-01 00:46:00|2018-01-03 08:11:00|      false|    1064|            2|         1064|
|2018-01-01 00:48:00|2018-01-02 07:57:00|      false|    1064|            1|         1064|
|2018-01-01 01:29:00|2018-01-02 08:13:00|      false|    1064|            1|         1064|
|2018-01-01 01:34:00|2018-01-01 13:29:00|      false|    1064|            0|         1064|
+-------------------+-------------------+-----------+--------+-------------+-------------+
only showing top 5 rows



In [82]:
dept = spark.read.csv("dept.csv", header=True)

In [84]:
df = df.join(dept, "dept_division", "left")

In [85]:
df

DataFrame[dept_division: string, case_id: int, case_opened_date: timestamp, case_closed_date: timestamp, case_due_date: timestamp, case_late: boolean, num_days_late: double, case_closed: boolean, service_request_type: string, SLA_days: double, case_status: string, source_id: string, request_address: string, council_district: string, zipcode: string, dept_name: string, standardized_dept_name: string, dept_subject_to_SLA: string]

In [86]:
# Split 

train, test = df.randomSplit([.8, .2])

In [87]:
train.count(), test.count()

(673162, 168542)

### spark documents --> MLlib