In [1]:
# imports

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pandas as pd

import env

In [2]:
# spark object

spark = SparkSession.builder.getOrCreate()

# When using Spark on the job, you'll work with the operations 
 # team to install the right Java drivers and configure your connection

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/08 12:08:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/03/08 12:08:15 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/03/08 12:08:15 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
23/03/08 12:08:15 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.


## 1 / Read the cases, department, and source data into their own spark dataframes.

In [3]:
# importing 'case'

query = """SELECT * FROM cases """
url = env.get_connection("311_data")

case_df = pd.read_sql(query, url)
case_df = spark.createDataFrame(case_df)

In [4]:
case_df.show(4, vertical = True)

[Stage 0:>                                                          (0 + 0) / 1][Stage 0:>                                                          (0 + 1) / 1]

23/03/08 12:11:19 WARN TaskSetManager: Stage 0 contains a task of very large size (18865 KiB). The maximum recommended task size is 1000 KiB.
23/03/08 12:11:23 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 0 (TID 0): Attempting to kill Python Worker
-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 SLA_due_date         | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616         
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
-RECORD 1-------------------------

                                                                                

In [5]:
# importing 'source'

query = """SELECT * FROM dept"""
url = env.get_connection("311_data")

dept_df = pd.read_sql(query, url)
dept_df = spark.createDataFrame(dept_df)
dept_df.show(4)

+--------------------+--------------------+----------------------+-------------------+
|       dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+--------------------+--------------------+----------------------+-------------------+
|     311 Call Center|    Customer Service|      Customer Service|                YES|
|               Brush|Solid Waste Manag...|           Solid Waste|                YES|
|     Clean and Green|Parks and Recreation|    Parks & Recreation|                YES|
|Clean and Green N...|Parks and Recreation|    Parks & Recreation|                YES|
+--------------------+--------------------+----------------------+-------------------+
only showing top 4 rows



In [6]:
# importing 'source'

query = """SELECT * FROM source"""
url = env.get_connection("311_data")

source_df = pd.read_sql(query, url)
source_df = spark.createDataFrame(source_df)
source_df.show(4)

+-----+---------+----------------+
|index|source_id| source_username|
+-----+---------+----------------+
|    0|   100137|Merlene Blodgett|
|    1|   103582|     Carmen Cura|
|    2|   106463| Richard Sanchez|
|    3|   119403|  Betty De Hoyos|
+-----+---------+----------------+
only showing top 4 rows



## 2 / Let's see how writing to the local disk works in spark:

    Write the code necessary to store the source data in both csv and json format, 
     store these as sources_csv and sources_json.
    Inspect your folder structure. What do you notice?



In [7]:
# storing cases as .json

case_df.write.json('data/cases_json', mode = 'overwrite')

