In [1]:
from mpi import create_distinct_view, create_identity_view, link
import pandas as pd
import os
from utils.db import get_session, get_mongo
from utils.models import *

## Ingest
Bring .csv into the system

In [41]:
from ingest import load_file
os.listdir('assets/data')

['dws_wages.csv',
 'usbe_students.csv',
 'ushe_students.csv',
 'ustc_students.csv']

In [42]:
load_file('assets/data/dws_wages.csv', 'dws_wages')
load_file('assets/data/usbe_students.csv', 'usbe_students')
load_file('assets/data/ustc_students.csv', 'ustc_students')
load_file('assets/data/ushe_students.csv', 'ushe_students')

## Linking Steps

Process of Record Linkage

0. Load
1. Preprocess
2. Index
3. Compare
4. Classify
5. Evaluate
6. Update (Post MPI - Adding new information, new MPI, to MPI Master Record)

### Master Person Architectures
Visualize data architecture store of MPI raw records.  NOTE: data may not be present in all database options.  Use whichever cell aligns with configured loading behavior.

**SQL** View

In [47]:
with get_session() as session:
    result = session.execute(
        'SELECT * FROM master_person_long LIMIT 5'
    ).fetchall()
    count_mpi = session.execute(
        'SELECT COUNT(DISTINCT(mpi)) FROM master_person_long'
    ).fetchone()
columns = ('mpi', 'field', 'value', 'score', 'guid')
pd.DataFrame(result, columns=columns)

Unnamed: 0,mpi,field,value,score,guid
0,15343154-13459944-13878525-11385390,ushe_student_id_pool,469629.0,1.0,1772797024894460804
1,15343154-13459944-13878525-11385390,last_name_pool,Steed,1.0,1772797024894460804
2,15343154-13459944-13878525-11385390,first_name_pool,Sakayla,1.0,1772797024894460804
3,15343154-13459944-13878525-11385390,middle_name_pool,Conder,1.0,1772797024894460804
4,15343154-13459944-13878525-11385390,birth_date_pool,6/20/2859,1.0,1772797024894460804


In [None]:
print('Total MPI in system: ', count_mpi[0])

**NoSQL** View

In [39]:
import json

db = get_mongo()
count_docs = db.raw.count_documents({})
x = db.raw.find_one({'mpi': 1})
x['_id'] = str(x['_id'])

print(json.dumps(x, indent=2))

