# Data Wrangling
In this lesson, we will acquire and prepare the data we will use in the rest of this module

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.getOrCreate()

# Reading Data
Spark lets us read data in from a variety of data sources using what it calls a `DataFrameReader`. We can access the `read` property of our `spark` obect and then set various options and read from a data source.

In [2]:
df = spark.read.csv("source.csv", sep = ",", header=True, inferSchema=True)

In [3]:
df.show()

+---------+--------------------+
|source_id|     source_username|
+---------+--------------------+
|   100137|    Merlene Blodgett|
|   103582|         Carmen Cura|
|   106463|     Richard Sanchez|
|   119403|      Betty De Hoyos|
|   119555|      Socorro Quiara|
|   119868| Michelle San Miguel|
|   120752|      Eva T. Kleiber|
|   124405|           Lori Lara|
|   132408|       Leonard Silva|
|   135723|        Amy Cardenas|
|   136202|    Michelle Urrutia|
|   136979|      Leticia Garcia|
|   137943|    Pamela K. Baccus|
|   138605|        Marisa Ozuna|
|   138650|      Kimberly Green|
|   138650|Kimberly Green-Woods|
|   138793| Guadalupe Rodriguez|
|   138810|       Tawona Martin|
|   139342|     Jessica Mendoza|
|   139344|        Isis Mendoza|
+---------+--------------------+
only showing top 20 rows



## Data Schemas
Spark includes a concept of a data schema, which is a way to specify the types of our data ahead of time. Doing so lets us be sure about the structure of our data, and can significantly increase the speed of loading data (inferring the schema can be a costly operation for large datasets).

We'll import several things from the `pyspark.sql.types` module:

- `StringType`
- `DoubleType`
- `IntegerType`
- `LongType`
- `ShortType`
- `TimestampType`
- `FloatType`
- `DateType`

All of the above types will go inside of a `StructField`, which will be encapsulated in a `StructType`, and the resulting object will represent our data schema.

In [4]:
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType(
    [
        StructField("source_id", StringType()),
        StructField("source_username", StringType()),
    ]
)

spark.read.csv("source.csv", header=True, schema=schema)

DataFrame[source_id: string, source_username: string]

Notice that instead of `inferSchema=True`, we pass the `schema` object in to the `read.csv` call.

## Writing Data
A spark dataframe can be written to a local destination using the `.write` property. Several common output formats are:
- `csv`: for writing to a local csv file(s)
- `parquet`: Parwuet is a very popular columnar storage format for Hadoop.
- `json`: for writing to a local json file(s)
- `jdbc`: for writing to a SQL database table

# Follow Up Lesson
## Data Prep

