# End-to-End Data Cleaning and Normalization Pipeline

## 1. Setup and Configuration

This first section imports all necessary libraries and defines the paths for the entire pipeline. It sets up the root directory and specifies input, intermediate, and final output locations. This ensures the notebook is self-contained and can be run from top to bottom.

In [1]:
import pandas as pd
from pathlib import Path
import os
import re
import shutil
import json
from collections import defaultdict
import csv

# --- PATH DEFINITIONS ---

# Get the absolute path of the project root by going up from the notebook's location
ROOT = Path(os.getcwd()).resolve().parent
print(f"Project Root: {ROOT}")

# Source data
DATA_ROOT = ROOT / "data"

# Intermediate directories for each cleaning step
CLEANED_100K_DIR = ROOT / "cleaned-100k"
CLEANED_MAJOR_RULES_DIR = ROOT / "cleaned_major_rules"

# Final output directory
FINAL_OUTPUT_DIR = ROOT / "final_pipeline_output"
LOGS_DIR = FINAL_OUTPUT_DIR / "logs"

# External data files
LGD_JSON_PATH = ROOT / "India-State-District.json"

# --- CLEANUP AND SETUP ---

# For a clean run, remove intermediate and final directories if they exist
if CLEANED_100K_DIR.exists():
    shutil.rmtree(CLEANED_100K_DIR)
    print(f"Removed existing directory: {CLEANED_100K_DIR}")

if CLEANED_MAJOR_RULES_DIR.exists():
    shutil.rmtree(CLEANED_MAJOR_RULES_DIR)
    print(f"Removed existing directory: {CLEANED_MAJOR_RULES_DIR}")

if FINAL_OUTPUT_DIR.exists():
    shutil.rmtree(FINAL_OUTPUT_DIR)
    print(f"Removed existing directory: {FINAL_OUTPUT_DIR}")

# Create fresh directories
CLEANED_100K_DIR.mkdir(parents=True, exist_ok=True)
CLEANED_MAJOR_RULES_DIR.mkdir(parents=True, exist_ok=True)
FINAL_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
LOGS_DIR.mkdir(parents=True, exist_ok=True)

print("\\n--- Directories Initialized ---")
print(f"Intermediate (100k cleaned): {CLEANED_100K_DIR}")
print(f"Intermediate (major rules): {CLEANED_MAJOR_RULES_DIR}")
print(f"Final Output: {FINAL_OUTPUT_DIR}")
print(f"Logs: {LOGS_DIR}")

Project Root: C:\Users\INDIAN\OneDrive\Documents\PRO__\uidai-datathon-2026-participation
\n--- Directories Initialized ---
Intermediate (100k cleaned): C:\Users\INDIAN\OneDrive\Documents\PRO__\uidai-datathon-2026-participation\cleaned-100k
Intermediate (major rules): C:\Users\INDIAN\OneDrive\Documents\PRO__\uidai-datathon-2026-participation\cleaned_major_rules
Final Output: C:\Users\INDIAN\OneDrive\Documents\PRO__\uidai-datathon-2026-participation\final_pipeline_output
Logs: C:\Users\INDIAN\OneDrive\Documents\PRO__\uidai-datathon-2026-participation\final_pipeline_output\logs


## 2. Step 1: Remove '100000' Pincode Entries

This step takes the raw data from the `data/` directory, iterates through each CSV file, and removes any rows where the `pincode` column has the value '100000'. The resulting cleaned files are saved to the first intermediate directory, `cleaned-100k/`, preserving the original sub-folder structure (`biometric`, `demographic`, `enrolment`).

