#Data Loading, Cleaning, and Normalization
We need to load the data from .csv into Postgres.  We also need to normalize the data to make analysis easy.  We'll use Pandas to deal with the .csv loading and data storage.

Files we need to load:
- cfs_2014_inmain.csv (CFS data)
- cfs_xxx2014_incilog.csv (CFS event data -- one for each month)
- cfs_2014_lwmain.csv (incident data)
- cfs_2014_lwmodop.csv (incident modus operandi data)
- LWMAIN.THING.csv (incident lookup tables, where THING is one of the following: CSSTATUS, EMDIVISION, EMSECTION, EMUNIT, INSTSTATS, PREMISE, or WEAPON)

We'll use pandas and sqlalchemy to stuff the data into a local instance of postgres.

In [3]:
import pandas as pd
from sqlalchemy import create_engine # database connection
import datetime as dt
from IPython.display import display
from functools import reduce

In [2]:
display(pd.read_csv('../csv_data/cfs_2014_inmain.csv', nrows=2).head())

Unnamed: 0,inci_id,calltime,calldow,case_id,callsource,primeunit,firstdisp,streetno,streetonly,street,...,secs2tr,secsar2tr,lastclr,secs2lc,secsar2lc,secstr2lc,timeclose,reptaken,closecode,closecomm
0,2014000002,1/1/14 0:00:22,4,,PHONE,BK2,BK2,301,S ELM ST,301 S ELM ST,...,0,0,1/1/14 0:04:20,238,0,0,1/1/14 0:04:22,,10,
1,2014000003,1/1/14 0:00:40,4,14000001.0,SELF,B200,B200,1610,GUESS RD,1610 GUESS RD,...,0,0,1/1/14 0:15:57,918,917,0,1/1/14 0:15:59,B200,1,


We need to create the tables before touching the data so they have all the proper constraints.  Pandas' to_sql method, while helpful, won't handle the constraints automatically.

#Database DDL

Code to create the database schema is below.

In [5]:
engine = create_engine('postgresql://localhost/cfs')

In [7]:
def reset_db():
    """
    Remove and recreate tables to prepare for reloading the db
    """
    engine.execute("DROP TABLE IF EXISTS note CASCADE;")
    engine.execute("DROP TABLE IF EXISTS call CASCADE;")
    
    engine.execute("""
    CREATE TABLE call
    (
      call_id bigint NOT NULL,
      call_time timestamp without time zone,
      call_dow bigint,
      case_id text,
      call_source text,
      primary_unit text,
      first_dispatched text,
      street_num text,
      street_name text,
      city_desc text,
      zip text,
      crossroad1 text,
      crossroad2 text,
      geox double precision,
      geoy double precision,
      service text,
      agency text,
      beat text,
      district text,
      sector text,
      business text,
      nature_code text,
      nature_desc text,
      priority text,
      report_only bigint,
      cancelled bigint,
      time_enroute timestamp without time zone,
      time_finished timestamp without time zone,
      first_unit_dispatch timestamp without time zone,
      first_unit_enroute timestamp without time zone,
      first_unit_arrive timestamp without time zone,
      first_unit_transport timestamp without time zone,
      last_unit_clear timestamp without time zone,
      time_closed timestamp without time zone,
      reporting_unit text,
      close_code text,
      close_comm text,
      CONSTRAINT call_id_pkey PRIMARY KEY (call_id)
    );
    """)
    
    engine.execute("""
    CREATE TABLE note
    (
      note_id serial NOT NULL,
      text text,
      "timestamp" timestamp without time zone,
      author text,
      call_id integer,
      CONSTRAINT note_pkey PRIMARY KEY (note_id),
      CONSTRAINT note_call_id_fkey FOREIGN KEY (call_id) REFERENCES call (call_id)
    );
    """)
    
    engine.execute("""
    CREATE TABLE call_log
    (
      call_log_id bigint NOT NULL,
      transaction_code text,
      transaction_desc text,
      "timestamp" timestamp without time zone,
      call_id bigint,
      unit_code text,
      radio_or_event text,
      unitper_id bigint,
      close_code text,
      CONSTRAINT call_log_pkey PRIMARY KEY (call_log_id),
      CONSTRAINT call_log_call_id_fkey FOREIGN KEY (call_id) REFERENCES call (call_id)
    );
    """)

