In [2]:
import pandas as pd
import requests
from sodapy import Socrata
from sqlalchemy import create_engine
from time import time
import argparse
#import psycopg2
from tenacity import retry, wait_exponential, stop_after_attempt
import requests
import pyspark


## Extract Via API

In [3]:
client = Socrata("data.cityofnewyork.us", "xoIfIdDlHq6gGzxqLqbUeMpsG")

In [4]:
@retry(wait=wait_exponential(multiplier=2, min=2, max=16), stop=stop_after_attempt(5))
def get_data_from_api(client,data_set,limit_rows):
    results = client.get(data_set,limit=limit_rows)
    return results
try:
    #results = client.get("8m42-w767", limit=50)
    results = get_data_from_api(client,"8m42-w767",10000)
    print("Connected to API")
    
except requests.exceptions.RequestException as e:
    print(f"Failed to fetch data from API: {e}")

Connected to API


In [None]:
import os
import json
#Save to temp folder
current_dir = os.getcwd()
temp_folder = os.path.join(current_dir, "temp")

In [None]:
# Create the temp folder if it doesn't exist
if not os.path.exists(temp_folder):
    os.makedirs(temp_folder)

In [None]:
# Define the path to the JSON file
file_path = os.path.join(temp_folder, "output.json")

In [None]:
# Write the JSON output to the file
with open(file_path, "w", encoding="utf-8") as file:
    json.dump(results, file, indent=4)

## Pandas Transformations ##

In [5]:
df = pd.DataFrame.from_records(results)

In [7]:
df.head(10)

Unnamed: 0,starfire_incident_id,incident_datetime,alarm_box_borough,alarm_box_number,alarm_box_location,incident_borough,zipcode,policeprecinct,citycouncildistrict,communitydistrict,...,first_activation_datetime,incident_close_datetime,valid_dispatch_rspns_time_indc,valid_incident_rspns_time_indc,incident_response_seconds_qy,incident_travel_tm_seconds_qy,engines_assigned_quantity,ladders_assigned_quantity,other_units_assigned_quantity,first_on_scene_datetime
0,2100404460110002,2021-01-04T00:01:00.000,MANHATTAN,446,3 AVE & ST. MARKS PL,MANHATTAN,10003,9,2,103,...,2021-01-04T00:02:00.000,2021-01-04T00:07:00.000,N,N,0,0,1,0,0,
1,2100433250140001,2021-01-04T00:01:00.000,BROOKLYN,3325,AVENUE O & E 13 ST,BROOKLYN,11230,70,48,314,...,2021-01-04T00:02:00.000,2021-01-04T00:32:00.000,N,Y,170,165,1,0,0,2021-01-04T00:04:00.000
2,2100411280150003,2021-01-04T00:01:00.000,QUEENS,1128,MOTT AVE & DICKENS ST,QUEENS,11691,101,31,414,...,2021-01-04T00:02:00.000,2021-01-04T00:05:00.000,N,N,0,0,1,0,0,
3,2100416590110004,2021-01-04T00:02:00.000,MANHATTAN,1659,BROADWAY & 153 ST,MANHATTAN,10031,30,7,109,...,2021-01-04T00:02:00.000,2021-01-04T00:31:00.000,N,Y,318,314,1,0,0,2021-01-04T00:07:00.000
4,2100413490110006,2021-01-04T00:02:00.000,MANHATTAN,1349,5 AVE & 112 ST,MANHATTAN,10026,28,9,110,...,2021-01-04T00:03:00.000,2021-01-04T00:18:00.000,N,Y,871,834,1,0,0,2021-01-04T00:17:00.000
5,2100412610150005,2021-01-04T00:06:00.000,QUEENS,1261,ALMEDA AVE & B63 ST,QUEENS,11692,100,31,414,...,2021-01-04T00:07:00.000,2021-01-04T00:18:00.000,N,Y,341,336,1,0,0,2021-01-04T00:12:00.000
6,2100479410150006,2021-01-04T00:07:00.000,QUEENS,7941,B'WAY & BAXTER AVE,QUEENS,11373,110,25,404,...,2021-01-04T00:08:00.000,2021-01-04T00:42:00.000,N,Y,307,267,2,2,1,2021-01-04T00:12:00.000
7,2100425290120007,2021-01-04T00:10:00.000,BRONX,2529,M.L.KING JR BLVD & W 165 ST,BRONX,10452,44,16,204,...,2021-01-04T00:11:00.000,2021-01-04T00:16:00.000,N,N,0,0,1,0,0,
8,2100402410130006,2021-01-04T00:10:00.000,RICHMOND / STATEN ISLAND,241,BAY & BROAD STS,RICHMOND / STATEN ISLAND,10304,120,49,501,...,2021-01-04T00:11:00.000,2021-01-04T00:16:00.000,N,Y,132,127,1,0,0,2021-01-04T00:13:00.000
9,2100417350140004,2021-01-04T00:15:00.000,BROOKLYN,1735,NEW JERSEY AVE & FULTON ST,BROOKLYN,11207,75,37,305,...,2021-01-04T00:16:00.000,2021-01-04T00:45:00.000,N,Y,208,159,1,1,0,2021-01-04T00:18:00.000


