<div class="center">

### CSE4510: Activity 4
##### Spark DataFrames with PySpark
Grant Butler | [gbutler2020@my.fit.edu](mailto:gbutler2020@my.fit.edu) | [904.423.9358](tel:9044239358)
</div>

<center>

##### Table of Contents:
</center>

[Obtain Dataset](#obtain-dataset)
1. [Data Cleaning](#data-cleaning)
    - a. [Create Spark Context](#create-spark-context)
    - b. [Read Dataset](#read-dataset)
    - c. [Create *Delay* Column](#create-delay-column)
    - d. [Drop Columns and Change Schema](#drop-columns)

In [1]:
# general imports
from datetime import datetime
import os

##### Obtain Dataset <a name="obtain-dataset"></a>

First, I downloaded the full data set of *San Francisco Fire Calls*, totalling ~2.3GB, from [here](https://data.sfgov.org/Public-Safety/Fire-Department-Calls-for-Service/nuek-vuh3) and added it to `/data/fire_dep_calls.csv`.

##### 1. Data Cleaning <a name="data-cleaning"></a>

a. Create Spark Context <a name="create-spark-context"></a>

I started out by using the python module `findspark` to get pyspark into the notebook. This seems to work a lot more consistently than with the `pyspark` command from shell. Then, I created a parallelized session called "cse4510_activity4".

In [2]:
# using findspark instead of starting with pyspark seems to work better

import findspark
findspark.init()

# importing pyspark and starting a session for the assignment
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.master("local[*]") \
            .appName("cse4510_activity4") \
            .getOrCreate()

/usr/bin/hadoop


SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/apache-spark/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/hadoop/common/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


b. Read Dataset <a name="read-dataset"></a>

Here I am reading the file in with `spark.read()` along with the following options: 

|    option     | value |
| :-----------: | :---: |
| samplingRatio |  0.1  |
|  inferSchema  | true  |
|    header     | true  |

In [3]:
# opening the data in spark
fire_data = spark\
        .read \
        .option("samplingRatio", "0.1") \
        .option("inferSchema", "true") \
        .option("header", "true") \
        .csv("data/fire_dep_calls.csv")

# printing schema
fire_data.printSchema()



root
 |-- Call Number: integer (nullable = true)
 |-- Unit ID: string (nullable = true)
 |-- Incident Number: integer (nullable = true)
 |-- Call Type: string (nullable = true)
 |-- Call Date: string (nullable = true)
 |-- Watch Date: string (nullable = true)
 |-- Received DtTm: string (nullable = true)
 |-- Entry DtTm: string (nullable = true)
 |-- Dispatch DtTm: string (nullable = true)
 |-- Response DtTm: string (nullable = true)
 |-- On Scene DtTm: string (nullable = true)
 |-- Transport DtTm: string (nullable = true)
 |-- Hospital DtTm: string (nullable = true)
 |-- Call Final Disposition: string (nullable = true)
 |-- Available DtTm: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Zipcode of Incident: integer (nullable = true)
 |-- Battalion: string (nullable = true)
 |-- Station Area: string (nullable = true)
 |-- Box: string (nullable = true)
 |-- Original Priority: string (nullable = true)
 |-- Priority: string (nullable

                                                                                

c. Create *Delay* Column <a name="create-delay-column"></a>

Here, I selected the two columns `Response DtTm` and `Received DtTm`, converted their string information to be a datetime object, and subtracted them to give the delay in seconds in a `datetime.datetime` object, dividing by 60, and rounding to 2DP. Then, I filtered into another Spark DF with `Call Number == 203350320`, and printed the output.

<sub>*Note: I was sandboxing with the commented code until I figured it out with the final code.*</sub>

In [6]:
from pyspark.sql.types import DoubleType, DateType, LongType

# # getting the rows
# response_dttm = fire_data.select('Response DtTm')
# received_dttm = fire_data.select('Received DtTm')

# # fixing time format
# response_dttm = response_dttm.withColumn('Response DtTm', to_timestamp('Response DtTm', 'MM/dd/yyyy hh:mm:ss a'))
# received_dttm = received_dttm.withColumn('Received DtTm', to_timestamp('Received DtTm', 'MM/dd/yyyy hh:mm:ss a'))

# # helper function to subtract and return time in minutes
# def get_delay_in_min(time_a: datetime, time_b: datetime):
#     difference = (time_a - time_b)
#     return int(difference)

# with_delay = fire_data.withColumn('Received DtTm', to_timestamp('Received DtTm', 'MM/dd/yyyy hh:mm:ss a'))
# with_delay = with_delay.withColumn('Response DtTm', to_timestamp('Response DtTm', 'MM/dd/yyyy hh:mm:ss a'))


# with_delay = with_delay.withColumn('Delay', 
#              round(((with_delay['Response DtTm']
#              .cast("long")
#              - with_delay['Received DtTm']
#              .cast("long")) /60, 2)))


date_format = 'MM/dd/yyyy hh:mm:ss a'

fire_data = fire_data.withColumn('Delay',
            round((to_timestamp(col('Response DtTm'), date_format).cast('long')
            - to_timestamp(col('Received DtTm'), date_format).cast('long'))
            /60, 2))

spec_call = fire_data.filter(col('Call Number') == 203350320)

spec_call.select(col('Call Number'), col('Response DtTm'), col('Received DtTm'), col('Delay')).show()

+-----------+--------------------+--------------------+-----+
|Call Number|       Response DtTm|       Received DtTm|Delay|
+-----------+--------------------+--------------------+-----+
|  203350320|11/30/2020 06:24:...|11/30/2020 06:21:...| 2.85|
|  203350320|                null|11/30/2020 06:21:...| null|
|  203350320|                null|11/30/2020 06:21:...| null|
|  203350320|11/30/2020 06:23:...|11/30/2020 06:21:...|  2.7|
|  203350320|11/30/2020 06:22:...|11/30/2020 06:21:...| 1.52|
+-----------+--------------------+--------------------+-----+



##### d. Drop Columns and Arrange Schema <a name="drop-columns"></a>