# Get Data

In [1]:
%load_ext lab_black
%load_ext autoreload
%autoreload 2

In [2]:
import configparser
import os
import shutil
from glob import glob
from io import BytesIO
from zipfile import ZipFile

import pandas as pd
import requests
from sqlalchemy import create_engine

In [3]:
config = configparser.ConfigParser()
config.read("../sql.ini")
default_cfg = config["default"]

In [4]:
DB_TYPE = default_cfg["DB_TYPE"]
DB_DRIVER = default_cfg["DB_DRIVER"]
DB_USER = default_cfg["DB_USER"]
DB_PASS = default_cfg["DB_PASS"]
DB_HOST = default_cfg["DB_HOST"]
DB_PORT = default_cfg["DB_PORT"]
DB_NAME = default_cfg["DB_NAME"]

In [5]:
# Connect to single database (required to create database)
URI_NO_DB = f"{DB_TYPE}+{DB_DRIVER}://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}"

# Connect to all databases (required to perform CRUD operations and submit queries)
URI = f"{DB_TYPE}+{DB_DRIVER}://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}"

## About

In this notebook, we will download historical [Dinesafe inspections data](https://open.toronto.ca/dataset/dinesafe/) from WayBackMachine (internet web archive, [link](https://archive.org/)). These datasets are snapshots captured at various timestamps. We need these [snapshots](https://web.archive.org/web/*/http://opendata.toronto.ca/public.health/dinesafe/dinesafe.zip) since the version of this data on the Toronto Open Data portal covers a short period of time (approx. 18 months starting in Jan 2020). We will want to have access to as much data as possible to train an ML model to predict a critical infraction during future inspections.

All historical datasets will be processed (dropping any inspections that might be duplicated across multiple snapshots), concatenated and then appended to a local MySQL database.

## Database Administration

The inspections data will be stored locally in a MySQL database. We'll first create the `dinesafe` database

In [7]:
engine = create_engine(URI_NO_DB)
conn = engine.connect()

In [8]:
_ = conn.execute(f"DROP DATABASE IF EXISTS {DB_NAME};")
_ = conn.execute(f"CREATE DATABASE IF NOT EXISTS {DB_NAME};")

In [9]:
conn.close()
engine.dispose()

## Create Database Table

In [10]:
engine = create_engine(URI)
conn = engine.connect()

Create the `inspections` table in the `dinesafe` database

In [None]:
# Name of database table
table_name = "inspections"

In [11]:
_ = conn.execute(f"DROP TABLE IF EXISTS {table_name}")

In [12]:
create_table_query = f"""
                     CREATE TABLE IF NOT EXISTS {table_name} (
                         row_id INT,
                         establishment_id INT,
                         inspection_id INT,
                         establishment_name TEXT,
                         establishmenttype TEXT,
                         establishment_address TEXT,
                         latitude FLOAT,
                         longitude FLOAT,
                         establishment_status TEXT,
                         minimum_inspections_peryear INT,
                         infraction_details TEXT,
                         inspection_date DATE,
                         severity TEXT,
                         action TEXT,
                         court_outcome TEXT,
                         amount_fined FLOAT
                     )
                     """
_ = conn.execute(create_table_query)

In [13]:
conn.close()
engine.dispose()

## Get Data and Populate Database

Retrieve DineSafe program data snapshots from WayBackMachine and append to the `dinesafe` database (drop duplicate inspections and change data types before appending to table)
- a helper function `process_data()` is used to
  - download historical inspections data from WayBackMachine (`extract()`) and unzip contents into `data/raw`
  - process the raw inspections data (`process_data()`)
    - change `INSPECTION_DATE` to a `datetime`
    - change `MINIMUM_INSPECTIONS_PERYEAR` to an integer datatype
    - clean `AMOUNT_FINED` (remove commas) and convert to numerical datatype
    - append `LATITUDE` and `LONGITUDE` columns (if not present)
      - some inspections data files have these but others don't
      - since we'll be appending all files to the same database table, they will all need to have these columns even if the columns contain missing values
    - append `LATITUDE` and `LONGITUDE` columns to lowercase
  - append the processed data to the `inspections` table in the `dinesafe` database (`load()`)

In [14]:
def extract(zip_filenames):
    """Retrieve dinesafe data snapshot XML files from WayBackMachine."""
    available_files = []
    for zip_fname in zip_filenames:
        # Assemble source URL
        url = (
            f"https://web.archive.org/web/{zip_fname}/"
            "http://opendata.toronto.ca/public.health/dinesafe/dinesafe.zip"
        )
        # Create path to target dir, where extracted .XML file will be found
        target_dir = f"data/raw/{zip_fname}"
        if not os.path.exists(f"data/raw/{zip_fname}/dinesafe.xml"):
            # Get zipped file containing .XML file
            r = requests.get(url)
            # Extract to target dir
            with ZipFile(BytesIO(r.content)) as zfile:
                zfile.extractall(target_dir)
        available_files.append(target_dir)
    return available_files


def read_data(filepath):
    """Load an XML file into a DataFrame."""
    return pd.read_xml(filepath)


def process_data(df, cols_order_wanted):
    """Process inspections data."""
    # Datetime formatting
    df["INSPECTION_DATE"] = pd.to_datetime(df["INSPECTION_DATE"])
    # Change datatype 1/2
    df = df.astype({"MINIMUM_INSPECTIONS_PERYEAR": int})
    # Remove commas from column and convert string to float
    if df["AMOUNT_FINED"].dtype == "object":
        df["AMOUNT_FINED"] = pd.to_numeric(
            df["AMOUNT_FINED"].astype(str).str.replace(",", ""), errors="coerce"
        )
    # Append latitude and longitude columns, if not found in the data
    for loc_col in ["LATITUDE", "LONGITUDE"]:
        if loc_col not in list(df):
            df[loc_col] = None
    # Change column names to lowercase, Re-order columns and Change datatype of the
    # latitude and longitude columns
    df = df.rename(columns=str.lower)[cols_order_wanted].astype(
        {"latitude": float, "longitude": float}
    )
    return df


def transform(available_files, cols_order_wanted):
    """Transform data in downloaded XML files."""
    dfs = []
    for f in available_files:
        df = read_data(f"{f}/dinesafe.xml")
        df = process_data(df, cols_order_wanted)
        dfs.append(df)
    return dfs


def load(dfs, uri, table_name="inspections"):
    """Vertically concatenate list of DataFrames and Append to database."""
    dfs_all = pd.concat(dfs, ignore_index=True).drop_duplicates(
        keep="first", subset=None
    )
    engine = create_engine(uri)
    conn = engine.connect()
    dfs_all.to_sql(name=table_name, con=conn, index=False, if_exists="append")
    conn.close()
    engine.dispose()


def retrieve_data(zip_filenames, uri, cols_order_wanted, table_name="inspections"):
    """Retrieve data, process and append to database table."""
    # Extract
    available_files_list = extract(zip_filenames)

    # Transform
    dfs = transform(available_files_list, cols_order_wanted)

    # Load
    load(dfs, uri, table_name)
    return dfs

Run the ETL workflow to retrieve historical inspections data files, process each file and append processed data to the `inspections` table of the `dinesafe` database

In [6]:
# Data file names to download (these are timestamps at which data snapshot was
# captured by WayBackMachine)
zip_filenames = [
    "20130723222156",
    "20150603085055",
    "20151012004454",
    "20160129205023",
    "20160317045436",
    "20160915001010",
    "20170303162206",
    "20170330001043",
    "20170726115444",
    "20190116215713",
    "20190126084933",
    "20190614092848",
    "20210626163552",
]

# Order of DataFrame columns (to re-order raw data) in order to match column order in database table
cols_order_wanted = [
    "row_id",
    "establishment_id",
    "inspection_id",
    "establishment_name",
    "establishmenttype",
    "establishment_address",
    "latitude",
    "longitude",
    "establishment_status",
    "minimum_inspections_peryear",
    "infraction_details",
    "inspection_date",
    "severity",
    "action",
    "court_outcome",
    "amount_fined",
]

In [15]:
%%time
dfs = retrieve_data(zip_filenames, URI, cols_order_wanted, table_name)

CPU times: user 1min 30s, sys: 2.97 s, total: 1min 33s
Wall time: 1min 46s


These datasets list each infraction for a single inspection on a separate row. We will now need to filter these infractions to only select relevant ones and then aggregate them by inspection, since each row (inspection) will be used as an independent observation by the ML model we train later.

In the next notebook (`2_sql_filter_transform.ipynb`), we will filter these infractions and aggregate them by inspection.