In [1]:
import geopandas
import numpy as np
import pandas as pd
from collections import defaultdict

from constants import *

### Configuration
`OVERLAP_IGNORE_THRESHOLD` - If the percentage overlap is less than this value, then the overlap is ignored.

`OVERLAP_CONFIRM_THRESHOLD` - If the percentage overlap is greater than this value, then the districts are considered clearly overlapping.

`geopandas.options.io_engine` - Determines the engine used for reading shapefiles. `fiona` is used by default; `pyogrio` needs to be installed separately and is faster.

In [2]:
OVERLAP_IGNORE_THRESHOLD = 0.1
OVERLAP_CONFIRM_THRESHOLD = 0.8

geopandas.options.io_engine = "pyogrio"

In [3]:
# Temp replacement for file paths while handling stuff
VA_DEMOGRAPHIC_PATH = "../../Data/va_pl2020_vtd.zip"
VA_ELECTION_PATH = "../../Data/va_vest_20.zip"

In [None]:
census_df_format = {
    "P0030001": "pop_total",
    "P0030003": "pop_white",
    "P0030004": "pop_black",
    "P0030005": "pop_native",
    "P0030006": "pop_asian",
    "P0030007": "pop_pacific",
    "P0030009": "pop_two_or_more",
    "P0040002": "pop_hispanic"
}

election_df_format = {
    "G20PREDBID" : "vote_dem",
    "G20PRERTRU": "vote_rep"
}

# Load and format precinct-level census data
va_census_df = geopandas.read_file(VA_DEMOGRAPHIC_PATH)
va_census_df.set_index("GEOID20", inplace=True)
va_census.rename(columns=census_df_format, inplace=True)

# Load and format block-level election data
va_election_df = pd.read_csv(VA_BLOCK_ELECTION_PATH)
va_election_df.set_index("geoid20", inplace=True)
va_election.rename(columns=election_df_format, inplace=True)

# Load and format block boundaries
va_boundary_df = geopandas.read_file(VA_BLOCK_BOUNDARY_PATH)
va_boundary_df = va_boundary_df.set_index("GEOID20")[["geometry"]]

va_precinct_df = get_precinct_data(va_census_df, va_election_df, va_boundary_df)

In [4]:
va_census = geopandas.read_file(VA_DEMOGRAPHIC_PATH)
va_vest = geopandas.read_file(VA_ELECTION_PATH)

# Assign unique IDs to each entries for ease of analysis
'''
va_census_id_header = pd.Series(index=va_census.index, data="VA_CENSUS")
va_census_index_str = pd.Series(index=va_census.index, data=va_census.index).astype(str)
va_census["UNIQUE_ID"] = va_census_id_header.str.cat(others=[va_census_index_str, va_census["COUNTYFP20"], va_census["VTDST20"]], sep="-")

va_vest_id_header = pd.Series(index=va_vest.index, data="VA_VEST")
va_vest_index_str = pd.Series(index=va_vest.index, data=va_vest.index).astype(str)
va_vest["UNIQUE_ID"] = va_vest_id_header.str.cat(others=[va_vest_index_str, va_vest["COUNTYFP"], va_vest["VTDST"]], sep="-")
'''

'\nva_census_id_header = pd.Series(index=va_census.index, data="VA_CENSUS")\nva_census_index_str = pd.Series(index=va_census.index, data=va_census.index).astype(str)\nva_census["UNIQUE_ID"] = va_census_id_header.str.cat(others=[va_census_index_str, va_census["COUNTYFP20"], va_census["VTDST20"]], sep="-")\n\nva_vest_id_header = pd.Series(index=va_vest.index, data="VA_VEST")\nva_vest_index_str = pd.Series(index=va_vest.index, data=va_vest.index).astype(str)\nva_vest["UNIQUE_ID"] = va_vest_id_header.str.cat(others=[va_vest_index_str, va_vest["COUNTYFP"], va_vest["VTDST"]], sep="-")\n'

### Overlap Checking
District data from the census dataset is merged with district data from the VEST dataset.

This will be done by determining how the districts in the census dataset overlap with districts in the VEST dataset. The set of district boundaries should be relatively similar, though a district in the census dataset may be broken up into multiple districts in the VEST dataset (or vice versa).

The boundaries in the census data file will be treated as canonical, since they contain fewer overlapping sections and empty holes. Data from districts in the VEST file will be mapped to the district in the census file which covers that area.

