In [1]:
import mysql.connector
from mysql.connector import errorcode
from config import rds_config

In [2]:
CONFIG_USER = rds_config["user"]
CONFIG_PASSWORD = rds_config["password"]
CONFIG_HOST = rds_config["host"]

conn = mysql.connector.connect(
    user=CONFIG_USER, 
    password=CONFIG_PASSWORD,
    host=CONFIG_HOST,
    buffered=True
)

cursor = conn.cursor()

In [3]:
DB_NAME = 'sf_fires_testing'

In [4]:
def create_database(cursor):
    try:
        statement = f"CREATE DATABASE {DB_NAME} DEFAULT CHARACTER SET 'utf8'"
        cursor.execute(statement)
    except mysql.connector.Error as err:
        print(f"Failed creating database: {err}")

try:
    cursor.execute(f"USE {DB_NAME}")
    print(f"Using {DB_NAME}")
except mysql.connector.Error as err:
    print(f"Database {DB_NAME} does not exist")
    if err.errno == mysql.connector.errorcode.ER_BAD_DB_ERROR:
        create_database(cursor)
        print(f"Database {DB_NAME} created successfully.")
        conn.database = DB_NAME
    else:
        print(err)

Using sf_fires_testing


In [5]:
query = "show databases"
cursor.execute(query)
for db in cursor:
     print(db[0])

information_schema
mysql
performance_schema
sf_fires_testing
sys


### Create Table

In [6]:
import json

In [6]:
import pickle

with open('../Files/rs_data_types.pickle', 'rb') as handle:
    columns = pickle.load(handle)

In [7]:
print(list(columns.keys()))

