- imports the pyspark module, which provides the Python API for interacting with Apache Spark
- creates a SparkSession object named spark. SparkSession is the entry point to programming Spark with the Dataset and DataFrame API. 
- performs an operation on the spark object. The range() function generates a DataFrame with a single column named "id" that contains values from 0 to 4 (since we specified 5 as the argument). The show() method is then called on the DataFrame to display its contents

In [1]:
from pyspark.sql.functions import col, to_timestamp

In [2]:
import pyspark
spark = pyspark.sql.SparkSession.builder.getOrCreate()
spark.range(5).show()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/22 12:30:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[Stage 0:>                                                          (0 + 8) / 8]

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



                                                                                

- imports the module pyspark.sql.functions and assigns it an alias F.

In [3]:
import pyspark.sql.functions as F

In [4]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [5]:
spark

In [6]:
import pandas as pd
import numpy as np

np.random.seed(456)

pandas_dataframe = pd.DataFrame(
    dict(n=np.arange(20), group=np.random.choice(list("abc"), 20))
)
pandas_dataframe

Unnamed: 0,n,group
0,0,b
1,1,b
2,2,c
3,3,a
4,4,c
5,5,c
6,6,a
7,7,b
8,8,a
9,9,b


- creates a DataFrame in Apache Spark using a pandas DataFrame as input.

In [None]:
df = spark.createDataFrame(pandas_dataframe)
df

In [None]:
df.show(5)

In [None]:
df.describe()

- The .describe() function is a method that calculates various summary statistics of the data in the data frame. These statistics include count, mean, standard deviation, minimum, maximum, and quartiles. It provides a quick overview of the distribution of numerical columns in the data frame.
- The .show() function is used to display the result of the describe() function. It prints the summary statistics to the console or output area.

- calculates the summary statistics of the data frame df and displays them. This can be useful for understanding the basic characteristics of the data, such as the range of values, central tendency, and spread.

In [None]:
df.describe().show()

- The pydataset library is a package that provides easy access to various popular datasets for data analysis and modeling. The data function is used to load a specific dataset by passing its name as an argument.

- In this case, the "mpg" dataset is being loaded using the data("mpg") call. The "mpg" dataset typically contains information about various car models, including attributes such as miles per gallon (mpg), cylinders, horsepower, weight, etc.

- The spark.createDataFrame() method is then used to create a Spark DataFrame from the loaded dataset. Spark is a distributed computing system that provides high-performance processing for large-scale data. By creating a DataFrame, you can perform various operations and transformations on the data using Spark's capabilities.

- Finally, the mpg.show(5) statement is used to display the first 5 rows of the DataFrame mpg. This command will output the data in a tabular format, showing the values for each column in the DataFrame.

In [None]:
from pydataset import data

mpg = spark.createDataFrame(data("mpg"))
mpg.show(5)

- The code snippet imports the `data` function from the `pydataset` library. This function allows us to access various built-in datasets.

- The `mpg.select(mpg.hwy, mpg.cty, mpg.model)` statement selects specific columns from the dataset called `mpg`. The columns selected are `hwy`, `cty`, and `model`.

- The `show(10)` function is then applied to the selected columns. This function displays the first 10 rows of the selected columns from the `mpg` dataset. The output will be presented as a table-like structure.

In [None]:
mpg.select(mpg.hwy, mpg.cty, mpg.model).show(10)

- The code `mpg.select(mpg.hwy, mpg.hwy + 1)` selects two columns from the `mpg` dataset: `mpg.hwy` and `mpg.hwy + 1`. 

- The `mpg.hwy` column represents the highway miles per gallon values in the dataset.

- The expression `mpg.hwy + 1` adds 1 to each value in the `mpg.hwy` column.

- The `show(5)` function is called on the selected columns, which displays the first 5 rows of the resulting dataset.

In [None]:
mpg.select(mpg.hwy, mpg.hwy + 1).show(5)

In [None]:
mpg.select(mpg.hwy.alias("highway_mileage")).show(5)

> New Section

In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

In [8]:
# Read the 311.csv and create a Spark DataFrame named "df"
spark = SparkSession.builder.getOrCreate()
df1 = spark.read.csv('311.csv', header=True, inferSchema=True)

                                                                                

In [9]:
# Write to local disk in JSON format
df1.write.json('311_sources_json')

                                                                                

In [10]:
print("--- Before handling dates")
df1.select("SLA_due_date", "case_closed_date", "case_opened_date").show(5)