##cfs_2014_inmain.csv

In [14]:
import re

timestamp_expr = re.compile("\[(\d{2}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}) (.+?)\]")

def split_notes_dict(notes,call_id):
    """
    Return a list of dicts.  Each dict represents a single note and contains the corresponding call_id,
    the timestamp, the note-taker, and the text of the note.
    """
    dicts = []
    regex_split = timestamp_expr.split(notes)[:-1]  # get rid of the last empty string created by the split
    for i in range(0,len(regex_split),3):
        text = regex_split[i].strip()
        timestamp = dt.datetime.strptime(regex_split[i+1], "%m/%d/%y %H:%M:%S")
        author = regex_split[i+2]
        dicts.append({"text": text, "timestamp": timestamp, "author": author, "call_id": call_id})
    return dicts

def split_notes(notes):
    """
    Return a list of tuples.  Each tuple represents a single note and contains the corresponding call_id,
    the timestamp, the note-taker, and the text of the note.
    """
    notes = str(notes)
    tuples = []
    regex_split = timestamp_expr.split(notes)[:-1]  # get rid of the last empty string created by the split
    for i in range(0,len(regex_split),3):
        text = regex_split[i].strip()
        timestamp = dt.datetime.strptime(regex_split[i+1], "%m/%d/%y %H:%M:%S")
        author = regex_split[i+2]
        tuples.append((text, timestamp, author))
    return tuples

start = dt.datetime.now()
# load the data in chunks so we don't use too much memory
chunksize = 20000
j = 0

# We need to map the inmain columns to the renamed columns in the call table
# if an inmain column isn't in this dict, it means we need to drop it
call_mappings = {
    "inci_id": "call_id",
    "calltime": "call_time",
    "calldow": "call_dow",
    "case_id": "case_id",
    "callsource": "call_source",
    "primeunit": "primary_unit",
    "firstdisp": "first_dispatched",
    "streetno": "street_num",
    "streetonly": "street_name",
    "citydesc": "city_desc",
    "zip": "zip",
    "crossroad1": "crossroad1",
    "crossroad2": "crossroad2",
    "geox": "geox",
    "geoy": "geoy",
    "service": "service",
    "agency": "agency",
    "statbeat": "beat",
    "district": "district",
    "ra": "sector",
    "business": "business",
    "naturecode": "nature_code",
    "nature": "nature_desc",
    "priority": "priority",
    "rptonly": "report_only",
    "cancelled": "cancelled",
    "timeroute": "time_enroute",
    "timefini": "time_finished",
    "firstdtm": "first_unit_dispatch",
    "firstenr": "first_unit_enroute",
    "firstarrv": "first_unit_arrive",
    "firsttran": "first_unit_transport",
    "lastclr": "last_unit_clear",
    "timeclose": "time_closed",
    "reptaken": "reporting_unit",
    "closecode": "close_code",
    "closecomm": "close_comm"
}

keep_columns = set(call_mappings.keys())

