In [1]:
import pyspark
from pyspark.sql.functions import *

In [2]:
sum

<function pyspark.sql.functions._create_function.<locals>._(col)>

In [3]:
# to get original sum function
from builtins import sum as builtin_sum

In [4]:
builtin_sum([1, 2, 3, 4, 5])

15

In [5]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [6]:
spark

In [7]:
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType(
    [
        StructField("source_id", StringType()),
        StructField("source_username", StringType()),
    ]
)
# this is programmatic documentation and makes the data reading process faster

In [8]:
df = spark.read.csv("data/case.csv", header=True)

In [9]:
df.show()

+----------+----------------+----------------+------------+---------+-------------------+-----------+----------------+--------------------+------------------+-----------+---------+--------------------+----------------+
|   case_id|case_opened_date|case_closed_date|SLA_due_date|case_late|      num_days_late|case_closed|   dept_division|service_request_type|          SLA_days|case_status|source_id|     request_address|council_district|
+----------+----------------+----------------+------------+---------+-------------------+-----------+----------------+--------------------+------------------+-----------+---------+--------------------+----------------+
|1014127332|     1/1/18 0:42|    1/1/18 12:29|9/26/20 0:42|       NO| -998.5087616000001|        YES|Field Operations|        Stray Animal|             999.0|     Closed| svcCRMLS|2315  EL PASO ST,...|               5|
|1014127333|     1/1/18 0:46|     1/3/18 8:11| 1/5/18 8:30|       NO|-2.0126041669999997|        YES|     Storm Water|Remova

In [10]:
df.printSchema()