23/03/08 12:11:28 WARN TaskSetManager: Stage 4 contains a task of very large size (18865 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

In [9]:
# storing cases as .csv

(case_df.write.format('csv')
 .mode('overwrite')
 .option('header', 'true')
 .save('data/case_csv'))

23/03/08 12:17:13 WARN TaskSetManager: Stage 6 contains a task of very large size (18865 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

In [10]:
# read .csv

c = spark.read.json('data/cases_json')

                                                                                

In [11]:
c.show(1, vertical = True)

-RECORD 0------------------------------------
 SLA_days             | 7.0                  
 SLA_due_date         | 10/16/17 9:21        
 case_closed          | YES                  
 case_closed_date     | 10/18/17 14:42       
 case_id              | 1013935522           
 case_late            | YES                  
 case_opened_date     | 10/9/17 9:21         
 case_status          | Closed               
 council_district     | 1                    
 dept_division        | Waste Collection     
 num_days_late        | 2.222581019          
 request_address      | 1803  POPLAR ST W... 
 service_request_type | Automation Proper... 
 source_id            | 138810               
only showing top 1 row



In [36]:
# how to get headers to appear when reading in .csv files

cc = spark.read.csv('data/case_csv', header = True)

In [37]:
cc.show(1, vertical = True)

-RECORD 0------------------------------------
 case_id              | 1013935522           
 case_opened_date     | 10/9/17 9:21         
 case_closed_date     | 10/18/17 14:42       
 SLA_due_date         | 10/16/17 9:21        
 case_late            | YES                  
 num_days_late        | 2.222581019          
 case_closed          | YES                  
 dept_division        | Waste Collection     
 service_request_type | Automation Proper... 
 SLA_days             | 7.0                  
 case_status          | Closed               
 source_id            | 138810               
 request_address      | 1803  POPLAR ST W... 
 council_district     | 1                    
only showing top 1 row



In [14]:
# storing dept as .json

dept_df.write.json('data/dept_json', mode = 'overwrite')

In [15]:
# storing dept as .csv

dept_df.write.csv('data/dept_csv', mode = 'overwrite')

In [16]:
# storing sources as json

(source_df.write.format('json')
 .mode('overwrite')
 .option('header', 'true')
 .save('data/source_json'))


In [17]:
# storing sources as csv

(source_df.write.format('csv')
 .mode('overwrite')
 .option('header', 'true')
 .save('data/source_csv'))

## 3 / Inspect the data in your dataframes. Are the data types appropriate? Write the code necessary to cast the values to the appropriate types.

In [18]:
# source_df dtypes

source_df.printSchema()

root
 |-- index: long (nullable = true)
 |-- source_id: string (nullable = true)
 |-- source_username: string (nullable = true)



These data types look appropriate, though 'index' could be recast as a ShortType.

In [19]:
# recasting 'index'

source_df = source_df.withColumn("index", col("index").cast("short"))
source_df.printSchema()

root
 |-- index: short (nullable = true)
 |-- source_id: string (nullable = true)
 |-- source_username: string (nullable = true)



In [20]:
# case_df dtypes

case_df.printSchema()

root
 |-- case_id: long (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- SLA_due_date: string (nullable = true)
 |-- case_late: string (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: long (nullable = true)



The dates are all strings. They should be changed to datetime format.  
case_id could also be changed to ShortType and 'council_district' to string.

In [21]:
# recasting 'case_id' to ShortType and 'council_district' to string

case_df = case_df.withColumn("case_id", col("case_id").cast("short"))
case_df = case_df.withColumn('council_district', col('council_district').cast('string'))


In [22]:
case_df.printSchema()

root
 |-- case_id: short (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- SLA_due_date: string (nullable = true)
 |-- case_late: string (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = true)



In [28]:
# changing to DateTime format


cur_fmt = 'M/d/yy H:mm'

case_df = (
            case_df.withColumn('case_opened_date', to_timestamp('case_opened_date', cur_fmt))
                         .withColumn('case_closed_date', to_timestamp('case_closed_date', cur_fmt))
                         .withColumn('SLA_due_date', to_timestamp('SLA_due_date', cur_fmt))
            )

In [29]:
case_df.select("case_opened_date", "case_closed_date", "SLA_due_date").show(1)

23/03/08 12:23:45 WARN TaskSetManager: Stage 16 contains a task of very large size (18865 KiB). The maximum recommended task size is 1000 KiB.


[Stage 16:>                                                         (0 + 1) / 1]

23/03/08 12:23:50 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 16 (TID 72): Attempting to kill Python Worker
+-------------------+-------------------+-------------------+
|   case_opened_date|   case_closed_date|       SLA_due_date|
+-------------------+-------------------+-------------------+
|2018-01-01 00:42:00|2018-01-01 12:29:00|2020-09-26 00:42:00|
+-------------------+-------------------+-------------------+
only showing top 1 row



                                                                                

In [30]:
# dept_df dtypes

dept_df.printSchema()

root
 |-- dept_division: string (nullable = true)
 |-- dept_name: string (nullable = true)
 |-- standardized_dept_name: string (nullable = true)
 |-- dept_subject_to_SLA: string (nullable = true)



These all look fine.

# 1 / How old is the latest (in terms of days past SLA) currently open issue? How long has the oldest (in terms of days since opened) currently opened issue been open?