In [7]:
import os
import pandas as pd
from tqdm import tqdm
import json
from config import (
    WITHHELD_DEMOGRAPHIC_FEATURES,
    PROCESSED_DATA_DIR,
    VOTER_FILE_DIR,
    VOTER_FILE_COLUMN_MAPPING_FILE,
    ELECTION_DATES_TO_NAMES_MAPPING_FILE,
    FEATURES_FILE,
    VOTER_FILE_STR,
    ZONE_CODES_STR,
    ZONE_TYPES_STR,
    ELECTION_MAP_STR,
    TEN_PCT_SAMPLE_FILE,
    election_date_to_feature_names
)

# Data Processing

The goal of this script is to take the raw Pennsylvania voter file and process it into usable tabular data.

### Pre-processing
The dataset is split along county lines, so let's first simply gather a list of the counties in the state.

In [2]:
# Find the names of all the counties
counties: set[str] = set()
for file in sorted(os.listdir(VOTER_FILE_DIR)):
    if VOTER_FILE_STR in file:
        counties.add(file.split(" ")[0])

print(f"Found {len(counties)} counties")

Found 67 counties


The dataset also has messy election data. Some elections are specific to small areas (like special elections), and some have differing descriptions across counties. We'll deal with all this later, but for now, let's find all the unique election dates and the different descriptions they have across counties.

In [5]:
# Track the dates of elections, and their corresponding descriptions
date_to_names: dict[str, set[str]] = {}
for file in os.listdir(VOTER_FILE_DIR):
    if ELECTION_MAP_STR in file:
        election_map = pd.read_csv(
            os.path.join(VOTER_FILE_DIR, file),
            sep="\t",
            encoding="unicode_escape",
            header=None,
        )

        # Store the description of each election for this county
        for _, row in election_map.iterrows():
            if row[3] not in date_to_names:
                date_to_names[row[3]] = {row[2]}
            else:
                date_to_names[row[3]].add(row[2])

date_to_names = {date: list(date_to_names[date]) for date in date_to_names}
json.dump(
    date_to_names,
    open(ELECTION_DATES_TO_NAMES_MAPPING_FILE, "w"),
    indent=4,
)

print(f"Found {len(date_to_names)} different election dates, with a minimum of {min([len(date_to_names[date]) for date in date_to_names])} and a maximum of {max([len(date_to_names[date]) for date in date_to_names])} descriptions")

Found 83 different election dates, with a minimum of 1 and a maximum of 27 descriptions


Next, we need to define the actual columns we'll use in our final dataset. The columns are contain both demographic data and election history for each voter. So, let's define the specific column names and make the edits we need (like converting to one-hot encoding, adding feature presence columns, etc).

In [6]:
columns_template = pd.read_csv(VOTER_FILE_COLUMN_MAPPING_FILE, header=0)

# Define the column names which will store election information
election_column_names = []
for election in date_to_names.keys():
    election_column_names.extend(election_date_to_feature_names(election))

# Only include up to the column named District 40
raw_column_names = columns_template["Field Description"].tolist()
demographic_columns = [
    col
    for col in raw_column_names
    if "Election" not in col
    and "District" not in col
    and col not in WITHHELD_DEMOGRAPHIC_FEATURES
]

# Replace some specific columns with one-hot encodings, and add feature presence columns where needed
demographic_columns.remove("Gender")
demographic_columns.append("Gender M")
demographic_columns.append("Gender F")
demographic_columns.append("Gender U")

demographic_columns.remove("Party Code")
demographic_columns.append("Party D")
demographic_columns.append("Party R")
demographic_columns.append("Party I")

demographic_columns.remove("Last Vote Date")
demographic_columns.append("Last Vote Date Presence")
demographic_columns.append("Last Vote Date")

with open(FEATURES_FILE, "w") as f:
    json.dump(
        {
            "demographic": demographic_columns,
            "elections": election_column_names,
        },
        f,
        indent=4,
    )

all_column_names = demographic_columns + election_column_names

### Reading

Now, we're ready to actually read and process the county data. The code is well documented, but at a high level, we read in a county, extract all its files from the raw data, align the demographic and election data columns to the ones we've defined above, and write the county's data to a csv file.