In [25]:
def merge_precinct_data(census_df, vest_df, verbose=False, output_format=["merged_precincts"]):
    '''
    Merges a state's census dataset and VEST dataset.

    Parameters
    ----------
    census_df : GeoDataFrame
        GeoDataFrame containing the state's precinct-level census data.
    vest_df : GeoDataFrame
        GeoDataFrame containing the state's precinct-level election data.
    verbose : bool
        Indicates whether or not descriptive data for manual double-checking should be printed.
    output_mappings: list[str]
        Gives a list of items to be returned.
        Allowed values include:
        'merged_precincts' - GeoDataFrame consisting of merged precinct data
        'mappings' - 
    
    Returns
    -------
    GeoDataFrame containing merged data or (GeoDataFrame, dict) containing merged data and dict of combined precincts
    '''

    census_df["INDEX_ID"] = pd.Series(index=census_df.index, data=census_df.index)
    vest_df["INDEX_ID"] = pd.Series(index=vest_df.index, data=vest_df.index)

    # Determine all overlaps between precincts in the census dataset and VEST dataset
    # If the overlap has area zero (consists only of points and lines), then it is discarded
    overlap_check = geopandas.overlay(census_df, vest_df, keep_geom_type=True)
    overlap_check.rename_geometry("overlap_geometry", inplace=True)

    # Calculate area of overlap vs areea of census/VEST precincts and determine percentage overlap
    # Area (overlap_geometry_area, census_geometry_area, vest_geometry_area) is calculated in square meters
    overlap_crs = overlap_check.geometry.crs
    overlap_check["census_geometry"] = geopandas.GeoSeries(overlap_check["INDEX_ID_1"].apply(lambda x : census_df.loc[x]["geometry"]))
    overlap_check["census_geometry"].set_crs(crs=overlap_crs, inplace=True)
    overlap_check["vest_geometry"] = geopandas.GeoSeries(overlap_check["INDEX_ID_2"].apply(lambda x : vest_df.loc[x]["geometry"]))
    overlap_check["vest_geometry"].set_crs(crs=overlap_crs, inplace=True)

    overlap_check["overlap_geometry_area"] = overlap_check["overlap_geometry"].to_crs({'proj':'cea'}).area
    overlap_check["census_geometry_area"] = overlap_check["census_geometry"].to_crs({'proj':'cea'}).area
    overlap_check["vest_geometry_area"] = overlap_check["vest_geometry"].to_crs({'proj':'cea'}).area

    overlap_check["census_overlap"] = overlap_check["overlap_geometry_area"] / overlap_check["census_geometry_area"]
    overlap_check["vest_overlap"] = overlap_check["overlap_geometry_area"] / overlap_check["vest_geometry_area"]

    # Split overlaps into groups:
    # matched_precincts_overlaps contains precincts which more or less match one-to-one
    # subset_overlaps contains precincts which are likely a subset of a precinct in the other dataset
    # manual_check_overlaps contains edge cases which may need to be checked manually
    # minor_overlaps contains overlaps which make up a small portion of both precincts, which can be ignored
    matched_precinct_overlap_indices = (overlap_check["census_overlap"] > OVERLAP_CONFIRM_THRESHOLD) & (overlap_check["vest_overlap"] > OVERLAP_CONFIRM_THRESHOLD)
    matched_precinct_overlaps = overlap_check[matched_precinct_overlap_indices]

    subset_overlap_indices = (overlap_check["census_overlap"] > OVERLAP_CONFIRM_THRESHOLD) ^ (overlap_check["vest_overlap"] > OVERLAP_CONFIRM_THRESHOLD)
    subset_overlaps = overlap_check[subset_overlap_indices]

    manual_check_overlap_indices = ((overlap_check["census_overlap"] <= OVERLAP_CONFIRM_THRESHOLD) & (overlap_check["vest_overlap"] <= OVERLAP_CONFIRM_THRESHOLD) &
                                    (overlap_check["census_overlap"] >= OVERLAP_IGNORE_THRESHOLD) & (overlap_check["vest_overlap"] >= OVERLAP_IGNORE_THRESHOLD))
    manual_check_overlaps = overlap_check[manual_check_overlap_indices]

    minor_overlap_indices = (overlap_check["census_overlap"] < OVERLAP_IGNORE_THRESHOLD) & (overlap_check["vest_overlap"] < OVERLAP_IGNORE_THRESHOLD)
    minor_overlaps = overlap_check[minor_overlap_indices]

    if verbose:
        print(f"matched_precinct_overlaps: {len(matched_precinct_overlaps)}")
        print(f"subset_overlaps: {len(subset_overlaps)}")
        print(f"manual_check_overlaps: {len(manual_check_overlaps)}")
        print(f"minor_overlaps: {len(minor_overlaps)}")

    # Combine precincts which have overlapping areas
    vest_to_census_mapping = defaultdict(lambda : list())
    census_to_vest_mapping = defaultdict(lambda : list())

    for _, row in matched_precinct_overlaps.iterrows():
        census_id = row["INDEX_ID_1"]
        vest_id = row["INDEX_ID_2"]
        vest_to_census_mapping[census_id].append(vest_id)
        census_to_vest_mapping[vest_id].append(census_id)

    # Merging VEST districts into census districts
    for _, row in subset_overlaps.sort_values("census_overlap")[subset_overlaps["census_overlap"] <= 0.8].iterrows():
        census_id = row["INDEX_ID_1"]
        vest_id = row["INDEX_ID_2"]
        vest_to_census_mapping[census_id].append(vest_id)

    # Merging census districts into VEST districts
    for _, row in subset_overlaps.sort_values("vest_overlap")[subset_overlaps["vest_overlap"] <= 0.8].iterrows():
        census_id = row["INDEX_ID_1"]
        vest_id = row["INDEX_ID_2"]
        census_to_vest_mapping[vest_id].append(census_id)

    # Drop mappings in census_to_vest_mapping consisting of only one item
    for k in list(census_to_vest_mapping.keys()):
        if len(census_to_vest_mapping[k]) == 1:
            census_to_vest_mapping.pop(k)

    counts = defaultdict(lambda : 0)
    for m in vest_to_census_mapping.values():
        counts[len(m)] += 1
    print(counts)

    counts = defaultdict(lambda : 0)
    for m in census_to_vest_mapping.values():
        counts[len(m)] += 1
    print(counts)