root
 |-- case_id: string (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- SLA_due_date: string (nullable = true)
 |-- case_late: string (nullable = true)
 |-- num_days_late: string (nullable = true)
 |-- case_closed: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: string (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = true)



In [11]:
# rename SLA_due_date to case_due_date

In [12]:
df.groupby('case_late').pivot('case_closed').count().show()

+---------+-----+------+
|case_late|   NO|   YES|
+---------+-----+------+
|      YES| 6525| 87978|
|       NO|11585|735616|
+---------+-----+------+



In [13]:
# .withColumn to transform columns
df.withColumn('case_late', col('case_late') == 'YES').show(1, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 SLA_due_date         | 9/26/20 0:42         
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
only showing top 1 row



In [14]:
df = (
    df.withColumn('case_late', col('case_late') == 'YES')
    .withColumn('case_closed', col('case_closed') == 'YES')
)

In [15]:
type(df)

pyspark.sql.dataframe.DataFrame

In [16]:
df.dtypes

[('case_id', 'string'),
 ('case_opened_date', 'string'),
 ('case_closed_date', 'string'),
 ('SLA_due_date', 'string'),
 ('case_late', 'boolean'),
 ('num_days_late', 'string'),
 ('case_closed', 'boolean'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'string'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'string')]

In [17]:
# df.withColumn('council_district', format_string('%04d', col('council_district'))).show(2, vertical=True)

In [18]:
df.withColumn("case_opened_date", to_timestamp(col("case_opened_date"), "M/d/yy H:mm")).show(2, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 1/1/18 12:29         
 SLA_due_date         | 9/26/20 0:42         
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 2018-01-01 00:46:00  
 case_closed_date     | 1/3/18 8:11          
 SLA_due_date         | 1/5/18 8:30          
 case_late            | false                
 num_days_late        | -2.0126041

In [19]:
df = (
    df.withColumn("case_opened_date", to_timestamp(col("case_opened_date"), "M/d/yy H:mm"))
    .withColumn("case_closed_date", to_timestamp(col("case_closed_date"), "M/d/yy H:mm"))
    .withColumn("SLA_due_date", to_timestamp(col("SLA_due_date"), "M/d/yy H:mm"))
)

In [20]:
df.dtypes

[('case_id', 'string'),
 ('case_opened_date', 'timestamp'),
 ('case_closed_date', 'timestamp'),
 ('SLA_due_date', 'timestamp'),
 ('case_late', 'boolean'),
 ('num_days_late', 'string'),
 ('case_closed', 'boolean'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'string'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'string')]

In [21]:
df.show(2, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 SLA_due_date         | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 2018-01-01 00:46:00  
 case_closed_date     | 2018-01-03 08:11:00  
 SLA_due_date         | 2018-01-05 08:30:00  
 case_late            | false                
 num_days_late        | -2.0126041

In [22]:
# .trim == .strip in base python
# removes leading and trailing whitespace
df = df.withColumn("request_address", lower(trim(col("request_address"))))

In [23]:
df.show(2, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 SLA_due_date         | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  el paso st,... 
 council_district     | 5                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 2018-01-01 00:46:00  
 case_closed_date     | 2018-01-03 08:11:00  
 SLA_due_date         | 2018-01-05 08:30:00  
 case_late            | false                
 num_days_late        | -2.0126041

In [24]:
df.select("request_address").show(2, truncate=False)

+------------------------------------+
|request_address                     |
+------------------------------------+
|2315  el paso st, san antonio, 78207|
|2215  goliad rd, san antonio, 78223 |
+------------------------------------+
only showing top 2 rows



In [25]:
df.dtypes

[('case_id', 'string'),
 ('case_opened_date', 'timestamp'),
 ('case_closed_date', 'timestamp'),
 ('SLA_due_date', 'timestamp'),
 ('case_late', 'boolean'),
 ('num_days_late', 'string'),
 ('case_closed', 'boolean'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'string'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'string')]

In [26]:
df = df.withColumn("zipcode", regexp_extract(col("request_address"), r"\d+$", 0))

In [27]:
df.show(3, vertical=True, truncate=False)

-RECORD 0-----------------------------------------------------
 case_id              | 1014127332                            
 case_opened_date     | 2018-01-01 00:42:00                   
 case_closed_date     | 2018-01-01 12:29:00                   
 SLA_due_date         | 2020-09-26 00:42:00                   
 case_late            | false                                 
 num_days_late        | -998.5087616000001                    
 case_closed          | true                                  
 dept_division        | Field Operations                      
 service_request_type | Stray Animal                          
 SLA_days             | 999.0                                 
 case_status          | Closed                                
 source_id            | svcCRMLS                              
 request_address      | 2315  el paso st, san antonio, 78207  
 council_district     | 5                                     
 zipcode              | 78207                          

Case lifetime: how long it took to close the case or how long hs the case been open

In [28]:
df = (
    df.withColumn("case_age", datediff(current_timestamp(), "case_opened_date"))
    .withColumn("days_to_closed", datediff("case_closed_date", "case_opened_date"))
    .withColumn("case_lifetime", when(col("case_closed"), col("days_to_closed")).otherwise(col("case_age")))
    .drop("case_age", "days_to_closed")
)

In [29]:
df.show(3, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 SLA_due_date         | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  el paso st,... 
 council_district     | 5                    
 zipcode              | 78207                
 case_lifetime        | 0                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 2018-01-01 00:46:00  
 case_closed_date     | 2018-01-03 08:11:00  
 SLA_due_date         | 2018-01-05

In [30]:
df.sample(fraction=.01).show(3, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 2018-01-01 00:46:00  
 case_closed_date     | 2018-01-03 08:11:00  
 SLA_due_date         | 2018-01-05 08:30:00  
 case_late            | false                
 num_days_late        | -2.0126041669999997  
 case_closed          | true                 
 dept_division        | Storm Water          
 service_request_type | Removal Of Obstru... 
 SLA_days             | 4.322222222          
 case_status          | Closed               
 source_id            | svcCRMSS             
 request_address      | 2215  goliad rd, ... 
 council_district     | 3                    
 zipcode              | 78223                
 case_lifetime        | 2                    
-RECORD 1------------------------------------
 case_id              | 1014127467           
 case_opened_date     | 2018-01-01 12:28:00  
 case_closed_date     | 2018-01-01 12:44:00  
 SLA_due_date         | 2020-09-26

In [31]:
dept = spark.read.csv("data/dept.csv", header=True, inferSchema=True)
dept.show()

+--------------------+--------------------+----------------------+-------------------+
|       dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+--------------------+--------------------+----------------------+-------------------+
|     311 Call Center|    Customer Service|      Customer Service|                YES|
|               Brush|Solid Waste Manag...|           Solid Waste|                YES|
|     Clean and Green|Parks and Recreation|    Parks & Recreation|                YES|
|Clean and Green N...|Parks and Recreation|    Parks & Recreation|                YES|
|    Code Enforcement|Code Enforcement ...|  DSD/Code Enforcement|                YES|
|Code Enforcement ...|Code Enforcement ...|  DSD/Code Enforcement|                YES|
|Code Enforcement ...|                null|  DSD/Code Enforcement|                YES|
|   Dangerous Premise|Code Enforcement ...|  DSD/Code Enforcement|                YES|
|Dangerous Premise...|Code Enforcement ...|

In [32]:
df = df.join(dept, "dept_division", "left")

In [33]:
df

DataFrame[dept_division: string, case_id: string, case_opened_date: timestamp, case_closed_date: timestamp, SLA_due_date: timestamp, case_late: boolean, num_days_late: string, case_closed: boolean, service_request_type: string, SLA_days: string, case_status: string, source_id: string, request_address: string, council_district: string, zipcode: string, case_lifetime: int, dept_name: string, standardized_dept_name: string, dept_subject_to_SLA: string]

In [34]:
df.show(3, vertical=True)

-RECORD 0--------------------------------------
 dept_division          | Field Operations     
 case_id                | 1014127332           
 case_opened_date       | 2018-01-01 00:42:00  
 case_closed_date       | 2018-01-01 12:29:00  
 SLA_due_date           | 2020-09-26 00:42:00  
 case_late              | false                
 num_days_late          | -998.5087616000001   
 case_closed            | true                 
 service_request_type   | Stray Animal         
 SLA_days               | 999.0                
 case_status            | Closed               
 source_id              | svcCRMLS             
 request_address        | 2315  el paso st,... 
 council_district       | 5                    
 zipcode                | 78207                
 case_lifetime          | 0                    
 dept_name              | Animal Care Services 
 standardized_dept_name | Animal Care Services 
 dept_subject_to_SLA    | YES                  
-RECORD 1-------------------------------

In [35]:
# data splitting with spark

train, test = df.randomSplit([.8, .2])

In [36]:
train.count()

673323

In [37]:
test.count()

168381

In [38]:
train, validate, test = df.randomSplit([.70, .15, .15], seed=123)