In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.getOrCreate()

In [2]:
df = spark.read.csv("data/source.csv", header=True, inferSchema=True)
df.printSchema()
df.show()


root
 |-- source_id: string (nullable = true)
 |-- source_username: string (nullable = true)

+---------+--------------------+
|source_id|     source_username|
+---------+--------------------+
|   100137|    Merlene Blodgett|
|   103582|         Carmen Cura|
|   106463|     Richard Sanchez|
|   119403|      Betty De Hoyos|
|   119555|      Socorro Quiara|
|   119868| Michelle San Miguel|
|   120752|      Eva T. Kleiber|
|   124405|           Lori Lara|
|   132408|       Leonard Silva|
|   135723|        Amy Cardenas|
|   136202|    Michelle Urrutia|
|   136979|      Leticia Garcia|
|   137943|    Pamela K. Baccus|
|   138605|        Marisa Ozuna|
|   138650|      Kimberly Green|
|   138650|Kimberly Green-Woods|
|   138793| Guadalupe Rodriguez|
|   138810|       Tawona Martin|
|   139342|     Jessica Mendoza|
|   139344|        Isis Mendoza|
+---------+--------------------+
only showing top 20 rows



In [3]:
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType([
    StructField("source_id", StringType()),
    StructField("source_username", StringType()),
])

df = spark.read.csv("data/source.csv", header=True, schema=schema)
df.show()

+---------+--------------------+
|source_id|     source_username|
+---------+--------------------+
|   100137|    Merlene Blodgett|
|   103582|         Carmen Cura|
|   106463|     Richard Sanchez|
|   119403|      Betty De Hoyos|
|   119555|      Socorro Quiara|
|   119868| Michelle San Miguel|
|   120752|      Eva T. Kleiber|
|   124405|           Lori Lara|
|   132408|       Leonard Silva|
|   135723|        Amy Cardenas|
|   136202|    Michelle Urrutia|
|   136979|      Leticia Garcia|
|   137943|    Pamela K. Baccus|
|   138605|        Marisa Ozuna|
|   138650|      Kimberly Green|
|   138650|Kimberly Green-Woods|
|   138793| Guadalupe Rodriguez|
|   138810|       Tawona Martin|
|   139342|     Jessica Mendoza|
|   139344|        Isis Mendoza|
+---------+--------------------+
only showing top 20 rows



In [4]:
df.write.json("spark-json")