['incident_number', 'exposure_number', 'suppression_units', 'suppression_personnel', 'ems_units', 'ems_personnel', 'other_units', 'other_personnel', 'estimated_property_loss', 'estimated_contents_loss', 'fire_fatalities', 'fire_injuries', 'civilian_fatalities', 'civilian_injuries', 'number_of_alarms', 'floor_of_fire_origin', 'number_of_floors_with_minimum_damage', 'number_of_floors_with_significant_damage', 'number_of_floors_with_heavy_damage', 'number_of_floors_with_extreme_damage', 'number_of_sprinkler_heads_operating', 'incident_date', 'alarm_dttm', 'arrival_dttm', 'id', 'address', 'call_number', 'close_dttm', 'city', 'zipcode', 'battalion', 'station_area', 'box', 'first_unit_on_scene', 'primary_situation', 'mutual_aid', 'action_taken_primary', 'action_taken_secondary', 'action_taken_other', 'detector_alerted_occupants', 'property_use', 'area_of_fire_origin', 'ignition_cause', 'ignition_factor_primary', 'ignition_factor_secondary', 'heat_source', 'item_first_ignited', 'human_factors

In [8]:
for i, (k, v) in enumerate(columns.items()):
    print(k, v)
    if i > 5: break

incident_number INTEGER
exposure_number SMALLINT
suppression_units SMALLINT
suppression_personnel SMALLINT
ems_units SMALLINT
ems_personnel SMALLINT
other_units SMALLINT


In [9]:
# Redshift
create_str = "CREATE TABLE IF NOT EXISTS sf_fires.sf_fires ("
for k, v in columns.items():
    create_str += f"{k.lower()} {v},"
create_str += "PRIMARY KEY (id)) DISTKEY(id)"
create_str += " COMPOUND SORTKEY(incident_date, battalion, neighborhood_district)"

# incident_date, 

# MySQL
# create_str = "CREATE TABLE IF NOT EXISTS sf_fires ("
# for k, v in columns.items():
#     create_str += f"`{k.lower()}` {v},"
# create_str += "PRIMARY KEY (id)) ENGINE=InnoDB"

In [10]:
create_str

'CREATE TABLE IF NOT EXISTS sf_fires.sf_fires (incident_number INTEGER,exposure_number SMALLINT,suppression_units SMALLINT,suppression_personnel SMALLINT,ems_units SMALLINT,ems_personnel SMALLINT,other_units SMALLINT,other_personnel SMALLINT,estimated_property_loss DECIMAL(12,3),estimated_contents_loss DECIMAL(12,3),fire_fatalities SMALLINT,fire_injuries SMALLINT,civilian_fatalities SMALLINT,civilian_injuries SMALLINT,number_of_alarms SMALLINT,floor_of_fire_origin DECIMAL(7,2),number_of_floors_with_minimum_damage DECIMAL(7,2),number_of_floors_with_significant_damage DECIMAL(7,2),number_of_floors_with_heavy_damage DECIMAL(7,2),number_of_floors_with_extreme_damage DECIMAL(7,2),number_of_sprinkler_heads_operating DECIMAL(7,2),incident_date TIMESTAMP,alarm_dttm TIMESTAMP,arrival_dttm TIMESTAMP,id VARCHAR,address VARCHAR,call_number VARCHAR,close_dttm VARCHAR,city VARCHAR,zipcode VARCHAR,battalion VARCHAR,station_area VARCHAR,box VARCHAR,first_unit_on_scene VARCHAR,primary_situation VARCHAR

In [11]:
drop_table_query = f"DROP TABLE IF EXISTS `sf_fires`"
cursor.execute(drop_table_query)

In [12]:
create_str = "CREATE TABLE IF NOT EXISTS sf_fires ("
for k, v in columns.items():
    create_str += f"`{k.lower()}` {v},"
create_str += "PRIMARY KEY (id)) ENGINE=InnoDB"

try:
    cursor.execute(create_str)
    print("table created")
except mysql.connector.Error as err:
    if err.errno == errorcode.ER_TABLE_EXISTS_ERROR:
        print("This table already exists")
    else:
        print(err.msg)

table created


In [13]:
select_query = "SELECT * FROM information_schema.tables"
cursor.execute(select_query)
tables = cursor.fetchall()
print(tables[0])

('def', 'mysql', 'innodb_table_stats', 'BASE TABLE', 'InnoDB', 10, 'Dynamic', 11, 1489, 16384, 0, 0, 4194304, None, datetime.datetime(2021, 2, 23, 20, 24, 43), datetime.datetime(2021, 11, 29, 20, 11, 53), None, 'utf8_bin', None, 'row_format=DYNAMIC stats_persistent=0', '')


### Populate Table

In [14]:
import json

In [15]:
with open('../Data/data.json') as json_file:
    data = json.load(json_file)

In [16]:
data_short = data[:1000]

In [17]:
# building INSERT query
insert_query = "INSERT INTO sf_fires ("
for col in columns:
    insert_query += f"{col}, "
insert_query = f"{insert_query[:-2]}) VALUES ("

for col in columns:
    insert_query += f"%({col})s, "
insert_query = f"{insert_query[:-2]})"
insert_query

'INSERT INTO sf_fires (incident_number, exposure_number, suppression_units, suppression_personnel, ems_units, ems_personnel, other_units, other_personnel, estimated_property_loss, estimated_contents_loss, fire_fatalities, fire_injuries, civilian_fatalities, civilian_injuries, number_of_alarms, floor_of_fire_origin, number_of_floors_with_minimum_damage, number_of_floors_with_significant_damage, number_of_floors_with_heavy_damage, number_of_floors_with_extreme_damage, number_of_sprinkler_heads_operating, incident_date, alarm_dttm, arrival_dttm, id, address, call_number, close_dttm, city, zipcode, battalion, station_area, box, first_unit_on_scene, primary_situation, mutual_aid, action_taken_primary, action_taken_secondary, action_taken_other, detector_alerted_occupants, property_use, area_of_fire_origin, ignition_cause, ignition_factor_primary, ignition_factor_secondary, heat_source, item_first_ignited, human_factors_associated_with_ignition, structure_type, structure_status, fire_spread,

In [18]:
for item in data_short:
    insert_query_values = { k:(item[k] if k in item.keys() else None) for k in columns }
    insert_query_values["point"] = str(item["point"]["coordinates"])
    cursor.execute(insert_query, insert_query_values)
conn.commit()

In [21]:
import pandas as pd
from sqlalchemy import create_engine

In [22]:
df = pd.DataFrame(data)

In [23]:
for col, data_type in columns.items():
    if "INT" in data_type:
        df[col] = pd.to_numeric(df[col], downcast="integer")
    elif "TIME" in data_type:
        df[col] = pd.to_datetime(df[col])

In [24]:
conn.close()

In [25]:
SQL_USER = rds_config["user"]
SQL_PASSWORD = rds_config["password"]
HOST = rds_config["host"]
DB_NAME = "sf_fires_testing"
MYSQL_CONN = "mysqlconnector"
sql_config = f"mysql+{MYSQL_CONN}://{SQL_USER}:{SQL_PASSWORD}@{HOST}/{DB_NAME}"
sql_engine = create_engine(sql_config)
conn = sql_engine.connect()

In [30]:
df.at[0, "point"]

{'type': 'Point', 'coordinates': [-122.41837339, 37.74208979]}

In [34]:
import numpy as np

In [39]:
np.isnan(df.at[91011, "point"])

True

In [41]:
df["point"][df["point"].isna()][:5]

86396     NaN
88761     NaN
91011     NaN
248997    NaN
259669    NaN
Name: point, dtype: object

In [50]:
def get_coords(point):
    if not point:
        return None
    else:
        return str(dict(point)["coordinates"])

df["point"] = df["point"].apply(get_coords)

ValueError: dictionary update sequence element #0 has length 1; 2 is required

In [51]:
for i in df.index:
    value = df.at[i, "point"]
    try:
        df.at[i, "point"] = str(value["coordinates"])
    except:
        df.at[i, "point"] = None

In [76]:
try:
    df.to_sql("sf_fires", conn, if_exists="replace")
except Exception as err:
    print(err)

In [None]:
# building INSERT query
insert_query = "INSERT INTO sf_fires ("
for col in columns:
    insert_query += f"{col}, "
insert_query = f"{insert_query[:-2]}) VALUES "

for item in data_short:
    insert_query_values = [ item[k] if k in item.keys() and k != "point" else None for k in columns ]
    insert_query_values.append(str(item["point"]["coordinates"]))
    insert_query += str(tuple(insert_query_values)) + ", "

Error: Session cannot generate requests

In [None]:
insert_query = insert_query[:-2]
insert_query[-50:]

" Hunters Point', None, '[-122.372228, 37.728104]')"