In [None]:
df.dtypes

In [None]:
# Get rows 390 through 400
subset = df.iloc[389:400]

In [None]:
subset

In [None]:
#Converting fields to correct data types
#Date conversion
df.incident_datetime = pd.to_datetime(df.incident_datetime)
df.first_assignment_datetime = pd.to_datetime(df.first_assignment_datetime)
df.first_activation_datetime = pd.to_datetime(df.first_activation_datetime)
df.incident_close_datetime = pd.to_datetime(df.incident_close_datetime)
#df.first_on_scene_datetime = pd.to_datetime(df.first_on_scene_datetime)

#Float conversion
df.dispatch_response_seconds_qy = df.dispatch_response_seconds_qy.astype(float)
df.incident_response_seconds_qy = df.incident_response_seconds_qy.astype(float)
df.incident_travel_tm_seconds_qy = df.incident_travel_tm_seconds_qy.astype(float)
df.engines_assigned_quantity = df.engines_assigned_quantity.astype(float)
df.ladders_assigned_quantity = df.ladders_assigned_quantity.astype(float)
df.other_units_assigned_quantity = df.other_units_assigned_quantity.astype(float)



In [None]:
print(df.dtypes)

## PySpark Transformations ##

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col
from pyspark.sql.functions import to_timestamp, to_date
spark = SparkSession.builder.appName("Transformations_NYC_Fire_Incidents").getOrCreate()
print(spark.version)

In [None]:
df = spark.read.json(spark.sparkContext.parallelize([results]))

In [None]:
df.show()

In [None]:
#ORIGINAL - Modified with For Loop
#Function to clean null values, The function takes in the following paramters: pyspark dataframe, column name to clean, each of the broughs values to switch to.
def clean_null_values(df,null_fields_and_values,aggregate_field,boroughs):
    
    for field in null_fields_and_values:
        
        df = df.withColumn(
        field,
        when(col(field).isNull() & (col(aggregate_field) == aggregate_values[0]),null_fields_and_values[field][0])
        .when(col(field).isNull() & (col(aggregate_field) == aggregate_values[1]),null_fields_and_values[field][1])
        .when(col(field).isNull() & (col(aggregate_field) == aggregate_values[2]),null_fields_and_values[field][2])
        .when(col(field).isNull() & (col(aggregate_field) == aggregate_values[3]),null_fields_and_values[field][3])
        .when(col(field).isNull() & (col(aggregate_field) == aggregate_values[4]),null_fields_and_values[field][4])
        .otherwise(col(field))
    )
    return df


null_fields_and_values = {"zipcode":[10451,11201,10001,11004,10301],
                          "policeprecinct":[40,60,1,100,120],
                          "citycouncildistrict":[8,33,1,19,49],
                          "communitydistrict":[201,301,101,401,501],
                          "communityschooldistrict":[7,13,1,7,31],
                          "congressionaldistrict":[13,7,7,3,11]}

aggregate_field = "alarm_box_borough"
aggregate_values = ["BRONX","BROOKLYN","MANHATTAN","QUEENS","RICHMOND / STATEN ISLAND"]

df = clean_null_values(df,null_fields_and_values,aggregate_field,aggregate_values)


In [None]:
#Validation purposes run one by one
df.where(df["policeprecinct"].isNull()).select("starfire_incident_id","zipcode","alarm_box_borough").count()
df.where(df["citycouncildistrict"].isNull()).select("starfire_incident_id","zipcode","alarm_box_borough").count()
df.where(df["communitydistrict"].isNull()).select("starfire_incident_id","zipcode","alarm_box_borough").count()
df.where(df["communityschooldistrict"].isNull()).select("starfire_incident_id","zipcode","alarm_box_borough").count()
df.where(df["congressionaldistrict"].isNull()).select("starfire_incident_id","zipcode","alarm_box_borough").count()