In [2]:
def remove_100k_values(input_root: Path, output_root: Path):
    """
    Reads all CSVs from input_root, removes rows where 'pincode' is '100000',
    and saves them to output_root, mirroring the directory structure.
    """
    print("--- Step 1: Removing '100000' Pincode Values ---")
    
    csv_files = sorted(list(input_root.rglob("*.csv")))
    if not csv_files:
        print(f"[WARN] No CSV files found in {input_root}")
        return

    for csv_file in csv_files:
        try:
            # Determine output path
            relative_path = csv_file.relative_to(input_root)
            output_path = output_root / relative_path
            output_path.parent.mkdir(parents=True, exist_ok=True)
            
            print(f"Processing: {csv_file.name}")

            # Read, filter, and save
            df = pd.read_csv(csv_file, dtype=str)
            
            # Find pincode column case-insensitively
            pincode_col = next((col for col in df.columns if 'pincode' in col.lower()), None)
            
            if pincode_col:
                initial_rows = len(df)
                df = df[df[pincode_col] != '100000']
                final_rows = len(df)
                print(f"  - Rows removed: {initial_rows - final_rows}")
            else:
                print(f"  - Pincode column not found in {csv_file.name}. Copying as is.")

            df.to_csv(output_path, index=False)
            
        except Exception as e:
            print(f"[ERROR] Failed to process {csv_file.name}: {e}")

    print("\\n--- Step 1 Complete ---")

# Execute the function
remove_100k_values(DATA_ROOT, CLEANED_100K_DIR)

--- Step 1: Removing '100000' Pincode Values ---
Processing: biometric_0_500000.csv
  - Rows removed: 0
Processing: biometric_1000000_1500000.csv
  - Rows removed: 0
Processing: biometric_1500000_1861108.csv
  - Rows removed: 0
Processing: biometric_500000_1000000.csv
  - Rows removed: 0
Processing: demographic_0_500000.csv
  - Rows removed: 0
Processing: demographic_1000000_1500000.csv
  - Rows removed: 0
Processing: demographic_1500000_2000000.csv
  - Rows removed: 0
Processing: demographic_2000000_2071700.csv
  - Rows removed: 1
Processing: demographic_500000_1000000.csv
  - Rows removed: 1
Processing: enrolment_0_500000.csv
  - Rows removed: 9
Processing: enrolment_1000000_1006029.csv
  - Rows removed: 0
Processing: enrolment_500000_1000000.csv
  - Rows removed: 13
\n--- Step 1 Complete ---


## 3. Step 2: Apply Major State/District Canonicalization

This step incorporates the powerful, deterministic rule set from the `Final_changer.py` script. It processes the data from the `cleaned-100k/` directory and applies a vast mapping of state aliases and district-to-state overrides. This is the most significant cleaning step for correcting common, known anomalies (e.g., `Orissa` -> `Odisha`, `Rangareddy` -> `Telangana`). The results are written to the `cleaned_major_rules/` directory.

In [3]:
# This cell contains the core logic adapted from Final_changer.py

# --- NORMALIZATION AND RULE DEFINITIONS ---

PUNCT_PATTERN = re.compile(r"[\\.,;:/\\\\\"'`\\-]+")
SPACE_COLLAPSE = re.compile(r"\\s+")

CANONICAL_STATES = {
	"andhra pradesh", "arunachal pradesh", "assam", "bihar", "chhattisgarh", "goa", "gujarat", "haryana", 
	"himachal pradesh", "jharkhand", "karnataka", "kerala", "madhya pradesh", "maharashtra", "manipur", 
	"meghalaya", "mizoram", "nagaland", "odisha", "punjab", "rajasthan", "sikkim", "tamil nadu", "telangana", 
	"tripura", "uttar pradesh", "uttarakhand", "west bengal", "andaman and nicobar islands", "chandigarh", 
	"dadra and nagar haveli and daman and diu", "delhi", "jammu and kashmir", "ladakh", "lakshadweep", "puducherry",
}

STATE_ALIAS_MAP = {
	"orissa": "odisha", "uttaranchal": "uttarakhand", "tamilnadu": "tamil nadu", "chhatisgarh": "chhattisgarh",
	"chattisgarh": "chhattisgarh", "west bangal": "west bengal", "west bengli": "west bengal", 
	"westbengal": "west bengal", "pondicherry": "puducherry", "daman & diu": "dadra and nagar haveli and daman and diu",
	"daman and diu": "dadra and nagar haveli and daman and diu", "dadra & nagar haveli": "dadra and nagar haveli and daman and diu",
	"jammu & kashmir": "jammu and kashmir", "jammu and kashmir": "jammu and kashmir",
	"the dadra and nagar haveli and daman and diu": "dadra and nagar haveli and daman and diu",
}