for call in pd.read_csv('../csv_data/cfs_2014_inmain.csv', chunksize=chunksize, iterator=True, encoding='ISO-8859-1',
                       low_memory=False):
    
    """
    nice, clean iterative algorithm for separating out the notes data -- unfortunately, it's prohibitively slow
    (~3 mins per 25k record or thereabouts)
    """
    #for index, row in call.iterrows():
    #    note = note.append(pd.DataFrame(split_notes_dict(str(row['notes']), row['inci_id'])))
        #if call.iloc[i]['naturecode'] not in nature_set:
        #    nature_set.add(call.iloc[i]['naturecode'])
        #    nature = nature.append(pd.DataFrame({"nature_code": [call.iloc[i]['naturecode']],
        #                                "nature_desc": [call.iloc[i]['nature']]}))
   
    """
    Horrid ugly algorithm for separating out the notes data -- it's faster by about 10x though
    Pandas is really slow when iterating on rows, so we have to do all the transformations to a whole series/list
    at a time
    """
    # Create a new series, which is (for each call) a list of tuples containing the text, author, and timestamp
    # of that call:
    # ex. Series(["one long string with text, author, timestamp for all remarks"]) -> 
    #     Series([(text, author, timestamp), (text2, author2, timestamp2)])
    call['collected_notes'] = call['notes'].apply(split_notes)
    
    # Combine the previous series with the inci_id of each row, preserving the relationship between inci_id
    # and each individual remark, then convert it to a list so we can reduce and map
    # ex. Series([(text, author, timestamp), (text2, author2, timestamp2)]) ->
    #     [((text, author, timestamp), inci_id), ((text2, author2, timestamp2), inci_id2)]
    combined_notes = call['collected_notes'].combine(call['inci_id'],
                                                          lambda x,y: [(e,y) for e in x]).tolist()
    
    # Reduce the list of lists using extend; instead of a list of lists of tuples, we have one long list of
    # nested tuples
    # ex. [[((text, author, timestamp), inci_id)], [((text2, author2, timestamp2), inci_id2)]] ->
    #     [((text, author, timestamp), inci_id), ((text2, author2, timestamp2), inci_id2)]
    extended_notes = []
    for l in combined_notes:
        extended_notes.extend(l)
    
    # Flatten the tuples, so we have a list of non-nested tuples
    # ex. [((text, author, timestamp), inci_id), ((text2, author2, timestamp2), inci_id2)] ->
    #     [(text, author, timestamp, inci_id), (text2, author2, timestamp2, inci_id2)]
    extended_notes = map(lambda x: (x[0][0],x[0][1],x[0][2],x[1]), extended_notes)
    
    # Create a dataframe from the list of tuples (whew)
    note = pd.DataFrame.from_records(extended_notes, columns=['text','timestamp','author','call_id'])
    
    # drop unnecessary columns
    for c in call.columns:
        if c not in keep_columns:
            call = call.drop(c, axis=1)   
    
    # rename to the CFS Analytics column names
    call.rename(columns=call_mappings, inplace=True)
    
    ##### USING DPD COLUMN NAMES ABOVE #########
    ##### USING CFS ANALYTICS COLUMN NAMES BELOW ######
    
    # Perform datetime conversions
    call['call_time'] = pd.to_datetime(call['call_time'])
    call['time_enroute'] = pd.to_datetime(call['time_enroute'])
    call['time_finished'] = pd.to_datetime(call['time_finished'])
    call['first_unit_dispatch'] = pd.to_datetime(call['first_unit_dispatch'])
    call['first_unit_enroute'] = pd.to_datetime(call['first_unit_enroute'])
    call['first_unit_arrive'] = pd.to_datetime(call['first_unit_arrive'])
    call['first_unit_transport'] = pd.to_datetime(call['first_unit_transport'])
    call['last_unit_clear'] = pd.to_datetime(call['last_unit_clear'])
    call['time_closed'] = pd.to_datetime(call['time_closed'])

    # progress update
    j+=1
    print('{} seconds: completed {} rows'.format((dt.datetime.now() - start).seconds, j*chunksize))
    

    # store in the database
    call.to_sql('call', engine, index=False, if_exists='append')
    note.to_sql('note', engine, index=False, if_exists='append')

8 seconds: completed 20000 rows
34 seconds: completed 40000 rows
60 seconds: completed 60000 rows
86 seconds: completed 80000 rows
112 seconds: completed 100000 rows
139 seconds: completed 120000 rows
165 seconds: completed 140000 rows
192 seconds: completed 160000 rows
220 seconds: completed 180000 rows
247 seconds: completed 200000 rows
275 seconds: completed 220000 rows
301 seconds: completed 240000 rows
328 seconds: completed 260000 rows
355 seconds: completed 280000 rows
383 seconds: completed 300000 rows
410 seconds: completed 320000 rows
437 seconds: completed 340000 rows
465 seconds: completed 360000 rows
489 seconds: completed 380000 rows


#cfs_xxx2014_incilog.csv
There is one of these for each month, so we have to load them separately.

In [None]:
months = ("jan", "feb", "mar", "apr", "may", "jun", "jul", "aug", "sep", "oct", "nov", "dec")