We don't aggregate all the counties together, since it would use too much RAM. With about 10,000,000 voters, the dataset would take 6 - 12 GB of RAM. Instead, we'll ensure alignment of the columns and then save them individually, so we can easily combine them later if needed.

In [None]:
for county in tqdm(
    sorted(counties), desc="Processing counties", unit="county", colour="green"
):
    # region Load the data
    voter_data = election_map = zone_codes = zone_types = None
    for file in os.listdir(VOTER_FILE_DIR):
        if county in file and VOTER_FILE_STR in file:
            voter_data = pd.read_csv(
                os.path.join(VOTER_FILE_DIR, file),
                sep="\t",
                encoding="unicode_escape",
                header=None,
                dtype=str,
            )
        if county in file and ELECTION_MAP_STR in file:
            election_map = pd.read_csv(
                os.path.join(VOTER_FILE_DIR, file),
                sep="\t",
                encoding="unicode_escape",
                header=None,
            )

            # Map the election number to a date in the elections
            county_elections = {}
            for _, row in election_map.iterrows():
                date = row[3]
                if date in date_to_names:
                    county_elections[row[1]] = date
                else:
                    print(
                        f"Skipping election {row[1]} in county {county} due to missing data"
                    )

        if county in file and ZONE_CODES_STR in file:
            zone_codes = pd.read_csv(
                os.path.join(VOTER_FILE_DIR, file),
                sep="\t",
                encoding="unicode_escape",
                header=None,
            )
        if county in file and ZONE_TYPES_STR in file:
            zone_types = pd.read_csv(
                os.path.join(VOTER_FILE_DIR, file),
                sep="\t",
                encoding="unicode_escape",
                header=None,
            )

    if voter_data is None or county_elections is None:
        print(f"Skipping county {county} due to missing data")
        continue
    # endregion

    voter_data.columns = columns_template["Field Description"].tolist()

    aligned_voter_data = pd.DataFrame(columns=all_column_names)

    # region Migrate the demographic columns
    for col in voter_data.columns:
        # Ignore the election columns
        if "Election" in col:
            continue

        elif col == "Gender":
            aligned_voter_data["Gender M"] = (voter_data[col] == "M").astype(int)
            aligned_voter_data["Gender F"] = (voter_data[col] == "F").astype(int)
            aligned_voter_data["Gender U"] = (~voter_data[col].isin(["M", "F"])).astype(
                int
            )

        elif col == "Party Code":
            aligned_voter_data["Party D"] = (voter_data[col] == "D").astype(int)
            aligned_voter_data["Party R"] = (voter_data[col] == "R").astype(int)
            aligned_voter_data["Party I"] = (~voter_data[col].isin(["D", "R"])).astype(
                int
            )

        # Scale all dates to 0-1 (using 1900 to today as range)
        elif col in [
            "DOB",
            "Registration Date",
            "Status Change Date",
            "Date Last Changed",
        ]:
            date = pd.to_datetime(voter_data[col], errors="coerce", format="%m/%d/%Y")
            today = pd.to_datetime("today")
            start_date = pd.to_datetime("1900-01-01")

            # If date is before start or after today, set to start or today
            date = date.where(date > start_date, start_date)
            date = date.where(date < today, today)

            # In these columns, there are very few missing values, so we can just fill them with the start date
            date = date.fillna(start_date)

            aligned_voter_data[col] = round(
                (date - start_date) / (today - start_date), 3
            )

        # Scale all dates (which may or may not exist) to 0-1
        elif col == "Last Vote Date":
            date = pd.to_datetime(voter_data[col], errors="coerce", format="%m/%d/%Y")
            today = pd.to_datetime("today")
            start_date = pd.to_datetime("1900-01-01")

            # If date is before start or after today, set to start or today
            date = date.where(date > start_date, start_date)
            date = date.where(date < today, today)

            aligned_voter_data["Last Vote Date Presence"] = (~date.isna()).astype(int)

            date = date.fillna(start_date)

            aligned_voter_data["Last Vote Date"] = round(
                (date - start_date) / (today - start_date), 3
            )

        elif col == "Voter Status":
            aligned_voter_data[col] = (voter_data[col] == "A").astype(int)

        # Migrate the non-custom columns
        elif col in all_column_names:
            aligned_voter_data[col] = voter_data[col]

        elif col in WITHHELD_DEMOGRAPHIC_FEATURES or "District" in col:
            continue

        else:
            print(f"Found a missing column {col} in county {county}")
            aligned_voter_data[col] = None
    # endregion

    added_dates: set[str] = set()

    # region Migrate the election columns
    for index, election_date in county_elections.items():
        if election_date in added_dates:
            # This election was already added, so merge the features with the existing ones
            aligned_voter_data[
                f"Election {election_date} Party D"
            ] = aligned_voter_data[f"Election {election_date} Party D"].combine_first(
                voter_data[f"Election {index} Party"] == "D"
            )
            aligned_voter_data[
                f"Election {election_date} Party R"
            ] = aligned_voter_data[f"Election {election_date} Party R"].combine_first(
                voter_data[f"Election {index} Party"] == "R"
            )
            aligned_voter_data[
                f"Election {election_date} Party I"
            ] = aligned_voter_data[f"Election {election_date} Party I"].combine_first(
                ~voter_data[f"Election {index} Party"].isin(["D", "R"])
            )
            aligned_voter_data[f"Election {election_date} Voted"] = aligned_voter_data[
                f"Election {election_date} Voted"
            ].combine_first(
                voter_data[f"Election {index} Vote Method"]
                .isin(["AP", "MB", "AB", "P"])
                .astype(int)
            )
            aligned_voter_data[
                f"Election {election_date} By Mail"
            ] = aligned_voter_data[f"Election {election_date} By Mail"].combine_first(
                voter_data[f"Election {index} Vote Method"]
                .isin(["MB", "AB"])
                .astype(int)
            )
            continue

        aligned_voter_data[f"Election {election_date} Party D"] = (
            voter_data[f"Election {index} Party"] == "D"
        ).astype(int)
        aligned_voter_data[f"Election {election_date} Party R"] = (
            voter_data[f"Election {index} Party"] == "R"
        ).astype(int)
        aligned_voter_data[f"Election {election_date} Party I"] = (
            ~voter_data[f"Election {index} Party"].isin(["D", "R"])
        ).astype(int)
        aligned_voter_data[f"Election {election_date} Voted"] = (
            voter_data[f"Election {index} Vote Method"]
            .isin(["AP", "MB", "AB", "P"])
            .astype(int)
        )
        aligned_voter_data[f"Election {election_date} By Mail"] = (
            voter_data[f"Election {index} Vote Method"].isin(["MB", "AB"]).astype(int)
        )
        aligned_voter_data[f"Election {election_date} Presence"] = 1

        added_dates.add(election_date)
    # endregion

    # For all the elections that are not present, set the presence to 0
    for election_date in date_to_names.keys():
        if election_date not in added_dates:
            aligned_voter_data[f"Election {election_date} Presence"] = 0

    # Save the csv
    aligned_voter_data.to_csv(
        f"{PROCESSED_DATA_DIR}/{county}_voter_data.csv", index=False
    )


### Sampling

Finally (and this part can be run independently of the rest of the script if needed), let's sample all the data to get a smaller dataset to work with. Edit the `SAMPLE_RATE` constant at the top of the code to change the proportion. This is how we'll do our EDA, and if needed, training.

Note that this will take a while to run, since it involves reading in all the data and combining it. Also beware to limit the sample rate: if it were 1.0, it would take up 6 - 12 GB of RAM as mentioned above (since it would be the full dataset).

In [None]:
SAMPLE_RATE = 0.1

dataframes = []

for file in tqdm(
    os.listdir(PROCESSED_DATA_DIR), desc="Reading files", unit="file", colour="green"
):
    if file.endswith(".csv"):
        if file == TEN_PCT_SAMPLE_FILE:
            # If this script was already run, skip the file
            continue
        else:
            df = pd.read_csv(os.path.join(PROCESSED_DATA_DIR, file))

            # Take a random sample of 10% of the rows
            df = df.sample(frac=SAMPLE_RATE, random_state=42)
            dataframes.append(df)

print("Combining dataframes (this could take a while)...")
df = pd.concat(dataframes, axis=0, ignore_index=True, sort=False)
df.to_csv(TEN_PCT_SAMPLE_FILE, index=False)