LADAKH_DISTRICTS = {"leh", "kargil"}
TELANGANA_DISTRICTS = {
	"hyderabad", "rangareddy", "k v rangareddy", "k.v. rangareddy", "warangal", "nalgonda", "medak", 
	"khammam", "karimnagar", "adilabad", "mahabubnagar", "nizamabad",
}

DISTRICT_FORCE_MAP = {
    "rupnagar": "punjab", "k.v. rangareddy": "telangana", "k v rangareddy": "telangana", "rangareddy": "telangana",
    "dadra and nagar haveli": "dadra and nagar haveli and daman and diu", "bandipore": "jammu and kashmir",
    "bandipora": "jammu and kashmir", "punch": "jammu and kashmir", "poonch": "jammu and kashmir",
    "rajauri": "jammu and kashmir", "rajouri": "jammu and kashmir", "bandipur": "jammu and kashmir",
    "kupwara": "jammu and kashmir", "jammu": "jammu and kashmir", "kathua": "jammu and kashmir",
    "srinagar": "jammu and kashmir", "badgam": "jammu and kashmir", "baramula": "jammu and kashmir",
    "anantnag": "jammu and kashmir", "udhampur": "jammu and kashmir", "doda": "jammu and kashmir",
    "pulwama": "jammu and kashmir", "ganderbal": "jammu and kashmir", "kulgam": "jammu and kashmir",
    "kishtwar": "jammu and kashmir", "kargil": "ladakh", "leh": "ladakh", "leh (ladakh)": "ladakh",
    "kamrup": "assam", "hyderabad": "telangana", "warangal": "telangana", "nizamabad": "telangana",
    "karimnagar": "telangana", "nalgonda": "telangana", "medak": "telangana", "adilabad": "telangana",
    "mahabubnagar": "telangana", "khammam": "telangana", "south andaman": "andaman and nicobar islands",
    "cuddalore": "tamil nadu", "viluppuram": "tamil nadu", "pondicherry": "puducherry", "karikal": "puducherry",
    "karaikal": "puducherry", "yanam": "puducherry", "daman": "dadra and nagar haveli and daman and diu",
    "diu": "dadra and nagar haveli and daman and diu", "nicobar": "andaman and nicobar islands",
    "nicobars": "andaman and nicobar islands", "north and middle andaman": "andaman and nicobar islands",
    "andamans": "andaman and nicobar islands", "ahilyanagar": "maharashtra", "ahmadnagar": "maharashtra",
    "ahmed nagar": "maharashtra", "ahmednagar": "maharashtra", "alipurduar": "west bengal",
    "ambedkar nagar": "uttar pradesh", "ashok nagar": "madhya pradesh", "ashoknagar": "madhya pradesh",
    "barddhaman": "west bengal", "bardez": "goa", "bardhaman": "west bengal", "bhavnagar": "gujarat",
    "burdwan": "west bengal", "chamarajanagar": "karnataka", "chamarajanagar *": "karnataka",
    "chamrajanagar": "karnataka", "chamrajnagar": "karnataka", "chatrapati sambhaji nagar": "maharashtra",
    "chhatrapati sambhajinagar": "maharashtra", "gandhinagar": "gujarat", "ganganagar": "rajasthan",
    "gautam buddha nagar": "uttar pradesh", "gautam buddha nagar *": "uttar pradesh", "gurdaspur": "punjab",
    "harda": "madhya pradesh", "harda *": "madhya pradesh", "hardoi": "uttar pradesh", "hardwar": "uttarakhand",
    "jamnagar": "gujarat", "jyotiba phule nagar": "uttar pradesh", "jyotiba phule nagar *": "uttar pradesh",
    "kabeerdham": "chhattisgarh", "kawardha": "chhattisgarh", "kanpur nagar": "uttar pradesh",
    "khorda": "odisha", "khordha": "odisha", "khordha  *": "odisha", "khorda  *": "odisha",
    "kushi nagar": "uttar pradesh", "kushinagar": "uttar pradesh", "kushinagar *": "uttar pradesh",
    "lohardaga": "jharkhand", "mahabub nagar": "telangana", "mahabubnagar": "telangana",
    "aurangabad": "maharashtra", "bijapur": "karnataka", "raigarh": "chhattisgarh", "balrampur": "uttar pradesh",
    "pratapgarh": "uttar pradesh", "bilaspur": "chhattisgarh"
}