In [None]:
#Converting fields to date time
def convert_to_date_time(df,date_fields_to_convert):
    for field in date_fields_to_convert:
        df = df.withColumn(field, to_timestamp(df[field]))
    return df

date_fields_to_convert = ["incident_datetime","first_assignment_datetime","first_activation_datetime","incident_close_datetime"]

df = convert_to_date_time(df,date_fields_to_convert)

In [None]:
#Converting fields to floats
def convert_to_float(df,date_fields_to_convert):
    for field in date_fields_to_convert:
        df = df.withColumn(field, df[field].cast("float"))
    return df

numerical_fields_to_convert = ["dispatch_response_seconds_qy",
                               "incident_response_seconds_qy",
                               "incident_travel_tm_seconds_qy",
                               "engines_assigned_quantity",
                               "ladders_assigned_quantity",
                               "other_units_assigned_quantity"]

df = convert_to_float(df,numerical_fields_to_convert)

In [None]:
#Function to categorize the response times and other quantity type fields. This will be used to aggregate data for OLAP usage.
from pyspark.sql.functions import max
from pyspark.sql.functions import min

def categorize_float_fields(df,fields_to_categorize):
    
    for field in fields_to_categorize:
    
        # Returns the max response
        max_quantity = df.agg(max(field).alias("max_response_alias")).collect()[0]
        max_quantity = max_quantity["max_response_alias"] 
    
        # Returns the min quantity
        min_quantity = df.agg(min(field).alias("min_response_alias")).collect()[0]
        min_quantity = min_quantity["min_response_alias"]
    
        #Calculates the category interval this is to determine the intervals between each category. 5 Categories were chosen.
        category_interval = (max_quantity - min_quantity) / 5
        
        #Categorizes each quantity column using the range between the max and min
        df = df.withColumn(
            "category_" + field,
            when((col(field) == 0),fields_to_categorize[field][0])
            .when((col(field) > 0) & (col(field) <= category_interval),fields_to_categorize[field][1])
            .when((col(field) > category_interval) & (col(field) <= (category_interval*2)),fields_to_categorize[field][2])
            .when((col(field) > (category_interval*2)) & (col(field) <= (category_interval*3)),fields_to_categorize[field][3])
            .when((col(field) > (category_interval*3)) & (col(field) <= (category_interval*4)),fields_to_categorize[field][4])
            .otherwise(fields_to_categorize[field][5])
        )
    
    return df

fields_to_categorize = {"dispatch_response_seconds_qy":["None","Very Low","Low","Medium","High","Very High"],
    "incident_response_seconds_qy":["None","Very Low","Low","Medium","High","Very High"],
    "incident_travel_tm_seconds_qy":["None","Very Low","Low","Medium","High","Very High"],
    "engines_assigned_quantity":["None","Minimal","Limited","Moderate","Substantial","Abundant"],
    "ladders_assigned_quantity":["None","Minimal","Limited","Moderate","Substantial","Abundant"],
    "other_units_assigned_quantity":["None","Minimal","Limited","Moderate","Substantial","Abundant"]}


df = categorize_float_fields(df,fields_to_categorize)


In [None]:
from pyspark.sql.functions import avg
#Renaming Columns and Casting to Float Types
def clean_column(df,column_name_before,column_name_after):
    df = df.withColumnRenamed(column_name_before, column_name_after)
    df = df.withColumn(column_name_after, col(column_name_after).cast("float"))
    
    return df

#Calculating Averages for response times by each borough
def calculate_averages(df,fields_to_calculate_averages,aggregate_field):
    for field in fields_to_calculate_averages:
        total_avg = df.groupBy(aggregate_field).agg(avg(field)).alias("total_avg_"+field+"_per_borough")
        df = df.join(total_avg, on=aggregate_field, how="left")
        df = clean_column(df,f'avg({field})',f'total_avg_{field}_per_borough')
    
    return df


aggregate_field = "alarm_box_borough"
fields_to_calculate_averages = ["dispatch_response_seconds_qy","incident_travel_tm_seconds_qy","incident_response_seconds_qy"]

df = calculate_averages(df,fields_to_calculate_averages,aggregate_field)

