## Spark Wrangle Exercises:
### Corey Solitaire
`11.30.2020`

In [1]:
# Create Spark Session
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

'''at a company this would be more complex, and include
commands to set up specific enfiroments'''

spark = SparkSession.builder.getOrCreate()

### To schema or not to schema

 - Schema: a way of specifying the datatypes / data shape to spark   
 
 - Why:
   - Preformance
   - Data Integrity

In [2]:
# Call in DF with specified structures
# header = True (calls in headers from CSF)
# inferSchema = True (infers datatype of columns in csv)
source = spark.read.csv('source.csv', header = True, inferSchema = True)
dept = spark.read.csv('dept.csv', header = True, inferSchema = True)
case = spark.read.csv('case.csv', header = True, inferSchema = True)

***
## Exercises:
- This exercises uses the case.csv, dept.csv, and source.csv files from the san antonio 311 call dataset.   

`You might have noticed that the latest date in the dataset is fairly far off from the present day. To account for this, replace any occurances of the current time with the maximum date from the dataset.`
***

### 1. Read the case, department, and source data into their own spark dataframes.

In [4]:
print(source.show(5)),print(dept.show(5)),print(case.show(5))

+---------+----------------+
|source_id| source_username|
+---------+----------------+
|   100137|Merlene Blodgett|
|   103582|     Carmen Cura|
|   106463| Richard Sanchez|
|   119403|  Betty De Hoyos|
|   119555|  Socorro Quiara|
+---------+----------------+
only showing top 5 rows

None
+--------------------+--------------------+----------------------+-------------------+
|       dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+--------------------+--------------------+----------------------+-------------------+
|     311 Call Center|    Customer Service|      Customer Service|                YES|
|               Brush|Solid Waste Manag...|           Solid Waste|                YES|
|     Clean and Green|Parks and Recreation|    Parks & Recreation|                YES|
|Clean and Green N...|Parks and Recreation|    Parks & Recreation|                YES|
|    Code Enforcement|Code Enforcement ...|  DSD/Code Enforcement|                YES|
+------------

(None, None, None)

### 2. Let's see how writing to the local disk works in spark:

- Write the code necessary to store the source data in both csv and json format, store these as sources_csv and sources_json

- Inspect your folder structure. What do you notice?

In [5]:
source.write.csv("source_csv", mode="overwrite")
dept.write.csv("dept_csv", mode="overwrite")
case.write.csv("case_csv", mode="overwrite")
source.write.json("source_json", mode="overwrite")
dept.write.json("dept_json", mode="overwrite")
case.write.json("case_json", mode="overwrite")

***
`Inside the folder the csv and json objects are stored using unique identifiers`
***

### 3. Inspect the data in your dataframes. Are the data types appropriate? Write the code necessary to cast the values to the appropriate types.

In [9]:
source.printSchema()

root
 |-- source_id: string (nullable = true)
 |-- source_username: string (nullable = true)



In [10]:
dept.printSchema()

root
 |-- dept_division: string (nullable = true)
 |-- dept_name: string (nullable = true)
 |-- standardized_dept_name: string (nullable = true)
 |-- dept_subject_to_SLA: string (nullable = true)



In [11]:
case.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- SLA_due_date: string (nullable = true)
 |-- case_late: string (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: integer (nullable = true)



### Changes to case data frame

In [14]:
# rename to match other columns
case = case.withColumnRenamed("SLA_due_date", "case_due_date")

# change case_closed and case_late to boolean values
case = case.withColumn("case_closed", expr('case_closed == "YES"')).withColumn(
    "case_late", expr('case_late == "YES"')
)

# change city coulncil district to string not int (categorical)
case = case.withColumn("council_district", col("council_district").cast("string"))
# add 00 before number
case = case.withColumn("council_district", col("council_district").cast("int"))

# '%03d' means at least 3 digits, pad with 0s
#
# In order to use the format_string function the way we are, we'll need to
# convert council_district back to an integer temporarily, but the final output
# will be a string.
case = case.withColumn(
    "council_district",
    format_string("%03d", col("council_district").cast("int")),
)

# Now we will handle the 3 columns that have dates in them. We'll use spark's to_timestamp function for this.
fmt = "M/d/yy H:mm"
case = (
    case.withColumn("case_opened_date", to_timestamp("case_opened_date", fmt))
    .withColumn("case_closed_date", to_timestamp("case_opened_date", fmt))
    .withColumn("case_due_date", to_timestamp("case_opened_date", fmt))
)

# standardize address column, trim, removes whitespace, lowercase
case = case.withColumn("request_address", trim(lower(case.request_address)))

# convert number of days late to weeks
case = case.withColumn(
    "num_weeks_late", expr("num_days_late / 7 AS num_weeks_late")
)

# new features (zipcode)
case = case.withColumn("zipcode", regexp_extract("request_address", r"\d+$", 0))

''' 
Next we will create several new, related columns:

- case_age: How old the case is; the difference in days between when the case was opened and the current day
- days_to_closed: The number of days between when the case was opened and when it was closed
- case_lifetime: Number of days between when the case was opened and when it was closed, if the case is still open, the number of days since the case was opened

'''

case = (
    case.withColumn(
        "case_age", datediff(current_timestamp(), "case_opened_date")
    )
    .withColumn(
        "days_to_closed", datediff("case_closed_date", "case_opened_date")
    )
    .withColumn(
        "case_lifetime",
        when(expr("! case_closed"), col("case_age")).otherwise(
            col("days_to_closed")
        ),
    )
)

In [16]:
#Joint modified data frames

df = (
    case
    # left join on dept_division
    .join(dept, "dept_division", "left")
    # drop all the columns except for standardized name, as it has much fewer unique values
    .drop(dept.dept_division)
    .drop(dept.dept_name)
    .drop(case.dept_division)
    .withColumnRenamed("standardized_dept_name", "department")
    # convert to a boolean
    .withColumn("dept_subject_to_SLA", col("dept_subject_to_SLA") == "YES")
)

In [18]:
# Check to see if it works
df.show(2, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 00:42:00  
 case_due_date        | 2018-01-01 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  el paso st,... 
 council_district     | 005                  
 num_weeks_late       | -142.6441088         
 zipcode              | 78207                
 case_age             | 1064                 
 days_to_closed       | 0                    
 case_lifetime        | 0                    
 department           | Animal Care Services 
 dept_subject_to_SLA  | true                 
-RECORD 1-------------------------

### Train / Test Split

In [20]:
train, test = df.randomSplit([0.8, 0.2])
train, validate, test = df.randomSplit([0.6, 0.2, 0.2])

In [23]:
# verify split train
print((train.count(), len(train.columns)))

(504655, 20)


In [24]:
# verify split validate
print((validate.count(), len(validate.columns)))

(168701, 20)


In [25]:
# verify split test
print((test.count(), len(test.columns)))

(168348, 20)


*** 
### 1.  How old is the latest (in terms of days past SLA) currently open issue? How long has the oldest (in terms of days since opened) currently opened issue been open?
 

   
### 2. How many Stray Animal cases are there?

### 3. How many service requests that are assigned to the Field Operations department (dept_division) are not classified as "Officer Standby" request type (service_request_type)?

### 4. Convert the council_district column to a string column.   

### 5. Extract the year from the case_closed_date column.

### 6. Convert num_days_late from days to hours in new columns num_hours_late.

### 7. Join the case data with the source and department data.

### 8. Are there any cases that do not have a request source?

### 9. What are the top 10 service request types in terms of number of requests?

### 10. What are the top 10 service request types in terms of average days late?

### 11. Does number of days late depend on department?

### 12. How do number of days late depend on department and request type?