for month in months:
    start = dt.datetime.now()
    print("Starting load for month: %s" % (month))
    # load the data in chunks so we don't use too much memory
    chunksize = 20000
    j = 0

    # We need to map the incilog columns to the renamed columns in the call_log table
    # if an incilog column isn't in this dict, it means we need to drop it
    call_log_mappings = {
        "incilogid": "call_log_id",
        "transtype": "transaction_code",
        "descript": "transaction_desc",
        "timestamp": "timestamp",
        "inci_id": "call_id",
        "unitcode": "unit_code",
        "radorev": "radio_or_event",
        "unitperid": "unitper_id",
        "closecode": "close_code"
    }
    
    keep_columns = set(call_log_mappings.keys())

    for call_log in pd.read_csv('../csv_data/cfs_%s2014_incilog.csv' % (month), chunksize=chunksize, 
                           iterator=True, encoding='ISO-8859-1', low_memory=False):
        for c in call_log.columns:
            if c not in keep_columns:
                call_log = call_log.drop(c, axis=1)

        # rename to the CFS Analytics column names
        call_log.rename(columns=call_log_mappings, inplace=True)

        ##### USING DPD COLUMN NAMES ABOVE #########
        ##### USING CFS ANALYTICS COLUMN NAMES BELOW ######
        
        # january has some bogus call_ids, so we have to filter them
        if month == "jan":
            call_log = call_log[call_log.call_id > 2014000000]
            
        # Perform datetime conversions
        call_log['timestamp'] = pd.to_datetime(call_log['timestamp'])
        
        # progress update
        j+=1
        print('{} seconds: completed {} rows'.format((dt.datetime.now() - start).seconds, j*chunksize))

        # store in the database
        call_log.to_sql('call_log', engine, index=False, if_exists='append')

#cfs_2014_lwmain.csv


In [11]:
start = dt.datetime.now()
# load the data in chunks so we don't use too much memory
chunksize = 20000
j = 0

# We need to map the incilog columns to the renamed columns in the call_log table
# if an incilog column isn't in this dict, it means we need to drop it
incident_mappings = {
    "lwmainid": "incident_id",
    "inci_id": "call_id",
    "time": "time_filed",
    "streetnbr": "street_num",
    "street": "street_name",
    "city": "city",
    "zip": "zip",
    "geox": "geox",
    "geoy": "geoy",
    "tract": "beat",
    "district": "district",
    "reportarea": "sector",
    "premise": "premise_code",
    "weapon": "weapon_code",
    "domestic": "domestic",
    "juvenile": "juvenile",
    "gangrelat": "gang_related",
    "emunit": "emp_bureau_code",
    "emdivision": "emp_division_code",
    "emsection": "emp_unit_code",
    "asst_offcr": "num_officers",
    "invststats": "investigation_status_code",
    "investunit": "investigator_unit_code",
    "csstatus": "case_status_code",
    "lwchrgid": "lwchrgid",
    "chrgcnt": "charge_seq",
    "ucr_code": "ucr_code",
    "arr_chrg": "ucr_short_desc",
    "attm_comp": "attempted_or_committed"
}

keep_columns = set(incident_mappings.keys())

ucr_desc = pd.DataFrame({"ucr_short_desc": [], "ucr_long_desc": []})

for incident in pd.read_csv('../csv_data/cfs_2014_lwmain.csv', chunksize=chunksize, 
                       iterator=True, encoding='ISO-8859-1', low_memory=False):
    
    ucr_desc = ucr_desc.append(pd.concat([ incident['arr_chrg'],
                                           incident['chrgdesc'] ],
                                        axis=1, keys=['ucr_short_desc', 'ucr_long_desc']))
    
    date_filed = dt.datetime.strptime(incident['date_rept'], "%m/%d/%y")
    time_filed = dt.datetime.strptime(incident['time'], "%I:%M %p")
    
    # Perform datetime conversions
    incident['time_filed'] = dt.datetime(date_filed.year, date_filed.month, date_filed.day,
                                         time_filed.hour, time_filed.minute)
    
    for c in incident.columns:
        if c not in keep_columns:
            incident = incident.drop(c, axis=1)

    # rename to the CFS Analytics column names
    incident.rename(columns=incident_mappings, inplace=True)

    ##### USING DPD COLUMN NAMES ABOVE #########
    ##### USING CFS ANALYTICS COLUMN NAMES BELOW ######
    
    # Drop duplicate ucr_descs
    ucr_desc = ucr_desc.drop_duplicates()
    
    # progress update
    j+=1
    print('{} seconds: completed {} rows'.format((dt.datetime.now() - start).seconds, j*chunksize))

    # store in the database
    incident.to_sql('incident', engine, index=False, if_exists='append')