In [None]:
#Total Resources Assigned to an Incident. Total quantity of Engines, Ladders, and Other Units.
def sum_fields(df,sum_field_name,fields_to_sum):
    df = df.withColumn(
        sum_field_name,
        col(fields_to_sum[0]) + col(fields_to_sum[1]) + col(fields_to_sum[2])
    )
    return df

sum_field_name = "total_resources_assigned_quantity"
fields_to_sum = ["engines_assigned_quantity","ladders_assigned_quantity","other_units_assigned_quantity"]
df = sum_fields(df,sum_field_name,fields_to_sum)

In [None]:
df.dtypes

In [None]:
df.printSchema()

In [None]:
def write_temp_file(results):
    print("hello world")
    #Save to temp folder
    current_dir = os.getcwd()
    temp_folder = os.path.join(current_dir, "temp")

    # Create the temp folder if it doesn't exist
    if not os.path.exists(temp_folder):
        os.makedirs(temp_folder)

    # Define the path to the JSON file
    file_path = os.path.join(temp_folder, "temp.json")

    # Write the JSON output to the file
    with open(file_path, "w", encoding="utf-8") as file:
        json.dump(results, file, indent=4)

def read_temp_file():
    # Get the current directory and define the temp folder path
    print("Reading JSON Temp File to json_data variable")
    current_dir = os.getcwd()
    temp_folder = os.path.join(current_dir, "temp")
    print(current_dir)

    # Define the path to the JSON file
    file_path = os.path.join(temp_folder, "temp.json")
    print("Printing file path: ")
    print(file_path)

    # Read the JSON file
    with open(file_path, "r", encoding="utf-8") as file:
        json_data = json.load(file)
    
    return json_data

def remove_temp_file():
    current_dir = os.getcwd()
    temp_folder = os.path.join(current_dir, "temp")
    file_path = os.path.join(temp_folder, "temp.json")
    # Delete the temporary file
    os.remove(file_path)
    print(f"Temporary file deleted: {file_path}")


#### Official Transformations ^^

In [None]:
from pyspark.sql.functions import max
from pyspark.sql.functions import min

In [None]:
# category_ranges_dispatch_response_seconds_qy
max_dispatch_response_seconds_qy = df.agg(max("dispatch_response_seconds_qy").alias("max_dispatch_response_seconds_qy")).collect()[0]
max_dispatch_response_seconds_qy = max_dispatch_response_seconds_qy["max_dispatch_response_seconds_qy"] 

min_dispatch_response_seconds_qy = df.agg(min("dispatch_response_seconds_qy").alias("max_dispatch_response_seconds_qy")).collect()[0]
min_dispatch_response_seconds_qy = min_dispatch_response_seconds_qy["max_dispatch_response_seconds_qy"]

category_ranges_dispatch_response_seconds_qy = (max_dispatch_response_seconds_qy - min_dispatch_response_seconds_qy) / 5

#Categorize dispatch_response_seconds_qy Very Low, Low, Medium, High, Very High
df = df.withColumn(
    "category_dispatch_response_seconds_qy",
    when((col("dispatch_response_seconds_qy") >= 0) & (col("dispatch_response_seconds_qy") <= category_ranges_dispatch_response_seconds_qy),"Very Low")
    .when((col("dispatch_response_seconds_qy") > category_ranges_dispatch_response_seconds_qy) & (col("dispatch_response_seconds_qy") <= (category_ranges_dispatch_response_seconds_qy*2)),"Low")
    .when((col("dispatch_response_seconds_qy") > (category_ranges_dispatch_response_seconds_qy*2)) & (col("dispatch_response_seconds_qy") <= (category_ranges_dispatch_response_seconds_qy*3)),"Medium")
    .when((col("dispatch_response_seconds_qy") > (category_ranges_dispatch_response_seconds_qy*3)) & (col("dispatch_response_seconds_qy") <= (category_ranges_dispatch_response_seconds_qy*4)),"High")
    .otherwise("Very High")
    #.when(col("dispatch_response_seconds_qy") > (category_ranges_dispatch_response_seconds_qy*4) & (col("dispatch_response_seconds_qy") <= (category_ranges_dispatch_response_seconds_qy*5)),"Very High")
)

df.select(df["category_dispatch_response_seconds_qy"]).show()

In [None]:
df.select(["category_other_units_assigned_quantity"]).show()

In [None]:
df.dtypes

In [None]:
#df["category_engines_assigned_quantity"].select()
df.orderBy(df["engines_assigned_quantity"].desc()).select(df["category_engines_assigned_quantity"],df["engines_assigned_quantity"]).show()

