### ETL PROTOTYPE 

- Alejandro Giorgio

In [1]:
import requests
import pandas as pd
import io
import json

In [2]:
from dotenv import load_dotenv
load_dotenv()

True

In [3]:
base_url = "https://data.sfgov.org/resource/wg3w-h783.json"

In [4]:
# Initialize offset
offset = 0

# Number of records to fetch in each request
limit = 50000

La API tiene un límite de devolver 50000 filas de datos por vez. Si queremos obtener más de 50000 registros, tenemos que manejar esta limitación en nuestro código.

Podemos usar los parámetros ‘$offset’ y ‘$limit’ que provee la API para obtener todos los registros. Empezamos con un offset de 0 y un límite de 50000 (o cualquier otro número hasta el límite máximo permitido por la API). Enviamos un pedido GET a la API con estos parámetros y obtenemos los primeros 50000 registros. Después aumentamos el offset por el límite (50000 en este caso) y enviamos otro pedido para obtener los siguientes 50000 registros. Repetimos este proceso hasta que la API no devuelva datos, lo que significa que hemos obtenido todos los registros.

In [5]:
df = pd.DataFrame()

In [6]:
while True:
    # Construct the URL with the offset and limit parameters
    url = f"{base_url}?$limit={limit}&$offset={offset}"

    # Send a GET request to the API endpoint
    response = requests.get(url)

    # Check if the request was successful
    if response.status_code == 200:
        # Convert the response content to a pandas DataFrame
        data = pd.read_json(io.StringIO(response.text))

        # If no data is returned, we've fetched all records
        if data.empty:
            break

        # Append the data to df DataFrame using pandas.concat
        df = pd.concat([df, data], ignore_index=True)

        # Increase the offset by limit
        offset += limit

        print(f"offset is {offset}")
    else:
        print(f"Failed to fetch data. Status code: {response.status_code}")
        break

offset is 50000
offset is 100000
offset is 150000
offset is 200000
offset is 250000
offset is 300000
offset is 350000
offset is 400000
offset is 450000
offset is 500000
offset is 550000
offset is 600000
offset is 650000
offset is 700000
offset is 750000
offset is 800000
offset is 850000


### Modeling the data

In [7]:
# Convert date columns to datetime
from scripts.constants import date_columns, string_columns

for col in date_columns:
    df[col] = pd.to_datetime(df[col])

for col in string_columns:
    df[col] = df[col].str.lower().str.strip()

In [8]:
threshold = int(0.6 * len(df))
df = df.dropna(thresh=threshold, axis=1)

In [9]:
df.columns = df.columns.str.replace(':', '')
df.columns = df.columns.str.replace('@', '')

In [10]:
df['point'] = df['point'].apply(json.dumps)

In [11]:
df = df.drop_duplicates()

In [12]:
df.sample(2).transpose()

Unnamed: 0,22671,700288
incident_datetime,2023-04-25 19:09:00,2019-10-27 21:30:00
incident_date,2023-04-25 00:00:00,2019-10-27 00:00:00
incident_time,2024-04-02 19:09:00,2024-04-02 21:30:00
incident_year,2023,2019
incident_day_of_week,tuesday,sunday
report_datetime,2023-04-25 19:11:00,2019-10-27 21:38:00
row_id,126975164020,86341515200
incident_id,1269751,863415
incident_number,230288141,190813307
report_type_code,ii,ii


### INTEGRATION WITH REDSHIFT

In [13]:
import os
import psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
from sqlalchemy import create_engine

In [18]:
# Define your Redshift credentials
host = os.environ["REDSHIFT_HOST"]
dbname = os.environ["REDSHIFT_DATABASE"]
user = os.environ["REDSHIFT_USER"]
password = os.environ["REDSHIFT_PWD"]
port = os.environ["REDSHIFT_PORT"]


In [32]:
host = "data-engineer-cluster.cyhh5bfevlmn.us-east-1.redshift.amazonaws.com"

In [34]:
# Create a connection to Redshift
conn = psycopg2.connect(
    dbname=dbname,
    user=user,
    password=password,
    port=port,
    host=host
)

### Creacion de la tabla

In [35]:
# Ensure that every single operation with the database is treated as a transaction and is immediately committed to the database.
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)

# Create a cursor object
cur = conn.cursor()

# Creating the table
create_table_command = """
CREATE TABLE IF NOT EXISTS sf_police_incidents (
    incident_datetime TIMESTAMP,
    incident_date DATE,
    incident_time TIME,
    incident_year INT,
    incident_day_of_week VARCHAR(255),
    report_datetime TIMESTAMP,
    row_id BIGINT,
    incident_id INT,
    incident_number INT,
    report_type_code VARCHAR(255),
    report_type_description VARCHAR(255),
    incident_code INT,
    incident_category VARCHAR(255),
    incident_subcategory VARCHAR(255),
    incident_description VARCHAR(255),
    resolution VARCHAR(255),
    police_district VARCHAR(255),
    cad_number FLOAT,
    intersection VARCHAR(255),
    cnn FLOAT,
    analysis_neighborhood VARCHAR(255),
    supervisor_district FLOAT,
    supervisor_district_2012 FLOAT,
    latitude FLOAT,
    longitude FLOAT,
    point VARCHAR(255),
    computed_region_26cr_cadq FLOAT,
    computed_region_qgnn_b9vv FLOAT,
    computed_region_jwn9_ihcz FLOAT
)
"""


# Execute the SQL command
cur.execute(create_table_command)

Carga a la tabla del dataframe luego del proceso de ETL

In [36]:
# Close the cursor and connection
cur.close()
conn.close()

# Create an SQLAlchemy engine
engine = create_engine(
    f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{dbname}"
)

# Write the DataFrame to the table in Redshift
df.to_sql("sf_police_incidents", engine, if_exists="append", index=False)