# **Person linkage case study**

In this case study, we imagine running a person linkage to add unique identifiers to a simulated 2030 Census Unedited File (CUF).
We emulate the methods of the Census Bureau's Person Identification Validation System (PVS) using publicly available descriptions.
The PVS aims to give people in the input file (the CUF, in this case) unique identifiers that can be used to link them to other administrative records.

We approximate the (highly confidential) input data to PVS by using simulated data from our pseudopeople package.
See the `generate_simulated_data` notebook for details about this.

## Sources

These papers describe PVS as their main subject:

* Wagner and Layne. The Person Identification Validation System (PVS): Applying the Center for Administrative Records Research and Applications’ (CARRA) Record Linkage Software. 2014. https://www.census.gov/content/dam/Census/library/working-papers/2014/adrm/carra-wp-2014-01.pdf ([archived](https://web.archive.org/web/20230216043235/https://www.census.gov/content/dam/Census/library/working-papers/2014/adrm/carra-wp-2014-01.pdf))
* Layne, Wagner, and Rothhaas. Estimating Record Linkage False Match Rate for the Person Identification Validation System. 2014. https://www.census.gov/content/dam/Census/library/working-papers/2014/adrm/carra-wp-2014-02.pdf ([archived](https://web.archive.org/web/20220121051156/https://www.census.gov/content/dam/Census/library/working-papers/2014/adrm/carra-wp-2014-02.pdf))
* Alexander et al. Creating a Longitudinal Data Infrastructure at the Census Bureau. 2015. https://www.census.gov/content/dam/Census/library/working-papers/2015/adrm/2015-alexander.pdf ([archived](https://web.archive.org/web/20170723192315/http://www.census.gov/content/dam/Census/library/working-papers/2015/adrm/2015-alexander.pdf))
* Massey and O'Hara. Person Matching in Historical Files using the Census Bureau’s Person Validation System. 2014. https://www.census.gov/content/dam/Census/library/working-papers/2014/adrm/carra-wp-2014-11.pdf ([archived](https://web.archive.org/web/20221018074814/http://www.census.gov/content/dam/Census/library/working-papers/2014/adrm/carra-wp-2014-11.pdf))
* NORC. Assessment of the U.S. Census Bureau’s Person Identification Validation System. 2011. https://www.norc.org/content/dam/norc-org/pdfs/PVS%20Assessment%20Report%20FINAL%20JULY%202011.pdf ([archived](https://web.archive.org/web/20230705005935/https://www.norc.org/content/dam/norc-org/pdfs/PVS%20Assessment%20Report%20FINAL%20JULY%202011.pdf))

These apply PVS to some linking task, and in doing so describe it in some detail:

* Brown et al. Real-Time 2020 Administrative Record Census Simulation. 2023. https://www2.census.gov/programs-surveys/decennial/2020/program-management/evaluate-docs/EAE-2020-admin-records-experiment.pdf ([archived](https://web.archive.org/web/20230521191811/https://www2.census.gov/programs-surveys/decennial/2020/program-management/evaluate-docs/EAE-2020-admin-records-experiment.pdf))
* Massey et al. Linking the 1940 U.S. Census with Modern Data. 2018. https://www.ncbi.nlm.nih.gov/pmc/articles/PMC6530596/ ([DOI](https://doi.org/10.1080%2F01615440.2018.1507772))
* Luque and Wagner. Assessing Coverage and Quality of the 2007 Prototype Census Kidlink Database. 2015. https://www.census.gov/content/dam/Census/library/working-papers/2015/adrm/carra-wp-2015-07.pdf ([archived](https://web.archive.org/web/20220808205231/http://www.census.gov/content/dam/Census/library/working-papers/2015/adrm/carra-wp-2015-07.pdf))
* Bond et al. The Nature of the Bias When Studying Only Linkable Person Records: Evidence from the American Community Survey. 2014. https://www.census.gov/content/dam/Census/library/working-papers/2014/adrm/carra-wp-2014-08.pdf ([archived](https://web.archive.org/web/20220803083857/http://www.census.gov/content/dam/Census/library/working-papers/2014/adrm/carra-wp-2014-08.pdf))

Finally, this paper is about the address matching process (MAF Match) which acts as part of PVS:

* Brummet. Comparison of Survey, Federal, and Commercial Address Data Quality. 2014. https://www.census.gov/content/dam/Census/library/working-papers/2014/adrm/carra-wp-2014-06.pdf ([archived](https://web.archive.org/web/20220121213358/https://www.census.gov/content/dam/Census/library/working-papers/2014/adrm/carra-wp-2014-06.pdf))

All of these papers are referenced throughout this notebook (and other notebooks) by the authors' names, along with additional information like page numbers.
All page numbers represent those of the PDF files themselves, *not* page numbers printed in the documents.

## PVS overview

Brown et al., p. 28:

> The PVS uses probabilistic record linkage (Fellegi and Sunter, 1969) to match data from an
    incoming file (e.g., a survey or administrative record file) to reference files containing data on
    SSN applications from the NUMIDENT enhanced with address data obtained from other federal
    administrative records.

## Setup

In [None]:
# Query planning is now on by default, but it has some rough edges.
# See https://github.com/dask/dask/issues/10995 for general discussion
# and https://github.com/dask/dask-expr/issues/1066 for the particular
# issue I ran into.
import dask

dask.config.set({"dataframe.query-planning": False})

In [None]:
import re, copy, os, time, datetime
import pandas
import jellyfish

In [None]:
print(datetime.datetime.now())

In [None]:
%load_ext autoreload
%autoreload 1

In [None]:
from person_linkage_case_study_utils import distributed_compute, utils

In [None]:
# DO NOT EDIT if this notebook is not called 03_link_datasets.ipynb!
# This notebook is designed to be run with papermill; this cell is tagged 'parameters'
data_to_use = "small_sample"
input_dir = "output/02_generate_case_study_files"
output_dir = "output/03_link_datasets"

# The "compute engine" is what we use on the Python side
# for our case-study-specific operations,
# as opposed to the Splink engine
compute_engine = "pandas"
# Only matter if using a distributed compute engine
compute_engine_num_workers = 3
compute_engine_cpus_per_worker = 2
compute_engine_threads_per_worker = 1
compute_engine_memory_per_worker = "1GB"
# NOTE: This is, as Dask requests, a directory local to the compute node.
# But IHME's cluster doesn't support this very well -- it can be small-ish,
# full of stuff from other users, etc.
compute_engine_local_directory = (
    f"/tmp/{os.environ['USER']}_{int(time.time())}_person_linkage_case_study"
)
compute_engine_memory_constrained = True
scheduler = "slurm"

splink_engine = "duckdb"
# Only matter if using Spark
spark_local = False
spark_num_workers = 15
spark_memory_per_worker = "4GB"
spark_memory_master = "10GB"
# Only matter if using distributed (non-local) Spark
spark_cpus_master = 2
spark_cpus_per_worker = 2
spark_worker_memory_overhead_mb = 500

clear_intermediate = True

queue = None
account = None
walltime = None

log_directory = f"{output_dir}/{data_to_use}/logs"
conda_path = None
conda_env = None

In [None]:
if compute_engine != "pandas":
    utils.ensure_empty(compute_engine_local_directory)

In [None]:
output_dir = f"{output_dir}/{data_to_use}"

In [None]:
df_ops, pd = distributed_compute.start_compute_engine(
    compute_engine,
    num_workers=compute_engine_num_workers,
    cpus_per_worker=compute_engine_cpus_per_worker,
    threads_per_worker=compute_engine_threads_per_worker,
    memory_per_worker=compute_engine_memory_per_worker,
    worker_walltime=walltime,
    local_directory=compute_engine_local_directory,
    log_directory=log_directory,
    memory_constrained=compute_engine_memory_constrained,
    scheduler=scheduler,
    queue=queue,
    account=account,
)

In [None]:
if splink_engine == "spark":
    # NOTE: This directory needs to be on a shared filesystem.
    spark_checkpoints_dir = f"{output_dir}/tmpCheckpoints"
    utils.ensure_empty(spark_checkpoints_dir)

    spark, teardown_spark = distributed_compute.start_spark_cluster(
        local=spark_local,
        conda_path=conda_path,
        conda_env=conda_env,
        cpus_master=spark_cpus_master,
        memory_master=spark_memory_master,
        master_walltime=walltime,
        num_workers=spark_num_workers,
        cpus_per_worker=spark_cpus_per_worker,
        memory_per_worker=spark_memory_per_worker,
        worker_walltime=walltime,
        worker_memory_overhead_mb=spark_worker_memory_overhead_mb,
        partition=queue,
        account=account,
        checkpoint_directory=spark_checkpoints_dir,
        log_directory=log_directory,
        scheduler=scheduler,
    )
else:
    assert splink_engine == "duckdb"

    # Tell duckdb about Slurm resource limits, if present
    import duckdb

    if "SLURM_MEMORY_ON_NODE" in os.environ:
        memory_mb = os.environ["SLURM_MEMORY_ON_NODE"]
        duckdb.sql(f"SET memory_limit = '{max(memory_mb - 20_000, 5_000)}MB';")

    if "SLURM_CPUS_PER_TASK" in os.environ:
        cpus = os.environ["SLURM_CPUS_PER_TASK"]
        duckdb.sql(f"SET threads TO {cpus};")

# Load data

See code in `generate_simulated_data` directory for how we generated the files to link

In [None]:
census_2030 = df_ops.persist(
    df_ops.read_parquet(f"{input_dir}/{data_to_use}/simulated_census_2030.parquet")
)
geobase_reference_file = df_ops.persist(
    df_ops.read_parquet(
        f"{input_dir}/{data_to_use}/simulated_geobase_reference_file.parquet"
    )
)
name_dob_reference_file = df_ops.persist(
    df_ops.read_parquet(
        f"{input_dir}/{data_to_use}/simulated_name_dob_reference_file.parquet"
    )
)

# Pre-process input file

Wagner and Layne, p. 9:

> The first step of the PVS process is to edit data fields to make them homogenous for
comparisons between incoming and reference files.

In [None]:
# Input file before any processing; the final result will be this with the PIK column added
census_2030_raw_input = census_2030.copy()

## Name parsing and standardizing

Wagner and Layne, p. 9:

> The first edits are parsing and standardizing - parsing separates fields into
component parts, while standardizing guarantees key data elements are consistent (e.g.,
STREET, STR are both converted to ST). Name and address fields are parsed and
standardized as they are key linkage comparators. 

As noted in the data generation notebook, there is no parsing to be done here:
because the Census questionnnaire asks for first name, middle initial, and last name as
separate fields on the form, the name would be already parsed when running PVS on the CUF.

The real PVS name parser generates fields like prefix (e.g. Mr.) and suffix (e.g. Jr.)
but we never have these in names from pseudopeople.

More details about the name parser and standardizer (Wagner and Layne, p. 10):

> The PVS system incorporates a name
standardizer (McGaughey, 1994), which is a C language subroutine called as a function
within SAS. It performs name parsing and includes a nickname lookup table and outputs
name variants (standardized variations of first and last names). For example, Bill
becomes William, Chuck and Charlie becomes Charles, etc. The PVS keeps both the
original name (Bill) and the converted name (William) for matching. PVS also has a fake
name table to blank names such as “Queen of the House” or “Baby Girl.” The name data
are parsed, checked for nicknames, and standardized.

The key thing that wasn't clear to me from this description was what it meant to
"keep both original name and the converted name for matching."
I found this in Massey et al.:

> The Name Search Module also accounts for instances where census records contain a nickname.
> For these records, the preprocessing step of the Name Search module outputs two records for
> these observations, one record for the nickname and one record for the formal name.
> For example, if the input record has the name “Bill Smith,” the formatting program will add
> a formal name “William” to that record. This record will then output to both the B-S cut and to the W-S cut.

From this, I gather that nicknames work similar to alternate names in the reference file:
entire duplicate records are made with each name (nickname and formal).
However, I am still confused by calling this "the preprocessing step **of the Name Search module**."
In Wagner and Layne it does not seem like this is module-specific, and I don't see any
reason that would be desirable, so I have done this multi-record approach across all modules.

In [None]:
# Nickname processing
# Have not yet found a nickname list in PVS docs,
# so we do a minimal version for now -- could use
# another list such as the one in pseudopeople
# These examples all come directly from examples in the descriptions of PVS
nickname_standardizations = {
    "Bill": "William",
    "Chuck": "Charles",
    "Charlie": "Charles",
    "Cathy": "Catherine",
    "Matt": "Matthew",
}
has_nickname = df_ops.persist(
    census_2030.first_name.isin(nickname_standardizations.keys())
)
print(f"{df_ops.compute(has_nickname.sum()):,.0f} nicknames in the Census")

In [None]:
# Add extra rows for the normalized names
census_2030 = df_ops.concat(
    [
        census_2030,
        census_2030[has_nickname].assign(
            first_name=lambda df: df.first_name.replace(nickname_standardizations)
        ),
    ],
    ignore_index=True,
)

In [None]:
# Note: The above will introduce duplicates on record_id, so we redefine
# record_id to be unique (without getting rid of the original, input file record ID)
census_2030 = df_ops.add_unique_record_id(
    census_2030.rename(columns={"record_id": "record_id_raw_input_file"}),
    "census_2030_preprocessed",
)

In [None]:
# This list of fake names comes from the NORC report, p. 100-101
# It is what was used in PVS as of 2011
fake_names = pd.read_csv("data/fake-names.txt", header=None).rename(
    columns={0: "fake_name"}
)
fake_names = df_ops.persist(
    fake_names.assign(fake_name=fake_names.fake_name.str.strip().str.upper())
    .dropna(subset="fake_name")
    .pipe(df_ops.drop_duplicates)
    .pipe(df_ops.ensure_large_string_capacity)
)

assert df_ops.compute((fake_names.fake_name == fake_names.fake_name.str.upper()).all())
fake_names

In [None]:
for col in ["first_name", "last_name"]:
    census_2030 = census_2030.assign(
        **{f"{col}_upper": census_2030[col].str.upper()}
    ).merge(fake_names, left_on=f"{col}_upper", right_on="fake_name", how="left")
    has_fake_name = df_ops.persist(census_2030.fake_name.notnull())
    print(f"Found {df_ops.compute(has_fake_name.sum()):,.0f} fake names in {col}")
    census_2030 = df_ops.concat(
        [
            census_2030[~has_fake_name].drop(columns=["fake_name", f"{col}_upper"]),
            # NOTE: For reasons I don't really understand, assigning pandas.NA reverts it to a normal string instead of a large_string
            census_2030[has_fake_name]
            .assign(**{col: pandas.NA})
            .drop(columns=["fake_name", f"{col}_upper"])
            .pipe(df_ops.ensure_large_string_capacity),
        ],
        ignore_index=True,
    )

## Address parsing and standardizing

Wagner and Layne, p. 11:

> The PVS editing process also incorporates an address parser and standardizer,
written in the C language and called as a function within SAS (U.S. Census Bureau
Geography Division, 1995). It performs parsing of address strings into individual output
fields (see Figure 2), and standardizes the spelling of key components of the address
such as street type. The PVS also incorporates use of a commercial product to update
zip codes, and correct misspellings of address elements.

As noted in the data generation notebook, for now we haven't combined the address parts
into a single string that would need to be parsed.
We plan to add this in a future version of pseudopeople.

For now, we do some simple standardization in each of the address parts (street number,
street name, etc).

In [None]:
def standardize_address_part(column):
    return (
        column
        # Remove leading or trailing whitespace
        .str.strip()
        # Turn any strings of consecutive whitespace into a single space
        .str.replace("\s+", " ", regex=True)
        # Normalize case
        .str.upper()
        # Normalize the word street as described in the example quoted above
        # In reality, there would be many rules like this
        .str.replace("\b(STREET|STR)\b", "ST", regex=True)
        # Make sure missingness is represented consistently
        .replace("", pandas.NA)
    )


address_cols = [
    "street_number",
    "street_name",
    "unit_number",
    "city",
    "state",
    "zipcode",
]
for address_col in address_cols:
    census_2030 = census_2030.assign(
        **{address_col: census_2030[address_col].pipe(standardize_address_part)}
    )

census_2030 = df_ops.ensure_large_string_capacity(census_2030)

## MAFMatch

**General information about MAFMatch**

Wagner and Layne, p. 11:

> The PVS provides an additional address enhancement by matching records in the
incoming file to Census Bureau’s Master Address File (MAF) in order to assign a unique
address identifier, the MAF Identifier (MAFID), and other Census geographical codes
(e.g., Census tract and block). The MAFID is used in the PVS for search purposes and as a
linkage key for administrative files. Then, addresses are matched to the Census Bureau’s
Topologically Integrated Geographic Encoding and Referencing Database (TIGER) to
obtain Census geographical codes.

My understanding is that this step is useful for two reasons:
- Potentially adds an alternate version/way of formatting the same address
  (the way it is in the MAF, instead of the way it is in the input file).
  However, instead of using this alternate to match directly, it is boiled down
  to a MAFID, so it is only useful when an input file address and a reference file
  address (different to one another) both match to the same MAF address.
  (Presumably, the whole MAFMatch process described here needs to run on the reference files?)
- Adds Census geographies which could be used in blocking (but are they?)

I don't understand the TIGER part of this.
In particular, p. 12 describes a probabilistic/fuzzy match to TIGER, but I thought TIGER
would contain exactly the same addresses as the MAF.
I also don't understand what geocodes would be present in TIGER that wouldn't also be
present in the MAF.
Maybe these things are more out-of-sync with each other than I understood.

**MAFMatch in this case study**

In the decennial Census, the sampling frame is a subset of the MAF (Brown et al., p. 15, footnote 4).
That is, in the CUF, the address field (which is not supplied by the respondent)
would be the MAF address the questionnaire/NRFU was sent to.

Therefore, I don't believe it makes sense to do MAFMatch for the CUF, because all the addresses
are already the same.
I haven't fully confirmed this, but p. 14 of Wagner and Layne says

> The 2010 Census Unedited File (CUF), had 350 million records and processed through every PVS
module, excluding MAF match and SSN verification

which suggests that (like SSN verification) MAFMatch is not applicable to the CUF.

Given this, we skip MAFMatch here.
Also, if we wanted to add it, we would need to generate something like a MAF from pseudopeople.

## Drop records with insufficient information

In 2011 (NORC, p. 25):

> The initial edit process, described in the Introduction: PVS Background section, removes from consideration
incoming records that have no name data. Therefore, no record that is processed in PVS has blank first and last
names.

In 2014 it appears to be the same/similar, because in Table 2 of Wagner and Layne there is a row "NO SEARCH: Blank Name" (p. 18).

In 2023 (Brown et al., p. 28):

> Records containing sufficient PII to be linkable with some confidence, for example those
containing name and age, are sent through the linkage process.<sup>15</sup>

> <sup>15</sup> Records with names on the PVS invalid name list (e.g., “Mickey Mouse,” “householder,” or “son”) are excluded
from PVS search.

I'd prefer to use the 2023 information, but it is too vague:
"for example" means this is just an approximation, and it doesn't specify
parts of "name."
The footnote also seems to contradict earlier reports, which said fake names were simply
blanked out, which seems preferable.
Since the fake name step comes before this one, a fake first **and** last name would lead
to exclusion here, which is perhaps what the footnote was intending?

Here, we follow the blank-name approach.

In [None]:
census_2030 = census_2030[
    census_2030.first_name.notnull() | census_2030.last_name.notnull()
]

# Create derived variables for use in linkage

Here we create variables to be used as matching variables and blocking keys,
when those matching variables/blocking keys are defined in a way that is not already
present in the data at this point.

The variables needed here depend on the modules and passes described below -- see those
sections for more citations.