HAMIRPUR_VALID = {"himachal pradesh", "uttar pradesh"}

def normalize_for_match(value: str) -> str:
    if not isinstance(value, str):
        return ""
    s = value.strip().lower()
    s = PUNCT_PATTERN.sub(" ", s)
    s = SPACE_COLLAPSE.sub(" ", s)
    return s.strip()

def title_case_clean(value: str) -> str:
    return " ".join(part.capitalize() for part in value.split())

def _state_district_columns(columns: list) -> tuple:
    state_col = next((c for c in columns if "state" in c.lower()), None)
    dist_col = next((c for c in columns if "district" in c.lower()), None)
    return state_col, dist_col

def resolve_state_district(state_val: str, dist_val: str, logs: dict) -> tuple:
    orig_state, orig_dist = state_val, dist_val
    n_state = normalize_for_match(state_val)
    n_dist = normalize_for_match(dist_val)
    
    is_corrected = False

    # Apply rules
    if n_dist in DISTRICT_FORCE_MAP:
        n_state = DISTRICT_FORCE_MAP[n_dist]
        is_corrected = True
    elif n_dist in LADAKH_DISTRICTS:
        n_state = "ladakh"
        is_corrected = True
    elif n_dist in TELANGANA_DISTRICTS:
        n_state = "telangana"
        is_corrected = True
    
    if n_state in STATE_ALIAS_MAP:
        n_state = STATE_ALIAS_MAP[n_state]
        is_corrected = True

    # Hamirpur collision handling
    if n_dist == "hamirpur" and n_state not in HAMIRPUR_VALID:
        logs['collisions'].append((orig_state, orig_dist))
        return orig_state, orig_dist, False # Return original, not corrected

    if n_state not in CANONICAL_STATES:
        logs['unresolved'].append((orig_state, orig_dist))
        return orig_state, orig_dist, False

    final_state = title_case_clean(n_state)
    final_dist = title_case_clean(n_dist)
    
    if is_corrected:
        logs['corrections'].append((orig_state, orig_dist, final_state, final_dist))

    return final_state, final_dist, is_corrected

# --- FILE PROCESSING FUNCTION ---

def apply_major_rules(input_root: Path, output_root: Path, chunksize: int = 100_000):
    print("\\n--- Step 2: Applying Major Canonicalization Rules ---")
    
    csv_files = sorted(list(input_root.rglob("*.csv")))
    if not csv_files:
        print(f"[WARN] No CSV files found in {input_root}")
        return

    for csv_file in csv_files:
        rel_path = csv_file.relative_to(input_root)
        output_path = output_root / rel_path
        output_path.parent.mkdir(parents=True, exist_ok=True)
        
        print(f"Processing: {csv_file.name}")
        
        # Setup logs for this file
        logs = defaultdict(list)
        
        # Process in chunks
        reader = pd.read_csv(csv_file, dtype=str, chunksize=chunksize, on_bad_lines='skip', engine='python')
        
        is_first_chunk = True
        for chunk in reader:
            state_col, dist_col = _state_district_columns(chunk.columns)
            if not state_col or not dist_col:
                chunk.to_csv(output_path, mode='a', index=False, header=is_first_chunk)
                is_first_chunk = False
                continue

            for idx, row in chunk.iterrows():
                state, dist, corrected = resolve_state_district(row[state_col], row[dist_col], logs)
                chunk.at[idx, state_col] = state
                chunk.at[idx, dist_col] = dist
            
            chunk.to_csv(output_path, mode='a', index=False, header=is_first_chunk)
            is_first_chunk = False
            
    print("\\n--- Step 2 Complete ---")

# Execute the function
apply_major_rules(CLEANED_100K_DIR, CLEANED_MAJOR_RULES_DIR)

\n--- Step 2: Applying Major Canonicalization Rules ---
Processing: biometric_0_500000.csv
Processing: biometric_1000000_1500000.csv
Processing: biometric_1500000_1861108.csv
Processing: biometric_500000_1000000.csv
Processing: demographic_0_500000.csv
Processing: demographic_1000000_1500000.csv
Processing: demographic_1500000_2000000.csv
Processing: demographic_2000000_2071700.csv
Processing: demographic_500000_1000000.csv
Processing: enrolment_0_500000.csv
Processing: enrolment_1000000_1006029.csv
Processing: enrolment_500000_1000000.csv
\n--- Step 2 Complete ---


