<a href="https://colab.research.google.com/github/drscook/redistricting_wrangler/blob/main/redistricting_2025_10.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Advances in Data Engineering for Redistricting

Scott Cook

Tarleton State Univ

2025-10-13

#Background

##Geography

Consider [Geographic Hierarchy](https://www2.census.gov/geo/pdfs/reference/geodiagram.pdf). We need data from 4 sources that live at different levels:
- 2020 Decennial Census PL94-171
    - level: block (only dataset at this smallest level)
    - https://www.census.gov/programs-surveys/decennial-census/about/rdo/summary-files.html
    - For convenience, we're actually pulling from TX Legislative Council: https://data.capitol.texas.gov/dataset/2020-census-geography    
- American Community Surveys (ACS)
    - level: some block group, some tract
    - https://www.census.gov/programs-surveys/acs/data.html
    - Like an annual mini-Census
    - Use 5 year estimates (more stable)
- Citizen Voting Age Population (CVAP) special tabulation
    - level: block group
    - https://www.census.gov/programs-surveys/decennial-census/about/voting-rights/cvap.html
    - Not a part of decennial census or ACS
- Elections
    - level: [voting tabulation district (VTD)](https://data.capitol.texas.gov/dataset/vtds)
    - https://data.capitol.texas.gov/dataset/comprehensive-election-datasets-compressed-format

##Problem

While Census-controlled levels are nested, VTDs only respect blocks. In other words, a block lies entirely within exactly one VTD. But a block group or tract might be split across mutliple VTDs (and conversely).

##Solution

Push all data down to common refinement (pieces that do not cross any boundary) then aggregate back up to desired level
1. Fetch raw data from relevant sources
1. Geospatially intersect (overlay) block and VTD geometries to create *pieces* (essentially blocks with slight correction for geospatial misalignments)
1. Apportion data to pieces proportional to voting age population from 2020 Census (recall 2020 Census is the only data at block level)


##Links

- Census
    - API Key: https://api.census.gov/data/key_signup.html
    - Geographic Hierarchy: https://www2.census.gov/geo/pdfs/reference/geodiagram.pdf
    - Geoids: https://www.census.gov/programs-surveys/geography/guidance/geo-identifiers.html
    - American Community Survey (ACS): https://www.census.gov/programs-surveys/acs/data.html
    - ACS variables: https://api.census.gov/data/2023/acs/acs5/variables.html
    - Citizen Voting Age Population (CVAP): https://www.census.gov/programs-surveys/decennial-census/about/voting-rights/cvap.html
    - Block shapefiles: https://www2.census.gov/geo/tiger/TIGER2020/TABBLOCK20/
    - Python: https://pypi.org/project/census/

- Texas Legislative Council
    - Voting Tabulation Districts (VTD): https://data.capitol.texas.gov/dataset/vtds
    - Elections: https://data.capitol.texas.gov/dataset/comprehensive-election-datasets-compressed-format
    - PL94-171: https://data.capitol.texas.gov/dataset/2020-census-geography
    - School Districts (not used here): https://data.capitol.texas.gov/dataset/school-districts

- Other
    - CRS:
        - https://docs.qgis.org/3.40/en/docs/gentle_gis_introduction/coordinate_reference_systems.html
        - https://epsg.io/3085
        - https://epsg.io/4269
    - Tarrant precincts: https://www.tarrantcountytx.gov/en/elections/interactive-maps/commissioner-precinct-maps.html

- Quesion: Did Andrea find block assignment files for on TX Lege data portal? If so, where? Census has BAF for congressional, senate, and house here, but I think she has a source with more (school districts, judicial, etc).



#Setup

- Request Census API key
    - https://api.census.gov/data/key_signup.html
    - path into api_key = '___' below
- Run cells below once at the start of session
    - installs packages
    - ignore pop-up about "session crash for unknown reason"
    - mounts your google drive to save data
        - need to confirm through several pop-ups    
        - click folder icon on left (under key icon)
        - navigate drive > MyDrive to where you want to save data
        - hover over it, click 3 dots on right, click "copy path"
        - paste into root = pathlib.Path('___') below

In [None]:
#run once to begin each session
%pip install -U ipython-autotime Census us
from IPython import get_ipython
get_ipython().kernel.do_shutdown(restart=True)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

#Code

In [None]:
#districts - done
#adjacency matrix - done
#link to ACS variable list - done (in Links section)
#parquet to csv function - done
#universe variable - done
#list of offices - done (optional argument to get_elections)

%reload_ext autotime
import pathlib, requests, zipfile, pickle, warnings, census, us, pyarrow.parquet as pq
import numpy as np, pandas as pd, scipy as sp, geopandas as gpd, networkx as nx
from shapely.strtree import STRtree
root = pathlib.Path('/content/drive/MyDrive/redistricting_2025_10')/'data'
api_key = '5640e76608e24d8d6cc35b96ce35028445957cb5'
session = census.Census(api_key)
pd.set_option('display.max_columns', None)
warnings.filterwarnings("ignore", message="Allowing arbitrary scalar fill_value in SparseDtype is deprecated", category=FutureWarning)

#################### helpers ####################
def prep(df):
    return df.convert_dtypes().rename(columns=lambda x: x.strip().lower().replace(' ','_').replace('-','_'))


def dump(file, obj, **kwargs):
    """write obj to file"""
    file.parent.mkdir(parents=True, exist_ok=True)  #make parent directory
    #write using method associated to file.suffix
    if file.suffix == '.parquet':
        prep(obj).to_parquet(file, **kwargs)
    elif file.suffix in ['.csv', '.txt']:
        prep(obj).to_csv(file, **kwargs)
    elif file.suffix == '.npz':
        numpy.savez_compressed(file, obj, **kwargs)
    else:
        with open(file, 'wb') as f:
            pickle.dump(obj, f, pickle.HIGHEST_PROTOCOL, **kwargs)
    return obj


def load(file, **kwargs):
    """read obj from file"""
    if file.suffix == '.parquet':
        if any(col.type=='binary' for col in pq.ParquetFile(file).schema_arrow):  #if parquet contains geometry columns, use geopandas
            return prep(gpd.read_parquet(file, **kwargs))
        else:
            return prep(pd.read_parquet(file, **kwargs))
    elif file.suffix in ['.csv', '.txt']:
        return prep(pd.read_csv(file, **kwargs))
    elif file.suffix == '.npz':
        return np.load(file, allow_pickle=True, **kwargs)
    else:
        try:
            return prep(gpd.read_file(file, **kwargs))  #try geopandas to test is file is geospatial (like shapefile)
        except:
            with open(file, 'rb') as f:
                return pickle.load(f, **kwargs)


def parquet_to_csv(src):
    assert src.suffix == '.parquet'
    dst = src.with_suffix('.csv')
    df = load(src)
    dump(dst, df)
    return df


def fetch(file, url, unzip=True):
    """fetch data from url and write to file"""
    if not file.exists():
        print(f'fetching {url} to {file}')
        with requests.get(url, stream=True) as response:  #request url:
            response.raise_for_status()
            file.parent.mkdir(parents=True, exist_ok=True)  #make parent directory
            with open(file, 'wb') as f:  #pull data streaming
                for chunk in response.iter_content(chunk_size=8192):
                    if chunk:
                        f.write(chunk)
        if unzip and zipfile.is_zipfile(file):  #unzip
            with zipfile.ZipFile(file, 'r') as f:
                f.extractall(file.parent)
    return file


def move_col(df, col='geometry', loc=-1):
    """move col to position loc"""
    df.insert(loc+df.shape[1]*(loc<0), col, df.pop(col))
    return df


def simplify(df, tolerance=0):
    return df.assign(geometry=df.geometry.buffer(0))#.simplify(tolerance, preserve_topology=True))


geoid_structure = {'state':2, 'county':3, 'tract':6, 'block_group':1, 'block':4}
def make_geoid(df, level):
    """make geoid from separate columns"""
    df['geoid'] =''
    g = lambda col, width: df.pop(col).astype(str).str.rjust(width, '0')  #prepend leading 0's to ensure correct string length
    for col, width in geoid_structure.items():
        df['geoid'] += g(col, width)  #iteratively append to existing geoid
        if col == level:  #stop at level and rename geoid to level
            df[level] = df.pop('geoid').astype('Int64')
            return df


def get_paths(stem):
    path = root/stem
    dst = path/f'{stem.replace('/', '_')}.parquet'
    src = dst.with_suffix('.zip')
    return path, dst, src
#################### data ####################

def get_block():
    """fetch & process block geometries"""
    path, dst, src = get_paths(f'block/2020/{state.abbr}')
    if not dst.exists():
        fetch(src, f'https://www2.census.gov/geo/tiger/TIGER2020/TABBLOCK20/tl_2020_{state.fips}_tabblock20.zip', unzip=False)  #download raw data
        print(f'creating {dst}')
        gdf = load(src, columns=['GEOID20'])
        gdf['block'] = gdf.pop('geoid20').astype('Int64')
        dump(dst, gdf)
    return load(dst)


def get_vtd():
    """fetch & process voting tabulation district geometries - only works for TX"""
    path, dst, src = get_paths(f'vtd/{tx_lege_year}/{state.abbr}')
    if not dst.exists():
        fetch(src, f'https://data.capitol.texas.gov/dataset/4d8298d0-d176-4c19-b174-42837027b73e/resource/906f47e4-4e39-4156-b1bd-4969be0b2780/download/vtds_pg.zip', unzip=False)  #download raw data
        print(f'creating {dst}')
        gdf = load(src, columns=['VTDKEY'])
        dump(dst, gdf)
    return load(dst)


def get_district(district):
    """fetch & process district block equivalency"""
    path, dst, src = get_paths(f'district/{district}/{state.abbr}')
    if not dst.exists():
        url = 'https://data.capitol.texas.gov/dataset/b806b39a-4bab-4103-a66a-9c99bcaba490/resource/c3f03464-e320-4d7f-b528-1883bd82cfb2/download/planc2193_blk.zip' if district == 'congress' else \
              'https://data.capitol.texas.gov/dataset/70836384-f10c-423d-a36e-748d7e000872/resource/0af4cde1-f651-4e9f-aded-a47a88cb5548/download/plans2168_blk.zip' if district == 'senate' else \
              'https://data.capitol.texas.gov/dataset/71af633c-21bf-42cf-ad48-4fe95593a897/resource/fbba5db7-0b1a-4eee-8d65-375c4d092b62/download/planh2316_blk.zip' if district == 'house' else None
        fetch(src, url)
        print(f'creating {dst}')
        df = load(src.parent/f'{url.split("/")[-1].split("_")[0].upper()}.csv')
        df.columns = ['block', district]
        dump(dst, df)
    return load(dst)


def get_pl94():
    """fetch & process PL94-171 - only works for TX (currently)"""
    path, dst, src = get_paths(f'pl94/2020/{state.abbr}')
    if not dst.exists():
        fetch(src, f'https://data.capitol.texas.gov/dataset/2b59f5ce-5fa4-4040-a550-caffbe8986c4/resource/33ed77c5-951b-4a88-9de7-9c90c4ba50db/download/blocks_pop.zip')  #download raw data
        print(f'creating {dst}')
        repl = {'sctbkey':'block', 'fename':'county'} | {k:f'{k}_2020' for k in ['anglo','asian','hisp','total','vap','black','bh','nanglo','anglovap','hispvap','bhvap','blackvap','asianvap','nanglovap']}
        df = load(src.parent/'Blocks_Pop.txt')[repl.keys()].rename(columns=repl)
        dump(dst, df)
    return load(dst)


def get_piece():
    """intersect geometries into pieces that do not cross any boundary"""
    path, dst, src = get_paths(f'piece/2024/{state.abbr}')
    if not dst.exists():
        P = get_pl94()
        V = simplify(get_vtd())
        B = simplify(get_block().to_crs(V.crs))
        print(f'creating {dst}')
        #according to Texas Legislative Council, vtds are specifically created so each block is wholly contained in exactly 1 vtd
        #however, there can be tiny precision errors, so we assign each block to the vtd with largest intersection by area
        I = gpd.overlay(V, B, keep_geom_type=True)  #intersect
        I = I.assign(area=I.area).sort_values('area').groupby('block')['vtdkey'].last().reset_index()  #for each block, find vtd with largest intersection by area
        gdf = B.merge(I)
        gdf['block_group'] = gdf['block']//1000
        gdf['tract'] = gdf['block']//10000
        for district in ['congress','senate','house']:
            gdf = gdf.merge(get_district(district))  #merge districts
        gdf = move_col(gdf.merge(P))  #merge pl94-171 data
        dump(dst, gdf)
    return load(dst).set_index(['block','block_group','tract','vtdkey',*districts])  #include additional districts here too


def get_multiplier(level):
    """compute multipliers to apportion from level to blocks via universe"""
    path, dst, src = get_paths(f'multiplier/2020/{state.abbr}/{level}/{universe}')
    if not dst.exists():
        P = get_piece()
        print(f'creating {dst}')
        #for each piece, compute its universe / total universe in the unit that contains it
        T = P.groupby(level)[universe].transform('sum').clip(1)  #we will get 0/0 errors below iff ALL pieces in a given unit have universe = 0; clip(1) applies lower bound = 1 to the unit's value to avoid
        df = (P[universe]/T).rename('multiplier').to_frame()
        dump(dst, df)
    return load(dst)


def apportion(data, level):
    """apportion data from level to blocks"""
    data = data.set_index(level)
    df = get_multiplier(level).join(data)  #join multiplier to data
    df *= df.pop('multiplier').to_frame().values  #pop multiplier and use it to multiply all remaining columns
    # check that we recover the original values by aggregating back to original level
    chk = data - df.groupby(level).sum()
    assert np.max(np.abs(chk) < 0.001)
    return df.squeeze()


def get_acs_yr_code(yr, code):
    """fetch & process 1 acs variable for 1 year"""
    path, dst, src = get_paths(f'acs/{yr}/{state.abbr}/{code}')
    if not dst.exists():
        print(f'creating {dst}')
        #detect whether this acs variable is available at block_group level; if not settle for tracts
        test = session.acs5.state_county_blockgroup(code, state.fips, '003', '*', year=yr)  #pull 1 county at block_group level
        level = 'block_group' if any(t[code] is not None for t in test) else 'tract'  #any t[code] is not None <=> available for block_group,
        #fetch from census api using their python package at the level determined above
        if level == 'block_group':
            raw = session.acs5.state_county_blockgroup(code, state.fips, '*', '*', year=yr)
        else:
            raw = session.acs5.state_county_tract(code, state.fips, '*', '*', year=yr)
        df = make_geoid(prep(pd.DataFrame(raw)), level)
        dump(dst, df)
    df = load(dst)
    level = df.columns[-1]
    return apportion(df, level)


def get_acs_yr(yr, acs_variables):
    """fetch & process all acs variables for 1 year"""
    L = [get_acs_yr_code(yr, code).rename(f'{alias}_{yr}') for code, alias in acs_variables]
    return pd.concat(L, axis=1)


def get_cvap_yr(yr):
    """fetch & process cvap for 1 year"""
    path, dst, src = get_paths(f'cvap/{yr}')
    level = 'block_group'
    if not dst.exists():
        fetch(src, f'https://www2.census.gov/programs-surveys/decennial/rdo/datasets/{yr}/{yr}-cvap/CVAP_{yr-4}-{yr}_ACS_csv_files.zip')  #download raw data
        print(f'creating {dst}')
        df = load(src.parent/'BlockGr.csv', encoding='latin1')  #load relevant file
        df[level] = df['geoid'].str[-12:].astype('Int64')  #create block_group identifier columns
        df = df.pivot_table(index=level, columns='lntitle', values=['cit_est','cvap_est'], fill_value=0)  #pivot long to wide so each row is a blkgrp with colunms for each citizen & cvap variable
        df.columns = [f'{k[:-4]}_{v}_{yr}' for k,v in df.columns]  #clean up after pivot
        dump(dst, df.reset_index())
    return apportion(load(dst), level)


def get_election(print_offices=0):
    """get election results - only works for TX"""
    path, dst, src = get_paths(f'election/{tx_lege_year}/{state.abbr}')
    if not dst.exists():
        fetch(src, f'https://data.capitol.texas.gov/dataset/35b16aee-0bb0-4866-b1ec-859f1f044241/resource/e1cd6332-6a7a-4c78-ad2a-852268f6c7a2/download/general-vtds-election-data.zip')  #download raw data
        print(f'creating {dst}')
        L = [load(file, usecols=['vtdkeyvalue','Office','Name','Party','Votes']).assign(year=int(file.stem[:4])) for file in src.parent.iterdir() if 'General_Election_Returns' in file.stem and 'City' not in file.stem]
        df = pd.concat(L).rename(columns={'vtdkeyvalue':level})
        for k in ['office','name']:
            df[k] = df[k].str.replace('_',' ').str.replace('.', '')  #process strings
        dump(dst, df)
    df = load(dst)
    if print_offices:
        display(df['office'].value_counts().head(print_offices))
    return df


def get_vote(years, offices):
    """process and filter election results - only works for TX"""
    level = 'vtdkey'
    df = (
        get_election()
        .query(f'year in @years and office in @offices and party in ["D","R"]', engine='python')  # keep elections for specified offices in specified years for democrats and republicans
        .assign(candidate=lambda X: X['office']+'_'+X['name']+'_'+X['party']+'_'+X['year'].astype('string'))  # concat information as column names for pivot
        .pivot_table(index=level, columns='candidate', values='votes', fill_value=0)  # pivot long to wide so each row is a vtd with colunms for each election
        .rename_axis(columns=None).reset_index()  # clean up after pivot
        )
    return apportion(df, level)


def get_merged(years, offices, acs_variables):
    """merge all data into pieces"""
    path, dst, src = get_paths(f'merged/{state.abbr}/piece')
    if not dst.exists():
        L = [
            *[get_acs_yr(yr, acs_variables) for yr in years],
            *[get_cvap_yr(yr).filter(like='cvap') for yr in years],
            get_vote(years, offices),
            get_piece(),
        ]
        print(f'creating {dst}')
        gdf = gpd.GeoDataFrame(pd.concat(L, axis=1), geometry='geometry')
        dump(dst, gdf)
    return load(dst)


def get_aggegrated(level):
    """aggregate pieces to level"""
    path, dst, src = get_paths(f'merged/{state.abbr}/{level}')
    if not dst.exists():
        global pieces, years, offices, acs_variables
        try:
            pieces
        except:  #create pieces in globals if it does not already exist
            pieces = get_merged(years, offices, acs_variables)
        print(f'creating {dst}')
        if level == 'block':
            gdf = pieces
        else:
            aggfunc = {k:'last' for k in districts} | {k:'sum' for k in pieces.columns.drop('geometry')}
            gdf = move_col(pieces.reset_index(districts).sort_values(universe).dissolve(level, aggfunc, method='coverage'))
        dump(dst, gdf)
    return load(dst)


def get_graph(gdf, rook=False):
    """compute adjacency matrix and dual graph"""
    level = gdf.index.names[0]
    path, dst, src = get_paths(f'merged/{state.abbr}/{level}')
    dst = dst.with_name(f'{dst.stem}_adjacency_{"rook" if rook else "queen"}.npz')
    if not dst.exists():
        print(f'creating {dst}')
        def check_corner(x, y):
            """return True if intersection contain a line segment (not a corner)"""
            if rook:  # using rook adjacency so corners are not okay
                g = x.intersection(y)
                return g.geom_type in ('LineString', 'MultiLineString') or (g.geom_type == 'GeometryCollection' and any(h.geom_type in ('LineString','MultiLineString') for h in g.geoms))
            else:  # using queen adjacency so corners are okay
                return True
        tree = STRtree(gdf.geometry)
        #list of geoms that touch
        adj = [[i,j] for i, x in enumerate(gdf.geometry) for j in tree.query(x, predicate='touches') if i < j and check_corner(x, gdf.geometry.iloc[j])]
        #build adjacency as scipy sparse matrix - standard dense matrix exceeds memory at block level
        rows, cols = np.array(adj).T.tolist()
        adj = sp.sparse.csr_matrix((np.ones(len(rows)*2, dtype=bool), [rows+cols, cols+rows]), shape=(gdf.shape[0], gdf.shape[0]))
        sp.sparse.save_npz(dst, adj)
    adj = sp.sparse.load_npz(dst)
    #create networkx graph
    try:
        G = nx.from_scipy_sparse_array(adj, nodelist=gdf.index)  #should replace 2 lines below for networkx >= 3.6 (future release)
    except:
        G = nx.from_scipy_sparse_array(adj)
        G = nx.relabel_nodes(G, dict(enumerate(gdf.index)))
    nx.set_node_attributes(G, gdf.drop(columns="geometry").to_dict("index"))
    return G, pd.DataFrame.sparse.from_spmatrix(adj, index=gdf.index, columns=gdf.index)


state = us.states.TX
tx_lege_year = 2024
districts = ['county', 'congress', 'senate', 'house']
universe = 'vap_2020'  #variable used to apportion data onto pieces

years = [2020, 2021, 2022, 2023]
acs_variables = [
    ['B01001_001E' , 'pop_total'],
    ['B01001I_001E', 'hisp_total'],
]
offices = [
    'President',
    'US Sen',
    'Governor',
    'Lt Governor',
    'Attorney Gen',
]
# get_election(print_offices=30)  #print_offices > 0 displays names of offices

In [None]:
level = 'tract'
gdf = get_aggegrated(level)
G, adj = get_graph(gdf)#, rook=True)

In [None]:
for level in [
    # 'congress',
    # 'senate',
    # 'house',
    # 'county',
    # 'tract',
    # 'block_group',
    'block',
    ]:
    print(level)
    gdf = get_aggegrated(level)
    for rook in [True, False]:
        G, adj = get_adjacency(gdf, rook)