ucr_desc.to_sql('ucr_desc', engine, index=False, if_exists='append')

TypeError: must be str, not Series

#Initial Exploration

Initial exploration of the Durham PD CFS data using non-robust .csv reading code.  Has windows line endings, so have to open the file in universal mode to account for that.

In [44]:
from pprint import pprint

first = True
incilog_header = ""
incilog = []

with open("cfs_mar2015_incilog.csv","rU") as f:
    for line in f.readlines():
        if first:
            incilog_header = line
            first = False
        else:
            incilog.append([datum.strip() for datum in line.split(',')])

In [50]:
pprint(incilog[0])

['63260886',
 'RPTO',
 'Report Only',
 '3/27/15 15:22:41',
 '55361',
 '2014412231',
 'B125',
 'R',
 '997150',
 '']


In [6]:
first = True
inmain_header = ""
inmain = []

with open("cfs_mar2015_inmain.csv","rU") as f:
    for line in f.readlines():
        if first:
            inmain_header = line
            first = False
        else:
            inmain.append([datum.strip() for datum in line.split(',')])

In [49]:
pprint(inmain[0])

['2015087068',
 '3/1/15 0:00:32',
 '1',
 '',
 'E911',
 'C413',
 'C424',
 '617',
 'HOPE AVE',
 '617 HOPE AVE',
 'DURHAM',
 '27707',
 'ANACOSTA ST',
 'LINCOLN ST',
 '2030390.25',
 '807470.19',
 'LAW',
 'DPD',
 '412',
 'D4',
 'STH',
 '',
 'ASSIST',
 'ASSIST PERSON',
 '4',
 '0',
 '0',
 'actve dist...child advised mom and aunt aruging  [03/01/15 00:01:14 SMITHK]  WRLS  [03/01/15 00:01:19 SMITHK]  NO PHASE 2.....EHX SHOWS 500 MAHONE POSS APT1  [03/01/15 00:04:09 SMITHK]  [EPD] Aborted by Law Priority with code: 1. Caller hung up  [03/01/15 00:07:42 SMITHK]  {C413} NEED BETTER LOCATION  [03/01/15 00:09:50 ROSSA]',
 '3/1/15 0:04:11',
 '219',
 '3/1/15 0:08:11',
 '459',
 '3/1/15 0:04:53',
 '261',
 '42',
 '0',
 '3/1/15 0:04:53',
 '261',
 '0',
 '3/1/15 0:09:32',
 '540',
 '279',
 'NULL',
 '0',
 '0',
 '3/1/15 0:34:42',
 '2050',
 '1510',
 '0',
 '3/1/15 0:34:43',
 '',
 '10',
 '']


The dispatcher's remarks are all concatenated together, separated by brackets containing what appear to be timestamps and names.  We'll use regexes to pull these apart.

In [51]:
import re

timestamp_expr = re.compile("\[(\d{2}/\d{2}/\d{2} \d{2}:\d{2}:\d{2}) (.+?)\]")

test_str = "actve dist...child advised mom and aunt aruging  [03/01/15 00:01:14 SMITHK]  \
WRLS  [03/01/15 00:01:19 SMITHK]  \
NO PHASE 2.....EHX SHOWS 500 MAHONE POSS APT1  [03/01/15 00:04:09 SMITHK]  \
[EPD] Aborted by Law Priority with code: 1. Caller hung up  [03/01/15 00:07:42 SMITHK]  \
{C413} NEED BETTER LOCATION  [03/01/15 00:09:50 ROSSA]"

