In [19]:
import requests
import datetime
import pandas as pd
import sqlalchemy
from sqlalchemy.orm import sessionmaker
import sqlite3

DATABASE_LOCATION = "sqlite:///san_diego_traffic_incidents.sqlite"

# Extract stage
# San Diego Coordinates for Bounding Box
upper_left_coord = "32.977413436141305,-117.27876235556089"
lower_right_coord = "32.689025547308106,-117.0256323049526"

# Request traffic incident information in San Diego, CA
# Use HERE Api for information
request_url = "https://traffic.ls.hereapi.com/traffic/6.3/incidents.json?apiKey=&bbox={0};{1}".format(upper_left_coord, lower_right_coord)
response = requests.get(request_url)

ids = []
start_times = []
end_times = []
criticality = []
origins_latitude = []
origins_longitude = []
ends = []
ends_latitude = []
ends_longitude = []
road_closed = []
event_code = []
details = []
descriptions = []

response_json = response.json()

for traffic_item in response_json['TRAFFIC_ITEMS']['TRAFFIC_ITEM']:
    ids.append(traffic_item['TRAFFIC_ITEM_ID'])
    start_times.append(datetime.datetime.strptime(traffic_item['START_TIME'], "%m/%d/%Y %H:%M:%S"))
    end_times.append(datetime.datetime.strptime(traffic_item['END_TIME'], "%m/%d/%Y %H:%M:%S"))
    criticality.append(traffic_item['CRITICALITY']['DESCRIPTION'])
    origins_latitude.append(traffic_item['LOCATION']['GEOLOC']['ORIGIN']['LATITUDE'])
    origins_longitude.append(traffic_item['LOCATION']['GEOLOC']['ORIGIN']['LONGITUDE'])
    ends_latitude.append(traffic_item['LOCATION']['GEOLOC']['TO'][0]['LATITUDE'])
    ends_longitude.append(traffic_item['LOCATION']['GEOLOC']['TO'][0]['LONGITUDE'])
    road_closed.append(traffic_item['TRAFFIC_ITEM_DETAIL']['ROAD_CLOSED'])
    event_code.append(traffic_item['TRAFFIC_ITEM_DETAIL']['ALERTC'][0]['EVENT_CODE'])
    descriptions.append(traffic_item['TRAFFIC_ITEM_DESCRIPTION'][0]['value'])

traffic_dict = {
    "ID" : ids,
    "start_time" : start_times,
    "end_time" : end_times,
    "criticality" : criticality,
    "origin_latitude" : origins_latitude,
    "origin_longitude" : origins_longitude,
    "end_latitude" : ends_latitude,
    "end_longitude" : ends_longitude,
    "road_closed" : road_closed,
    "event_code" : event_code,
    "description" : descriptions
}

traffic_df = pd.DataFrame(traffic_dict, columns=["ID", "start_time", "end_time",
                                                "criticality", "origin_latitude",
                                                "origin_longitude", "end_latitude",
                                                "end_longitude", "road_closed",
                                                "event_code", "description"])
print(traffic_df.head())

                    ID          start_time            end_time criticality  \
0  2125868212748945033 2021-09-28 19:03:41 2021-11-16 01:00:00       minor   
1  4406328831143562376 2021-09-28 19:03:41 2024-01-01 07:59:00       minor   
2  2363172757814374937 2021-09-28 19:03:41 2021-11-19 01:00:00    critical   
3   121353197452981545 2021-09-28 19:03:41 2021-11-16 22:00:00       minor   
4  2273006450211101389 2021-09-28 19:03:41 2022-02-24 01:00:00    critical   

   origin_latitude  origin_longitude  end_latitude  end_longitude  \
0        32.757240       -117.138850     32.757240    -117.138641   
1        32.755750       -117.146250     32.756360    -117.146240   
2        32.874043       -117.214249     32.874060    -117.214753   
3        32.716980       -117.164740     32.715770    -117.164730   
4        32.722980       -117.168300     32.722971    -117.167534   

   road_closed  event_code                                        description  
0        False         803  Between 

In [20]:
# Transform stage

def verify_df(df: pd.DataFrame) -> bool:
    # Check if DataFrame is empty
    if df.empty:
        print("No traffic incidents downloaded. Finishing execution.")
        return False
    
    # Check if there are any duplicate IDs
    if df['ID'].is_unique:
        pass
    else:
        raise Exception("Primary Key check is violated")
    
    # Check for any Null values
    if df.isnull().values.any():
        raise Exception("Null values found in data")
    return True   

# Verify DataFrame
print(verify_df(traffic_df))

True


In [21]:
# Load Stage

engine = sqlalchemy.create_engine(DATABASE_LOCATION)
conn = sqlite3.connect('san_diego_traffic_incidents.sqlite')
cursor = conn.cursor()

# Create the table structure
sql_query = """
CREATE TABLE IF NOT EXISTS san_diego_traffic_incidents(
    ID VARCHAR(20),
    start_time DATETIME,
    end_time DATETIME,
    criticality VARCHAR(8),
    origin_latitude REAL,
    origin_longitude REAL,
    end_latitude REAL,
    end_longitude REAL,
    road_closed VARCHAR(5),
    event_code SMALLINT,
    description VARCHAR(400),
    CONSTRAINT primary_key_constraint PRIMARY KEY (ID)
    )
"""

cursor.execute(sql_query)
print("Opened database successfully")

# Insert our data
# To avoid duplicates, we read the SQL DB for the IDs
current_ids = pd.read_sql('SELECT ID FROM san_diego_traffic_incidents', conn)
current_ids = current_ids['ID'].tolist()
current_ids = [int(i) for i in current_ids]

# Then we filter out the incidents with already existing IDs
new_incidents = traffic_df[~traffic_df['ID'].isin(current_ids)]

# Lastly, we append these incidents to our database
try:
    new_incidents.to_sql('san_diego_traffic_incidents', con=conn, index=False, if_exists='append')
except:
    print("EXCEPTION RAISED")
    
conn.close()
print("Databse closed successfully")

Opened database successfully
Databse closed successfully
