In [3]:
#import spark for python! 
import pyspark

#create the spark session
spark = pyspark.sql.SparkSession.builder.getOrCreate()
spark

In [21]:
# imports
import pandas as pd
import numpy as np
import os
import env
from pydataset import data
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from sqlalchemy import text, create_engine
from pyspark.sql.types import StructType, StructField, StringType

In [5]:
case = spark.read.csv("case.csv", sep=',', header=True, inferSchema=True)
dept = spark.read.csv("dept.csv", sep=',', header=True, inferSchema=True)
source = spark.read.csv("source.csv", sep=',', header=True, inferSchema=True)

                                                                                

In [6]:
case.show(2,vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 SLA_due_date         | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 1/1/18 0:46          
 case_closed_date     | 1/3/18 8:11          
 SLA_due_date         | 1/5/18 8:30          
 case_late            | NO                   
 num_days_late        | -2.0126041

In [7]:
case.columns

['case_id',
 'case_opened_date',
 'case_closed_date',
 'SLA_due_date',
 'case_late',
 'num_days_late',
 'case_closed',
 'dept_division',
 'service_request_type',
 'SLA_days',
 'case_status',
 'source_id',
 'request_address',
 'council_district']

In [8]:
source.show(2,vertical=True)

-RECORD 0---------------------------
 source_id       | 100137           
 source_username | Merlene Blodgett 
-RECORD 1---------------------------
 source_id       | 103582           
 source_username | Carmen Cura      
only showing top 2 rows



In [9]:
source.write.json("data/sources_json", mode="overwrite")

In [10]:
source.write.csv("data/sources_csv", header=True, mode="overwrite")

In [11]:
os.listdir("data")

['sources_csv', 'sources_json']

In [12]:
os.listdir("data/sources_csv")

['.part-00000-1ab04b47-ff47-47b4-9b2b-10e3eb6ba5ef-c000.csv.crc',
 'part-00000-1ab04b47-ff47-47b4-9b2b-10e3eb6ba5ef-c000.csv',
 '._SUCCESS.crc',
 '_SUCCESS']

In [13]:
os.listdir("data/sources_json")

['._SUCCESS.crc',
 'part-00000-ca477cc1-06e9-4158-b903-d819fd63d8a2-c000.json',
 '.part-00000-ca477cc1-06e9-4158-b903-d819fd63d8a2-c000.json.crc',
 '_SUCCESS']

In [14]:
case.dtypes

[('case_id', 'int'),
 ('case_opened_date', 'string'),
 ('case_closed_date', 'string'),
 ('SLA_due_date', 'string'),
 ('case_late', 'string'),
 ('num_days_late', 'double'),
 ('case_closed', 'string'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'int')]

In [15]:
case.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- SLA_due_date: string (nullable = true)
 |-- case_late: string (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: integer (nullable = true)



In [16]:
#rename col
case = case.withColumnRenamed("SLA_due_date", "case_due_date")

In [17]:
#convert data types

case = (
    case.withColumn("case_late", col("case_late")=="YES")
    .withColumn("case_closed", col("case_closed")=="YES")
)

case = case.withColumn("council_district", format_string("%03d", col("council_district")))

In [22]:
fmt = "M/d/yy H:mm" #saved to the underlying stored data

case = (
    case.withColumn('case_opened_date', to_timestamp(col('case_opened_date'), fmt))
    .withColumn('case_closed_date', to_timestamp(col('case_closed_date'), fmt))
    .withColumn('case_due_date', to_timestamp(col('case_due_date'), fmt))

SyntaxError: unexpected EOF while parsing (922237865.py, line 6)

In [19]:
# Cleanup text data
case = case.withColumn('request_address', lower(trim(col('request_address'))))

In [20]:
case.select("case_closed_date").sort(desc("case_closed_date")).first()

                                                                                

Row(case_closed_date='9/9/17 9:56')

In [23]:
case.select(
    datediff(to_timestamp(lit("2018-08-08 10:38:00")), case.case_opened_date)
).show(3)

+-------------------------------------------------------------+
|datediff(to_timestamp(2018-08-08 10:38:00), case_opened_date)|
+-------------------------------------------------------------+
|                                                         null|
|                                                         null|
|                                                         null|
+-------------------------------------------------------------+
only showing top 3 rows



In [24]:
case.select(
    datediff(to_timestamp(current_timestamp()), case.case_opened_date)
).show(3)

+-------------------------------------------------------------+
|datediff(to_timestamp(current_timestamp()), case_opened_date)|
+-------------------------------------------------------------+
|                                                         null|
|                                                         null|
|                                                         null|
+-------------------------------------------------------------+
only showing top 3 rows



In [25]:
# Extract zipcode
case = case.withColumn('zipcode', regexp_extract(col('request_address'), r'\d+$', 0))

# Create a `case_lifetime` feature
case = (
    case.withColumn('case_age', datediff(to_timestamp(lit('2018-08-08 10:38:00')), 'case_opened_date'))
    .withColumn('days_to_closed', datediff('case_closed_date', 'case_opened_date'))
    .withColumn('case_lifetime', when(
        col('case_closed'), col('days_to_closed')).otherwise(col('case_age')))
    .withColumn('curr_case_age', datediff(current_timestamp(), 'case_opened_date'))
    .drop('case_age', 'days_to_closed')
)

In [26]:
case.show(2, vertical=True, truncate=False)

-RECORD 0----------------------------------------------------
 case_id              | 1014127332                           
 case_opened_date     | 1/1/18 0:42                          
 case_closed_date     | 1/1/18 12:29                         
 case_due_date        | 9/26/20 0:42                         
 case_late            | false                                
 num_days_late        | -998.5087616000001                   
 case_closed          | true                                 
 dept_division        | Field Operations                     
 service_request_type | Stray Animal                         
 SLA_days             | 999.0                                
 case_status          | Closed                               
 source_id            | svcCRMLS                             
 request_address      | 2315  el paso st, san antonio, 78207 
 council_district     | 005                                  
 zipcode              | 78207                                
 case_li

In [27]:
case.filter("! case_closed")\
    .select('case_id', 'case_lifetime', 'case_opened_date', 'case_closed_date', 'num_days_late')\
    .sort(desc("num_days_late")).show(5)

                                                                                

+----------+-------------+----------------+----------------+------------------+
|   case_id|case_lifetime|case_opened_date|case_closed_date|     num_days_late|
+----------+-------------+----------------+----------------+------------------+
|1013225646|         null|    1/1/17 13:48|            null|       348.6458333|
|1013225651|         null|    1/1/17 13:57|            null|       348.6458333|
|1013226813|         null|    1/2/17 11:26|            null|348.52356480000003|
|1013229328|         null|    1/3/17 10:01|            null|347.58256939999995|
|1013236238|         null|    1/5/17 14:39|            null|       345.3894213|
+----------+-------------+----------------+----------------+------------------+
only showing top 5 rows



In [28]:
case.filter("! case_closed")\
    .select('case_id', 'case_lifetime', 'curr_case_age', 'case_opened_date', 'case_closed_date', 'num_days_late')\
    .sort(desc("num_days_late")).show(5)

                                                                                

+----------+-------------+-------------+----------------+----------------+------------------+
|   case_id|case_lifetime|curr_case_age|case_opened_date|case_closed_date|     num_days_late|
+----------+-------------+-------------+----------------+----------------+------------------+
|1013225646|         null|         null|    1/1/17 13:48|            null|       348.6458333|
|1013225651|         null|         null|    1/1/17 13:57|            null|       348.6458333|
|1013226813|         null|         null|    1/2/17 11:26|            null|348.52356480000003|
|1013229328|         null|         null|    1/3/17 10:01|            null|347.58256939999995|
|1013236238|         null|         null|    1/5/17 14:39|            null|       345.3894213|
+----------+-------------+-------------+----------------+----------------+------------------+
only showing top 5 rows



In [29]:
case.filter(case.service_request_type == lit("Stray Animal")).count()

                                                                                

26760

In [30]:
case.filter(case.dept_division == "Field Operations").filter(case.service_request_type != "Officer Standby").count()

                                                                                

113902

In [31]:
case = case.withColumn('council_district', col('council_district').cast('string'))

In [32]:
case.withColumn("year", year("case_closed_date")).select("service_request_type", 'case_closed_date', 'year')\
.show(5)

+--------------------+----------------+----+
|service_request_type|case_closed_date|year|
+--------------------+----------------+----+
|        Stray Animal|    1/1/18 12:29|null|
|Removal Of Obstru...|     1/3/18 8:11|null|
|Removal Of Obstru...|     1/2/18 7:57|null|
|Front Or Side Yar...|     1/2/18 8:13|null|
|Animal Cruelty(Cr...|    1/1/18 13:29|null|
+--------------------+----------------+----+
only showing top 5 rows



In [33]:
case.withColumn("num_hours_late", round(expr("num_days_late * 24"), 1))\
.select("num_days_late", 'num_hours_late').show(5)

+-------------------+--------------+
|      num_days_late|num_hours_late|
+-------------------+--------------+
| -998.5087616000001|      -23964.2|
|-2.0126041669999997|         -48.3|
|       -3.022337963|         -72.5|
|       -15.01148148|        -360.3|
|0.37216435200000003|           8.9|
+-------------------+--------------+
only showing top 5 rows



In [34]:
df = case.join(source, "source_id", "left").drop(source.source_id)

df.show(2, vertical=True)

-RECORD 0------------------------------------
 source_id            | svcCRMLS             
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 case_due_date        | 9/26/20 0:42         
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 request_address      | 2315  el paso st,... 
 council_district     | 005                  
 zipcode              | 78207                
 case_lifetime        | null                 
 curr_case_age        | null                 
 source_username      | svcCRMLS             
-RECORD 1------------------------------------
 source_id            | svcCRMSS             
 case_id              | 1014127333

In [35]:
df = df.join(dept, "dept_division", "left")\
    .drop(dept.dept_division)\
    .drop(dept.dept_name)\
    .withColumnRenamed('standardized_dept_name', 'department')\
    .withColumn('dept_subject_to_SLA', col('dept_subject_to_SLA') == 'YES')
df.show(2, vertical=True)

-RECORD 0------------------------------------
 dept_division        | Field Operations     
 source_id            | svcCRMLS             
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 case_due_date        | 9/26/20 0:42         
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 request_address      | 2315  el paso st,... 
 council_district     | 005                  
 zipcode              | 78207                
 case_lifetime        | null                 
 curr_case_age        | null                 
 source_username      | svcCRMLS             
 department           | Animal Care Services 
 dept_subject_to_SLA  | true                 
-RECORD 1-------------------------

In [36]:
df.filter(df.source_id == "null").count()

0

In [37]:
df.filter(df.source_id == "").count()

0

23/07/10 17:48:32 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 946261 ms exceeds timeout 120000 ms
23/07/10 17:48:32 WARN SparkContext: Killing executors is not supported by current scheduler.
23/07/10 18:04:18 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:322)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:641)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1111)
	at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:244)
	at sc