{
  "_id": "5fbe592b6499310813ee0b0a",
  "mpi": 1,
  "sources": [
    {
      "guid": 11,
      "score": 0.0,
      "fields": [
        {
          "fieldname": "last_name",
          "value": null
        },
        {
          "fieldname": "first_name",
          "value": "elvis"
        }
      ]
    },
    [
      {
        "guid": 11,
        "score": 0.0,
        "fields": [
          {
            "fieldname": "last_name",
            "value": null
          },
          {
            "fieldname": "first_name",
            "value": "elvis"
          }
        ]
      }
    ],
    [
      {
        "guid": 12,
        "score": 0.0,
        "fields": [
          {
            "fieldname": "last_name",
            "value": "presley"
          },
          {
            "fieldname": "first_name",
            "value": "elvis"
          }
        ]
      }
    ],
    [
      {
        "guid": 13,
        "score": 0.0,
        "fields": [
          {
            "fieldname": "last_name

In [28]:
print('Total MPI in system: ', count_docs)

Total MPI in system:  1


## Prepare Data

Prepare identity view (MPI vectors) and data view (distinct mapped columns from source)

In [1]:
source_tablename = 'ustc_students'

In [3]:
from utils.db import query_db, get_db
from mpi.prepare import create_data_view, create_identity_view
import logging

preplogger = logging.getLogger('notebook')

In [6]:
raw, dview = create_data_view(source_tablename)

In [9]:
iview = create_identity_view()
iview

TypeError: create_identity_view() missing 1 required positional argument: 'mapped_columns'

Check here for potential for match.  If NO matching columns exist, mpis will need to be created for the dview.

In [30]:
# Check for match availability.  If not, halt process and create MPIs
from mpi.link import is_match_available
from mpi.update import generate_mpi, write_mpi_data, gen_mpi_insert

if not is_match_available(dview, iview):
    print('Match unavailable.  Generated MPIs for data view.')
    temp = generate_mpi(dview)
    write_mpi_data(gen_mpi_insert(temp))
    
    # Recreate a view from the MPI table with valid identity data
    iview = create_identity_view(dview.columns.tolist())
else:
    print('Match available.  Proceed with linking process.')

Match available.  Proceed with linking process.


## Building record linkage and mpi classification

In [31]:
from mpi.link import clean_raw, match_dtype

### Preprocessing

Standardize data across data and identity views.

In [32]:
# Match Dtypes - Align data types prior to cleaning.
#    This helps the cleaner by segmenting string/object and numeric fields

# t2 identity table is a multi-index table.
#    If the table is empty, it will not have the value & score attributes
#    The if/else just checks whether or not the pivot of the mpi view yielded value and score fields.

if hasattr(iview, 'value'):
    ivalue = iview.value
else:
    ivalue = iview
    
if hasattr(iview, 'score'):
    iscore = iview.score

# Cast columns to matching datatypes for comparisons later on
source_data, id_data = match_dtype(dview, ivalue)  

# Clean data and re-index comparison.
subset = clean_raw(subset)
source_data = clean_raw(source_data)
id_data = clean_raw(id_data)
id_data = id_data.reset_index(level='mpi')

In [33]:
source_data.head(1)

Unnamed: 0,birth_date_pool,first_name_pool,gender_pool,ustc_student_id_pool,last_name_pool,middle_name_pool,ssn_pool,guid
0,4/15/2110,lajan,f,253355.0,spidle flaminio,maxwell weaver,133782248,930450171738989444


In [34]:
id_data.head(1)

field,mpi,birth_date_pool,first_name_pool,gender_pool,guid,last_name_pool,middle_name_pool,ssn_pool
0,10002511-6185763-7269861-14177405,7/3/2028,arella,n,1772797024894460804,forsland clos,margaret doris mae,374057445


In [35]:
iscore.head(1)

field,birth_date_pool,first_name_pool,gender_pool,guid,last_name_pool,middle_name_pool,ssn_pool
mpi,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
10002511-6185763-7269861-14177405,1,1,1,1,1,1,1


## Indexing

Make record pairs - pair rows needing match to potential identity candidates.

Indexing serves two purposes:

1. Create the list of pairs to check (candidate link).  Example: row 1 from table 1 to row 199 from table 2.

2. Reduce the potential number of pairs to check (candidates).

In [36]:
from mpi.link import build_indexer, match_columns

In [37]:
# Create indexer on dataview
#    Indexer is a set of rules to generate 
#    candidate matches from data -> identities

source_matched, id_matched = match_columns(source_data, id_data)

indexer = build_indexer(source_matched)

# Check index algorithms (generated from data view columns)
indexer.algorithms

[<SortedNeighbourhood left_on='middle_name_pool', right_on='middle_name_pool'>,
 <SortedNeighbourhood left_on='last_name_pool', right_on='last_name_pool'>,
 <SortedNeighbourhood left_on='first_name_pool', right_on='first_name_pool'>,
 <Block left_on='ssn_pool', right_on='ssn_pool'>]

In [38]:
# Run indexer on dataview, identity view
candidates = indexer.index(source_matched, id_matched)

# Full indexing is a cross join of data and all possible identities.

# Demonstrating full indexing size:
print('Full Index Length: ', len(source_data) * len(id_data))

# Examine multi indices.  On the left is the data view index.  Right identity.
print('Algorithmic Index Length: ', len(candidates))

# Estimate Savings
print('Savings: ', (1- len(candidates)/(len(source_data) * len(id_data))) * 100)

# Preview indices:
for pair in candidates[0:5]:
    print(f'Data-row {pair[0]}', f'ID-row {pair[1]}')

Full Index Length:  100000000
Algorithmic Index Length:  57599
Savings:  99.942401
Data-row 0 ID-row 1849
Data-row 0 ID-row 6377
Data-row 0 ID-row 7005
Data-row 0 ID-row 9087
Data-row 0 ID-row 9412


## Comparing

Indexing does not normally store the outcome of its findings.  Indexing algorithms are meant to be fast, can be error prone.  Algorithms can be tuned for string (many), numeric, and time/date fields.

The output of comparison is a clean feature matrix for the classifier to train/predict on.

In [39]:
from mpi.link import build_comparator

In [40]:
# Create comparator on dataview
#    Comparator is a set of algorithms for each feature to be compared.
#    These are genearlly much more expensive compared to indexing functions
cmp = build_comparator(source_matched)

# Check comparison algorithms and fields
cmp.features

[<Numeric 'ssn_pool'>,
 <String 'middle_name_pool'>,
 <String 'last_name_pool'>,
 <String 'first_name_pool'>,
 <String 'gender_pool'>]

In [41]:
# Compute comparisons
#    Gives clean match dataset for classification
comparisons = cmp.compute(candidates, source_data, id_data)
comparisons.head()

Unnamed: 0,Unnamed: 1,ssn_pool,middle_name_pool,last_name_pool,first_name_pool,gender_pool
0,1849,0.0,0.0,0.0,0.0,1.0
0,6377,0.0,0.0,0.0,0.0,0.0
0,7005,0.0,0.0,0.0,0.0,0.0
0,9087,0.0,0.0,1.0,0.0,1.0
0,9412,0.0,0.0,1.0,0.0,0.0


## Classification

Score candidates for match.  

#### Two approaches: Supervised vs Unsupervised
 * **Supervised** approach requires a training set.
 * **Unsupervised** does not require a training set and operates on only on the comparison table itself.

In [42]:
from mpi.link import estimate_true, build_classifier

# Get estimated true linkages for supervised model
links_true = estimate_true(comparisons)

# Create classifier
clf = build_classifier('logistic', comparisons, match_index=links_true)

# Check probabilities (score) of each comparison -- NOT IN USE IN THIS VERSION
predictions = clf.prob(comparison_vectors=comparisons)
predictions

0     1849    0.000007
      6377    0.000001
      7005    0.000001
      9087    0.000271
      9412    0.000050
                ...   
9999  3443    0.000050
      3728    0.000039
      6632    0.000007
      8916    0.000189
      9871    0.000007
Length: 57599, dtype: float64

## Evaluate
Express classification quality and explore outliers

In [43]:
from recordlinkage import reduction_ratio
from recordlinkage import confusion_matrix

links_pred = clf.predict(comparison_vectors=comparisons)

rratio = reduction_ratio(links_pred, source_data)
cmatrix = confusion_matrix(links_true, links_pred, candidates)

In [44]:
# Review confusion matrix
#    TP-FN
#    |  |
#    FP-TN
print(cmatrix)

# Review reduction ratio
print(rratio)

[[  674     0]
 [    0 56925]]
0.9999865186518652


The confusion matrix may not be particularly useful here as generation of true links is prone to error. The reduction ratio is more sensitive than binary predictions in this case.

In [45]:
# Review findings
#   Interesting that the logistic predicted an MPI indices for each given an incomplete target list.

# Is the relationship 1,1?
split_list = lambda x: ([ix[0] for ix in x], [ix[1] for ix in x])
i1, i2 = split_list(links_pred)
len(list(set(i1))), len(list(set(i2)))

(674, 674)

## Update

Append matched MPIs and match score to data view and merge to original data.

In [46]:
from mpi.link import expand_match_to_raw

# Join data view (DISTINCT identities in source table), now containing matched and generated MPIs, to raw table.
#    This can be done a few ways.  Here, the data view (whose columns have been renamed and processed)
#    is joined to the original subset (whose columns were just renamed).  The subset is then indexed back 
#    unto the raw table so original column names and source formatting are preserved.


raw, matched, unmatched = expand_match_to_raw(raw, subset, source_data, id_data, links_pred)
raw.head(1)

Unnamed: 0,BIA,DISABLED,DISPLACEDHOMEMAKER,DWS,ECON,LEP,PELL,SINGLEPARENT,U_AGE,U_BIRTH_DT,...,U_START_DATE,U_STATE_ORIGIN,U_STOP_DATE,U_SUBJ,U_SUFFIX,U_TITLE,U_YEAR,id,guid,mpi
0,0,1,1,0,0,1,1,0,42,4/15/2110,...,20162189,DE,58081754,FLUID POWER HYDRAULICS,,WASHINGTON CO. ECONOMIC TRAINING,2011,7547,930450171738989444,9197423-1362941-5290047-14787063


### Adding missing data, repeated data

When MPIs are generated for unmatched records, all data is used to populate the MPI table.  For matched MPIs, an update to missing information now needs to be performed to capture any new data.

What information is populated can have large performance and storage ramifications.
- If repeat data is kept, the initial identity view will need consolidation to reduce the number of possible indexes and comparisons.
- If repead data is NOT kept, significant information for background updates in match quality will be lost.

In [28]:
# TODO

### De-Identification

Create de-identified table while match available in memory or as referenced temp table.

In [29]:
from assets.mapping import colmap
from utils.db import dataframe_to_db
from di import simple_di

dataframe_to_db(
    simple_di(raw), 
    tablename=source_tablename + '_di'
)

'ushe_students_di'