In [None]:
df.dtypes

In [None]:
df.createOrReplaceTempView('Temp_Tbl')
max_dispatch_response_seconds_qy = spark.sql("SELECT max(dispatch_response_seconds_qy) FROM Temp_Tbl")
min_dispatch_response_seconds_qy = spark.sql("SELECT max(dispatch_response_seconds_qy) FROM Temp_Tbl")

In [None]:
max_dispatch_response_seconds_qy.show()

In [None]:
min_dispatch_response_seconds_qy.show()

In [None]:
max_dispatch_response_seconds_qy.show() / 4

In [None]:
df.select(df["dispatch_response_seconds_qy"]).show()

In [None]:
from pyspark.sql.functions import length
df_with_char_count = df.withColumn("char_count", length(df["zipcode"]))
df_with_char_count.where(df_with_char_count["char_count"]>5).select("zipcode","char_count").show()

In [None]:
df.dtypes

In [None]:
df.select("first_assignment_datetime").show()

In [None]:
df.createOrReplaceTempView('Temp_Tbl')

In [None]:
query = spark.sql("SELECT * FROM Fire_Incidents_Temp_Tbl where zipcode is null")

In [None]:
query.count()

In [None]:
query = spark.sql("SELECT starfire_incident_id,zipcode,alarm_box_borough FROM Fire_Incidents_Temp_Tbl where starfire_incident_id = 2100422620120017")
query.show()

In [None]:
df.select("starfire_incident_id", "zipcode","alarm_box_borough").show()

In [None]:
df.where(df["starfire_incident_id"]==2100422620120017).select("starfire_incident_id","zipcode","alarm_box_borough").show()

In [None]:
df.where(df["starfire_incident_id"]==2100422620120017).select("starfire_incident_id","zipcode","alarm_box_borough").show()

In [None]:
df.where(df["zipcode"].isNull()).select("starfire_incident_id","zipcode","alarm_box_borough").count()

## Postgres Load ##

In [None]:
from sqlalchemy import create_engine

In [None]:
#Creating the engine postgressql://username:password@host:port/db_name
username = 'root'
password = 'root'
host = "fire_incidents_db"
port = 5432
database = "fire_incidents"
engine = create_engine(f'postgresql://{username}:{password}@{host}:{port}/{database}')
#engine = create_engine(f'postgresql://{username}:{password}@{host_name}:{port}/{database}')
#engine = create_engine('postgresql://root:root@fire_incidents_db:5432/fire_incidents')

In [None]:
#Needs to convert fields to date time before loading
def convert_to_date_time_using_pands(df,date_fields_to_convert):
    for field in date_fields_to_convert:
        df[field] = pd.to_datetime(df[field])
        
    return df

date_fields_to_convert = ["incident_datetime",
                              "first_assignment_datetime",
                              "first_activation_datetime",
                              "incident_close_datetime",
                             "first_on_scene_datetime"]

df = convert_to_date_time_using_pands(df,date_fields_to_convert)

In [None]:
#Defines a schema, names it to yellow_taxi_data, and then assigns it to postgres
print(pd.io.sql.get_schema(df,name='fire_incidents_schema',con=engine))

In [None]:
#Creates the table in postgres with only the field names. Name = yellow_taxi_data, Engine is the postgres database, if_exists = 'replace' if a table already exists with this name it will replace it
df.head(n=0).to_sql(name='fire_incidents_tbl',con=engine,if_exists='replace')

In [None]:
start = 0
batchsize = 1000
def create_batches_of_rows(dataframe,batchsize):
    start = 0
    while start < len(df) + 1:
        yield df.iloc[start:start + batchsize]
        start += batchsize

In [None]:
#Creates a list of batches. Parses the dataframe and the batchsize through the create_batches_of_rows function and sets the variable batches to the list
batches = list(create_batches_of_rows(df,100))

In [None]:
#Loops through each one of the batches and appends the batch to the postgressql database.
counter = 1
for batch in batches:
    batch.to_sql(name='fire_incidents_tbl', con=engine, if_exists='append')
    print(f'Batch Loaded..... {counter}')
    counter += 1
    

In [None]:
username = 'root'
password = 'root'
host = "fire_incidents_db"
port = 5432
database = "fire_incidents"
print(f'postgresql+psycopg2://{username}:{password}@{host}:{port}/{database}')