In [26]:
merge_precinct_data(va_census, va_vest, verbose=True)

  merged_geom = block.unary_union


matched_precinct_overlaps: 2443
subset_overlaps: 44
manual_check_overlaps: 0
minor_overlaps: 1343
defaultdict(<function merge_precinct_data.<locals>.<lambda> at 0x000001AEDDF81E10>, {1: 2427, 2: 23})
defaultdict(<function merge_precinct_data.<locals>.<lambda> at 0x000001AEB9AC9C60>, {2: 8, 3: 1})


  result = super().__getitem__(key)


In [32]:
def get_precinct_data(census_df, election_df, boundary_df, verbose=False, output_format=["merged_data"]):
    '''
    Merges a state's precinct-level census dataset and census block level voting dataset.

    Parameters
    ----------
    census_df : GeoDataFrame
        GeoDataFrame containing the state's precinct-level census data. Must use GeoID column as index.
    election_df : DataFrame
        DataFrame containing the state's block-level election data. Must use GeoID column as index.
    boundary_df : GeoDataFrame
        GeoDataFrame containing the state's census block boundaries. Must use GeoID column as index.
    verbose : bool
        Indicates whether or not descriptive data for manual double-checking should be printed.
    output_format: list[str]
        Gives a list of items to be returned.
        Allowed values include:
        'merged_data' - GeoDataFrame consisting of data merged to precinct-level
        'mappings' - dict mapping each census block to a precinct
        'unused_blocks' - GeoDataFrame containing blocks which could not be mapped to a precinct (if any)
    
    Returns
    -------
    List of selected output items (or a single object if only one output is selected)
    '''

    # Merge block-level election data and block boundary data
    if not boundary_df.index.equals(election_df.index):
        raise Exception("boundary_df and election_df do not have the same index")
    merged_block_df = boundary_df.join(election_df)
    merged_block_df.rename_axis(index=["BLOCK_GEOID20"], inplace=True)

    # Join blocks onto the precinct-level DataFrame
    census_df["VTD_GEOID20"] = census_df.index
    merged_df = geopandas.sjoin(merged_block_df, census_df, how="left", predicate="covered_by")
    precinct_df.groupby("VTD_GEOID20").sum()

    # Return output
    output = list()
    for entry in output_format:
        if entry == "merged_data":
            output.append(precinct_df)
        elif entry == "mappings":
            # Create dict indicating the blocks that correspond to each district
            d = defaultdict(lambda : list())
            for index, row in precinct_df.iterrows():
                d[row["VTD_GEOID20"]].append(index)
            output.append(dict(d))
        elif entry == "unused_blocks":
            output.append(merged_df.loc[merged_df["VTD_GEOID20"].isna()])

    if len(output) == 1:
        return output[0]
    return output