pprint(timestamp_expr.split(test_str))

['actve dist...child advised mom and aunt aruging  ',
 '03/01/15 00:01:14',
 'SMITHK',
 '  WRLS  ',
 '03/01/15 00:01:19',
 'SMITHK',
 '  NO PHASE 2.....EHX SHOWS 500 MAHONE POSS APT1  ',
 '03/01/15 00:04:09',
 'SMITHK',
 '  [EPD] Aborted by Law Priority with code: 1. Caller hung up  ',
 '03/01/15 00:07:42',
 'SMITHK',
 '  {C413} NEED BETTER LOCATION  ',
 '03/01/15 00:09:50',
 'ROSSA',
 '']


This is a function we can use to get the data for each individual note.

In [52]:
def split_notes(notes):
    """
    Return a list of 3-tuples.  Each tuple represents a single note and contains the timestamp, the note-taker, and
    the text of the note.
    """
    tuples = []
    regex_split = timestamp_expr.split(notes)[:-1]  # get rid of the last empty string created by the split
    for i in range(0,len(regex_split),3):
        note = regex_split[i].strip()
        timestamp = dt.datetime.strptime(regex_split[i+1], "%m/%d/%y %H:%M:%S")
        notetaker = regex_split[i+2]
        tuples.append((note,timestamp,notetaker))
    return tuples

pprint(split_notes(test_str))

[('actve dist...child advised mom and aunt aruging',
  datetime.datetime(2015, 3, 1, 0, 1, 14),
  'SMITHK'),
 ('WRLS', datetime.datetime(2015, 3, 1, 0, 1, 19), 'SMITHK'),
 ('NO PHASE 2.....EHX SHOWS 500 MAHONE POSS APT1',
  datetime.datetime(2015, 3, 1, 0, 4, 9),
  'SMITHK'),
 ('[EPD] Aborted by Law Priority with code: 1. Caller hung up',
  datetime.datetime(2015, 3, 1, 0, 7, 42),
  'SMITHK'),
 ('{C413} NEED BETTER LOCATION',
  datetime.datetime(2015, 3, 1, 0, 9, 50),
  'ROSSA')]


Questions we need answered about some of the fields:

inmain
- can we get any more info about the cases from the case_id? (case_id: case number, if a report is generated from the call)
- callsources: E911, ALARM self-explanatory, but SELF, PHONE and RADIO?
- primeunit: what are the responsibilities of the prime unit?
- service is always LAW, agency is always DPD
- nature/naturecode: differences between HANG UP, HANG UP WIRELESS PHASE 1, and HANG UP WIRELESS PHASE 2?
- notes: need abbreviations used, can maybe get some of them from the nature codes
- meanings of closecodes?

incilog
- each unit = one officer? any additional info we can get from unitper table, such as officer pay to more accurately estimate cost?

assuming "code_agcy" for all since that matches up best with the data
lwmain.csstatus
- which code (code_fbi, code_sbi, code_agcy) is the one corresponding to the csstatus foreign key? (assuming code_agcy) are any columns other than descriptn informative?

same questions for lwmain.emdivision, emsection, emunit, invststats, premise, weapon

 (eventually) Here we'll create the database schema to store the CFS data in a more structured way.

In [None]:
"""
# I think we're actually going to use postgres -- maybe not worry about the specific db implementation for now

import sqlite3
conn = sqlite3.connect('dpd_cfs.db')
c = conn.cursor()

CREATE_INCIDENT = \
\"""
CREATE TABLE IF NOT EXISTS incident (
    inci_id INTEGER PRIMARY KEY,
    calltime TIMESTAMP,
    calldow INTEGER,
    case_id INTEGER,
    callsource VARCHAR,
    primeunit VARCHAR,
    firstdisp VARCHAR,
    streetno INTEGER,
    streetonly VARCHAR,
    street VARCHAR,
    citydesc VARCHAR,
    zip INTEGER,
    crossroad1 VARCHAR,
    crossroad2 VARCHAR,
    geox DOUBLE,
    geoy DOUBLE,
    service VARCHAR,
    agency VARCHAR,
    
    )
\"""

c.execute('')
"""