# Wrangle Exercises
Data Acquisition
These exercises should go in a notebook or script named ```wrangle```. Add, commit, and push your changes.

This exercise uses the ```cases```, ```dept```, and ```source``` tables from the ```311_data``` on the ```Codeup MySQL server```.

In [36]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd

# The SparkSession is where you would specify the JDBC driver and additional connection details.
# We'll use pd.read_sql to simplify here so we can focus on the Spark API and not the IT setup.
# When using Spark on the job, you'll work with the operations team to install the right Java drivers and configure your connection
spark = SparkSession.builder.getOrCreate()

# ------------- #
# Local Imports #
# ------------- #

# importing sys
import sys

# adding 00_helper_files to the system path
sys.path.insert(0, '/Users/qmcbt/codeup-data-science/00_helper_files')

# env containing sensitive access credentials
import env
from env import user, password, host
from env import get_db_url

## 1. Read the ```cases```, ```dept```, and ```source data``` into their own spark dataframes.

In [7]:
# Read in source to a spark dataframe
query = """SELECT * FROM source"""
url = get_db_url("311_data")
source_df = pd.read_sql(query, url)
source_df = spark.createDataFrame(source_df)
source_df.show(5)

+-----+---------+----------------+
|index|source_id| source_username|
+-----+---------+----------------+
|    0|   100137|Merlene Blodgett|
|    1|   103582|     Carmen Cura|
|    2|   106463| Richard Sanchez|
|    3|   119403|  Betty De Hoyos|
|    4|   119555|  Socorro Quiara|
+-----+---------+----------------+
only showing top 5 rows



In [8]:
# Read in cases to a spark dataframe
query = """SELECT * FROM cases"""
url = get_db_url("311_data")
cases_df = pd.read_sql(query, url)
cases_df = spark.createDataFrame(cases_df)
cases_df.show(5)

23/01/22 13:00:59 WARN TaskSetManager: Stage 3 contains a task of very large size (18865 KiB). The maximum recommended task size is 1000 KiB.


[Stage 3:>                                                          (0 + 1) / 1]

23/01/22 13:01:04 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 3 (TID 3): Attempting to kill Python Worker
+----------+----------------+----------------+------------+---------+-------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|   case_id|case_opened_date|case_closed_date|SLA_due_date|case_late|num_days_late|case_closed|   dept_division|service_request_type|   SLA_days|case_status|source_id|     request_address|council_district|
+----------+----------------+----------------+------------+---------+-------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|1014127332|     1/1/18 0:42|    1/1/18 12:29|9/26/20 0:42|       NO| -998.5087616|        YES|Field Operations|        Stray Animal|      999.0|     Closed| svcCRMLS|2315  EL PASO ST,...|               5|
|1014127333|     1/1/18 0:46|     1/3/18 8

                                                                                

In [18]:
cases_df.show(2, vertical=True)

[Stage 8:>                                                          (0 + 0) / 1]

23/01/22 16:45:48 WARN TaskSetManager: Stage 8 contains a task of very large size (18865 KiB). The maximum recommended task size is 1000 KiB.


[Stage 8:>                                                          (0 + 1) / 1]

23/01/22 16:45:52 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 8 (TID 22): Attempting to kill Python Worker
-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 SLA_due_date         | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616         
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 1/1/18 0:46          
 case_closed_date     | 1/3/18 8:11  



In [9]:
# Read in source to a spark dataframe
query = """SELECT * FROM dept"""
url = get_db_url("311_data")
dept_df = pd.read_sql(query, url)
dept_df = spark.createDataFrame(dept_df)
dept_df.show(5)

+--------------------+--------------------+----------------------+-------------------+
|       dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+--------------------+--------------------+----------------------+-------------------+
|     311 Call Center|    Customer Service|      Customer Service|                YES|
|               Brush|Solid Waste Manag...|           Solid Waste|                YES|
|     Clean and Green|Parks and Recreation|    Parks & Recreation|                YES|
|Clean and Green N...|Parks and Recreation|    Parks & Recreation|                YES|
|    Code Enforcement|Code Enforcement ...|  DSD/Code Enforcement|                YES|
+--------------------+--------------------+----------------------+-------------------+
only showing top 5 rows



## 2. Let's see how writing to the local disk works in spark:

* Write the code necessary to store the source data in both csv and json format, store these as ```sources_csv``` and ```sources_json```  
### ANSWER:

In [13]:
# Write the .json file in a data folder overwriting any existing file with the same name
source_df.write.json("data/source_json", mode="overwrite")

# Write the .csv file in a data folder overwriting any existing file with the same name
(
    source_df.write.format("csv")
    .mode("overwrite")
    .option("header", "true")
    .save("data/source_csv")
)

                                                                                

In [17]:
!ls -a