In [None]:
# We want to compare mailing address with physical address
geobase_reference_file = geobase_reference_file.rename(
    columns=lambda c: c.replace("mailing_address_", "")
)

In [None]:
# PVS uses DOB as separate fields for day, month, and year
def split_dob(df, date_format="%Y%m%d"):
    # Have to be floats because we want to treat as numeric for assessing similarity
    # Note that as of now, none of our pseudopeople noise types would change the punctuation ("/") in the date, but
    # they can insert non-numeric characters here or otherwise create invalid dates, in which case we fail to parse the date
    # and treat it as missing.
    dob = pd.to_datetime(df.date_of_birth, format=date_format, errors="coerce")
    df = df.assign(
        month_of_birth=dob.dt.month,
        year_of_birth=dob.dt.year,
        day_of_birth=dob.dt.day,
    )
    df = df.assign(
        month_of_birth=df.month_of_birth.fillna(pandas.NA),
        year_of_birth=df.year_of_birth.fillna(pandas.NA),
        day_of_birth=df.day_of_birth.fillna(pandas.NA),
    )
    return df.drop(columns=["date_of_birth"])


census_2030 = split_dob(census_2030, date_format="%m/%d/%Y")
geobase_reference_file = split_dob(geobase_reference_file)
name_dob_reference_file = split_dob(name_dob_reference_file)

In [None]:
# I don't fully understand the purpose of blocking on the geokey,
# as opposed to just blocking on its constituent columns.
# Maybe it is a way of dealing with missingness in those constituent
# columns (e.g. so an address with no unit number can still be blocked on geokey)?
def add_geokey(df):
    return df.assign(
        geokey=distributed_compute.add_strings(
            compute_engine,
            [
                df.street_number,
                " ",
                df.street_name,
                " ",
                df.unit_number,
                " ",
                df.city,
                " ",
                df.state,
                " ",
                df.zipcode,
            ],
            # Normalize the whitespace
        )
        .str.replace("\s+", " ", regex=True)
        .str.strip()
        .replace("", pandas.NA)
    ).pipe(df_ops.ensure_large_string_capacity)


geobase_reference_file = add_geokey(geobase_reference_file)
census_2030 = df_ops.persist(add_geokey(census_2030))

In [None]:
# HACK: Remove address information from GQ, so it isn't used for blocking
# nor for HHCompSearch.
# Currently we have massive GQ "households."
# NOTE: This will miss people who are actually in GQ who have had their address noised,
# but that doesn't create huge "households" anyway so it's not a big deal
# NOTE: We can't just throw out any household where anyone has a GQ housing_type
# since the housing_type column itself can be noised. We set a threshold (data size dependent)
# of how many people in a household need to have a GQ housing_type.
unlikely_due_to_noise_threshold = {
    "usa": 250,
    "ri": 250,
    "small_sample": 2,
}[data_to_use]
probable_gq_geokeys = df_ops.persist(
    census_2030[census_2030.housing_type.notnull()]
    .assign(reported_gq=lambda df: df.housing_type != "Household")
    .pipe(
        df_ops.groupby_agg_small_groups,
        by="geokey",
        agg_func=lambda x: x.reported_gq.agg(["sum", "mean"]),
    )
    .pipe(
        lambda df: df[
            (df["sum"] >= unlikely_due_to_noise_threshold) & (df["mean"] >= 0.70)
        ]
    )
    .reset_index()
    .assign(is_gq_geokey=True)
)

if data_to_use in ("usa", "ri"):
    num_shards = 334
    num_gq_types = 6
    if data_to_use == "usa":
        # Should be exact
        assert num_shards * num_gq_types == len(probable_gq_geokeys)
    else:
        # Should be close to 1/334 (1 million population)
        print(f"{len(probable_gq_geokeys):,.0f} probable GQ geokeys")
        assert (
            (num_shards * num_gq_types * 0.5) / 334
            <= len(probable_gq_geokeys)
            <= (num_shards * num_gq_types * 2) / 334
        )
else:
    print(f"{len(probable_gq_geokeys):,.0f} probable GQ geokeys")

In [None]:
census_2030 = census_2030.merge(
    probable_gq_geokeys[["geokey", "is_gq_geokey"]], on="geokey", how="left"
).assign(is_gq_geokey=lambda df: df.is_gq_geokey.fillna(False))

census_2030 = df_ops.ensure_large_string_capacity(
    df_ops.concat(
        [
            census_2030[~census_2030.is_gq_geokey]
            .assign(
                geokey_for_blocking=lambda df: df.geokey,
                street_number_for_blocking=lambda df: df.street_number,
                street_name_for_blocking=lambda df: df.street_name,
            )
            .drop(columns=["is_gq_geokey"]),
            census_2030[census_2030.is_gq_geokey]
            .assign(
                geokey_for_blocking=pandas.NA,
                street_number_for_blocking=pandas.NA,
                street_name_for_blocking=pandas.NA,
            )
            .drop(columns=["is_gq_geokey"]),
        ],
        ignore_index=True,
    )
)

In [None]:
geobase_reference_file = geobase_reference_file.merge(
    probable_gq_geokeys[["geokey", "is_gq_geokey"]], on="geokey", how="left"
).assign(is_gq_geokey=lambda df: df.is_gq_geokey.fillna(False))

geobase_reference_file = df_ops.ensure_large_string_capacity(
    df_ops.concat(
        [
            geobase_reference_file[~geobase_reference_file.is_gq_geokey]
            .assign(
                geokey_for_blocking=lambda df: df.geokey,
                street_number_for_blocking=lambda df: df.street_number,
                street_name_for_blocking=lambda df: df.street_name,
            )
            .drop(columns=["is_gq_geokey"]),
            geobase_reference_file[geobase_reference_file.is_gq_geokey]
            .assign(
                geokey_for_blocking=pandas.NA,
                street_number_for_blocking=pandas.NA,
                street_name_for_blocking=pandas.NA,
            )
            .drop(columns=["is_gq_geokey"]),
        ],
        ignore_index=True,
    )
)

In [None]:
%xdel probable_gq_geokeys

In [None]:
# Layne, Wagner, and Rothhaas p. 26: the name matching variables are
# First 15 characters First Name, First 15 characters Middle Name, First 12 characters Last Name
# Additionally, there are blocking columns for all of 1-3 initial characters of First/Last.
# We don't have a full middle name in pseudopeople (nor would that be present in a real CUF)
# so we have to stick to the first initial for middle.
def add_truncated_name_cols(df):
    df = df.assign(
        first_name_15=df.first_name.str[:15], last_name_12=df.last_name.str[:12]
    )

    if "middle_name" in df.columns and "middle_initial" not in df.columns:
        df = df.assign(middle_initial=df.middle_name.str[:1])

    for num_chars in [1, 2, 3]:
        df = df.assign(
            **{
                f"first_name_{num_chars}": df.first_name.str[:num_chars],
                f"last_name_{num_chars}": df.last_name.str[:num_chars],
            }
        )

    return df


census_2030 = add_truncated_name_cols(census_2030)
geobase_reference_file = add_truncated_name_cols(geobase_reference_file)
name_dob_reference_file = add_truncated_name_cols(name_dob_reference_file)

In [None]:
# Layne, Wagner, and Rothhaas p. 26: phonetics are used in blocking (not matching)
# - Soundex for Street Name
# - NYSIIS code for First Name
# - NYSIIS code for Last Name
# - Reverse Soundex for First Name
# - Reverse Soundex for Last Name


def nysiis(input_string):
    if pandas.isna(input_string):
        return pandas.NA
    result = jellyfish.nysiis(input_string)
    if pandas.isna(result):
        return pandas.NA

    return result


def soundex(input_string):
    if pandas.isna(input_string):
        return pandas.NA
    result = jellyfish.soundex(input_string)
    if pandas.isna(result):
        return pandas.NA

    return result


def reverse_soundex(input_string):
    if pandas.isna(input_string):
        return pandas.NA

    return soundex(input_string[::-1])


def add_name_phonetics(df):
    for col in ["first_name", "last_name"]:
        kwargs = {}
        if compute_engine.startswith("dask"):
            kwargs["meta"] = (col, "object")
        df = df.assign(**{f"{col}_nysiis": df[col].apply(nysiis, **kwargs)})
        df = df.assign(
            **{f"{col}_reverse_soundex": df[col].apply(reverse_soundex, **kwargs)}
        )

    return df_ops.ensure_large_string_capacity(df)


def add_address_phonetics(df):
    kwargs = {}
    if compute_engine.startswith("dask"):
        kwargs["meta"] = (col, "object")
    df = df.assign(
        street_name_for_blocking_soundex=df.street_name_for_blocking.apply(
            soundex, **kwargs
        )
    )
    return df_ops.ensure_large_string_capacity(df)


census_2030 = add_name_phonetics(census_2030)
census_2030 = add_address_phonetics(census_2030)

geobase_reference_file = add_address_phonetics(geobase_reference_file)

name_dob_reference_file = add_name_phonetics(name_dob_reference_file)

In [None]:
# Columns used to "cut the database": ZIP3 and a grouping of first and last initial
def add_zip3(df):
    return df.assign(zip3=lambda x: x.zipcode.str[:3])


def add_first_last_initial_categories(df):
    # Page 20 of the NORC report: "Name-cuts are defined by combinations of the first characters of the first and last names. The twenty letter groupings
    # for the first character are: A-or-blank, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, and U-Z."
    initial_cut = (
        lambda x: x.fillna("A")
        .str[0]
        .replace("A", "A-or-blank")
        .replace(["U", "V", "W", "X", "Y", "Z"], "U-Z")
    )
    return df.assign(
        first_initial_cut=initial_cut(df.first_name),
        last_initial_cut=initial_cut(df.last_name),
    )

In [None]:
census_2030 = add_zip3(census_2030)
census_2030 = add_first_last_initial_categories(census_2030)

geobase_reference_file = add_zip3(geobase_reference_file)