## 4. Step 3: LGD-Based Normalization and Validation

This is the final and strictest cleaning step. It uses the official Local Government Directory (LGD) as the single source of truth.

The logic, adapted from `Normalizing_districts.py`, will:
1.  Normalize text fields.
2.  Apply known district aliases (e.g., `Bardhaman` -> `Purba Bardhaman`).
3.  Force state assignments for post-bifurcation districts (e.g., `Hyderabad` -> `Telangana`).
4.  **Crucially, it validates every single row against the LGD master data.**
    *   If a district doesn't exist in the LGD, the row is **dropped**.
    *   If a district exists but is mapped to the wrong state, the row is **dropped**.
    *   If a district is ambiguous (belongs to multiple states) and cannot be resolved, the row is **dropped**.
5.  The clean data is written to the `final_pipeline_output/` directory.
6.  All dropped rows are logged into **separate files** based on their source category (`biometric`, `demographic`, `enrolment`) in the `final_pipeline_output/logs/` directory, as requested.

In [5]:
# This cell contains the core logic adapted from Normalizing_districts.py

# --- LGD NORMALIZATION AND RULE DEFINITIONS ---

LGD_CANONICAL_STATES = {
	"Andaman And Nicobar Islands", "Andhra Pradesh", "Arunachal Pradesh", "Assam", "Bihar", "Chandigarh", 
    "Chhattisgarh", "Dadra And Nagar Haveli And Daman And Diu", "Delhi", "Goa", "Gujarat", "Haryana", 
    "Himachal Pradesh", "Jammu And Kashmir", "Jharkhand", "Karnataka", "Kerala", "Ladakh", "Lakshadweep", 
    "Madhya Pradesh", "Maharashtra", "Manipur", "Meghalaya", "Mizoram", "Nagaland", "Odisha", "Puducherry", 
    "Punjab", "Rajasthan", "Sikkim", "Tamil Nadu", "Telangana", "Tripura", "Uttar Pradesh", "Uttarakhand", "West Bengal",
}

LGD_STATE_ALIAS = {
	"Orissa": "Odisha", "Uttaranchal": "Uttarakhand", "Chattisgarh": "Chhattisgarh", "Pondicherry": "Puducherry",
	"Andaman And Nicobar": "Andaman And Nicobar Islands", "Daman And Diu": "Dadra And Nagar Haveli And Daman And Diu",
	"Dadra And Nagar Haveli": "Dadra And Nagar Haveli And Daman And Diu",
}

LGD_DISTRICT_ALIAS = {
	"Punch": "Poonch", "Davangere": "Davanagere", "Tumkur": "Tumakuru", "Bellary": "Ballari", "Sundergarh": "Sundargarh",
	"Baleswar": "Balasore", "Anugul": "Angul", "Sabarkantha": "Sabar Kantha", "Banaskantha": "Banas Kantha",
	"Panchmahals": "Panch Mahals", "Maldah": "Malda", "Bardhaman": "Purba Bardhaman", "Ahmednagar": "Ahilyanagar",
	"Aurangabad": "Chhatrapati Sambhajinagar", "Allahabad": "Prayagraj", "Faizabad": "Ayodhya",
	"Villupuram": "Viluppuram", "Tirupattur": "Tirupathur", "Tuticorin": "Thoothukkudi", "West Nimar": "Khargone",
	"East Nimar": "Khandwa", "Hoshangabad": "Narmadapuram", "Y S R": "Y.S.R. Kadapa", "K V Rangareddy": "Ranga Reddy",
	"Karim Nagar": "Karimnagar",
}

LGD_FORCE_STATE_BY_DISTRICT = {
	"Hyderabad": "Telangana", "Nizamabad": "Telangana", "Warangal": "Telangana", "Adilabad": "Telangana",
	"Nalgonda": "Telangana", "Bhadradri Kothagudem": "Telangana", "Karimnagar": "Telangana", "Mahabubnagar": "Telangana",
	"Ranchi": "Jharkhand", "Hazaribagh": "Jharkhand", "Dibrugarh": "Assam", "Leh": "Ladakh", "Kargil": "Ladakh",
}

