In [1]:
import findspark
findspark.init("/Users/DOU2274/spark/spark-3.1.1-bin-hadoop2.7")
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
spark = SparkSession.builder.appName("Fire Service Incidents").getOrCreate()

In [2]:
fire_schema = StructType([StructField('CallNumber', IntegerType(), True), StructField('UnitID', StringType(), True),
                    StructField('IncidentNumber', IntegerType(), True),
                    StructField('CallType', StringType(), True),
                    StructField('CallDate', StringType(), True),
                    StructField('WatchDate', StringType(), True),
                    StructField('CallFinalDisposition', StringType(), True),
                    StructField('AvailableDtTm', StringType(), True),
                    StructField('Address', StringType(), True),
                    StructField('City', StringType(), True),
                    StructField('Zipcode', IntegerType(), True),
                    StructField('Battalion', StringType(), True),
                    StructField('StationArea', StringType(), True),
                    StructField('Box', StringType(), True),
                    StructField('OriginalPriority', StringType(), True),
                    StructField('Priority', StringType(), True),
                    StructField('FinalPriority', IntegerType(), True),
                    StructField('ALSUnit', BooleanType(), True),
                    StructField('CallTypeGroup', StringType(), True),
                    StructField('NumAlarms', IntegerType(), True),
                    StructField('UnitType', StringType(), True),
                    StructField('UnitSequenceInCallDispatch', IntegerType(), True),
                    StructField('FirePreventionDistrict', StringType(), True),
                    StructField('SupervisorDistrict', StringType(), True),
                    StructField('Neighborhood', StringType(), True),
                    StructField('Location', StringType(), True),
                    StructField('RowID', StringType(), True),
                    StructField('Delay', FloatType(), True)])



In [3]:
file_path ="/Users/DOU2274/Desktop/LearningProjects/pyspark/sf-fire-calls.csv"

fire_df = spark.read.csv(file_path, header=True, schema=fire_schema)

write_2db =(fire_df.select("CallType","IncidentNumber" ).where(col("CallType").isNotNull()).distinct())

#write_2db.show()
(write_2db
.write \
.format("jdbc") \
.option("url", "jdbc:postgresql:sparklearning") \
.option("dbtable", "public.actor") \
.option("user", "postgres") \
.option("password", "Oracle987").load())

In [6]:
few_fire_df = (fire_df
.select("IncidentNumber", "AvailableDtTm", "CallType") .where(col("CallType") != "Medical Incident"))
few_fire_df.show(5, truncate=False)

+--------------+----------------------+--------------+
|IncidentNumber|AvailableDtTm         |CallType      |
+--------------+----------------------+--------------+
|2003235       |01/11/2002 01:51:44 AM|Structure Fire|
|2003250       |01/11/2002 04:16:46 AM|Vehicle Fire  |
|2003259       |01/11/2002 06:01:58 AM|Alarms        |
|2003279       |01/11/2002 08:03:26 AM|Structure Fire|
|2003301       |01/11/2002 09:46:44 AM|Alarms        |
+--------------+----------------------+--------------+
only showing top 5 rows



In [8]:
(fire_df.select("CallType")
.where(col("CallType").isNotNull()).agg(countDistinct("CallType").alias("DistinctCallTypes")) .show())

[Stage 2:>                                                        (0 + 11) / 11][Stage 2:=====>                                                   (1 + 10) / 11]

+-----------------+
|DistinctCallTypes|
+-----------------+
|               30|
+-----------------+





In [9]:
(fire_df.select("CallType").where(col("CallType").isNotNull()).distinct().show(5,truncate=False))

+-----------------------------------+
|CallType                           |
+-----------------------------------+
|Elevator / Escalator Rescue        |
|Marine Fire                        |
|Aircraft Emergency                 |
|Confined Space / Structure Collapse|
|Administrative                     |
+-----------------------------------+
only showing top 5 rows





In [10]:
(fire_df.select("CallType").where(col("CallType").isNotNull()) .distinct().count())

30

In [11]:
new_fire_df = fire_df.withColumnRenamed("Delay", "ResponseDelayedinMins") 

(new_fire_df.select("ResponseDelayedinMins") .where(col("ResponseDelayedinMins") > 5) .show(5, False))

+---------------------+
|ResponseDelayedinMins|
+---------------------+
|5.35                 |
|6.25                 |
|5.2                  |
|5.6                  |
|7.25                 |
+---------------------+
only showing top 5 rows



In [12]:
fire_ts_df = (new_fire_df
.withColumn("IncidentDate", to_timestamp(col("CallDate"), "MM/dd/yyyy")) .drop("CallDate")
.withColumn("OnWatchDate", to_timestamp(col("WatchDate"), "MM/dd/yyyy")) .drop("WatchDate")
.withColumn("AvailableDtTS", to_timestamp(col("AvailableDtTm"), "MM/dd/yyyy hh:mm:ss a")).drop("AvailableDtTm"))

In [13]:
(fire_ts_df
.select("IncidentDate", "OnWatchDate", "AvailableDtTS") .show(5, False))


+-------------------+-------------------+-------------------+
|IncidentDate       |OnWatchDate        |AvailableDtTS      |
+-------------------+-------------------+-------------------+
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 01:51:44|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 03:01:18|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 02:39:50|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 04:16:46|
|2002-01-11 00:00:00|2002-01-10 00:00:00|2002-01-11 06:01:58|
+-------------------+-------------------+-------------------+
only showing top 5 rows



In [14]:
(fire_ts_df .select(year('IncidentDate')) .distinct() .orderBy(year('IncidentDate')) .show())