name_dob_reference_file = add_first_last_initial_categories(name_dob_reference_file)

In [None]:
census_2030, geobase_reference_file, name_dob_reference_file = df_ops.persist(
    census_2030, geobase_reference_file, name_dob_reference_file
)

# Data, ready to link

In [None]:
print(f"{len(census_2030):,.0f}")

In [None]:
df_ops.head(census_2030, n=100)

In [None]:
intermediate_data_dir = f"{output_dir}/intermediate_data"
if clear_intermediate:
    utils.ensure_empty(intermediate_data_dir)

In [None]:
# Can save a checkpoint here for resuming without re-running the prep
# df_ops.to_parquet(census_2030, f'{intermediate_data_dir}/census_2030_prepped.parquet')

In [None]:
print(f"{len(geobase_reference_file):,.0f}")

In [None]:
df_ops.head(geobase_reference_file, n=100)

In [None]:
df_ops.compute(geobase_reference_file.year_of_birth.isna().mean())

In [None]:
# df_ops.to_parquet(geobase_reference_file, f'{intermediate_data_dir}/geobase_reference_file_prepped.parquet')

In [None]:
print(f"{len(name_dob_reference_file):,.0f}")

In [None]:
df_ops.head(name_dob_reference_file, n=100)

In [None]:
df_ops.compute(name_dob_reference_file.year_of_birth.isna().mean())

In [None]:
# df_ops.to_parquet(name_dob_reference_file, f'{intermediate_data_dir}/name_dob_reference_file_prepped.parquet')

# Emulate Multi-Match with splink

Wagner and Layne, p. 8:

> The PVS employs its probabilistic record linkage software, Multi-Match (Wagner
2012), as an integral part of the PVS.

Wagner and Layne, p. 12:

> PVS uses the same Multi-Match engine for each probabilistic search type. For each
search module the analyst defines a parameter file, which is passed to Multi-Match. The
parameter file includes threshold value(s) for the number of passes, blocking keys, and
within each pass, the match variables, match comparison type, and matching weights...
>
> Records must first match exactly on the blocking keys before any comparisons
between the match variables are attempted. Each match variable is given an
m and
u
probability, which is translated by MultiMatch as agreement and disagreement weights.
The sum of all match variable comparison weights for a record pair is the composite
weight. All record pairs with a composite weight greater than or equal to the threshold
set in the parameter file are linked, and the records from the incoming file for these
linked cases are excluded from all remaining passes. All Numident records are always
available for linking in every pass. Any record missing data for any of the blocking fields
for a pass skips that pass and moves to the next pass.