DROP_KEYWORDS= {
	"Near", "Colony", "Sector", "Phase", "Road", "Cross", "Thana", "Hospital", "University", "Dist ", "Sub Urban", "Suburban",
}

def lgd_normalize_text(value) -> str:
    if value is None or pd.isna(value): return ""
    if not isinstance(value, str): value = str(value)
    s = value.strip()
    if not s: return ""
    s = s.replace(".", "")
    s = s.replace("&", "And")
    s = " ".join(s.split())
    return s.title()

def load_lgd_master(path: Path) -> tuple:
    if not path.exists():
        raise FileNotFoundError(f"LGD master JSON not found: {path}")
    with path.open("r", encoding="utf-8") as f:
        data = json.load(f)
    
    state_to_districts = defaultdict(set)
    district_to_states = defaultdict(set)
    
    for entry in data:
        raw_state = entry.get("StateName", "")
        raw_district = entry.get("DistrictName(InEnglish)", "")
        if not raw_state or not raw_district: continue
        
        state = lgd_normalize_text(raw_state)
        district = lgd_normalize_text(raw_district)
        district = LGD_DISTRICT_ALIAS.get(district, district)
        
        if state and district:
            state_to_districts[state].add(district)
            district_to_states[district].add(state)
            
    return dict(state_to_districts), dict(district_to_states)

def lgd_validate_row(raw_state: str, raw_district: str, state_to_districts: dict, district_to_states: dict) -> tuple:
    norm_state = lgd_normalize_text(raw_state)
    norm_district = lgd_normalize_text(raw_district)

    if not norm_state or not norm_district:
        return False, norm_state, norm_district, "invalid_after_normalization"

    canonical_state = LGD_STATE_ALIAS.get(norm_state, norm_state)
    if canonical_state not in LGD_CANONICAL_STATES:
        return False, canonical_state, norm_district, "invalid_after_normalization"
    norm_state = canonical_state

    norm_district = LGD_DISTRICT_ALIAS.get(norm_district, norm_district)
    norm_state = LGD_FORCE_STATE_BY_DISTRICT.get(norm_district, norm_state)

    valid_states = district_to_states.get(norm_district)
    if not valid_states:
        return False, norm_state, norm_district, "district_not_in_lgd"

    if len(valid_states) == 1 and norm_state not in valid_states:
        (norm_state,) = tuple(valid_states)

    if norm_state not in valid_states:
        reason = "unresolved_ambiguity" if len(valid_states) > 1 else "district_belongs_to_other_state"
        return False, norm_state, norm_district, reason

    if any(kw.lower() in norm_district.lower() for kw in DROP_KEYWORDS):
        return False, norm_state, norm_district, "invalid_after_normalization"

    return True, norm_state, norm_district, None

# --- FILE PROCESSING FUNCTION ---

