### Data Wrangling

https://docs.google.com/presentation/d/1v54Tr4POZj9K4zsaHOn7U22rzdSlTXsyCgE5lfhd-rA/edit?usp=sharing

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.getOrCreate()

#### Read data 

We will use San Antonio's 311 call data for this lesson and exercises. Dowload the CSV data from Google classroom

https://classroom.google.com/u/0/w/Mzg3MTg5NzU1Njk1/tc/Mzg3MTg5NzU1NzE1

In [2]:
# Read in CSV file 
source = (spark.read.csv("source.csv",
                     sep=",",
                     header=True,
                     inferSchema=True)
     )

In [3]:
# Another way to read in data:

(
    spark.read.format("csv")
    .option("sep", ",")
    .option("inferSchema", True)
    .option("header", True)
    .load("source.csv")
)

DataFrame[source_id: string, source_username: string]

#### Data Schemas
Spark includes a concept of a data schema  
Specify the types of our data ahead of time

In [4]:
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType(
    [
        StructField("source_id", StringType()),
        StructField("source_username", StringType()),
    ]
)


# Read csv, but now we specify the schema:

source = spark.read.csv("source.csv", header=True, schema=schema)

In [5]:
source.printSchema()

root
 |-- source_id: string (nullable = true)
 |-- source_username: string (nullable = true)



#### Writing data

In [6]:
# write data to a destination using .write property

source.write.json("source_json", mode="overwrite")

#### Data Preparation

In [7]:
# Read the case.csv file

df = spark.read.csv("case.csv", header=True, inferSchema=True)

In [8]:
# shape of dataframe
len(df.columns), df.count() 

(14, 841704)

In [9]:
# look at first three records


In [10]:
#datatypes?


**Things to do:**

1. **Rename Columns:**
    - 'SLA_due_date -> case_due_date



2. **Correct Data Types:**
    - case_closed and case_late to boolean
    - council_district as a string
    - case_opened_date, case_closed_date and case_due_date to datetime format


3. **Data Transformation:**
    - request_address: trim and lowercase
    - format council district with leading zeros
    - convert the number of days a case is late to a number of weeks
    
    
4. **New features:**
    - zip_code : extract from address
    - case_age 
    - days_to_closed
    - case_lifetime
    
    
5. **Join cases data with department data:**


#### Rename Columns:

In [11]:
# Rename 'SLA_due_date' to 'case_due_date' using .withColumnRenamed



#### Correct Data Types:

In [12]:
df.dtypes

[('case_id', 'int'),
 ('case_opened_date', 'string'),
 ('case_closed_date', 'string'),
 ('SLA_due_date', 'string'),
 ('case_late', 'string'),
 ('num_days_late', 'double'),
 ('case_closed', 'string'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'int')]

In [13]:
# correct data types: case_closed and case_late to boolean

df.select("case_closed", "case_late").show(5)

+-----------+---------+
|case_closed|case_late|
+-----------+---------+
|        YES|       NO|
|        YES|       NO|
|        YES|       NO|
|        YES|       NO|
|        YES|      YES|
+-----------+---------+
only showing top 5 rows



In [14]:
# use .withColumn to change columns from string to boolean values



In [15]:
#check the columns
df.select("case_closed", "case_late").show(5)

+-----------+---------+
|case_closed|case_late|
+-----------+---------+
|        YES|       NO|
|        YES|       NO|
|        YES|       NO|
|        YES|       NO|
|        YES|      YES|
+-----------+---------+
only showing top 5 rows



In [16]:
# council_district cast as string
df.select('council_district').show(4)

+----------------+
|council_district|
+----------------+
|               5|
|               3|
|               3|
|               3|
+----------------+
only showing top 4 rows



In [17]:
# council_district as a string instead of int


In [18]:
# view the column



In [19]:
# check datatypes
df.dtypes

[('case_id', 'int'),
 ('case_opened_date', 'string'),
 ('case_closed_date', 'string'),
 ('SLA_due_date', 'string'),
 ('case_late', 'string'),
 ('num_days_late', 'double'),
 ('case_closed', 'string'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'int')]

In [20]:
# convert case_opened_date, case_closed_date and case_due_date to datetime format

df.select('case_opened_date', 'case_closed_date', 'case_due_date').show(5)


AnalysisException: cannot resolve 'case_due_date' given input columns: [SLA_days, SLA_due_date, case_closed, case_closed_date, case_id, case_late, case_opened_date, case_status, council_district, dept_division, num_days_late, request_address, service_request_type, source_id];
'Project [case_opened_date#65, case_closed_date#66, 'case_due_date]
+- Relation [case_id#64,case_opened_date#65,case_closed_date#66,SLA_due_date#67,case_late#68,num_days_late#69,case_closed#70,dept_division#71,service_request_type#72,SLA_days#73,case_status#74,source_id#75,request_address#76,council_district#77] csv


In [None]:
# to_timestamp, fmt

fmt = "M/d/yy H:mm"



In [None]:
# check the three columns again



#### Data Transformation

In [None]:
# request_address: trim and lowercase

df.select('request_address').show(5, False)

In [None]:
# request_address: trim and lowercase



In [None]:
# convert the number of days a case is late to a number of weeks



In [None]:
# use format_string function to pad zeros for council_district


#### New features:

In [None]:
# create a new column for zipcode:



case_age: How old the case is; the difference in days between when the case was opened and the current day  
days_to_closed: The number of days between when the case was opened and when it was closed  
case_lifetime: Number of days between when the case was opened and when it was closed, if the case is still open, the number of days since the case was opened  

In [None]:
#create three new columns 'case_age', 'days_to_closed', 'case_lifetime'

df = (
    df.withColumn(
        "case_age", datediff(current_timestamp(), "case_opened_date")
    )
    .withColumn(
        "days_to_closed", datediff("case_closed_date", "case_opened_date")
    )
    .withColumn(
        "case_lifetime",
        when(expr("! case_closed"), col("case_age")).otherwise(
            col("days_to_closed")
        ),
    )
)

In [None]:
df.show(1, False, True)

In [None]:
# read the dept.csv file:

dept = spark.read.csv("dept.csv", header=True, inferSchema=True)
dept.show(5, False, True)

In [None]:
# dept.select('dept_division').distinct().show()
# dept.select('dept_name').show()

In [None]:
# join the df and dept dataframe using 'dept_division' as common key
# drop columns as needed (keep standardized_dept_name)
# convert dept_subject_to_SLA to boolean

df = (
    df
    # left join on dept_division
    .join(dept, "dept_division", "left")
    # drop all the columns except for standardized name, as it has much fewer unique values
    .drop(dept.dept_division)
    .drop(dept.dept_name)
    .withColumnRenamed("standardized_dept_name", "department")
    # convert to a boolean
    .withColumn("dept_subject_to_SLA", col("dept_subject_to_SLA") == "YES")
)

In [None]:
df.show(1, False, True)