In [5]:
df2 = spark.read.csv('case.csv', header=True, inferSchema=True)
df2.printSchema()
df2.show(5, vertical=True)

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- SLA_due_date: string (nullable = true)
 |-- case_late: string (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: integer (nullable = true)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 SLA_due_date         | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  


In [6]:
df2.withColumnRenamed('SLA_due_date', 'case_due_date').show(1, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 case_due_date        | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
only showing top 1 row



In [7]:
df2 = df2.withColumnRenamed('SLA_due_date', 'case_due_date') # These transformations are being stored in the df2 variable, but will not be executed until necessary for output

In [8]:
df2.explain() #Notice that SLA_due_date has been changed to case_due_date (near the top of the output in line *(1))

== Physical Plan ==
*(1) Project [case_id#51, case_opened_date#52, case_closed_date#53, SLA_due_date#54 AS case_due_date#236, case_late#55, num_days_late#56, case_closed#57, dept_division#58, service_request_type#59, SLA_days#60, case_status#61, source_id#62, request_address#63, council_district#64]
+- FileScan csv [case_id#51,case_opened_date#52,case_closed_date#53,SLA_due_date#54,case_late#55,num_days_late#56,case_closed#57,dept_division#58,service_request_type#59,SLA_days#60,case_status#61,source_id#62,request_address#63,council_district#64] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/Users/adamgomez/codeup-data-science/spark-exercises/case.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<case_id:int,case_opened_date:string,case_closed_date:string,SLA_due_date:string,case_late:...




Are case_late and case_closed simple boolean representations?

In [9]:
df2.groupby('case_late', 'case_closed').count().show()

+---------+-----------+------+
|case_late|case_closed| count|
+---------+-----------+------+
|       NO|        YES|735616|
|      YES|        YES| 87978|
|       NO|         NO| 11585|
|      YES|         NO|  6525|
+---------+-----------+------+



Now that we have verified that YES and NO are the only options, lets convert to boolean

In [10]:
df2 = df2.withColumn("case_late", expr("case_late == 'YES'")) # Reassigns the YES to True and NO to False

In [11]:
df2 = df2.withColumn("case_closed", expr("case_closed == 'YES'"))

In [12]:
df2.show(1, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 case_due_date        | 9/26/20 0:42         
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
only showing top 1 row



Can we convert the council_district to a different number format?

In [13]:
df2.select(col('council_district'), format_string("%03d", col("council_district"))).show()

+----------------+-------------------------------------+
|council_district|format_string(%03d, council_district)|
+----------------+-------------------------------------+
|               5|                                  005|
|               3|                                  003|
|               3|                                  003|
|               3|                                  003|
|               7|                                  007|
|               7|                                  007|
|               4|                                  004|
|               4|                                  004|
|               4|                                  004|
|               4|                                  004|
|               4|                                  004|
|               4|                                  004|
|               4|                                  004|
|               4|                                  004|
|               4|             

In [14]:
df2.withColumn("council_district", format_string("%03d", col("council_district"))).show(1, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 case_due_date        | 9/26/20 0:42         
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 005                  
only showing top 1 row



In [15]:
df2 = df2.withColumn("council_district", format_string("%03d", col("council_district")))

In [16]:
df2.show(1, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 case_due_date        | 9/26/20 0:42         
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 005                  
only showing top 1 row



In [17]:
df2.explain() # Still just one operation. Spark is combining all of our edits into a single Project

== Physical Plan ==
*(1) Project [case_id#51, case_opened_date#52, case_closed_date#53, SLA_due_date#54 AS case_due_date#236, (case_late#55 = YES) AS case_late#287, num_days_late#56, (case_closed#57 = YES) AS case_closed#302, dept_division#58, service_request_type#59, SLA_days#60, case_status#61, source_id#62, request_address#63, format_string(%03d, council_district#64) AS council_district#487]
+- FileScan csv [case_id#51,case_opened_date#52,case_closed_date#53,SLA_due_date#54,case_late#55,num_days_late#56,case_closed#57,dept_division#58,service_request_type#59,SLA_days#60,case_status#61,source_id#62,request_address#63,council_district#64] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/Users/adamgomez/codeup-data-science/spark-exercises/case.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<case_id:int,case_opened_date:string,case_closed_date:string,SLA_due_date:string,case_late:...




Can we change the date time into a timestamp?

In [18]:
fmt = "M/d/yy H:mm"

df2.select(
    "case_opened_date",
    to_timestamp("case_opened_date", fmt)).show(5)

+----------------+-----------------------------------------------+
|case_opened_date|to_timestamp(`case_opened_date`, 'M/d/yy H:mm')|
+----------------+-----------------------------------------------+
|     1/1/18 0:42|                            2018-01-01 00:42:00|
|     1/1/18 0:46|                            2018-01-01 00:46:00|
|     1/1/18 0:48|                            2018-01-01 00:48:00|
|     1/1/18 1:29|                            2018-01-01 01:29:00|
|     1/1/18 1:34|                            2018-01-01 01:34:00|
+----------------+-----------------------------------------------+
only showing top 5 rows



In [19]:
df2 = df2.withColumn("case_opened_date", to_timestamp("case_opened_date", fmt))

In [20]:
df2.show(1, vertical=True) # case_opened_date has correctly be formatted

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 1/1/18 12:29         
 case_due_date        | 9/26/20 0:42         
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 005                  
only showing top 1 row



In [21]:
df2 = df2.withColumn("case_closed_date", to_timestamp("case_closed_date", fmt))
df2 = df2.withColumn("case_due_date", to_timestamp("case_due_date", fmt))

In [22]:
df2.show(1, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 005                  
only showing top 1 row



Let's normalize the text data in the address column

In [23]:
df2.select(col("request_address")).show(truncate=False)

+----------------------------------------+
|request_address                         |
+----------------------------------------+
|2315  EL PASO ST, San Antonio, 78207    |
|2215  GOLIAD RD, San Antonio, 78223     |
|102  PALFREY ST W, San Antonio, 78223   |
|114  LA GARDE ST, San Antonio, 78223    |
|734  CLEARVIEW DR, San Antonio, 78228   |
|BANDERA RD and BRESNAHAN                |
|10133  FIGARO CANYON, San Antonio, 78251|
|10133  FIGARO CANYON, San Antonio, 78251|
|10133  FIGARO CANYON, San Antonio, 78251|
|10133  FIGARO CANYON, San Antonio, 78251|
|10133  FIGARO CANYON, San Antonio, 78251|
|10133  FIGARO CANYON, San Antonio, 78251|
|10129  BOXING PASS, San Antonio, 78251  |
|10129  BOXING PASS, San Antonio, 78251  |
|10129  BOXING PASS, San Antonio, 78251  |
|834  BARREL POINT, San Antonio, 78251   |
|834  BARREL POINT, San Antonio, 78251   |
|834  BARREL POINT, San Antonio, 78251   |
|834  BARREL POINT, San Antonio, 78251   |
|834  BARREL POINT, San Antonio, 78251   |
+----------

In [24]:
df2.select(
    col("request_address"),
    lower(trim("request_address"))
).show(truncate=False) # We trim the whitespace and lowercase the letters

+----------------------------------------+----------------------------------------+
|request_address                         |lower(trim(request_address))            |
+----------------------------------------+----------------------------------------+
|2315  EL PASO ST, San Antonio, 78207    |2315  el paso st, san antonio, 78207    |
|2215  GOLIAD RD, San Antonio, 78223     |2215  goliad rd, san antonio, 78223     |
|102  PALFREY ST W, San Antonio, 78223   |102  palfrey st w, san antonio, 78223   |
|114  LA GARDE ST, San Antonio, 78223    |114  la garde st, san antonio, 78223    |
|734  CLEARVIEW DR, San Antonio, 78228   |734  clearview dr, san antonio, 78228   |
|BANDERA RD and BRESNAHAN                |bandera rd and bresnahan                |
|10133  FIGARO CANYON, San Antonio, 78251|10133  figaro canyon, san antonio, 78251|
|10133  FIGARO CANYON, San Antonio, 78251|10133  figaro canyon, san antonio, 78251|
|10133  FIGARO CANYON, San Antonio, 78251|10133  figaro canyon, san antonio,

In [25]:
df2 = df2.withColumn("request_address", lower(trim("request_address")))

In [26]:
df2.show(1, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  el paso st,... 
 council_district     | 005                  
only showing top 1 row



Can we extract the zipcode from the address to its own column?

In [27]:
df2.select("request_address",
          regexp_extract("request_address", r"(\d+)$", 1)
).show(truncate=False)

+----------------------------------------+------------------------------------------+
|request_address                         |regexp_extract(request_address, (\d+)$, 1)|
+----------------------------------------+------------------------------------------+
|2315  el paso st, san antonio, 78207    |78207                                     |
|2215  goliad rd, san antonio, 78223     |78223                                     |
|102  palfrey st w, san antonio, 78223   |78223                                     |
|114  la garde st, san antonio, 78223    |78223                                     |
|734  clearview dr, san antonio, 78228   |78228                                     |
|bandera rd and bresnahan                |                                          |
|10133  figaro canyon, san antonio, 78251|78251                                     |
|10133  figaro canyon, san antonio, 78251|78251                                     |
|10133  figaro canyon, san antonio, 78251|78251       

In [28]:
df2 = df2.withColumn('zipcode', regexp_extract('request_address', r'(\d+)$', 1))

In [29]:

df2.show(1, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  el paso st,... 
 council_district     | 005                  
 zipcode              | 78207                
only showing top 1 row



In [30]:
# case_lifetime
#     - if case is closed, use the diff between close and open dates (days_to_close)
#     - else diff between open day and now -- case_age

df2.select(
    "case_opened_date", 
    "case_closed_date", 
    "case_closed",
    datediff(current_timestamp(), "case_opened_date").alias("case_age"),
    datediff("case_closed_date", "case_opened_date").alias("days_to_close"),
).withColumn(
        "case_lifetime",
        when(col("case_closed"), col("days_to_close")).otherwise("case_age")
).show(10)

+-------------------+-------------------+-----------+--------+-------------+-------------+
|   case_opened_date|   case_closed_date|case_closed|case_age|days_to_close|case_lifetime|
+-------------------+-------------------+-----------+--------+-------------+-------------+
|2018-01-01 00:42:00|2018-01-01 12:29:00|       true|    1064|            0|            0|
|2018-01-01 00:46:00|2018-01-03 08:11:00|       true|    1064|            2|            2|
|2018-01-01 00:48:00|2018-01-02 07:57:00|       true|    1064|            1|            1|
|2018-01-01 01:29:00|2018-01-02 08:13:00|       true|    1064|            1|            1|
|2018-01-01 01:34:00|2018-01-01 13:29:00|       true|    1064|            0|            0|
|2018-01-01 06:28:00|2018-01-01 14:38:00|       true|    1064|            0|            0|
|2018-01-01 06:57:00|2018-01-02 15:32:00|       true|    1064|            1|            1|
|2018-01-01 06:58:00|2018-01-02 15:32:00|       true|    1064|            1|            1|

In [31]:
df2 = df2.withColumn("case_age", datediff(current_timestamp(), "case_opened_date").alias("case_age"))
df2 = df2.withColumn("days_to_close", datediff("case_closed_date", "case_opened_date").alias("days_to_close"))
df2 = df2.withColumn("case_lifetime", when(col("case_closed"), col("days_to_close")).otherwise(col("case_age")))

In [32]:
df2.show(1, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  el paso st,... 
 council_district     | 005                  
 zipcode              | 78207                
 case_age             | 1064                 
 days_to_close        | 0                    
 case_lifetime        | 0                    
only showing top 1 row



In [33]:
df2.where(~col("case_closed")).show(3, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014128388           
 case_opened_date     | 2018-01-02 09:39:00  
 case_closed_date     | null                 
 case_due_date        | 2018-01-09 09:39:00  
 case_late            | true                 
 num_days_late        | 211.5974884          
 case_closed          | false                
 dept_division        | 311 Call Center      
 service_request_type | Complaint            
 SLA_days             | 7.0                  
 case_status          | Open                 
 source_id            | mt13131              
 request_address      | 7326  westglade p... 
 council_district     | 006                  
 zipcode              | 78227                
 case_age             | 1063                 
 days_to_close        | null                 
 case_lifetime        | 1063                 
-RECORD 1------------------------------------
 case_id              | 1014128790           
 case_opened_date     | 2018-01-02

Lets join the data contained in the dept.csv to our df2

In [34]:
spark.read.csv("dept.csv", header=True).show()

+--------------------+--------------------+----------------------+-------------------+
|       dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+--------------------+--------------------+----------------------+-------------------+
|     311 Call Center|    Customer Service|      Customer Service|                YES|
|               Brush|Solid Waste Manag...|           Solid Waste|                YES|
|     Clean and Green|Parks and Recreation|    Parks & Recreation|                YES|
|Clean and Green N...|Parks and Recreation|    Parks & Recreation|                YES|
|    Code Enforcement|Code Enforcement ...|  DSD/Code Enforcement|                YES|
|Code Enforcement ...|Code Enforcement ...|  DSD/Code Enforcement|                YES|
|Code Enforcement ...|                null|  DSD/Code Enforcement|                YES|
|   Dangerous Premise|Code Enforcement ...|  DSD/Code Enforcement|                YES|
|Dangerous Premise...|Code Enforcement ...|

In [35]:
dept = spark.read.csv("dept.csv", header=True)

In [36]:
df2.join(dept, "dept_division", "left").show(5, vertical=True)

-RECORD 0--------------------------------------
 dept_division          | Field Operations     
 case_id                | 1014127332           
 case_opened_date       | 2018-01-01 00:42:00  
 case_closed_date       | 2018-01-01 12:29:00  
 case_due_date          | 2020-09-26 00:42:00  
 case_late              | false                
 num_days_late          | -998.5087616000001   
 case_closed            | true                 
 service_request_type   | Stray Animal         
 SLA_days               | 999.0                
 case_status            | Closed               
 source_id              | svcCRMLS             
 request_address        | 2315  el paso st,... 
 council_district       | 005                  
 zipcode                | 78207                
 case_age               | 1064                 
 days_to_close          | 0                    
 case_lifetime          | 0                    
 dept_name              | Animal Care Services 
 standardized_dept_name | Animal Care Se

In [37]:
df2 = df2.join(dept, "dept_division", "left")

In [38]:
df2

DataFrame[dept_division: string, case_id: int, case_opened_date: timestamp, case_closed_date: timestamp, case_due_date: timestamp, case_late: boolean, num_days_late: double, case_closed: boolean, service_request_type: string, SLA_days: double, case_status: string, source_id: string, request_address: string, council_district: string, zipcode: string, case_age: int, days_to_close: int, case_lifetime: int, dept_name: string, standardized_dept_name: string, dept_subject_to_SLA: string]

In [None]:
train, test = df2.randomSplit([.8, .2])