[34m.[m[m                         [34m.ipynb_checkpoints[m[m        [34mdata[m[m
[34m..[m[m                        NOTES_spark.ipynb         spark-wrangle.ipynb
[34m.git[m[m                      NOTES_spark_wrangle.ipynb spark101.ipynb
.gitignore                README.md


In [16]:
!ls data

[34mmpg_csv[m[m     [34mmpg_json[m[m    [34msource_csv[m[m  [34msource_json[m[m


* Inspect your folder structure. What do you notice?  
### ANSWER: There is now a folder named data that was created when we initiated spark to createDataFrame; both the .json and .csv files are saved in that folder because we chose that directory in our code above with ```data/```

## 3. Inspect the data in your dataframes. Are the data types appropriate? Write the code necessary to cast the values to the appropriate types.

#### source_df

In [19]:
# Show in .T format
source_df.show(1, vertical=True)

[Stage 9:>                                                          (0 + 1) / 1]

-RECORD 0---------------------------
 index           | 0                
 source_id       | 100137           
 source_username | Merlene Blodgett 
only showing top 1 row



                                                                                

In [10]:
# The .schema attribute shows the data types that Spark has inferred from the source
source_df.schema

StructType([StructField('index', LongType(), True), StructField('source_id', StringType(), True), StructField('source_username', StringType(), True)])

#### cases_df

In [20]:
# Show in .T format
cases_df.show(1, vertical=True)

[Stage 10:>                                                         (0 + 0) / 1]

23/01/22 16:52:08 WARN TaskSetManager: Stage 10 contains a task of very large size (18865 KiB). The maximum recommended task size is 1000 KiB.


[Stage 10:>                                                         (0 + 1) / 1]

23/01/22 16:52:12 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 10 (TID 24): Attempting to kill Python Worker
-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 SLA_due_date         | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616         
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
only showing top 1 row



                                                                                

In [24]:
# Rename Columns
cases_df = cases_df.withColumnRenamed("SLA_due_date", "case_due_date")
cases_df = cases_df.withColumnRenamed('SLA_days', 'case_days')
cases_df.show(1, vertical=True)

23/01/22 16:58:33 WARN TaskSetManager: Stage 13 contains a task of very large size (18865 KiB). The maximum recommended task size is 1000 KiB.


[Stage 13:>                                                         (0 + 1) / 1]

23/01/22 16:58:37 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 13 (TID 27): Attempting to kill Python Worker
-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 case_due_date        | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616         
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 case_days            | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
only showing top 1 row



                                                                                

In [28]:
# The .schema attribute shows the data types that Spark has inferred from the source
cases_df.schema

StructType([StructField('case_id', LongType(), True), StructField('case_opened_date', StringType(), True), StructField('case_closed_date', StringType(), True), StructField('case_due_date', StringType(), True), StructField('case_late', StringType(), True), StructField('num_days_late', DoubleType(), True), StructField('case_closed', StringType(), True), StructField('dept_division', StringType(), True), StructField('service_request_type', StringType(), True), StructField('case_days', DoubleType(), True), StructField('case_status', StringType(), True), StructField('source_id', StringType(), True), StructField('request_address', StringType(), True), StructField('council_district', LongType(), True)])

In [40]:
# change columns from String to datetime
# 'case_opened_date', 'case_closed_date', 'case_due_date'
fmt = "M/d/yy H:mm"
cases_df = (cases_df.withColumn("case_opened_date", to_timestamp("case_opened_date", fmt))
            .withColumn("case_closed_date", to_timestamp("case_closed_date", fmt))
            .withColumn("case_due_date", to_timestamp("case_due_date", fmt))
           )
cases_df.select("case_opened_date", "case_due_date", "case_closed_date").show(5)

23/01/22 17:52:40 WARN TaskSetManager: Stage 15 contains a task of very large size (18865 KiB). The maximum recommended task size is 1000 KiB.


[Stage 15:>                                                         (0 + 1) / 1]

23/01/22 17:52:44 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 15 (TID 29): Attempting to kill Python Worker
+-------------------+-------------------+-------------------+
|   case_opened_date|      case_due_date|   case_closed_date|
+-------------------+-------------------+-------------------+
|2018-01-01 00:42:00|2020-09-26 00:42:00|2018-01-01 12:29:00|
|2018-01-01 00:46:00|2018-01-05 08:30:00|2018-01-03 08:11:00|
|2018-01-01 00:48:00|2018-01-05 08:30:00|2018-01-02 07:57:00|
|2018-01-01 01:29:00|2018-01-17 08:30:00|2018-01-02 08:13:00|
|2018-01-01 01:34:00|2018-01-01 04:34:00|2018-01-01 13:29:00|
+-------------------+-------------------+-------------------+
only showing top 5 rows



                                                                                

In [41]:
# Show count of every possible crosstab combination of values between case_closed and case_late
cases_df.groupBy("case_closed", "case_late").count().show()

[Stage 16:>                                                         (0 + 0) / 8]

23/01/22 19:32:59 WARN TaskSetManager: Stage 16 contains a task of very large size (18865 KiB). The maximum recommended task size is 1000 KiB.


[Stage 16:>                                                         (0 + 8) / 8]

+-----------+---------+------+
|case_closed|case_late| count|
+-----------+---------+------+
|         NO|      YES|  6525|
|        YES|      YES| 87978|
|         NO|       NO| 11585|
|        YES|       NO|735616|
+-----------+---------+------+



                                                                                

In [45]:
# Find the expression (expr) 'YES' and change it to 'True', all else is 'False'
cases_df = cases_df.withColumn("case_closed", expr('case_closed == "YES"')
                              ).withColumn("case_late", expr('case_late == "YES"'))

cases_df.groupBy("case_closed", "case_late").count().show()

[Stage 22:>                                                         (0 + 0) / 8][Stage 22:>                                                         (0 + 1) / 8]

23/01/22 19:48:33 WARN TaskSetManager: Stage 22 contains a task of very large size (18865 KiB). The maximum recommended task size is 1000 KiB.


[Stage 22:>                                                         (0 + 8) / 8]

+-----------+---------+------+
|case_closed|case_late| count|
+-----------+---------+------+
|       true|    false|735616|
|       true|     true| 87978|
|      false|    false| 11585|
|      false|     true|  6525|
+-----------+---------+------+



                                                                                

In [47]:
# change data type to int
# 'num_days_late', 'case_days'
cases_df = cases_df.withColumn("num_days_late", col("num_days_late").cast("int")
                              ).withColumn("case_days", col("case_days").cast("int"))

cases_df.schema

StructType([StructField('case_id', LongType(), True), StructField('case_opened_date', TimestampType(), True), StructField('case_closed_date', TimestampType(), True), StructField('case_due_date', TimestampType(), True), StructField('case_late', BooleanType(), True), StructField('num_days_late', IntegerType(), True), StructField('case_closed', BooleanType(), True), StructField('dept_division', StringType(), True), StructField('service_request_type', StringType(), True), StructField('case_days', IntegerType(), True), StructField('case_status', StringType(), True), StructField('source_id', StringType(), True), StructField('request_address', StringType(), True), StructField('council_district', LongType(), True)])

#### dept_df

In [21]:
# Show in .T format
dept_df.show(1, vertical=True)

-RECORD 0----------------------------------
 dept_division          | 311 Call Center  
 dept_name              | Customer Service 
 standardized_dept_name | Customer Service 
 dept_subject_to_SLA    | YES              
only showing top 1 row



In [12]:
# The .schema attribute shows the data types that Spark has inferred from the source
dept_df.schema

StructType([StructField('dept_division', StringType(), True), StructField('dept_name', StringType(), True), StructField('standardized_dept_name', StringType(), True), StructField('dept_subject_to_SLA', StringType(), True)])

In [48]:
dept_df.groupBy("dept_subject_to_SLA").count().show()

[Stage 25:>                                                         (0 + 8) / 8]

+-------------------+-----+
|dept_subject_to_SLA|count|
+-------------------+-----+
|                YES|   31|
|                 NO|    8|
+-------------------+-----+



                                                                                

In [49]:
# Find the expression (expr) 'YES' and change it to 'True', all else is 'False'
dept_df = dept_df.withColumn("dept_subject_to_SLA", expr('dept_subject_to_SLA == "YES"'))

dept_df.groupBy("dept_subject_to_SLA").count().show()

+-------------------+-----+
|dept_subject_to_SLA|count|
+-------------------+-----+
|               true|   31|
|              false|    8|
+-------------------+-----+



### 1. How old is the latest (in terms of days past SLA) currently open issue? How long has the oldest (in terms of days since opened) currently opened issue been open?

### 2. How many Stray Animal cases are there?

### 3. How many service requests that are assigned to the Field Operations department (```dept_division```) are not classified as "Officer Standby" request type (```service_request_type```)?

### 4. Convert the ```council_district``` column to a ```string``` column.

### 5. Extract the year from the ```case_closed_date``` column.

### 6. Convert ```num_days_late``` from days to hours in new columns ```num_hours_late```.

### 7. Join the case data with the source and department data.

### 8. Are there any cases that do not have a request source?

### 9. What are the top 10 service request types in terms of number of requests?

### 10. What are the top 10 service request types in terms of average days late?

### 11. Does number of days late depend on department?

### 12. How do number of days late depend on department and request type?

## You might have noticed that the latest date in the dataset is fairly far off from the present day. To account for this, replace any occurances of the current time with the maximum date from the dataset.