# Loading CPDP Data Into the NPDC Index

This notebook processes the `unified_data` from [chicago-police-data](https://github.com/invinst/chicago-police-data/tree/master/data) and loads it into the NPDC index database.

Data is only inserted once, so it's safe to run this multiple times.

## Setup

Install project dependencies and Jupyter:

```bash
pip3 install jupyter
pip3 install -r requirements/dev_unix.txt
```

This notebook should be run from the repo root, and `unified_data_path` should point to the `unified_data` directory of the `chicago-police-data` repo.

The database should be running and the tables should be up to date. You can use docker to reset the application to a clean state: 

```bash
# Stop services and remove volumes, rebuild images, start the database, create tables, run seeds, and follow logs
docker-compose down -v && docker-compose up --build -d db api && docker-compose logs -f
```

Then open the notebook with either [VSCode](https://code.visualstudio.com/) or `jupyter notebook`.

You can run the notebook from the command line as well:

```bash
jupyter nbconvert --to notebook --execute backend/scraper/cpdp.ipynb --output cpdp
```

In [1]:
import os
if os.getcwd().endswith("scraper"):
    # Run this notebook from the repo root
    os.chdir("../..")
import sys
import math
import numpy as np
import pandas as pd
import sqlalchemy
import psycopg2
from IPython.display import display, HTML
from collections import namedtuple
from backend.database import db, Incident, Officer, Accusation, Victim

In [2]:
from backend.api import create_app
app = create_app("development")

unified_data_path = "../chicago-police-data/data/unified_data"
if not os.path.exists(unified_data_path):
    raise Exception(f"{unified_data_path} does not exist. Working directory should be the police-data-trust repo root and cpdp repo should be checked out in a sibling directory")

## Read Data Files

Data is organized into CSV's under `data/unified_data`. CSV file naming is described in `data/README.md`. CSV columns are described in `data/unified_data/data-dictionary/data-dictionary.yaml` as well as `data/complaints_general-summary.html`

In [3]:
def data_path(*args):
    return os.path.join(unified_data_path, *args)


def float_to_int_str(x):
    """Converts a floating-point string into an integer string.

    Empty strings are converted to nan
    """
    return str(int(float(x))) if x else math.nan


def read_csv(*path):
    return pd.read_csv(
        data_path(*path),
        dtype={
            "cr_id": str,
        },
        converters={
            "link_UID": float_to_int_str,
            "birth_year": float_to_int_str,
            "investigator_ID": float_to_int_str,
            "UID": float_to_int_str,
        },
        low_memory=False,
    )


def read_complaint(name):
    return read_csv("complaints", f"complaints-{name}.csv.gz")


def check_defined(df, column):
    assert df[column].isnull().values.sum() == 0


# Each record is one complaint, primary key cr_id
complaints = read_complaint("complaints").set_index(
    "cr_id", drop=False, verify_integrity=True
)

# Each record is an officer, deduplicated across data sources, primary key UID
profiles = read_csv("profiles", "final-profiles.csv.gz").set_index(
    "UID", drop=False, verify_integrity=True
)

# Each record is an accusation against one officer in one complaint, composite
# key (cr_id, UID)
accused = read_complaint("accused").set_index(
    ["cr_id", "UID"], drop=False, verify_integrity=True
)

# Each record is a person that filed a complaint. Many-to-1 with cr_id
complainants = read_complaint("complainants")
check_defined(complainants, "cr_id")

# Each record is a person assigned to investigate a particular complaint.
# Many-to-1 with (cr_id, investigator_ID). investigator_ID includes officer
# UID's and non-officer investigators. The same investigator may be assigned to
# the same complaint at different times, resulting in multiple records.
investigators = read_complaint("investigators")
check_defined(investigators, ["cr_id", "investigator_ID"])

# Each record is a victim in a complaint. Many-to-1 with cr_id. victims_v3
# contains injury information which is lost in the merged victims table.
victims_v2 = read_complaint("victims_2000-2016_2016-11")
victims_v3 = read_complaint("victims_2000-2018_2018-03")
victims_unified = read_complaint("victims")
check_defined(victims_v2, "cr_id")
check_defined(victims_v3, "cr_id")
check_defined(victims_unified, "cr_id")


In [4]:
def isnan(x):
    return isinstance(x, float) and math.isnan(x)


def nan_to_none(x):
    return None if isnan(x) else x


def strip_nan(r):
    return r._make([nan_to_none(e) for e in r])


def to_orm(instances, OrmClass):
    return [
        OrmClass(**strip_nan(i)._asdict())
        for i in instances.itertuples(index=False)
    ]


def to_dicts(instances):
    """Converts dataframe rows into dicts, converting NaN to None"""
    return [strip_nan(i)._asdict() for i in instances.itertuples(index=False)]


def create_bulk(instances, chunk_size=1000):
    """Inserts ORM instances into the database"""
    for chunk in range(0, len(instances), chunk_size):
        db.session.add_all(instances[chunk : chunk + chunk_size])
        db.session.flush()
    db.session.commit()


def insert_bulk(dicts, OrmClass):
    """Inserts dicts into the database.

    This is 3x faster but does not implement ORM features.
    """
    with app.app_context():
        db.session.bulk_insert_mappings(OrmClass, dicts)
        db.session.commit()


def insert_bulk_if_missing(dicts, OrmClass):
    try:
        insert_bulk(dicts, OrmClass)
    except sqlalchemy.exc.IntegrityError as e:
        if isinstance(e.orig, psycopg2.errors.UniqueViolation):
            print(f"Already created {OrmClass.__name__} records")
        else:
            raise e


def add_date_of_birth(target, birth_year):
    # Default to 01-01 for birthday
    has_birth_year = ~birth_year.isna()
    target.loc[has_birth_year, "date_of_birth"] = (
        birth_year[has_birth_year] + "-01-01"
    )


## Load Incidents

In [5]:
incidents = complaints[["complaint_date", "closed_date"]].copy()
incidents["source"] = "cpdp"
incidents[["source_id", "time_of_incident"]] = complaints[
    ["cr_id", "incident_date"]
]
has_full_address = ~complaints["full_address"].isna()

# full address only contains street information, so add the city
incidents.loc[has_full_address, "location"] = (
    complaints.loc[has_full_address, "full_address"] + " CHICAGO ILLINOIS"
)

# Join the address components, ignoring missing values and consolidating whitespace
address = complaints[~has_full_address]
incidents.loc[~has_full_address, "location"] = (
    address["add1"]
    .str.cat(address[["add2", "city"]], na_rep=" ")
    .str.strip()
    .str.replace(r"\s{2,}", " ", regex=True)
)

incident_dicts = to_dicts(incidents)


In [6]:
insert_bulk_if_missing(incident_dicts, Incident)


Already created Incident records


## Load Officers

In [7]:
officers = profiles[
    ["first_name", "last_name", "race", "gender", "appointed_date"]
].copy()
officers["source"] = "cpdp"
officers[["source_id", "rank", "star", "unit"]] = profiles[
    [
        "link_UID",
        "cleaned_rank",
        "current_star",
        "current_unit",
    ]
]
add_date_of_birth(officers, profiles["birth_year"])

officer_dicts = to_dicts(officers)


In [8]:
insert_bulk_if_missing(officer_dicts, Officer)


Already created Officer records


## Generate Mapping from CPDP to NPDC ID's

CPDP entities are linked together using incident and officer ID's. There is a 1-many corespondance between incidents/officers and victims, investigations, participants, and accusations. In order to insert these entities into the database, we need to convert the CPDP id's in the source data to their corresponding NPDC id's.

In [9]:
def get_source_npdc_id_map(OrmClass):
    """Returns a dict mapping non-null source ID's to NPDC id's (PK's)"""
    with app.app_context():
        ids = (
            db.session.query(OrmClass)
            .filter(OrmClass.source_id != None)
            .with_entities(OrmClass.source_id, OrmClass.id)
            .all()
        )
    return dict(ids)


officer_id_by_link_uid = get_source_npdc_id_map(Officer)
incident_id_by_cr_id = get_source_npdc_id_map(Incident)


## Load Accusations

Complaints may be made against multiple officers for a single incident. Each accusation links a single officer to a single incident.

In [10]:
accusations = pd.DataFrame()
accusations[["category", "category_code", "finding", "outcome"]] = accused[
    ["complaint_category", "complaint_code", "final_finding", "final_outcome"]
].copy()
accusations["incident_id"] = accused["cr_id"].map(incident_id_by_cr_id)
accusations["officer_id"] = accused["link_UID"].map(officer_id_by_link_uid)
check_defined(accusations, ["incident_id", "officer_id"])

insert_bulk_if_missing(to_dicts(accusations), Accusation)


Already created Accusation records


## List Officers by Number of Accusations

This matches the results at [cpdp.co](https://cpdp.co)

In [11]:
desc = db.desc
count = db.func.count
with app.app_context():
    q = (
        db.session.query(
            Officer.source_id, Officer.first_name, Officer.last_name, count()
        )
        .join(Accusation)
        .group_by(Officer.id)
        .order_by(desc(count()))
    )
    print("Query:\n", str(q), "\n")
    df = pd.DataFrame(
        data=[
            (id, f"{first.title()} {last.title()}", num_accusations)
            for id, first, last, num_accusations in q
        ],
        columns=["id", "Officer Name", "Number of Accusations"],
    ).set_index("id")

display(df)


Query:
 SELECT officer.source_id AS officer_source_id, officer.first_name AS officer_first_name, officer.last_name AS officer_last_name, count(*) AS count_1 
FROM officer JOIN accusation ON officer.id = accusation.officer_id GROUP BY officer.id ORDER BY count(*) DESC 



Unnamed: 0_level_0,Officer Name,Number of Accusations
id,Unnamed: 1_level_1,Unnamed: 2_level_1
8562,Jerome Finnigan,175
21837,Joe Parker,137
17816,Edward May,136
8138,Glenn Evans,132
21468,Kevin Osborn,125
...,...,...
28717,Richard Topel,1
3040,Robert Brown,1
15566,John Lahti,1
10854,John Grode,1


## Load Victims

Victims may or may not be the person making the complaint.

In [12]:
# About 25% of victims reference cr_id which is missing in the complaints table.
# CPDP still shows information for these complaints, and I'm not sure where that
# data comes from. For our purposes, drop victims with no associated complaint.
# Ex https://cpdp.co/complaint/1086131/
victims_v2 = victims_v2[victims_v2.cr_id.isin(incident_id_by_cr_id)]
victims_v3 = victims_v3[victims_v3.cr_id.isin(incident_id_by_cr_id)]

v2 = victims_v2[["gender", "race"]].copy()
v2["incident_id"] = victims_v2.cr_id.map(incident_id_by_cr_id)

v3 = victims_v3[
    ["gender", "race", "injury_condition", "injury_description"]
].copy()
v3["incident_id"] = victims_v3.cr_id.map(incident_id_by_cr_id)
add_date_of_birth(v3, victims_v3["birth_year"])
v3["deceased"] = v3["injury_condition"].str.match("deceased", case=False)

# Collect all victims in v3 as well as any in v2 that don't appear in v3
victims = v2[~v2.incident_id.isin(v3.incident_id)].append(v3, ignore_index=True)
check_defined(victims, "incident_id")
# Specify primary keys for victims so insertion into the database is idempotent
victims["id"] = 1 + np.arange(victims.shape[0])

insert_bulk_if_missing(to_dicts(victims), Victim)


Already created Victim records