--- Before handling dates
+------------+----------------+----------------+
|SLA_due_date|case_closed_date|case_opened_date|
+------------+----------------+----------------+
|9/26/20 0:42|    1/1/18 12:29|     1/1/18 0:42|
| 1/5/18 8:30|     1/3/18 8:11|     1/1/18 0:46|
| 1/5/18 8:30|     1/2/18 7:57|     1/1/18 0:48|
|1/17/18 8:30|     1/2/18 8:13|     1/1/18 1:29|
| 1/1/18 4:34|    1/1/18 13:29|     1/1/18 1:34|
+------------+----------------+----------------+
only showing top 5 rows



In [11]:
fmt = "M/d/yy H:mm"
df1 = (
    df1.withColumn("SLA_due_date", to_timestamp("SLA_due_date", fmt))
    .withColumn("case_closed_date", to_timestamp("case_closed_date", fmt))
    .withColumn("case_opened_date", to_timestamp("case_opened_date", fmt)))

In [13]:
print("--- After")
df1.select("SLA_due_date", "case_opened_date", "case_closed_date").show(5)

--- After
+-------------------+-------------------+-------------------+
|       SLA_due_date|   case_opened_date|   case_closed_date|
+-------------------+-------------------+-------------------+
|2020-09-26 00:42:00|2018-01-01 00:42:00|2018-01-01 12:29:00|
|2018-01-05 08:30:00|2018-01-01 00:46:00|2018-01-03 08:11:00|
|2018-01-05 08:30:00|2018-01-01 00:48:00|2018-01-02 07:57:00|
|2018-01-17 08:30:00|2018-01-01 01:29:00|2018-01-02 08:13:00|
|2018-01-01 04:34:00|2018-01-01 01:34:00|2018-01-01 13:29:00|
+-------------------+-------------------+-------------------+
only showing top 5 rows



In [15]:
# Inspect the data in your dataframes
df1.show()

+----------+-------------------+-------------------+-------------------+---------+-------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|   case_id|   case_opened_date|   case_closed_date|       SLA_due_date|case_late|num_days_late|case_closed|   dept_division|service_request_type|   SLA_days|case_status|source_id|     request_address|council_district|
+----------+-------------------+-------------------+-------------------+---------+-------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|1014127332|2018-01-01 00:42:00|2018-01-01 12:29:00|2020-09-26 00:42:00|       NO| -998.5087616|        YES|Field Operations|        Stray Animal|      999.0|     Closed| svcCRMLS|2315  EL PASO ST,...|               5|
|1014127333|2018-01-01 00:46:00|2018-01-03 08:11:00|2018-01-05 08:30:00|       NO| -2.012604167|        YES|     Storm Water