def apply_lgd_normalization(input_root: Path, output_root: Path, logs_root: Path, chunksize: int = 100_000):
    print("\\n--- Step 3: Applying LGD-Based Normalization and Validation ---")
    
    state_to_districts, district_to_states = load_lgd_master(LGD_JSON_PATH)
    print(f"LGD Master Loaded: {len(district_to_states)} unique districts.")

    csv_files = sorted(list(input_root.rglob("*.csv")))
    if not csv_files:
        print(f"[WARN] No CSV files found in {input_root}")
        return

    # Dictionary to hold log writers
    log_writers = {}
    log_files = {}

    for csv_file in csv_files:
        rel_path = csv_file.relative_to(input_root)
        output_path = output_root / rel_path
        output_path.parent.mkdir(parents=True, exist_ok=True)
        
        # Get the category (biometric, demographic, etc.) for segregated logging
        category = rel_path.parts[0]
        if category not in log_writers:
            log_path = logs_root / f"{category}_unresolved.csv"
            log_files[category] = log_path.open("w", newline="", encoding="utf-8")
            log_writers[category] = csv.writer(log_files[category])
            log_writers[category].writerow(["source_file", "raw_state", "raw_district", "normalized_state", "normalized_district", "drop_reason"])

        print(f"Processing: {csv_file.name} (logging to {category}_unresolved.csv)")
        
        reader = pd.read_csv(csv_file, dtype=str, chunksize=chunksize, on_bad_lines='skip', engine='python')
        
        is_first_chunk = True
        for chunk in reader:
            state_col, dist_col = _state_district_columns(chunk.columns)
            if not state_col or not dist_col:
                chunk.to_csv(output_path, mode='a', index=False, header=is_first_chunk)
                is_first_chunk = False
                continue

            keep_mask = []
            for idx, row in chunk.iterrows():
                raw_state = row[state_col]
                raw_dist = row[dist_col]
                keep, norm_state, norm_dist, reason = lgd_validate_row(raw_state, raw_dist, state_to_districts, district_to_states)
                
                if keep:
                    chunk.at[idx, state_col] = norm_state
                    chunk.at[idx, dist_col] = norm_dist
                else:
                    log_writers[category].writerow([csv_file.name, raw_state, raw_dist, norm_state, norm_dist, reason])
                
                keep_mask.append(keep)

            cleaned_chunk = chunk[keep_mask]
            if not cleaned_chunk.empty:
                cleaned_chunk.to_csv(output_path, mode='a', index=False, header=is_first_chunk)
                is_first_chunk = False

    # Close all log files
    for f in log_files.values():
        f.close()
        
    print("\\n--- Step 3 Complete ---")

# Execute the function
apply_lgd_normalization(CLEANED_MAJOR_RULES_DIR, FINAL_OUTPUT_DIR, LOGS_DIR)

\n--- Step 3: Applying LGD-Based Normalization and Validation ---
LGD Master Loaded: 773 unique districts.
Processing: biometric_0_500000.csv (logging to biometric_unresolved.csv)
Processing: biometric_1000000_1500000.csv (logging to biometric_unresolved.csv)
Processing: biometric_1500000_1861108.csv (logging to biometric_unresolved.csv)
Processing: biometric_500000_1000000.csv (logging to biometric_unresolved.csv)
Processing: demographic_0_500000.csv (logging to demographic_unresolved.csv)
Processing: demographic_1000000_1500000.csv (logging to demographic_unresolved.csv)
Processing: demographic_1500000_2000000.csv (logging to demographic_unresolved.csv)
Processing: demographic_2000000_2071700.csv (logging to demographic_unresolved.csv)
Processing: demographic_500000_1000000.csv (logging to demographic_unresolved.csv)
Processing: enrolment_0_500000.csv (logging to enrolment_unresolved.csv)
Processing: enrolment_1000000_1006029.csv (logging to enrolment_unresolved.csv)
Processing: enro

## 5. Step 4: Final Cleanup

This final step removes the intermediate directories (`cleaned-100k` and `cleaned_major_rules`) that were created during the pipeline's execution. This keeps the project workspace clean, leaving only the raw `data` and the `final_pipeline_output`.

In [7]:
print("\\n--- Step 4: Cleaning up intermediate directories ---")

try:
    if CLEANED_100K_DIR.exists():
        shutil.rmtree(CLEANED_100K_DIR)
        print(f"Successfully removed: {CLEANED_100K_DIR}")
        
    if CLEANED_MAJOR_RULES_DIR.exists():
        shutil.rmtree(CLEANED_MAJOR_RULES_DIR)
        print(f"Successfully removed: {CLEANED_MAJOR_RULES_DIR}")
        
    print("\\n--- Pipeline Finished ---")
    print(f"Final cleaned data is located in: {FINAL_OUTPUT_DIR}")
    print(f"Segregated logs for dropped rows are in: {LOGS_DIR}")

except Exception as e:
    print(f"[ERROR] Could not perform cleanup: {e}")

\n--- Step 4: Cleaning up intermediate directories ---
\n--- Pipeline Finished ---
Final cleaned data is located in: C:\Users\INDIAN\OneDrive\Documents\PRO__\uidai-datathon-2026-participation\final_pipeline_output
Segregated logs for dropped rows are in: C:\Users\INDIAN\OneDrive\Documents\PRO__\uidai-datathon-2026-participation\final_pipeline_output\logs
