In [73]:
import pyspark.sql
from pyspark.sql.functions import *

In [74]:
# firing up our spark session

spark = pyspark.sql.SparkSession.builder.getOrCreate()

> Read the case, department, and source data into their own spark dataframes.

In [75]:
# reading the case data
case_df = spark.read.csv('case.csv', header = True, inferSchema = True)

# reading the department data
department_df = spark.read.csv('dept.csv', header = True, inferSchema = True)


# reading the source data
source_df = spark.read.csv('source.csv', header = True, inferSchema = True)

In [76]:
# lets take a look at these dataframes
case_df.show(3,vertical = True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 SLA_due_date         | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 1/1/18 0:46          
 case_closed_date     | 1/3/18 8:11          
 SLA_due_date         | 1/5/18 8:30          
 case_late            | NO                   
 num_days_late        | -2.0126041

In [77]:
source_df.show(3, vertical = True)

-RECORD 0---------------------------
 source_id       | 100137           
 source_username | Merlene Blodgett 
-RECORD 1---------------------------
 source_id       | 103582           
 source_username | Carmen Cura      
-RECORD 2---------------------------
 source_id       | 106463           
 source_username | Richard Sanchez  
only showing top 3 rows



In [78]:
department_df.show(10, vertical = True)

-RECORD 0--------------------------------------
 dept_division          | 311 Call Center      
 dept_name              | Customer Service     
 standardized_dept_name | Customer Service     
 dept_subject_to_SLA    | YES                  
-RECORD 1--------------------------------------
 dept_division          | Brush                
 dept_name              | Solid Waste Manag... 
 standardized_dept_name | Solid Waste          
 dept_subject_to_SLA    | YES                  
-RECORD 2--------------------------------------
 dept_division          | Clean and Green      
 dept_name              | Parks and Recreation 
 standardized_dept_name | Parks & Recreation   
 dept_subject_to_SLA    | YES                  
-RECORD 3--------------------------------------
 dept_division          | Clean and Green N... 
 dept_name              | Parks and Recreation 
 standardized_dept_name | Parks & Recreation   
 dept_subject_to_SLA    | YES                  
-RECORD 4-------------------------------

In [79]:
# writing the data to the local machine
# case_df.write.json('cases_json', mode = 'overwrite')

# source_df.write.csv('sources_csv')

**The format of these files written to the local machine are different. The write function creates a folder, inside of which there are multiple files.**

In [80]:
case_df.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- SLA_due_date: string (nullable = true)
 |-- case_late: string (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: integer (nullable = true)



In [81]:
department_df.printSchema()

root
 |-- dept_division: string (nullable = true)
 |-- dept_name: string (nullable = true)
 |-- standardized_dept_name: string (nullable = true)
 |-- dept_subject_to_SLA: string (nullable = true)



In [82]:
source_df.printSchema()

root
 |-- source_id: string (nullable = true)
 |-- source_username: string (nullable = true)



- for department and source df, the data types look appropriate but for the case_df, we can make some changes 

In [83]:
case_df.printSchema()

case_df.show(3, vertical = True)

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- SLA_due_date: string (nullable = true)
 |-- case_late: string (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: integer (nullable = true)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 SLA_due_date         | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  


Things we can change in case_df
    - change the data type for case id from a integer to string
    - change the data type of the date to datatime stamp
    - change the column name for SLA_due date to case due date
    - convert case late and case closed to boolean data type

In [84]:
# to rename a column use the withColumnRenamed(col_to_rename, new_name)

case_df = case_df.withColumnRenamed( 'SLA_due_date', 'case_due_date')

In [85]:
case_df.show(2, vertical = True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 case_due_date        | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 1/1/18 0:46          
 case_closed_date     | 1/3/18 8:11          
 case_due_date        | 1/5/18 8:30          
 case_late            | NO                   
 num_days_late        | -2.0126041

In [86]:
# lets change the case late and case closed to boolean

case_df = case_df.withColumn('case_late', expr("case_late == 'Yes'"))

case_df = case_df.withColumn('case_closed', expr("case_closed == 'Yes'"))

In [87]:
# lets change the date time format
fmt = "M/d/yy H:mm"

case_df = case_df.withColumn("case_opened_date", to_timestamp("case_opened_date", fmt))
case_df = case_df.withColumn("case_closed_date", to_timestamp("case_closed_date", fmt))
case_df = case_df.withColumn("case_due_date", to_timestamp("case_due_date", fmt))

In [91]:

# formatting the address column
case_df=case_df.withColumn('request_address', lower(trim('request_address')))

In [93]:
case_df.show(vertical = True, truncate = False)

-RECORD 0--------------------------------------------------------
 case_id              | 1014127332                               
 case_opened_date     | 2018-01-01 00:42:00                      
 case_closed_date     | 2018-01-01 12:29:00                      
 case_due_date        | 2020-09-26 00:42:00                      
 case_late            | false                                    
 num_days_late        | -998.5087616000001                       
 case_closed          | false                                    
 dept_division        | Field Operations                         
 service_request_type | Stray Animal                             
 SLA_days             | 999.0                                    
 case_status          | Closed                                   
 source_id            | svcCRMLS                                 
 request_address      | 2315  el paso st, san antonio, 78207     
 council_district     | 5                                        
-RECORD 1-

In [98]:
# lets create a new column to hold the zipcode only

case_df = case_df.withColumn('zip_code', regexp_extract('request_address', r"(\d+)$", 1))

In [99]:

# format the council district to hold 3 digits with padding
case_df = case_df.withColumn("council_district", format_string("%03d", col("council_district")))


In [101]:

# create columns case_age, days_to_close, and case_lifetime

# create a case age column whihc is the difference in time between the current date and the date the case was opened
case_df = case_df.withColumn("case_age", datediff(current_timestamp(), "case_opened_date"))

# create days to close which is the difference in time between the day case was opened and the day the case was closed
case_df = case_df.withColumn("days_to_close", datediff("case_closed_date", "case_opened_date"))


# case_lifetime column is created by:
    # if the case is closed, this column holds the value from days_to_close columns
    # if the case in not closed, this column holds the value of case age
case_df = case_df.withColumn("case_lifetime", when(col("case_closed"), col("days_to_close")).otherwise(col("case_age")))

In [102]:
case_df.show(3, vertical = True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | false                
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  el paso st,... 
 council_district     | 005                  
 zip_code             | 78207                
 case_age             | 1064                 
 days_to_close        | 0                    
 case_lifetime        | 1064                 
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 2018-01-01

> How many Stray Animal cases are there?


In [88]:
(case_df
.where(col('service_request_type') == 'Stray Animal')
 .count()
)

26760