In [16]:
# Check the data types in the DataFrame
df1.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- SLA_due_date: timestamp (nullable = true)
 |-- case_late: string (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: integer (nullable = true)



In [17]:
# Cast values to the appropriate types
df1 = df1.withColumn("case_id", col("case_id").cast("integer"))
df1 = df1.withColumn("case_late", col("case_late").cast("boolean"))
df1 = df1.withColumn("num_days_late", col("num_days_late").cast("float"))
df1 = df1.withColumn("case_closed", col("case_closed").cast("boolean"))
df1 = df1.withColumn("SLA_days", col("SLA_days").cast("float"))
df1 = df1.withColumn("council_district", col("council_district").cast("integer"))

In [18]:
# Updated data types
df1.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- SLA_due_date: timestamp (nullable = true)
 |-- case_late: boolean (nullable = true)
 |-- num_days_late: float (nullable = true)
 |-- case_closed: boolean (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: float (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: integer (nullable = true)



In [19]:
df1.show(5, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 SLA_due_date         | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5088            
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 2018-01-01 00:46:00  
 case_closed_date     | 2018-01-03 08:11:00  
 SLA_due_date         | 2018-01-05 08:30:00  
 case_late            | false                
 num_days_late        | -2.0126042

> Dept

In [20]:
# Read the 311.csv and create a Spark DataFrame named "df"
spark = SparkSession.builder.getOrCreate()
df2 = spark.read.csv('dept.csv', header=True, inferSchema=True)

In [21]:
# Write to local disk in JSON format
df2.write.json('dept_sources_json')

In [22]:
# Inspect the data in your dataframes
df2.show()

+--------------------+--------------------+----------------------+-------------------+
|       dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+--------------------+--------------------+----------------------+-------------------+
|     311 Call Center|    Customer Service|      Customer Service|                YES|
|               Brush|Solid Waste Manag...|           Solid Waste|                YES|
|     Clean and Green|Parks and Recreation|    Parks & Recreation|                YES|
|Clean and Green N...|Parks and Recreation|    Parks & Recreation|                YES|
|    Code Enforcement|Code Enforcement ...|  DSD/Code Enforcement|                YES|
|Code Enforcement ...|Code Enforcement ...|  DSD/Code Enforcement|                YES|
|Code Enforcement ...|                null|  DSD/Code Enforcement|                YES|
|   Dangerous Premise|Code Enforcement ...|  DSD/Code Enforcement|                YES|
|Dangerous Premise...|Code Enforcement ...|

In [23]:
# Check the data types in the DataFrame
df2.printSchema()

root
 |-- dept_division: string (nullable = true)
 |-- dept_name: string (nullable = true)
 |-- standardized_dept_name: string (nullable = true)
 |-- dept_subject_to_SLA: string (nullable = true)



In [24]:
df2 = df2.withColumn("dept_division", col("dept_division").cast("string"))
df2 = df2.withColumn("dept_name", col("dept_name").cast("string"))
df2 = df2.withColumn("standardized_dept_name", col("standardized_dept_name").cast("string"))
df2 = df2.withColumn("dept_subject_to_SLA", col("dept_subject_to_SLA").cast("boolean"))

In [25]:
# Check the data types in the DataFrame
df2.printSchema()

root
 |-- dept_division: string (nullable = true)
 |-- dept_name: string (nullable = true)
 |-- standardized_dept_name: string (nullable = true)
 |-- dept_subject_to_SLA: boolean (nullable = true)



In [26]:
df2.show()

+--------------------+--------------------+----------------------+-------------------+
|       dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+--------------------+--------------------+----------------------+-------------------+
|     311 Call Center|    Customer Service|      Customer Service|               true|
|               Brush|Solid Waste Manag...|           Solid Waste|               true|
|     Clean and Green|Parks and Recreation|    Parks & Recreation|               true|
|Clean and Green N...|Parks and Recreation|    Parks & Recreation|               true|
|    Code Enforcement|Code Enforcement ...|  DSD/Code Enforcement|               true|
|Code Enforcement ...|Code Enforcement ...|  DSD/Code Enforcement|               true|
|Code Enforcement ...|                null|  DSD/Code Enforcement|               true|
|   Dangerous Premise|Code Enforcement ...|  DSD/Code Enforcement|               true|
|Dangerous Premise...|Code Enforcement ...|

In [None]:
> Join/merge df(dept) tables on df1(311)dept_division

In [27]:
# Join df2 and df1 on dept_division column
joined_df = df2.join(df1, "dept_division")

In [28]:
# Show the resulting DataFrame
joined_df.show(5)

+----------------+--------------------+----------------------+-------------------+----------+-------------------+-------------------+-------------------+---------+-------------+-----------+--------------------+---------+-----------+---------+--------------------+----------------+
|   dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|   case_id|   case_opened_date|   case_closed_date|       SLA_due_date|case_late|num_days_late|case_closed|service_request_type| SLA_days|case_status|source_id|     request_address|council_district|
+----------------+--------------------+----------------------+-------------------+----------+-------------------+-------------------+-------------------+---------+-------------+-----------+--------------------+---------+-----------+---------+--------------------+----------------+
|Field Operations|Animal Care Services|  Animal Care Services|               true|1014127332|2018-01-01 00:42:00|2018-01-01 12:29:00|2020-09-26 00:42:00|    

In [30]:
df3 = joined_df

In [31]:
df3.printSchema()

root
 |-- dept_division: string (nullable = true)
 |-- dept_name: string (nullable = true)
 |-- standardized_dept_name: string (nullable = true)
 |-- dept_subject_to_SLA: boolean (nullable = true)
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- SLA_due_date: timestamp (nullable = true)
 |-- case_late: boolean (nullable = true)
 |-- num_days_late: float (nullable = true)
 |-- case_closed: boolean (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: float (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: integer (nullable = true)



In [32]:
df3.show(2, vertical=True)

-RECORD 0--------------------------------------
 dept_division          | Field Operations     
 dept_name              | Animal Care Services 
 standardized_dept_name | Animal Care Services 
 dept_subject_to_SLA    | true                 
 case_id                | 1014127332           
 case_opened_date       | 2018-01-01 00:42:00  
 case_closed_date       | 2018-01-01 12:29:00  
 SLA_due_date           | 2020-09-26 00:42:00  
 case_late              | false                
 num_days_late          | -998.5088            
 case_closed            | true                 
 service_request_type   | Stray Animal         
 SLA_days               | 999.0                
 case_status            | Closed               
 source_id              | svcCRMLS             
 request_address        | 2315  EL PASO ST,... 
 council_district       | 5                    
-RECORD 1--------------------------------------
 dept_division          | Storm Water          
 dept_name              | Trans & Cap Im

In [33]:
from pyspark.sql.functions import current_date, current_timestamp, datediff, max, min, when

In [56]:
# Get the maximum date from the dataset
max_date = df3.select(max(df3.case_closed_date)).collect()[0][0]

# Calculate the latest days past SLA for currently open issues
latest_days_past_sla = df3.filter((df3.case_closed == False) & (df3.SLA_due_date < current_date())).select(datediff(current_date(), df3.SLA_due_date).alias("days_past_sla")).agg(max("days_past_sla")).collect()[0][0]

# Calculate the oldest days since opened for currently open issues
oldest_days_since_opened = df3.filter(df3.case_closed == False).select(datediff(current_date(), df3.case_opened_date).alias("days_since_opened")).agg(min("days_since_opened")).collect()[0][0]

In [57]:
print("Latest days past SLA for currently open issues:", latest_days_past_sla)
print("Oldest days since opened for currently open issues:", oldest_days_since_opened)

Latest days past SLA for currently open issues: 1959
Oldest days since opened for currently open issues: 1896


In [48]:
stray_animal_cases = df3.filter(df3.service_request_type == "Stray Animal").count()
print("Number of Stray Animal cases:", stray_animal_cases)

Number of Stray Animal cases: 3497


In [49]:
field_operations_requests = df3.filter((df3.dept_division == "Field Operations") & (df3.service_request_type != "Officer Standby")).count()
print("Number of service requests assigned to Field Operations department and not classified as Officer Standby:", field_operations_requests)

Number of service requests assigned to Field Operations department and not classified as Officer Standby: 15094


In [50]:
df3 = df3.withColumn("council_district", df3.council_district.cast("string"))

In [51]:
df3 = df3.withColumn("num_hours_late", df3.num_days_late * 24)

In [52]:
df3.show(5, vertical=True)

-RECORD 0--------------------------------------
 dept_division          | Field Operations     
 dept_name              | Animal Care Services 
 standardized_dept_name | Animal Care Services 
 dept_subject_to_SLA    | true                 
 case_id                | 1014127332           
 case_opened_date       | 2018-01-01 00:42:00  
 case_closed_date       | 2018-01-01 12:29:00  
 SLA_due_date           | 2020-09-26 00:42:00  
 case_late              | false                
 num_days_late          | -998.5088            
 case_closed            | true                 
 service_request_type   | Stray Animal         
 SLA_days               | 999.0                
 case_status            | Closed               
 source_id              | svcCRMLS             
 request_address        | 2315  EL PASO ST,... 
 council_district       | 5                    
 num_hours_late         | -23964.21            
-RECORD 1--------------------------------------
 dept_division          | Storm Water   

In [43]:
# Replace current time with the maximum date
df3 = df3.withColumn("case_closed_date", when(df3.case_closed_date == current_timestamp(), max_date).otherwise(df3.case_closed_date))

In [44]:
df3.show(5, vertical=True)

-RECORD 0--------------------------------------
 dept_division          | Field Operations     
 dept_name              | Animal Care Services 
 standardized_dept_name | Animal Care Services 
 dept_subject_to_SLA    | true                 
 case_id                | 1014127332           
 case_opened_date       | 2018-01-01 00:42:00  
 case_closed_date       | 2018-01-01 12:29:00  
 SLA_due_date           | 2020-09-26 00:42:00  
 case_late              | false                
 num_days_late          | -998.5088            
 case_closed            | true                 
 service_request_type   | Stray Animal         
 SLA_days               | 999.0                
 case_status            | Closed               
 source_id              | svcCRMLS             
 request_address        | 2315  EL PASO ST,... 
 council_district       | 5                    
 num_hours_late         | -23964.21            
-RECORD 1--------------------------------------
 dept_division          | Storm Water   