[splink](https://github.com/moj-analytical-services/splink) is similar to Multi-Match
(both are based on the Fellegi-Sunter approach to record linkage).
It also implements:
- Exact blocking on specified keys
- Determining overall match probability based on conditional independence of individual field comparisons
  (it is equivalent to multiply the probability ratios, or sum the logarithmic "weights" as Multi-Match describes it),
  which each have m and u probabilities
- Setting a match threshold

However, splink does not include some additional logic built into Multi-Match,
specifically the ability to run multiple "passes," removing linked records from
subsequent passes.
We have to implement that ourselves, calling splink again in each pass.

## Estimate parameters (lambda, m, u) once for all modules

In Multi-Match, there is no lambda (prior probability of a link, before any comparisons are observed).
This is because Multi-Match works entirely in weight space, and sets different weight thresholds
in different modules/passes instead of changing the prior probability.

In Multi-Match parameters are not directly estimated from the data.
They are primarily set manually by analysts, with a different set of parameter files
maintained for each type of input file (e.g. survey, administrative).

All parameters except lambda are column-level parameters.
We estimate the column-level parameters using the GeoBase Reference File,
since it has all columns that are used for matching with any reference file.

We estimate lambda (manually, see below) separately by reference file,
since the number of records is different between the reference files but
the size of the underlying population (people we are trying to match) is constant.

### lambda (prior probability of a match)

Splink has a built-in method (estimate_probability_two_random_records_match)
for estimating this, but it did not seem to give me reasonable estimates.

We just make an informed guess here based on how much overlap we expect
in the files, how much unintentional duplication we expect in the files (e.g. someone
being enumerated twice), how much *intentional* duplication we have in the
files (e.g. a record for each nickname variant), and some simplifying assumptions.

Our assumptions:
- 5% of the enumerations in the CUF are unintentional duplicates
- 0.5% of the PIKs in the reference file are unintentional duplicates
  (that same person is also represented with a different PIK)
- 90% of the people in the CUF are represented in the reference files
- Being represented in both files is independent of being unintentionally
  or intentionally duplicated
- Being intentionally duplicated in one file is independent of being intentionally
  duplicated in the other (likely not true since having a name with lots of variants
  would cause intentional duplicates in both)

We do this first (before m and u probabilities) because having lambda estimated is
useful to the EM algorithm for estimating m and u.

In [None]:
def estimate_number_true_matches(input_file, reference_file):
    people_represented_in_input_file = (
        len(df_ops.drop_duplicates(input_file[["record_id_raw_input_file"]])) * 0.95
    )
    people_represented_in_reference_file = (
        len(df_ops.drop_duplicates(reference_file[["pik"]])) * 0.995
    )
    people_represented_in_both = people_represented_in_input_file * 0.9

    # Assuming independence conditions as noted above, the number of true
    # matches that should be found for *each* true person is the expected number
    # of records in one file times the expected number of records in the other
    input_file_records_per_person = people_represented_in_input_file / len(input_file)
    reference_file_records_per_person = people_represented_in_reference_file / len(
        reference_file
    )
    record_matches_per_person = (
        input_file_records_per_person * reference_file_records_per_person
    )

    return people_represented_in_both * record_matches_per_person


def probability_two_random_records_match(input_file, reference_file):
    cartesian_product = len(input_file) * len(reference_file)
    if cartesian_product == 0:
        # Does not matter
        return 0.5

    return estimate_number_true_matches(input_file, reference_file) / cartesian_product


probability_two_random_records_match(census_2030, geobase_reference_file)

### m and u probabilities

In [None]:
common_cols = [c for c in census_2030.columns if c in geobase_reference_file.columns]
common_cols

In [None]:
def prep_table_for_splink(df, dataset_name, columns, df_ops):
    result = df[[c for c in df.columns if c in columns]]
    if compute_engine == "pandas" or splink_engine == "duckdb":
        # Needs to all be in one process anyway
        return df_ops.compute(result)
    else:
        utils.ensure_empty(f"{intermediate_data_dir}/for_splink/{dataset_name}")
        file_path = f"{intermediate_data_dir}/for_splink/{dataset_name}/{dataset_name}_{int(time.time())}.parquet"
        df_ops.to_parquet(result, file_path, wait=True, write_index=False)
        assert splink_engine == "spark"
        return spark.read.parquet(file_path)


if len(geobase_reference_file) > 1_000_000:
    geobase_reference_file_for_training = geobase_reference_file.sample(
        frac=(1_000_000 / len(geobase_reference_file)), random_state=1234
    )
else:
    geobase_reference_file_for_training = geobase_reference_file

if len(census_2030) > 1_000_000:
    census_2030_for_training = census_2030.sample(
        frac=(1_000_000 / len(census_2030)), random_state=1234
    )
else:
    census_2030_for_training = census_2030

utils.ensure_empty(f"{intermediate_data_dir}/for_splink/")
tables_for_splink = [
    prep_table_for_splink(
        geobase_reference_file_for_training,
        "geobase_reference_file",
        common_cols,
        df_ops,
    ),
    prep_table_for_splink(census_2030_for_training, "census_2030", common_cols, df_ops),
]

#### Define comparison variables and levels

**Variables**

The most recent report, from 2023 (Brown et al., p. 29), says:

> [In] GeoSearch... The typical matching variables are name, DOB, sex, and
  various address fields...
>
> NameSearch... uses only name and DOB fields...
> 
> DOBSearch... compares name, sex, and DOB...
> 
> the Household Composition search module... attempts to find a match... based on name and DOB information

So, the total list: name, DOB, sex, and "various address fields."
These address fields are listed in the PVS report (p. 34) which is admittedly from 2011:

> variables used directly in matching the input file with the reference files [include]...
> street name, street name prefix and suffix, house number, rural route and box, and ZIP code

**Comparisons**

Massey et al. footnote 2: 
> The PVS string comparator was developed by Winkler (1995) and measures the distance
  between two strings on a scale from 0 to 900, where a distance score of 0 is given if
  there is no similarity between two text strings and a score of 900 is given for an exact match.
  The cutoff value for the string distance is set to 750 in the Name Search module.

Massey and O'Hara, p. 6 footnote 7:
> For the comparison of text strings, a prorated value
  between the chosen agreement score and chosen disagreement score is given depending on the Jaro-Winkler
  distance between the string in the input file and reference file.

Massey et al.:
> For numeric variables, such as year of birth, a maximum acceptable difference
> between the variable value in the input and reference record is set. This also
> allows for creation of an interval, or band, around year of birth to permit
> inexact matches. Within this band, prorated agreement and disagreement weights
> are assigned depending on the similarity of year of birth.

The "maximum acceptable distance" implies that the m probability beyond that distance is
zero; such pairs should never be linked.

Note that this continuous "prorated value," as opposed to categorizing comparison levels,
is not possible in splink and goes outside the traditional F-S framing of m and u probabilities!
We omit it for now.

In [None]:
if splink_engine == "duckdb":
    from splink.duckdb.comparison_library import (
        exact_match,
        jaro_winkler_at_thresholds,
    )
    import splink.duckdb.comparison_level_library as cll
elif splink_engine == "spark":
    from splink.spark.comparison_library import (
        exact_match,
        jaro_winkler_at_thresholds,
    )
    import splink.spark.comparison_level_library as cll


def numeric_column_comparison(col_name, human_name, maximum_inexact_match_difference):
    return {
        "output_column_name": col_name,
        "comparison_description": human_name,
        "comparison_levels": [
            cll.null_level(col_name),
            cll.exact_match_level(col_name),
            {
                "sql_condition": f"abs({col_name}_l - {col_name}_r) <= {maximum_inexact_match_difference}",
                "label_for_charts": "Inexact match",
            },
            cll.else_level(),
        ],
    }


numeric_columns = ["day_of_birth", "month_of_birth", "year_of_birth"]

settings = {
    "link_type": "link_only",
    "comparisons": [
        jaro_winkler_at_thresholds("first_name_15", 750 / 900),
        jaro_winkler_at_thresholds("last_name_12", 750 / 900),
        exact_match("middle_initial"),
        # NOTE: Three separate comparisons here can strongly violate conditional independence.
        # However, it seems to be well-attested in the PVS descriptions.
        numeric_column_comparison(
            "day_of_birth", "Day of birth", maximum_inexact_match_difference=5
        ),
        numeric_column_comparison(
            "month_of_birth", "Month of birth", maximum_inexact_match_difference=3
        ),
        numeric_column_comparison(
            "year_of_birth", "Year of birth", maximum_inexact_match_difference=5
        ),
        # NOTE: There are some places where PVS seems to claim to compare individual parts
        # of the address.
        # Address matching is not mentioned at all in the one source we have listing exact variables.
        # Comparing parts of the address separately strongly breaks conditional independence.
        # So we compare the entire geokey at once.
        jaro_winkler_at_thresholds("geokey", 650 / 900),
    ],
    "probability_two_random_records_match": probability_two_random_records_match(
        census_2030, geobase_reference_file
    ),
    "unique_id_column_name": "record_id",
    # Not sure exactly what this does, but it is necessary for some of the fancier graphs below
    "retain_intermediate_calculation_columns": True,
}

if splink_engine == "duckdb":
    from splink.duckdb.linker import DuckDBLinker

    linker = DuckDBLinker(
        tables_for_splink,
        settings,
        input_table_aliases=["reference_file", "census_2030"],
    )
elif splink_engine == "spark":
    from splink.spark.linker import SparkLinker

    linker = SparkLinker(
        tables_for_splink,
        settings,
        input_table_aliases=["reference_file", "census_2030"],
        spark=spark,
    )

    import warnings

    # PySpark triggers a lot of Pandas warnings
    warnings.filterwarnings("ignore", category=DeprecationWarning)
    warnings.filterwarnings("ignore", category=FutureWarning)

#### Estimate u probabilities

This method seems to work well:
- For almost all of the columns, an exact or even inexact match is relatively rare
- Exact matches are common on ZIP code (makes sense, given our sample data is all in one ZIP)
- Inexact matches are more common where you would expect (day and month of birth have highly constrained values)

I have tested that this estimation is reproducible when run multiple times (with the same seed).

In [None]:
%%time

linker.estimate_u_using_random_sampling(max_pairs=1e7, seed=1234)

In [None]:
# Ignore the green bars on the left, these are the m probabilities that haven't been estimated yet
linker.m_u_parameters_chart()

#### Estimate m probabilities

The EM algorithm implemented in Splink can be used to estimate all the parameters at once.
However, I've found this to be extremely unstable, and the lambda and u estimates I've made
above much better than what happens when I allow EM to mess with them.

These EM session blocking rules are the result of **lots** of trial and error.
I consistently had problems with the EM algorithm deciding that first names
were completely wrong ("All other comparisons") almost as often as they were
right.
I can think of two reasons this might be happening:
- The EM algorithm is just looking for two coherent clusters, and the assumption is that these
  clusters correspond to matches and non-matches.
  But in real life, and in our simulated data, matches and non-matches are not homogenous.
  In particular, people who are in the same family live together and tend to share a last name,
  and this could have been the cluster EM was finding instead of finding the exact same person.
- Conditional independence is grossly violated by some of our columns, especially the address parts
  and the DOB parts. This could be causing some pathological behavior.

Using the EM approach below, I have at least obtained reasonable-looking m and u probabilities.

In [None]:
blocking_rule_for_training = (
    "l.first_name_15 = r.first_name_15 and l.zipcode = r.zipcode"
)
em_session_1 = linker.estimate_parameters_using_expectation_maximisation(
    blocking_rule_for_training,
    # Fix lambda; u is fixed by default
    fix_probability_two_random_records_match=True,
)

In [None]:
em_session_1.m_u_values_interactive_history_chart()

In [None]:
em_session_1.match_weights_interactive_history_chart()

In [None]:
blocking_rule_for_training = "l.geokey = r.geokey"
em_session_2 = linker.estimate_parameters_using_expectation_maximisation(
    blocking_rule_for_training,
    # Fix lambda; u is fixed by default
    fix_probability_two_random_records_match=True,
)

In [None]:
em_session_2.m_u_values_interactive_history_chart()

In [None]:
linker.match_weights_chart()

In [None]:
linker.m_u_parameters_chart()

In [None]:
linker.parameter_estimate_comparisons_chart()

In [None]:
splink_settings = linker._settings_obj.as_dict()

In [None]:
import pickle

with open(f"{intermediate_data_dir}/splink_settings.pkl", "wb") as f:
    pickle.dump(splink_settings, f, protocol=-1)

In [None]:
with open(f"{intermediate_data_dir}/splink_settings.pkl", "rb") as f:
    splink_settings = pickle.load(f)

## Implement matching passes

In [None]:
from dataclasses import dataclass
import pathlib

# Calculate this once to save time -- mapping from record_id to record_id_raw_input_file
# There can be multiple records (with different record_id) for the same input file record
# (record_id_raw_input_file) because of the handling of nicknames by creating extra records.
record_id_raw_input_file_by_record_id = census_2030[
    ["record_id", "record_id_raw_input_file"]
]

all_piks = df_ops.persist(
    df_ops.concat(
        [
            name_dob_reference_file[["record_id", "pik"]],
            geobase_reference_file[["record_id", "pik"]],
        ],
        ignore_index=True,
    )
)

dates_of_death = df_ops.persist(
    df_ops.read_parquet(f"{input_dir}/{data_to_use}/simulated_census_numident.parquet")[
        ["pik", "date_of_death"]
    ].assign(
        date_of_death=lambda s: pd.to_datetime(
            s.date_of_death, format="%Y%m%d", errors="coerce"
        )
    )
)


class PersonLinkageCascade:
    def __init__(self):
        # This dataframe will accumulate the PIKs to attach to the input file
        self.confirmed_piks = df_ops.empty_dataframe(
            columns=["record_id_raw_input_file", "pik"], dtype=str
        )
        self.current_module = None

    def start_module(self, *args, **kwargs):
        assert self.current_module is None or self.current_module.confirmed
        self.current_module = PersonLinkageModule(
            *args, already_confirmed_piks=self.confirmed_piks, **kwargs
        )

    def run_matching_pass(self, *args, **kwargs):
        self.current_module.run_matching_pass(*args, **kwargs)

    def confirm_piks(self, *args, **kwargs):
        # Make sure we are not about to confirm PIKs for any of the same files we have
        # already PIKed
        assert (
            len(
                self.current_module.provisional_links.merge(
                    self.confirmed_piks, on="record_id_raw_input_file", how="inner"
                )
            )
            == 0
        )

        newly_confirmed_piks = self.current_module.confirm_piks_from_provisional_links()
        if compute_engine.startswith("dask"):
            # An implementation of "checkpointing" in Dask, so we can safely delete the source files
            # without risking permanent loss of data we will need
            dir_path = f"{output_dir}/intermediate_data/newly_confirmed_piks/newly_confirmed_piks_{self.current_module.name}"
            utils.ensure_empty(dir_path)
            file_path = f"{dir_path}/newly_confirmed_piks_{self.current_module.name}_{int(time.time())}.parquet"
            df_ops.to_parquet(newly_confirmed_piks, file_path, wait=True)
            newly_confirmed_piks = df_ops.read_parquet(file_path)

        # We are now done with the intermediate data inside this module
        dir_path = f"{output_dir}/intermediate_data/{self.current_module.name}/"
        utils.remove_path(dir_path)

        self.confirmed_piks = df_ops.persist(
            df_ops.concat(
                [
                    self.confirmed_piks,
                    newly_confirmed_piks,
                ],
                ignore_index=True,
            )
        )



@dataclass
class PersonLinkageModule:
    name: str
    reference_file: pd.DataFrame
    reference_file_name: str
    already_confirmed_piks: pd.DataFrame
    cut_columns: list[str]
    matching_columns: list[str]
    bayes_factor_cut_columns: float = 1

    def __post_init__(self):
        self.provisional_links = df_ops.empty_dataframe(
            columns=["record_id_census_2030"], dtype=str
        )
        self.confirmed = False
        utils.ensure_empty(f"{intermediate_data_dir}/for_splink/")
        self.reference_file_for_splink = prep_table_for_splink(
            self.reference_file,
            self.reference_file_name,
            self.reference_file.columns,
            df_ops,
        )
        if clear_intermediate:
            utils.ensure_empty(f"{output_dir}/intermediate_data/{self.name}/")
        self.census_to_match = df_ops.persist(
            census_2030
            # Only look for matches among records that have not received a confirmed PIK
            .merge(
                self.already_confirmed_piks[["record_id_raw_input_file"]].assign(
                    already_confirmed_dummy=1
                ),
                on="record_id_raw_input_file",
                how="left",
            )
            .pipe(lambda df: df[df.already_confirmed_dummy.isna()])
            .drop(columns=["already_confirmed_dummy"])
            .pipe(df_ops.rebalance)
        )

    def run_matching_pass(
        self,
        pass_name,
        blocking_columns,
        probability_threshold=0.97,
        input_data_transformation=lambda x: x,
    ):
        assert self.confirmed == False

        print(f"Running {pass_name} of {self.name}")

        columns_needed = utils.dedupe_list(
            ["record_id"] + self.cut_columns + blocking_columns + self.matching_columns
        )
        print(
            f"Files to link are {len(self.reference_file):,.0f} and {len(self.census_to_match):,.0f} records"
        )
        if splink_engine == "spark":
            import pyspark

            spark_df = pyspark.sql.DataFrame
        else:
            spark_df = None
        if splink_engine == "spark" and isinstance(
            self.reference_file_for_splink, spark_df
        ):
            reference_file_for_splink = self.reference_file_for_splink.select(
                columns_needed
            )
        else:
            reference_file_for_splink = self.reference_file_for_splink[columns_needed]

        tables_for_splink = [
            reference_file_for_splink,
            prep_table_for_splink(
                self.census_to_match.pipe(input_data_transformation),
                "census_2030",
                columns_needed,
                df_ops,
            ),
        ]

        blocking_rule_parts = [
            f"l.{col} = r.{col}" for col in self.cut_columns + blocking_columns
        ]
        blocking_rule = " and ".join(blocking_rule_parts)

        if splink_engine == "spark":
            blocking_rule = {
                "blocking_rule": blocking_rule,
                "salting_partitions": 10,
            }

        # We base our Splink linker for this pass on the one we trained above,
        # but limiting it to the relevant column comparisons and updating the pass-specific
        # settings
        pass_splink_settings = copy.deepcopy(splink_settings)
        pass_splink_settings["comparisons"] = [
            c
            for c in pass_splink_settings["comparisons"]
            if c["output_column_name"] in self.matching_columns
        ]
        prior_two_random_records_match = probability_two_random_records_match(
            census_2030,
            self.reference_file,
        )
        prior_odds_two_random_records_match = prior_two_random_records_match / (
            1 - prior_two_random_records_match
        )
        # If cut columns are not used for matching, need to adjust our prior for the fact
        # that all pairs compared will have the same values in the cut columns
        odds_two_random_records_match = (
            prior_odds_two_random_records_match * self.bayes_factor_cut_columns
        )
        pass_splink_settings["probability_two_random_records_match"] = (
            odds_two_random_records_match / (1 + odds_two_random_records_match)
        )
        pass_splink_settings["blocking_rules_to_generate_predictions"] = [blocking_rule]

        if splink_engine == "duckdb":
            linker = DuckDBLinker(
                tables_for_splink,
                pass_splink_settings,
                # Must match order of tables_for_splink
                input_table_aliases=["reference_file", "census_2030"],
            )
        else:
            linker = SparkLinker(
                tables_for_splink,
                pass_splink_settings,
                # Must match order of tables_for_splink
                input_table_aliases=["reference_file", "census_2030"],
                spark=spark,
            )

        num_comparisons = linker.count_num_comparisons_from_blocking_rule(blocking_rule)
        print(f"Number of pairs that will be compared: {num_comparisons:,.0f}")

        # https://moj-analytical-services.github.io/splink/demos/06_Visualising_predictions.html#comparison-viewer-dashboard
        # We also include some pairs below the threshold, for additional context.
        pairs_worth_inspecting = linker.predict(
            threshold_match_probability=probability_threshold - 0.2
        )

        if (
            linker.query_sql(
                f"select count(*) as num from {pairs_worth_inspecting.physical_name}"
            ).num.iloc[0]
            > 0
        ):
            dashboard_file_name = f"diagnostics/splink_reports/{data_to_use}/{self.name.replace(' ', '_')}__{pass_name.replace(' ', '_')}.html"
            pathlib.Path(dashboard_file_name).parent.mkdir(parents=True, exist_ok=True)
            linker.comparison_viewer_dashboard(
                pairs_worth_inspecting, dashboard_file_name, overwrite=True
            )
            from IPython.display import IFrame, display

            display(IFrame(src=f"./{dashboard_file_name}", width="100%", height=1200))

        if compute_engine == "pandas":
            new_provisional_links = pairs_worth_inspecting.as_pandas_dataframe()
            new_provisional_links = new_provisional_links[
                new_provisional_links.match_probability >= probability_threshold
            ]
        elif splink_engine == "spark":
            # We do a tiny bit of computation in Spark directly to avoid writing more to disk than we need to
            pairs_worth_inspecting = pairs_worth_inspecting.as_spark_dataframe()
            new_provisional_links = pairs_worth_inspecting.filter(
                pairs_worth_inspecting.match_probability >= probability_threshold
            )
            if (
                pairs_worth_inspecting.rdd.getNumPartitions()
                < compute_engine_num_workers * 5
            ):
                new_provisional_links = new_provisional_links.repartition(
                    compute_engine_num_workers * 5
                )
            # NOTE: We do *not* utils.ensure_empty across passes here, because we need the provisional links from all passes
            # to stick around
            dir_path = f"{output_dir}/intermediate_data/{self.name}/{pass_name}/new_provisional_links/"
            utils.ensure_empty(dir_path)
            file_path = f"{dir_path}/new_provisional_links_{int(time.time())}.parquet"
            new_provisional_links.write.mode("overwrite").parquet(file_path)
            utils.ensure_empty(
                spark_checkpoints_dir
            )  # We no longer need these checkpoints
            new_provisional_links = df_ops.persist(df_ops.read_parquet(file_path))
        else:
            dir_path = f"{output_dir}/intermediate_data/{self.name}/{pass_name}/pairs_worth_inspecting/"
            utils.ensure_empty(dir_path)
            file_path = f"{dir_path}/pairs_worth_inspecting_{int(time.time())}.parquet"
            pairs_worth_inspecting.to_parquet(file_path, overwrite=True)
            pairs_worth_inspecting = df_ops.persist(df_ops.read_parquet(file_path))
            new_provisional_links = pairs_worth_inspecting[
                pairs_worth_inspecting.match_probability >= probability_threshold
            ]
            del pairs_worth_inspecting

        if splink_engine == "spark":
            utils.ensure_empty(
                spark_checkpoints_dir
            )  # We are done with this Spark linker

        if len(new_provisional_links) > 0:
            new_provisional_links = df_ops.persist(
                label_pairs_with_dataset(new_provisional_links)
                .merge(
                    record_id_raw_input_file_by_record_id,
                    left_on="record_id_census_2030",
                    right_on="record_id",
                    how="left",
                )
                .drop(columns=["record_id"])
            )
            self.provisional_links = df_ops.persist(
                df_ops.concat(
                    [
                        self.provisional_links,
                        new_provisional_links.assign(
                            module_name=self.name, pass_name=pass_name
                        ).pipe(df_ops.ensure_large_string_capacity),
                    ],
                    ignore_index=True,
                )
            )

            self.census_to_match = df_ops.persist(
                self.census_to_match
                # Only look for matches among records that have not received a provisional link
                # NOTE: "records" here does not mean input file records -- a nickname record having
                # a provisional link does not prevent a canonical name record from the same input record
                # from continuing to match
                .merge(
                    df_ops.drop_duplicates(
                        new_provisional_links[["record_id_census_2030"]]
                    ),
                    left_on="record_id",
                    right_on="record_id_census_2030",
                    how="left",
                )
                .pipe(lambda df: df[df.record_id_census_2030.isna()])
                .drop(columns=["record_id_census_2030"])
            )

        print(
            f"Found {len(new_provisional_links):,.0f} matches; {len(self.census_to_match) / len(census_2030):.2%} still eligible to match"
        )

    def confirm_piks_from_provisional_links(self):
        assert not self.confirmed

        provisional_links = self.provisional_links.merge(
            all_piks,
            left_on="record_id_reference_file",
            right_on="record_id",
            how="left",
        ).drop(columns="record_id")

        # "After the initial set of links is created in GeoSearch, a post-search program is run to determine
        # which of the links are retained. A series of checks are performed: First the date of death
        # information from the Numident is checked and links to deceased persons are dropped. Next a
        # check is made for more than one SSN assigned to a source record. If more than one SSN is
        # assigned, the best link is selected based on match weights. If no best SSN is determined, all SSNs
        # assigned in the GeoSearch module are dropped and the input record cascades to the next
        # module. A similar post-search program is run at the end of all search modules."
        # - Layne et al. p. 5

        # Drop links to deceased people
        # NOTE: On p. 38 of Brown et al. (2023) it discusses at length the number of PVS matches to deceased
        # people, which should not be possible based on this process.
        # Even though this is more recent, I can't think of a reason why this check would have
        # been *removed* from PVS -- can we chalk this up to something experimental they were doing for
        # the AR Census in the 2023 report?
        provisional_links = df_ops.persist(
            provisional_links.merge(dates_of_death, on="pik", how="left")
        )
        # Census day 2030
        deceased_links = df_ops.persist(
            provisional_links.date_of_death.notnull()
            & (provisional_links.date_of_death <= pandas.to_datetime("2030-04-01"))
        )
        print(
            f"{df_ops.compute(deceased_links.sum()):,.0f} input records linked to deceased people, dropping links"
        )
        provisional_links = df_ops.persist(provisional_links[~deceased_links])

        # Check for multiple linkage to a single input file record
        max_probability = df_ops.groupby_agg_small_groups(
            provisional_links,
            by="record_id_raw_input_file",
            agg_func=lambda x: x.match_probability.max(),
        ).reset_index()
        piks_per_input_file = (
            provisional_links.merge(
                max_probability, on=["record_id_raw_input_file", "match_probability"]
            )
            .pipe(
                df_ops.groupby_agg_small_groups,
                by="record_id_raw_input_file",
                agg_func=lambda x: x.pik.nunique(),
            )
            .rename("num_unique_piks")
            .reset_index()
        )

        multiple_piks = df_ops.persist(
            piks_per_input_file[piks_per_input_file.num_unique_piks > 1]
        )
        print(
            f"{len(multiple_piks):,.0f} input records linked to multiple PIKs, dropping links"
        )
        provisional_links = df_ops.persist(
            provisional_links.merge(
                multiple_piks, on="record_id_raw_input_file", how="left"
            )
            .pipe(
                lambda df: df[df.num_unique_piks.isnull() | (df.num_unique_piks <= 1)]
            )
            .pipe(
                df_ops.drop_duplicates,
                subset="record_id_raw_input_file",
                sort_col="match_probability",
                keep="last",
            )
        )

        assert df_ops.compute(
            (
                df_ops.groupby_agg_small_groups(
                    provisional_links,
                    by="record_id_raw_input_file",
                    agg_func=lambda x: x.pik.nunique(),
                )
                == 1
            ).all()
        )

        self.confirmed = True
        self.provisional_links = None

        return provisional_links[
            [
                "record_id_raw_input_file",
                "record_id_census_2030",
                "record_id_reference_file",
                "pik",
                "module_name",
                "pass_name",
                "match_probability",
            ]
        ]


def label_pairs_with_dataset(pairs):
    pairs = df_ops.persist(pairs)
    if len(pairs) == 0:
        return pairs

    # Name the columns according to the datasets, not "r" (right) and "l" (left)
    for suffix in ["r", "l"]:
        # NOTE: In practice, splink always has "r" correspond to one dataset and "l" correspond to the other.
        # It is unclear to me if this is a documented guarantee, so I double check.
        # NOTE: Do not use drop_duplicates here, since the groups should be enormous (the entire dataframe)
        assert df_ops.compute(pairs[f"source_dataset_{suffix}"].nunique()) == 1
        source_dataset_name = df_ops.head(pairs, n=1)[f"source_dataset_{suffix}"].iloc[
            0
        ]
        pairs = pairs.rename(
            columns={
                c: re.sub(f"_{suffix}$", f"_{source_dataset_name}", c)
                for c in pairs.columns
                if c.endswith(f"_{suffix}")
            }
        )

    return pairs

In [None]:
person_linkage_cascade = PersonLinkageCascade()

# Verification Module

Wagner and Layne, p.14:

> If the input file has a SSN data field, it first goes through the verification process.

This module of PVS only runs on input records with an SSN.
No records in the CUF have an SSN (SSN is not collected on the decennial)
so this module is skipped.

# GeoSearch

Brown et al. p. 29:

> the GeoSearch module blocks records at
different levels of geography, starting with the housing unit level, then broadening geography
up to the three-digit ZIP Code level. The typical matching variables are name, DOB, sex, and
various address fields.

Wagner and Layne p. 14:

> The typical GeoSearch blocking strategy starts with blocking records at the
household level, then broadens the geography for each successive pass and ends at
blocking by the first three digits of the zip code. The typical match variables are first,
middle, and last names; generational suffix; date of birth; gender and various address
fields.
> 
> The data for the GeoSearch module are split into 1,000 cuts based on the first three
digits of the zip code (zip3) for record. The GeoSearch program works on one zip3 cut
at a time, with shell scripts submitting multiple streams of cuts to the system. This
allows for parallel processing and restart capability.

Layne, Wagner, and Rothhaas list an exact series of passes with blocking and matching variables
for each (Appendix A on p. 25).
This was the series of passes they used in running PVS on the Medicare Enrollment
Database, the Indian Health Service Patient Registration File, and on two commercial
data sources.
All of these data sources are rather different than a CUF, and this paper is from 2014,
but it is the only place I know of where an entire set of passes is described,
so I have used the exact same passes here.

In [None]:
person_linkage_cascade.start_module(
    name="geosearch",
    reference_file=geobase_reference_file,
    reference_file_name="geobase_reference_file",
    cut_columns=["zip3"],
    matching_columns=[
        "first_name_15",
        "last_name_12",
        "middle_initial",
        "day_of_birth",
        "month_of_birth",
        "year_of_birth",
        "geokey",
    ],
)

## Pass 1: Block on geokey (entire address)

In [None]:
person_linkage_cascade.run_matching_pass(
    pass_name="geokey",
    blocking_columns=["geokey_for_blocking"],
)

## Pass 2: Block on geokey (entire address), switching first and last names

In [None]:
def switch_first_and_last_names(df):
    return (
        df.rename(columns={"first_name": "last_name", "last_name": "first_name"})
        # Re-calculate the truncated versions of first and last.
        # NOTE: It is not necessary to re-calculate the phonetic versions, because
        # those are never used in any pass that has a name switch.
        .pipe(add_truncated_name_cols)
    )

In [None]:
# We don't actually have any swapping of first and last names in pseudopeople
person_linkage_cascade.run_matching_pass(
    pass_name="geokey name switch",
    blocking_columns=["geokey_for_blocking"],
    input_data_transformation=switch_first_and_last_names,
)

## Pass 3: Block on house number and street name Soundex

In [None]:
person_linkage_cascade.run_matching_pass(
    pass_name="house number and street name Soundex",
    blocking_columns=["street_number_for_blocking", "street_name_for_blocking_soundex"],
)

## Pass 4: Block on house number and street name Soundex, switching first and last names

In [None]:
person_linkage_cascade.run_matching_pass(
    pass_name="house number and street name Soundex name switch",
    blocking_columns=["street_number_for_blocking", "street_name_for_blocking_soundex"],
    input_data_transformation=switch_first_and_last_names,
)

## Pass 5: Block on some name and DOB information

In [None]:
person_linkage_cascade.run_matching_pass(
    pass_name="some name and DOB information",
    blocking_columns=["first_name_2", "last_name_2", "year_of_birth"],
)

## Post-process and confirm PIKs

In [None]:
person_linkage_cascade.confirm_piks()

In [None]:
df_ops.compute(
    person_linkage_cascade.confirmed_piks.groupby(["module_name", "pass_name"]).size()
).sort_values(ascending=False)

# ZIP3 Adjacency Search

Layne, Wagner, and Rothhaas (p. 6) refers to ZIP3 adjacency as a module and even provides
a list of two passes within it in Appendix A.

Alexander et al. (p. 6) refers to it the same way, as a module.

However, Wagner and Layne (p. 14) says:

> The GeoSearch module also incorporates the adjacency of neighboring areas with
different zip3 values

which implies this logic is a pass or passes *within* GeoSearch (and due to how cascading works,
this is not just an academic distinction).

There is no mention at all of ZIP3 adjacency in the Brown et al. paper from 2023.

**TODO**: Determine whether this still exists, how it works if so, and implement it.
Doesn't really make sense with the sample data since there is only one ZIP.

# Movers

The Movers module is not mentioned in Brown et al, nor in Wagner and Layne.

In Layne, Wagner, and Rothhaas (p. 7), published in 2014, it is described as "Prototyped only – not implemented in PVS":

> The Movers module is appropriate for input files that combine individuals together into
> households.
> The module seeks multiple members of an input household in one address that
> may have moved together to another address.
> To be eligible for this module the household size must be greater than one.
> This module is being tested and was not used in the analysis for this paper.

Alexander et al. (p. 6) a year later makes no note of it being experimental:

> To be eligible for the Movers Module, no member of the household can have a
> PIK and the household must consist of more than one member.
> This module considers persons living at the same address as a unit and
> searches for matching units living together in the reference file (without regard for address).

Essentially the same text is included in Massey et al., published in 2018.

Though it doesn't provide much information, [this document](https://gunnisonconsulting.com/docs/pdf/Record%20Linkage%20Slicksheet_FINAL.pdf)
([archive](https://web.archive.org/web/20230705010237/https://gunnisonconsulting.com/docs/pdf/Record%20Linkage%20Slicksheet_FINAL.pdf))
from the Gunnison consulting group states that they worked with Census and
"pioneered the transition from a record-by-record matching approach to an
approach that better uses groups of records, such as full households, to make more
and better links."
Specifically, they claim that 16% of previously unlinked records were linked using
the household-level approach.

**TODO**: Haven't figured out if this is still in use, nor how it works.
This module seems of particular interest, because it sounds like it is quite different
from other modules.

# NameSearch

Brown et al., p. 29:

> ... the NameSearch module, which uses only name and DOB fields, comparing all combinations of
alternate names and dates of birth.

Wagner and Layne, p. 15:

> The NameSearch module searches the reference files for records failing the
Verification and GeoSearch Modules. Only name and date of birth data are used in this
search process. NameSearch consists of multiple passes against the Numident Name
Reference file, which contains all possible combinations of alternate names and
alternate dates of birth for each SSN in the Census Numident file, and includes data for
ITINs.
> 
> The typical NameSearch blocking strategy starts with a strict first pass, blocking
records by exact date of birth and parts of names. Successive passes block on parts of
the name and date of birth fields to allow for some name and date of birth variation.
The typical match variables are first, middle and last names, generational suffix, date of
birth, and gender.

As in GeoSearch, the only full listing I could find of passes with blocking and matching variables
was in Layne, Wagner, and Rothhaas.
This is probably somewhat out of date and is not what would be used on a CUF, but I have
copied it exactly here.

In [None]:
person_linkage_cascade.start_module(
    name="namesearch",
    reference_file=name_dob_reference_file,
    reference_file_name="name_dob_reference_file",
    cut_columns=["first_initial_cut", "last_initial_cut"],
    matching_columns=[
        "first_name_15",
        "last_name_12",
        "middle_initial",
        "day_of_birth",
        "month_of_birth",
        "year_of_birth",
    ],
)

## Pass 1: Block on DOB and NYSIIS of name

In [None]:
person_linkage_cascade.run_matching_pass(
    pass_name="DOB and NYSIIS of name",
    blocking_columns=[
        "day_of_birth",
        "month_of_birth",
        "year_of_birth",
        "first_name_nysiis",
        "last_name_nysiis",
    ],
)

## Pass 2: Block on DOB and initials

In [None]:
person_linkage_cascade.run_matching_pass(
    pass_name="DOB and initials",
    blocking_columns=[
        "day_of_birth",
        "month_of_birth",
        "year_of_birth",
        "first_name_1",
        "last_name_1",
    ],
)

## Pass 3: Block on year of birth and first two characters of name

In [None]:
person_linkage_cascade.run_matching_pass(
    pass_name="year of birth and first two characters of name",
    blocking_columns=["year_of_birth", "first_name_2", "last_name_2"],
)

## Pass 4: Block on birthday and first two characters of name

In [None]:
person_linkage_cascade.run_matching_pass(
    pass_name="birthday and first two characters of name",
    blocking_columns=["day_of_birth", "month_of_birth", "first_name_2", "last_name_2"],
)

## Post-process and confirm PIKs

In [None]:
person_linkage_cascade.confirm_piks()

In [None]:
df_ops.compute(
    person_linkage_cascade.confirmed_piks.groupby(["module_name", "pass_name"]).size()
).sort_values(ascending=False)

# DOBSearch

Brown et al., p. 29:

> ... the DOBSearch module, which blocks on
month and day of birth, then compares name, sex, and DOB.

Wagner and Layne, p. 15:

> The DOBSearch module searches the reference files for the records that fail the
> NameSearch, using name and date of birth data. The module matches against a re-split
> version of the Numident Name Reference file, splitting the data based on month and day
> of birth.
> 
> There are typically four blocking passes in the DOBSearch module. The first pass
> blocks records by first name in the incoming file to last name in the DOB Reference file
> and last name in incoming file to first name in the DOB Reference file. This strategy
> accounts for switching of first and last name in the incoming file.

Again, I have used the exact passes listed for this module in Layne, Wagner, and Rothhaas.
This is somewhat corroborated to be "typical" given that it is consistent with Wagner and Layne
above, but both papers are old.

In [None]:
person_linkage_cascade.start_module(
    name="dobsearch",
    reference_file=name_dob_reference_file,
    reference_file_name="name_dob_reference_file",
    cut_columns=["day_of_birth", "month_of_birth"],
    matching_columns=[
        "first_name_15",
        "last_name_12",
        "middle_initial",
        "day_of_birth",
        "month_of_birth",
        "year_of_birth",
    ],
)

## Pass 1: Block on initials, switching first and last names

In [None]:
person_linkage_cascade.run_matching_pass(
    pass_name="initials name switch",
    blocking_columns=["first_name_1", "last_name_1"],
    input_data_transformation=switch_first_and_last_names,
)

## Pass 2: Block on first three characters of name

In [None]:
person_linkage_cascade.run_matching_pass(
    pass_name="first three characters of name",
    blocking_columns=["first_name_3", "last_name_3"],
)

## Pass 3: Block on reverse Soundex of name

In [None]:
person_linkage_cascade.run_matching_pass(
    pass_name="reverse Soundex of name",
    blocking_columns=["first_name_reverse_soundex", "last_name_reverse_soundex"],
)

## Pass 4: Block on first two characters of first name and year of birth

In [None]:
person_linkage_cascade.run_matching_pass(
    pass_name="first two characters of first name and year of birth",
    blocking_columns=["first_name_2", "year_of_birth"],
)

## Post-process and confirm PIKs

In [None]:
person_linkage_cascade.confirm_piks()

In [None]:
df_ops.compute(
    person_linkage_cascade.confirmed_piks.groupby(["module_name", "pass_name"]).size()
).sort_values(ascending=False)

# HHCompSearch

Brown et al., p. 29:

> ... the Household Composition search module, which requires at least one
> person in the household of the unmatched person to have received a PIK.
> The full set of unmatched records with historical name, DOB, sex, and address data from
> households whose members with PIKs were observed in the past is created. The module
> attempts to find a match to this universe based on name and DOB information.

Wagner and Layne, p. 16:

> For persons with a PIK in the eligible household, all of the geokeys from the PVS
> GeoBase are extracted for each of these PIKs. The geokeys are unduplicated and all
> persons are selected from the PVS GeoBase with these geokeys. Next, the program
> removes all household members with a PIK, leaving the unPIKed persons in the
> household. This becomes the reference file to search against. There are typically two
> passes in this module. Records are blocked by MAFID, name, date of birth, and gender.

I don't know of any list of passes in this module.
Blocking on "MAFID, name, date of birth, and gender" seems like a lot for only
two passes, especially given that we are only matching people within each eligible
household.
I'm actually a bit surprised there is any blocking at all, given how restrictive this
module inherently is.
I've split the difference here by creating two passes that are very permissive.

## Create the reference file

In [None]:
# TODO: As of now in pseudopeople, our only indicator in the Census data of household
# is noiseless (ground truth).
# Instead, we use geokey to approximate the household clustering information that would be
# present in the real dataset.
# We should switch to using a (presumably low-noise) household indicator when we have that.
# NOTE: We use geokey_for_blocking instead of geokey to intentionally exclude the very
# large households currently created by GQ in pseudopeople.
pseudo_household_id = df_ops.drop_duplicates(
    census_2030[census_2030.geokey_for_blocking.notnull()][["geokey_for_blocking"]]
).pipe(df_ops.add_unique_id_col, col_name="pseudo_household_id")

In [None]:
census_2030 = df_ops.persist(
    census_2030.merge(pseudo_household_id, on="geokey_for_blocking", how="left")
)

In [None]:
piks_with_household = df_ops.persist(
    census_2030[["pseudo_household_id", "record_id_raw_input_file"]].merge(
        person_linkage_cascade.confirmed_piks, on="record_id_raw_input_file", how="left"
    )
)

In [None]:
piked_by_household = (
    df_ops.groupby_agg_small_groups(
        piks_with_household,
        by="pseudo_household_id",
        agg_func=lambda x: x.pik.agg(["count", "size"]),
    )
    .reset_index()
    .rename(columns={"count": "piked"})
    .assign(unpiked=lambda x: x["size"] - x["piked"])
)

In [None]:
eligible_households = piked_by_household[
    (piked_by_household.piked > 0) & (piked_by_household.unpiked > 0)
][["pseudo_household_id"]]
eligible_households

In [None]:
piks_by_eligible_household = df_ops.drop_duplicates(
    eligible_households.merge(
        piks_with_household[["pseudo_household_id", "pik"]].dropna(subset="pik"),
        on="pseudo_household_id",
        how="inner",
    )
)
piks_by_eligible_household

In [None]:
geokeys_by_eligible_household = (
    piks_by_eligible_household.merge(
        df_ops.drop_duplicates(
            geobase_reference_file[["pik", "geokey_for_blocking"]].dropna(
                subset="geokey_for_blocking"
            )
        ),
        on="pik",
    )
    .drop(columns=["pik"])
    .pipe(df_ops.drop_duplicates)
)
geokeys_by_eligible_household

In [None]:
# Apparently, we exclude from the reference file all *reference file* records with a PIK that
# has already been assigned to an input file row.
# Doing this goes against the normal assumption, which is that reference file records can match
# to multiple input file records.
# This is really surprising to me, but it seems clear from "the program
# removes all household members with a PIK, leaving the unPIKed persons in the
# household. This becomes the reference file to search against." (Wagner and Layne, p. 16)
eligible_geobase_reference_file_records = (
    geobase_reference_file[geobase_reference_file.geokey_for_blocking.notnull()]
    .merge(
        df_ops.drop_duplicates(person_linkage_cascade.confirmed_piks[["pik"]]).assign(
            confirmed_pik_dummy=1
        ),
        on="pik",
        how="left",
    )
    .pipe(lambda df: df[df.confirmed_pik_dummy.isnull()])
    .drop(columns=["confirmed_pik_dummy"])
)

In [None]:
hhcomp_reference_file = df_ops.persist(
    geokeys_by_eligible_household.merge(
        eligible_geobase_reference_file_records,
        on="geokey_for_blocking",
    )
)
hhcomp_reference_file

In [None]:
len(hhcomp_reference_file)

In [None]:
person_linkage_cascade.start_module(
    name="hhcompsearch",
    reference_file=hhcomp_reference_file,
    reference_file_name="hhcomp_reference_file",
    cut_columns=["pseudo_household_id"],
    # HACK: How else do we deal with the fact that pseudo_household_id
    # isn't used for matching?
    bayes_factor_cut_columns=1_000,
    matching_columns=[
        "first_name_15",
        "last_name_12",
        "middle_initial",
        "day_of_birth",
        "month_of_birth",
        "year_of_birth",
    ],
)

## Pass 1: Block on initials

In [None]:
person_linkage_cascade.run_matching_pass(
    pass_name="initials",
    blocking_columns=["first_name_1", "last_name_1"],
)

## Pass 2: Block on year of birth

In [None]:
person_linkage_cascade.run_matching_pass(
    pass_name="year of birth",
    blocking_columns=["year_of_birth"],
)

## Post-process and confirm PIKs

In [None]:
person_linkage_cascade.confirm_piks()

In [None]:
df_ops.compute(
    person_linkage_cascade.confirmed_piks.groupby(["module_name", "pass_name"]).size()
).sort_values(ascending=False)

# Resulting PIKs

In [None]:
pik_values = df_ops.drop_duplicates(
    person_linkage_cascade.confirmed_piks.rename(
        columns={"record_id_raw_input_file": "record_id"}
    )[["record_id", "pik"]]
)

In [None]:
census_2030_piked = census_2030_raw_input.copy()
kwargs = {}
if not compute_engine.startswith("dask"):
    kwargs["validate"] = "1:1"
census_2030_piked = df_ops.persist(
    census_2030_piked.merge(
        pik_values,
        how="left",
        on="record_id",
        **kwargs,
    )
)
census_2030_piked

In [None]:
piked_proportion = census_2030_piked.pik.notnull().mean()
# Compare with 90.28% of input records PIKed in the 2010 CUF,
# as reported in Wagner and Layne, Table 2, p. 18
print(f"{df_ops.compute(piked_proportion):.2%} of the input records were PIKed")

In [None]:
df_ops.to_parquet(census_2030_piked, f"{output_dir}/census_2030_piked.parquet")

In [None]:
df_ops.to_parquet(
    person_linkage_cascade.confirmed_piks, f"{output_dir}/confirmed_piks.parquet"
)

In [None]:
# NOTE: Can't use the magic ! date here for reasons I don't really understand having
# to do with the number of open file handles. This works fine.
print(datetime.datetime.now())

In [None]:
if splink_engine == "spark":
    teardown_spark()