In [5]:
import os
import json
import joblib
import pandas as pd
from notebook_helper import setup_repo_path
setup_repo_path()
from matching.loaders.load_attempts import load_attempts, filter_dataframe_by_columns
from matching.loaders.load_voterfile import load_voterfile_chunk
from matching.candidates.block_candidates import block_candidate_pairs
from matching.features.feature_builder import add_features
from matching.modeling.predict import predict_chunk
from matching.gold.gold_pairs import add_normalized_keys

PROGRESS_FILE = "progress.json"

def load_progress():
    if os.path.exists(PROGRESS_FILE):
        with open(PROGRESS_FILE, "r") as f:
            return json.load(f)
    else:
        return {}

def save_progress(progress):
    with open(PROGRESS_FILE, "w") as f:
        json.dump(progress, f)

def save_chunk_results(candidates_df, chunk_idx, output_dir="/Users/borismartinez/Documents/GitHub/engage/chunk_folder/chunk_prototype"):
    os.makedirs(output_dir, exist_ok=True)
    filename = os.path.join(output_dir, f"predicted_matches_chunk_{chunk_idx}.csv")
    candidates_df.to_csv(filename, index=False)
    print(f"Chunk {chunk_idx}: Saved {len(candidates_df)} rows to {filename}")

def stream_inference(attempts_path, db_engine, sql, model_path, chunksize=5000):
    # Load full attempts dataframe first
    attempts = load_attempts(attempts_path)

    # Rename columns consistently
    attempts = attempts.rename(columns={
        "first_name": "first_name_att",
        "last_name": "last_name_att",
        "date_of_birth": "dob_raw_att",
        "voting_zipcode": "zip_raw_att",
    })

    # Apply required columns filter
    required_columns = ["first_name_att", "last_name_att"]
    attempts = filter_dataframe_by_columns(attempts, required_columns)
    print(f"Attempts after required columns filter: {len(attempts)}")

    # Apply upload_time filter
    attempts = attempts[attempts["upload_time"].astype(str).str.contains("2024", na=False)]
    print(f"Attempts after 'upload_time' filter: {len(attempts)}")
    
    # Sample 100 distinct registration_form_id values
    unique_ids = attempts['registration_form_id'].unique()
    sampled_ids = pd.Series(unique_ids).sample(n=100, random_state=42).values

    # Filter attempts to keep only those sampled IDs
    attempts = attempts[attempts['registration_form_id'].isin(sampled_ids)]
    print(f"Sampled attempts rows after filtering by distinct registration_form_id (count may be >100 due to duplicates): {len(attempts)}")
    print(f"Distinct registration_form_id in sampled attempts: {attempts['registration_form_id'].nunique()}")
    print(f"Sample registration_form_id: {attempts['registration_form_id'].unique()}")

    # Continue with rest of processing (load model, process chunks, etc.)
    model = joblib.load(model_path)
    
    # Load or initialize progress tracking
    progress = load_progress()

    chunks_processed = 0
    total_chunks = None  # Optional: set if you want to know total chunks

    # Iterate over voter file chunks from DB
    for idx, vf_chunk in enumerate(load_voterfile_chunk(db_engine, sql, chunksize=chunksize)):
        if str(idx) in progress:
            print(f"Chunk {idx}: already processed, skipping")
            continue

        print(f"Chunk {idx}: processing ...")

        # Rename columns in voterfile chunk
        vf_chunk = vf_chunk.rename(columns={
            "first_name": "first_name_vf",
            "last_name": "last_name_vf",
            "residence_zipcode": "zip_raw_vf",
            "birth_date": "dob_raw_vf",
        })

        # Normalize keys in both attempts and voterfile chunks
        attempts_norm, vf_chunk_norm = add_normalized_keys(attempts, vf_chunk)

        # Generate candidate pairs by blocking
        candidates = block_candidate_pairs(attempts_norm, vf_chunk_norm)
        print(f"Chunk {idx}: Generated {len(candidates)} candidate pairs")
        print(f"Chunk {idx}: Unique registration_form_id in candidates: {candidates['registration_form_id'].nunique() if not candidates.empty else 0}")

        # Skip empty candidate chunks
        if candidates.empty:
            print(f"Chunk {idx}: No candidate pairs generated, skipping feature build and prediction")
            progress[str(idx)] = True
            save_progress(progress)
            continue

        # Compute features for candidate pairs
        X, _ = add_features(candidates)
        print(f"Chunk {idx}: Features computed for {X.shape[0]} candidates")

        # Predict match probabilities using the loaded model
        probs = predict_chunk(model, X)
        candidates['match_prob'] = probs
        print(f"Chunk {idx}: Predictions made")

        # Save chunk results and update progress
        save_chunk_results(candidates, idx)
        progress[str(idx)] = True
        save_progress(progress)
        chunks_processed += 1

    print(f"Processing complete. Total chunks processed: {chunks_processed}")

    # Optional: clear progress file if all done
    # if total_chunks and chunks_processed == total_chunks:
    #     os.remove(PROGRESS_FILE)
    #     print("All chunks processed. Progress file deleted.")


Repo root added to sys.path: /Users/borismartinez/Documents/GitHub/engage


In [7]:
import os
import matching.utils.db as db

db_engine = db.get_engine()
attempts_path = "/Users/borismartinez/Documents/GitHub/engage/data/vr_blocks_export_no_na.csv"
sql = "SELECT * FROM voterfile.election_detail_2024 WHERE county = 'DAD'"

# Correct: model_path should point to your trained model file, NOT metrics JSON
model_path = "/Users/borismartinez/Documents/GitHub/engage/models/xgboost_model.pkl"

chunksize = 5000
output_dir = "/Users/borismartinez/Documents/GitHub/engage/chunk_folder/chunk_prototype"

if os.path.exists(output_dir):
    for f in os.listdir(output_dir):
        if f.startswith("predicted_matches_chunk_prototype") and f.endswith(".csv"):
            os.remove(os.path.join(output_dir, f))
else:
    os.makedirs(output_dir)

stream_inference(
    attempts_path=attempts_path,
    db_engine=db_engine,
    sql=sql,
    model_path=model_path,
    chunksize=chunksize
)

  return pd.read_csv(path)


Attempts after required columns filter: 51008
Attempts after 'upload_time' filter: 25570
Sampled attempts rows after filtering by distinct registration_form_id (count may be >100 due to duplicates): 100
Distinct registration_form_id in sampled attempts: 100
Sample registration_form_id: [   6001287364    6001287712    6001285689 1010000008304 1010000005734
 1010000005010 1010000010099 1010000010671 1010000012411 1010000012272
 1010000016738 1010000019164 1010000019201 1010000019216 1010000028123
 1010000026446 1010000029903 1010000029892 1010000030854 1010000027402
 1010000035189 1010000033670 1010000054438 1010000060861 1010000072639
 1010000003612 1010000003575 1010000116656 1010000163458 1010000457595
 1010000224980 1010000241914 1010000262561 1010000262988 1010000337809
 1010000383112 1010000388172 1010000394362 1010000394365 1010000410531
 1010000415562 1010000422151 1010000421770 1010000444592 1010000445000
 1010000454972 1010000478258 1010000486694 1010000499812 1010000519156
 10