## Spark Data Wrangling Exercises

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [2]:
spark = SparkSession.builder.getOrCreate()

### 1) Read the case, department, and source data into their own spark dataframes.

In [3]:
dept = spark.read.csv('dept.csv', header=True, inferSchema=True)

In [21]:
case = spark.read.csv('case.csv', header=True, inferSchema=True)

In [5]:
source = spark.read.csv('source.csv', header=True, inferSchema=True)

### 2) Let's see how writing to the local disk works in spark:

- Write the code necessary to store the source data in both csv and json format, store these as `sources_csv` and `sources_json`
- Inspect your folder structure. What do you notice?

In [7]:
source.write.format('csv').mode('overwrite').option('header', 'true').save('sources_csv')
source.write.format('json').mode('overwrite').option('header', 'true').save('sources_json')

Woah, it made two new directories with the associated files.

### 3) Inspect the data in your dataframes. Are the data types appropriate? Write the code necessary to cast the values to the appropriate types.

In [8]:
dept.dtypes

[('dept_division', 'string'),
 ('dept_name', 'string'),
 ('standardized_dept_name', 'string'),
 ('dept_subject_to_SLA', 'string')]

In [9]:
dept.show(2, vertical=True)

-RECORD 0--------------------------------------
 dept_division          | 311 Call Center      
 dept_name              | Customer Service     
 standardized_dept_name | Customer Service     
 dept_subject_to_SLA    | YES                  
-RECORD 1--------------------------------------
 dept_division          | Brush                
 dept_name              | Solid Waste Manag... 
 standardized_dept_name | Solid Waste          
 dept_subject_to_SLA    | YES                  
only showing top 2 rows



In [10]:
dept.groupBy('dept_subject_to_SLA').count().show()

+-------------------+-----+
|dept_subject_to_SLA|count|
+-------------------+-----+
|                YES|   31|
|                 NO|    8|
+-------------------+-----+



In [11]:
dept = dept.withColumn('dept_subject_to_SLA', col('dept_subject_to_SLA') == 'Yes')

In [12]:
dept.show(2, vertical=True)

-RECORD 0--------------------------------------
 dept_division          | 311 Call Center      
 dept_name              | Customer Service     
 standardized_dept_name | Customer Service     
 dept_subject_to_SLA    | false                
-RECORD 1--------------------------------------
 dept_division          | Brush                
 dept_name              | Solid Waste Manag... 
 standardized_dept_name | Solid Waste          
 dept_subject_to_SLA    | false                
only showing top 2 rows



In [13]:
dept.dtypes

[('dept_division', 'string'),
 ('dept_name', 'string'),
 ('standardized_dept_name', 'string'),
 ('dept_subject_to_SLA', 'boolean')]

In [14]:
case.show(2, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 SLA_due_date         | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 1/1/18 0:46          
 case_closed_date     | 1/3/18 8:11          
 SLA_due_date         | 1/5/18 8:30          
 case_late            | NO                   
 num_days_late        | -2.0126041

In [15]:
case.dtypes

[('case_id', 'int'),
 ('case_opened_date', 'string'),
 ('case_closed_date', 'string'),
 ('SLA_due_date', 'string'),
 ('case_late', 'string'),
 ('num_days_late', 'double'),
 ('case_closed', 'string'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'int')]

In [16]:
case = (
    case.withColumn('case_opened_date', to_timestamp(col('case_opened_date'), 'M/d/yy H:mm'))
    .withColumn('case_closed_date', to_timestamp(col('case_closed_date'), 'M/d/yy H:mm'))
    .withColumn('SLA_due_date', to_timestamp(col('SLA_due_date'), 'M/d/yy H:mm'))
    .withColumn('case_late', col('case_late') == 'Yes')
    .withColumn('case_closed', col('case_closed') == 'Yes')
    .withColumn('council_district', format_string('%04d', col('council_district')))
)

In [22]:
case = case.withColumn('case_id', format_string('%04d', col('case_id')))

In [23]:
case.dtypes

[('case_id', 'string'),
 ('case_opened_date', 'string'),
 ('case_closed_date', 'string'),
 ('SLA_due_date', 'string'),
 ('case_late', 'string'),
 ('num_days_late', 'double'),
 ('case_closed', 'string'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'int')]

In [24]:
source.show(2, vertical=True)

-RECORD 0---------------------------
 source_id       | 100137           
 source_username | Merlene Blodgett 
-RECORD 1---------------------------
 source_id       | 103582           
 source_username | Carmen Cura      
only showing top 2 rows



In [25]:
source.dtypes

[('source_id', 'string'), ('source_username', 'string')]

### 4) How old is the latest (in terms of days past SLA) currently open issue? How long has the oldest (in terms of days since opened) currently opened issue been open?

In [46]:
# max_date = F.lit(case.select(F.max('case_closed_date')).head()[0])
# min_date = F.lit(case.select(F.max('case_opened_date')).head()[0])
# overdue_time = F.when(F.isnull('case_closed_date'), F.datediff(max_date, case.SLA_due_date))\
#                 .otherwise(F.datediff(case.case_closed_date  , case.SLA_due_date)).alias('overdue_time')
# case.select('*', overdue_time).where(overdue_time == F.lit(case.select(F.max(overdue_time)).head()[0])).show(1, vertical = True)

max_date = lit(case.select(max('case_closed_date')).show())
min_date = lit(case.select(min('case_opened_date')).show())

# overdue = when(isnull('case_closed_date'), datediff(max_date, case.SLA_due_date)).otherwise(datediff(case.case_closed_date, case.SLA_due_date)).alias('overdue')

# case.select('*', overdue).where(overdue == lit(case.select(max(overdue)).head()[0])).show(1, vertical=True)

+---------------------+
|max(case_closed_date)|
+---------------------+
|          9/9/17 9:56|
+---------------------+

+---------------------+
|min(case_opened_date)|
+---------------------+
|         1/1/17 10:03|
+---------------------+