+------------------+
|year(IncidentDate)|
+------------------+
|              2000|
|              2001|
|              2002|
|              2003|
|              2004|
|              2005|
|              2006|
|              2007|
|              2008|
|              2009|
|              2010|
|              2011|
|              2012|
|              2013|
|              2014|
|              2015|
|              2016|
|              2017|
|              2018|
+------------------+



In [15]:
#what were the most common types of fire calls?
(fire_ts_df
.select("CallType") .where(col("CallType").isNotNull()) .groupBy("CallType")
.count()
.orderBy("count", ascending=False) .show(n=10, truncate=False))

+-------------------------------+------+
|CallType                       |count |
+-------------------------------+------+
|Medical Incident               |113794|
|Structure Fire                 |23319 |
|Alarms                         |19406 |
|Traffic Collision              |7013  |
|Citizen Assist / Service Call  |2524  |
|Other                          |2166  |
|Outside Fire                   |2094  |
|Vehicle Fire                   |854   |
|Gas Leak (Natural and LP Gases)|764   |
|Water Rescue                   |755   |
+-------------------------------+------+
only showing top 10 rows



In [16]:
import pyspark.sql.functions as F 
(fire_ts_df.select(F.sum("NumAlarms"), F.avg("ResponseDelayedinMins"),F.min("ResponseDelayedinMins"), F.max("ResponseDelayedinMins")) .show())

+--------------+--------------------------+--------------------------+--------------------------+
|sum(NumAlarms)|avg(ResponseDelayedinMins)|min(ResponseDelayedinMins)|max(ResponseDelayedinMins)|
+--------------+--------------------------+--------------------------+--------------------------+
|        176170|         3.892364154521585|               0.016666668|                   1844.55|
+--------------+--------------------------+--------------------------+--------------------------+



In [17]:
(fire_ts_df
.select("CallType") .where(col("CallType").isNotNull())
.where(year('IncidentDate') == "2018").distinct().show(n=20, truncate=False))

+-------------------------------+
|CallType                       |
+-------------------------------+
|Elevator / Escalator Rescue    |
|Alarms                         |
|Odor (Strange / Unknown)       |
|Citizen Assist / Service Call  |
|HazMat                         |
|Explosion                      |
|Vehicle Fire                   |
|Suspicious Package             |
|Other                          |
|Outside Fire                   |
|Traffic Collision              |
|Assist Police                  |
|Gas Leak (Natural and LP Gases)|
|Water Rescue                   |
|Electrical Hazard              |
|Structure Fire                 |
|Medical Incident               |
|Fuel Spill                     |
|Smoke Investigation (Outside)  |
|Train / Rail Incident          |
+-------------------------------+



In [20]:
(fire_ts_df
.select(weekofyear('IncidentDate').alias("weekofyear"), "CallType")
.where(col("CallType").isNotNull())
.where(weekofyear('IncidentDate') == "2018") 
.groupBy("weekofyear")
.count()
.orderBy("count", ascending=False) .show(n=10, truncate=False))

+----------+-----+
|weekofyear|count|
+----------+-----+
+----------+-----+



Which neighborhood in San Francisco generated the most fire calls in 2018? 
Which neighborhoods had the worst response times to fire calls in 2018? 
Which week in the year in 2018 had the most fire calls?
Is there a correlation between neighborhood, zip code, and number of fire calls? 
How can we use Parquet files or SQL tables to store this data and read it back?


In [21]:
#spark.catalog.listDatabases() 
spark.catalog.listTables() #spark.catalog.listColumns("us_delay_flights_tbl")


[]

In [32]:
to_s3 = fire_ts_df.select("CallNumber", "CallType","Address", "City", "Neighborhood")

#fire_ts_df.show()

to_s3.write.parquet("s3a://salesanalyticssparkjob/Fire",mode="overwrite")

In [None]:
raw_bucket = 'salesanalyticssparkjob'
raw_path_dir = 'Fire'
raw_path = f"s3://{raw_bucket}/{raw_path_dir}"


to_s3 = wr.s3.write.(path =raw_path)
to_s3 = wr.s3.wr(path=raw_path, path_suffix=['.csv'],dataset=True)

s3 = boto3.resource('s3',
         aws_access_key_id='AKIA5CFINZPMT4D5PI6Y',
         aws_secret_access_key= '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')



In [None]:

from pyspark.sql.types import LongType


In [None]:
    # Create cubed function
def cubed(s):
        return s*s*s
    # Register UDF

#cubed(2)
spark.udf.register("cubed", cubed, LongType()) # Generate temporary view
spark.range(1, 9).createOrReplaceTempView("udf_test")

In [None]:
#spark.sql("select * from udf_test").show()
spark.sql("SELECT id, cubed(id) AS id_cubed FROM udf_test").show()


In [None]:
#Write data to database
#./bin/pyspark --packages org.postgresql:postgresql:42.1.1

jdbcDF1 = spark.read.format("jdbc").option("url", "jdbc:postgresql:dvdrental") .option("dbtable", "public.actor") .option("user", "postgres") .option("password", "Oracle987").load()
        
jdbcDF2 = (spark.read.format("jdbc")\
        .option("url", "jdbc:postgresql://localhost:5432/dvdrental") .option("dbtable", "public.actor")\
        .option("user", "postgres") .option("password", "Oracle987")\
        .load())




In [None]:
#jdbcDF1 = spark.read.format("jdbc").option("url", "jdbc:postgresql:dvdrental") .option("dbtable", "public.actor") .option("user", "postgres") .option("password", "Oracle987").load()

In [None]:
fire_df.printSchema