In [5]:
df = spark.read.csv('data/case.csv', header=True, inferSchema=True)
df.printSchema()
df.show(5, vertical=True)

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- SLA_due_date: string (nullable = true)
 |-- case_late: string (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: integer (nullable = true)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 SLA_due_date         | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  


In [6]:
df = df.withColumnRenamed("SLA_due_date", "case_due_date")

In [7]:
df.explain()

== Physical Plan ==
*(1) Project [case_id#66, case_opened_date#67, case_closed_date#68, SLA_due_date#69 AS case_due_date#165, case_late#70, num_days_late#71, case_closed#72, dept_division#73, service_request_type#74, SLA_days#75, case_status#76, source_id#77, request_address#78, council_district#79]
+- FileScan csv [case_id#66,case_opened_date#67,case_closed_date#68,SLA_due_date#69,case_late#70,num_days_late#71,case_closed#72,dept_division#73,service_request_type#74,SLA_days#75,case_status#76,source_id#77,request_address#78,council_district#79] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/Users/cryptobandido/codeup-data-science/spark-exercises/data/case.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<case_id:int,case_opened_date:string,case_closed_date:string,SLA_due_date:string,case_late:...




In [8]:
df.groupby('case_late', 'case_closed').count().show()

+---------+-----------+------+
|case_late|case_closed| count|
+---------+-----------+------+
|       NO|        YES|735616|
|      YES|        YES| 87978|
|       NO|         NO| 11585|
|      YES|         NO|  6525|
+---------+-----------+------+



In [9]:
df = df.withColumn("case_late", expr("case_late == 'YES'"))

In [10]:
df = df.withColumn("case_closed", expr("case_closed == 'YES'"))

In [11]:
df.printSchema()
df.show(5, vertical=True)

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- case_due_date: string (nullable = true)
 |-- case_late: boolean (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: boolean (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: integer (nullable = true)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 case_due_date        | 9/26/20 0:42         
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true               

In [12]:
df.select(col('council_district'), format_string("%03d", col("council_district"))).show()

+----------------+-------------------------------------+
|council_district|format_string(%03d, council_district)|
+----------------+-------------------------------------+
|               5|                                  005|
|               3|                                  003|
|               3|                                  003|
|               3|                                  003|
|               7|                                  007|
|               7|                                  007|
|               4|                                  004|
|               4|                                  004|
|               4|                                  004|
|               4|                                  004|
|               4|                                  004|
|               4|                                  004|
|               4|                                  004|
|               4|                                  004|
|               4|             

In [13]:
df = df.withColumn("council_district", format_string("%03d", col("council_district")))

In [14]:
df.explain()


== Physical Plan ==
*(1) Project [case_id#66, case_opened_date#67, case_closed_date#68, SLA_due_date#69 AS case_due_date#165, (case_late#70 = YES) AS case_late#216, num_days_late#71, (case_closed#72 = YES) AS case_closed#231, dept_division#73, service_request_type#74, SLA_days#75, case_status#76, source_id#77, request_address#78, format_string(%03d, council_district#79) AS council_district#330]
+- FileScan csv [case_id#66,case_opened_date#67,case_closed_date#68,SLA_due_date#69,case_late#70,num_days_late#71,case_closed#72,dept_division#73,service_request_type#74,SLA_days#75,case_status#76,source_id#77,request_address#78,council_district#79] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/Users/cryptobandido/codeup-data-science/spark-exercises/data/case.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<case_id:int,case_opened_date:string,case_closed_date:string,SLA_due_date:string,case_late:...




In [15]:
df.show(5, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 case_due_date        | 9/26/20 0:42         
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 005                  
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 1/1/18 0:46          
 case_closed_date     | 1/3/18 8:11          
 case_due_date        | 1/5/18 8:30          
 case_late            | false                
 num_days_late        | -2.0126041

In [16]:
fmt = "M/d/yy H:mm"

df.select(
    "case_opened_date",
    to_timestamp("case_opened_date", fmt)
).show(5)

+----------------+-----------------------------------------------+
|case_opened_date|to_timestamp(`case_opened_date`, 'M/d/yy H:mm')|
+----------------+-----------------------------------------------+
|     1/1/18 0:42|                            2018-01-01 00:42:00|
|     1/1/18 0:46|                            2018-01-01 00:46:00|
|     1/1/18 0:48|                            2018-01-01 00:48:00|
|     1/1/18 1:29|                            2018-01-01 01:29:00|
|     1/1/18 1:34|                            2018-01-01 01:34:00|
+----------------+-----------------------------------------------+
only showing top 5 rows



In [17]:
df = df.withColumn("case_opened_date", to_timestamp("case_opened_date", fmt))
df = df.withColumn("case_closed_date", to_timestamp("case_closed_date", fmt))
df = df.withColumn("case_due_date", to_timestamp("case_due_date", fmt))

In [18]:
df.printSchema()
df.show(3, vertical=True)

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- case_due_date: timestamp (nullable = true)
 |-- case_late: boolean (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: boolean (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = false)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true      

In [19]:
df.explain()

== Physical Plan ==
*(1) Project [case_id#66, gettimestamp(case_opened_date#67, M/d/yy H:mm, Some(America/Chicago)) AS case_opened_date#429, gettimestamp(case_closed_date#68, M/d/yy H:mm, Some(America/Chicago)) AS case_closed_date#444, gettimestamp(SLA_due_date#69, M/d/yy H:mm, Some(America/Chicago)) AS case_due_date#459, (case_late#70 = YES) AS case_late#216, num_days_late#71, (case_closed#72 = YES) AS case_closed#231, dept_division#73, service_request_type#74, SLA_days#75, case_status#76, source_id#77, request_address#78, format_string(%03d, council_district#79) AS council_district#330]
+- FileScan csv [case_id#66,case_opened_date#67,case_closed_date#68,SLA_due_date#69,case_late#70,num_days_late#71,case_closed#72,dept_division#73,service_request_type#74,SLA_days#75,case_status#76,source_id#77,request_address#78,council_district#79] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/Users/cryptobandido/codeup-data-science/spark-exercises/data/case.csv], Pa

In [20]:
df.select(
    col("request_address"),
    lower(trim("request_address"))
).show(truncate=False)

+----------------------------------------+----------------------------------------+
|request_address                         |lower(trim(request_address))            |
+----------------------------------------+----------------------------------------+
|2315  EL PASO ST, San Antonio, 78207    |2315  el paso st, san antonio, 78207    |
|2215  GOLIAD RD, San Antonio, 78223     |2215  goliad rd, san antonio, 78223     |
|102  PALFREY ST W, San Antonio, 78223   |102  palfrey st w, san antonio, 78223   |
|114  LA GARDE ST, San Antonio, 78223    |114  la garde st, san antonio, 78223    |
|734  CLEARVIEW DR, San Antonio, 78228   |734  clearview dr, san antonio, 78228   |
|BANDERA RD and BRESNAHAN                |bandera rd and bresnahan                |
|10133  FIGARO CANYON, San Antonio, 78251|10133  figaro canyon, san antonio, 78251|
|10133  FIGARO CANYON, San Antonio, 78251|10133  figaro canyon, san antonio, 78251|
|10133  FIGARO CANYON, San Antonio, 78251|10133  figaro canyon, san antonio,

In [21]:
df = df.withColumn("request_address", lower(trim("request_address")))

In [22]:
df.printSchema()
df.show(3, vertical=True, truncate=False)

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- case_due_date: timestamp (nullable = true)
 |-- case_late: boolean (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: boolean (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = false)

-RECORD 0-----------------------------------------------------
 case_id              | 1014127332                            
 case_opened_date     | 2018-01-01 00:42:00                   
 case_closed_date     | 2018-01-01 12:29:00                   
 case_due_date        | 2020-09-26 00:42:00                   
 case_late            | false            

In [23]:
df.select(
    "request_address",
    regexp_extract("request_address", r"(\d+)$", 1)
).show(truncate=False)

+----------------------------------------+------------------------------------------+
|request_address                         |regexp_extract(request_address, (\d+)$, 1)|
+----------------------------------------+------------------------------------------+
|2315  el paso st, san antonio, 78207    |78207                                     |
|2215  goliad rd, san antonio, 78223     |78223                                     |
|102  palfrey st w, san antonio, 78223   |78223                                     |
|114  la garde st, san antonio, 78223    |78223                                     |
|734  clearview dr, san antonio, 78228   |78228                                     |
|bandera rd and bresnahan                |                                          |
|10133  figaro canyon, san antonio, 78251|78251                                     |
|10133  figaro canyon, san antonio, 78251|78251                                     |
|10133  figaro canyon, san antonio, 78251|78251       

In [24]:
df = df.withColumn("zipcode", regexp_extract("request_address", r"(\d+)$", 1))


In [25]:
df

DataFrame[case_id: int, case_opened_date: timestamp, case_closed_date: timestamp, case_due_date: timestamp, case_late: boolean, num_days_late: double, case_closed: boolean, dept_division: string, service_request_type: string, SLA_days: double, case_status: string, source_id: string, request_address: string, council_district: string, zipcode: string]

In [26]:
df.show(5, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  el paso st,... 
 council_district     | 005                  
 zipcode              | 78207                
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 2018-01-01 00:46:00  
 case_closed_date     | 2018-01-03 08:11:00  
 case_due_date        | 2018-01-05 08:30:00  
 case_late            | false     

In [27]:
# case_lifetime
#   - if case is closed then diff between close and open dates -- days_to_close
#   - else diff between open date and now -- case_age

df.select(
    "case_opened_date",
    "case_closed_date",
    "case_closed",
    datediff(current_timestamp(), "case_opened_date").alias("case_age"),
    datediff("case_closed_date", "case_opened_date").alias("days_to_close"),
).withColumn(
    "case_lifetime",
    when(col("case_closed"), col("days_to_close")).otherwise(col("case_age")),
).show(10)


+-------------------+-------------------+-----------+--------+-------------+-------------+
|   case_opened_date|   case_closed_date|case_closed|case_age|days_to_close|case_lifetime|
+-------------------+-------------------+-----------+--------+-------------+-------------+
|2018-01-01 00:42:00|2018-01-01 12:29:00|       true|    1064|            0|            0|
|2018-01-01 00:46:00|2018-01-03 08:11:00|       true|    1064|            2|            2|
|2018-01-01 00:48:00|2018-01-02 07:57:00|       true|    1064|            1|            1|
|2018-01-01 01:29:00|2018-01-02 08:13:00|       true|    1064|            1|            1|
|2018-01-01 01:34:00|2018-01-01 13:29:00|       true|    1064|            0|            0|
|2018-01-01 06:28:00|2018-01-01 14:38:00|       true|    1064|            0|            0|
|2018-01-01 06:57:00|2018-01-02 15:32:00|       true|    1064|            1|            1|
|2018-01-01 06:58:00|2018-01-02 15:32:00|       true|    1064|            1|            1|

In [28]:
df = df.withColumn("case_age", datediff(current_timestamp(), "case_opened_date"))
df = df.withColumn("days_to_close", datediff("case_closed_date", "case_opened_date"))
df = df.withColumn("case_lifetime", when(col("case_closed"), col("days_to_close")).otherwise(col("case_age")))