### Parse MNREGA Social Audit Zipped HTMLs

In [1]:
import os
import gzip
import pandas as pd
import logging
from pathlib import Path
from tqdm import tqdm
import concurrent.futures
from bs4 import BeautifulSoup

In [2]:
def extract_info_from_html(file_path):
    """Extract all relevant information from a gzipped HTML file,
       flattening the qualitative report tables into individual columns.
    """
    # Read the HTML content from a gzipped file or a normal file.
    if file_path.name.endswith('.html.gz'):
        with gzip.open(file_path, 'rb') as f:
            html_bytes = f.read()
        html_content = html_bytes.decode('utf-8', errors='replace')
    else:
        with open(file_path, 'r', encoding='utf-8') as f:
            html_content = f.read()
    
    soup = BeautifulSoup(html_content, 'html.parser')
    data = {}
    
    # -- Basic Information Fields --
    basic_fields = {
        'state': 'ctl00_ContentPlaceHolder1_lblstate',
        'district': 'ctl00_ContentPlaceHolder1_lbldistrict',
        'block': 'ctl00_ContentPlaceHolder1_lblblock',
        'panchayat': 'ctl00_ContentPlaceHolder1_lblpanchayat',
        'sa_start_date': 'ctl00_ContentPlaceHolder1_lblSA_start_dt',
        'sa_end_date': 'ctl00_ContentPlaceHolder1_lblSA_end_dt',
        'gram_sabha_date': 'ctl00_ContentPlaceHolder1_lblGramSabha_dt',
        'public_hearing_date': 'ctl00_ContentPlaceHolder1_lblPublic_Hearing_dt'
    }
    for key, id_value in basic_fields.items():
        element = soup.find('span', id=id_value)
        data[key] = element.text.strip() if element else None

    # -- Financial Information Fields --
    financial_fields = {
        'sa_period_from': 'ctl00_ContentPlaceHolder1_lblSA_Period_From_Date',
        'sa_period_to': 'ctl00_ContentPlaceHolder1_lblSA_Period_To_Date',
        'wage_exp': 'ctl00_ContentPlaceHolder1_lblWage_exp',
        'material_exp': 'ctl00_ContentPlaceHolder1_lblmat_exp',
        'total_exp': 'ctl00_ContentPlaceHolder1_lbltotal_expen',
        'wage_given': 'ctl00_ContentPlaceHolder1_lblwage_given',
        'material_given': 'ctl00_ContentPlaceHolder1_lblmat_given',
        'total_given': 'ctl00_ContentPlaceHolder1_lbltotal_record_given'
    }
    for key, id_value in financial_fields.items():
        element = soup.find('span', id=id_value)
        data[key] = element.text.strip() if element else None

    # -- Work Information Fields --
    work_fields = {
        'total_works': 'ctl00_ContentPlaceHolder1_lbltot_work',
        'total_households': 'ctl00_ContentPlaceHolder1_lbltot_hh',
        'works_verified': 'ctl00_ContentPlaceHolder1_lbltot_work_verified',
        'households_verified': 'ctl00_ContentPlaceHolder1_lbltot_hh_verified',
        'gram_sabha_participants': 'ctl00_ContentPlaceHolder1_lblno_of_ppl_participated_gs'
    }
    for key, id_value in work_fields.items():
        element = soup.find('span', id=id_value)
        data[key] = element.text.strip() if element else None

    # -- Expense Information Fields --
    expense_fields = {
        'printing_expense': 'ctl00_ContentPlaceHolder1_lblprinting_expense',
        'videography_expense': 'ctl00_ContentPlaceHolder1_lblvideography_expense',
        'tea_expense': 'ctl00_ContentPlaceHolder1_lbltea_expense',
        'vrp_training_expense': 'ctl00_ContentPlaceHolder1_lblvrp_training_expense',
        'vrp_travel_expense': 'ctl00_ContentPlaceHolder1_lblvrp_travel_expense',
        'photocopying_expense': 'ctl00_ContentPlaceHolder1_lblphotocopying_expense',
        'other_expense': 'ctl00_ContentPlaceHolder1_lblother_expense',
        'vrp_honorarium_expense': 'ctl00_ContentPlaceHolder1_lblvrp_honorium_expense',
        'stationary_expense': 'ctl00_ContentPlaceHolder1_lblstationary_expense',
        'publicity_expense': 'ctl00_ContentPlaceHolder1_lblpublicity_expense',
        'mic_expense': 'ctl00_ContentPlaceHolder1_lblmic_expense',
        'photography_expense': 'ctl00_ContentPlaceHolder1_lblphotography_expense',
        'shamiana_expense': 'ctl00_ContentPlaceHolder1_lblshamiana_expense',
        'total_expense': 'ctl00_ContentPlaceHolder1_lbltotal_expense'
    }
    for key, id_value in expense_fields.items():
        element = soup.find('span', id=id_value)
        data[key] = element.text.strip() if element else None

    # -- Checklist Information Fields --
    checklist_fields = {
        'job_cards_with_people': 'ctl00_ContentPlaceHolder1_Label1',
        'job_cards_updated': 'ctl00_ContentPlaceHolder1_Label3',
        'job_cards_renewed': 'ctl00_ContentPlaceHolder1_Label4',
        'demand_registration_process': 'ctl00_ContentPlaceHolder1_Label2',
        'unmet_demand': 'ctl00_ContentPlaceHolder1_Label29',
        'payment_agency_problems': 'ctl00_ContentPlaceHolder1_Label30'
    }
    for key, id_value in checklist_fields.items():
        element = soup.find('span', id=id_value)
        data[key] = element.text.strip() if element else None

    # ---------------------------
    # Qualitative Report: Summary Of Reported Issues Table
    # ---------------------------
    rep_div = soup.find("div", id="ctl00_ContentPlaceHolder1_divReportedIssue")
    if rep_div:
        rep_table = rep_div.find("table")
        if rep_table:
            # Get the two header rows:
            thead = rep_table.find("thead")
            if thead:
                header_rows = thead.find_all("tr")
            else:
                header_rows = rep_table.find_all("tr")[:2]
            if len(header_rows) >= 2:
                # Process top header row (expand by colspan)
                top_row = header_rows[0]
                top_headers = []
                for th in top_row.find_all("th"):
                    colspan = int(th.get("colspan", 1))
                    text = th.get_text(strip=True)
                    top_headers.extend([text] * colspan)
                # Process bottom header row
                bottom_row = header_rows[1]
                bottom_headers = [th.get_text(strip=True) for th in bottom_row.find_all("th")]
                # Combine the two layers, skipping the first column (SR#)
                combined_headers = []
                # Assume the first cell in bottom_headers is "SR#" and skip it.
                for i in range(1, len(bottom_headers)):
                    group = top_headers[i]
                    sub = bottom_headers[i]
                    # Combine group and subheader if group is not already contained in sub.
                    if group and (group.lower() not in sub.lower()):
                        combined = f"{group}_{sub}"
                    else:
                        combined = sub
                    combined = combined.lower().replace(" ", "_")
                    combined_headers.append(combined)
                # Extract the first data row (skip the first cell)
                tbody = rep_table.find("tbody")
                if tbody:
                    data_row = tbody.find("tr")
                else:
                    all_rows = rep_table.find_all("tr")
                    data_row = all_rows[2] if len(all_rows) > 2 else None
                if data_row:
                    values = [td.get_text(strip=True) for td in data_row.find_all("td")]
                    # Skip the first value (SR#)
                    values = values[1:]
                else:
                    values = []
                # If the headers and values match, add them as separate keys.
                if len(combined_headers) == len(values):
                    for header, value in zip(combined_headers, values):
                        key = f"rep_{header}"
                        data[key] = value
                else:
                    data["rep_reported_issues_table"] = rep_table.get_text(separator=" | ", strip=True)
            else:
                data["rep_reported_issues"] = rep_table.get_text(separator=" | ", strip=True)
        else:
            data["rep_reported_issues"] = rep_div.get_text(separator=" | ", strip=True)
    else:
        data["rep_reported_issues"] = None

    # ---------------------------
    # Qualitative Report: Summary Of Action Taken Report Table
    # ---------------------------
    atr_div = soup.find("div", id="ctl00_ContentPlaceHolder1_divATR")
    if atr_div:
        atr_table = atr_div.find("table")
        if atr_table:
            header_row = atr_table.find("tr")
            if header_row:
                headers = [th.get_text(strip=True) for th in header_row.find_all(["th", "td"])]
                # Remove the first header if it is "SR#" or similar.
                if headers and headers[0].strip().lower() in ["sr#", "sr"]:
                    headers = headers[1:]
            else:
                headers = []
            tbody = atr_table.find("tbody")
            if tbody:
                data_row = tbody.find("tr")
            else:
                all_rows = atr_table.find_all("tr")
                data_row = all_rows[1] if len(all_rows) >= 2 else None
            if data_row:
                values = [td.get_text(strip=True) for td in data_row.find_all("td")]
                # Remove the first value if it corresponds to SR#
                if len(values) == len(headers) + 1:
                    values = values[1:]
            else:
                values = []
            if len(headers) == len(values):
                for header, value in zip(headers, values):
                    key = f"atr_{header.lower().replace(' ', '_')}"
                    data[key] = value
            else:
                data["atr_report"] = atr_table.get_text(separator=" | ", strip=True)
        else:
            data["atr_report"] = atr_div.get_text(separator=" | ", strip=True)
    else:
        data["atr_report"] = None

    # ---------------------------
    # Gram Panchayat Checklist Section
    # ---------------------------
    checklist_heading = soup.find("div", class_="panel-heading", string=lambda t: t and "Gram Panchayat Checklist" in t)
    gram_checklist = {}
    if checklist_heading:
        checklist_container = checklist_heading.find_next_sibling("div", class_="panel-body")
        if checklist_container:
            inner_panels = checklist_container.find_all("div", class_="panel panel-info")
            for panel in inner_panels:
                section_heading_elem = panel.find("div", class_="panel-heading")
                section_name = section_heading_elem.get_text(strip=True) if section_heading_elem else "unknown_section"
                panel_body = panel.find("div", class_="panel-body")
                section_data = {}
                if panel_body:
                    ul = panel_body.find("ul", class_="man")
                    if ul:
                        li_items = ul.find_all("li")
                        for li in li_items:
                            full_text = li.get_text(" ", strip=True)
                            parts = full_text.split(":", 1)
                            if len(parts) == 2:
                                question = parts[0].strip().lower().replace(" ", "_")
                                answer = parts[1].strip()
                            else:
                                question = full_text.lower().replace(" ", "_")
                                answer = ""
                            section_data[question] = answer
                gram_checklist[section_name.lower().replace(" ", "_")] = section_data
    data["gram_panchayat_checklist"] = gram_checklist if gram_checklist else None

    # ---------------------------
    # Reported Issues Section (if needed separately)
    # ---------------------------
    issues_div = soup.find('div', id="ctl00_ContentPlaceHolder1_divReportedIssue")
    if issues_div:
        data['reported_issues'] = issues_div.get_text(separator=" ", strip=True)
    else:
        data['reported_issues'] = None

    # ---------------------------
    # Source File Reference
    # ---------------------------
    data['source_file'] = file_path.name

    return data

def delete_invalid_gz_files(folder_paths):
    """
    Scans multiple folders for invalid .gz files and deletes them.
    
    Args:
        folder_paths (list): A list of directory paths to scan.
    """
    for folder_path in folder_paths:
        folder = Path(folder_path)

        if not folder.exists():
            logging.warning(f"⚠️ Warning: Folder {folder} does not exist. Skipping.")
            continue

        gz_files = list(folder.glob('*.gz'))

        logging.info(f"📂 Checking {len(gz_files)} .gz files in {folder}...")

        for file_path in gz_files:
            try:
                # Try to open and read a small portion of the file
                with gzip.open(file_path, 'rb') as f:
                    f.read(1)  # Read a small part to check validity
            except (OSError, gzip.BadGzipFile):
                logging.warning(f"🗑️ Deleting invalid .gz file: {file_path}")
                os.remove(file_path)

    logging.info("✅ Finished scanning and deleting invalid .gz files.")

def process_html_folders(folder_paths, max_workers=8):
    """
    Process all gzipped HTML files (.html.gz) in the given list of folders concurrently
    and return a combined DataFrame.
    
    :param folder_paths: List of directories to process.
    :param max_workers: Number of worker threads to use.
    :return: Combined DataFrame with extracted HTML data.
    """
    all_results = []
    futures = []
    
    # Create a thread pool executor
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        for folder_path in folder_paths:
            folder = Path(folder_path)
            html_files = list(folder.glob('*.html.gz'))
            print(f"📂 Found {len(html_files)} .html.gz files in {folder}...")
            
            for file_path in html_files:
                futures.append(executor.submit(extract_info_from_html, file_path))
        
        # Iterate through the completed futures (using tqdm for progress)
        for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures), desc="Processing files"):
            try:
                data = future.result()
                all_results.append(data)
            except Exception as e:
                print(f"\n❌ Error processing file: {e}")

    if all_results:
        df = pd.DataFrame(all_results)
        output_file = '../data/final_audit_results.csv'
        df.to_csv(output_file, index=False)
        print(f"\n✅ Saved final results to {output_file}")
        print(f"📊 Total records processed: {len(df)}")
        return df


In [3]:
html_folders = ["../data/html"]
delete_invalid_gz_files(html_folders)

In [4]:
df = process_html_folders(html_folders)

📂 Found 296656 .html.gz files in ../data/html...


Processing files:  99%|██████████████████████████████████████████████▍| 293051/296656 [5:39:25<1:39:16,  1.65s/it]IOStream.flush timed out
Processing files: 100%|████████████████████████████████████████████████▊| 295612/296656 [5:43:52<36:24,  2.09s/it]IOStream.flush timed out
Processing files: 100%|████████████████████████████████████████████████▉| 296034/296656 [5:44:34<00:41, 15.08it/s]IOStream.flush timed out
Processing files: 100%|█████████████████████████████████████████████████| 296656/296656 [5:45:43<00:00, 14.30it/s]



✅ Saved final results to ../data/final_audit_results.csv
📊 Total records processed: 296656


In [5]:
df.head()

Unnamed: 0,state,district,block,panchayat,sa_start_date,sa_end_date,gram_sabha_date,public_hearing_date,sa_period_from,sa_period_to,...,atr_fd_amount,atr_amount_of_fine/penalty_paid,atr_number_of_firs_filled,atr_number_of_employees_suspended,atr_number_of_employees_terminated,gram_panchayat_checklist,reported_issues,source_file,rep_reported_issues,atr_report
0,RAJASTHAN,NAGAUR,DEGANA,आंतरोली कलां,19/06/2024,23/06/2024,24/06/2024,24/06/2024,01/04/2023,31/03/2024,...,0,0,0,0,0,{'job_cards': {'are_job_cards_with_people': 'B...,Summary Of Reported Issues Financial Misapprop...,27_2714_2714007_2714007275_2023-2024_6_24_2024...,,
1,MEGHALAYA,EAST KHASI HILLS,MAWPHLANG,Lawkhla Mawlong,31/07/2023,03/08/2023,04/08/2023,29/02/2024,01/10/2022,31/03/2023,...,0,0,0,0,0,{'job_cards': {'are_job_cards_with_people': 'B...,Summary Of Reported Issues Financial Misapprop...,21_2102_2102005_2102009052_2022-2023_8_4_2023_...,,
2,MEGHALAYA,EAST KHASI HILLS,KHADARSHNONG-LAITKROH,Myiong,26/06/2023,28/06/2023,28/06/2023,26/07/2023,01/10/2022,31/03/2023,...,0,0,0,0,0,{'job_cards': {'are_job_cards_with_people': 'G...,Summary Of Reported Issues Financial Misapprop...,21_2102_2102002_2102002026_2022-2023_6_28_2023...,,
3,UTTAR PRADESH,KANNAUJ,TALGRAM,Terajaket,22/07/2019,24/07/2019,24/07/2019,01/08/2019,01/04/2018,31/03/2019,...,0,0,0,0,0,{'job_cards': {'are_job_cards_with_people': 'G...,Summary Of Reported Issues Financial Misapprop...,31_3168_3168006_3168006046_2018-2019_7_24_2019...,,
4,BIHAR,AURANAGABAD,GOH,MIRPUR,02/11/2022,06/11/2022,07/11/2022,07/11/2022,01/04/2021,31/03/2022,...,0,0,0,0,0,{'job_cards': {'are_job_cards_with_people': 'G...,Summary Of Reported Issues Financial Misapprop...,05_0505_0505004_0505004006_2021-2022_11_7_2022...,,


In [6]:
df.shape

(296656, 63)

In [7]:
def flatten_dict(d, parent_key='', sep='_'):
    """
    Recursively flattens a nested dictionary.
    For example:
      {'job_cards': {'are_job_cards_with_people': 'Greater than 75%', 'are_job_cards_updated': 'Yes'}}
    becomes:
      {'job_cards_are_job_cards_with_people': 'Greater than 75%', 'job_cards_are_job_cards_updated': 'Yes'}
    """
    items = {}
    if not isinstance(d, dict):
        return {}
    for k, v in d.items():
        new_key = f"{parent_key}{sep}{k}" if parent_key else k
        if isinstance(v, dict):
            items.update(flatten_dict(v, new_key, sep=sep))
        else:
            items[new_key] = v
    return items

In [8]:
flat_checklist_df = pd.json_normalize(df['gram_panchayat_checklist'])

flat_checklist_df = flat_checklist_df.add_prefix("gpc_")

df = df.drop(columns=['gram_panchayat_checklist']).join(flat_checklist_df)

In [9]:
df.shape

(296656, 91)

In [10]:
df.to_csv("../data/final_audit_results.csv", index = False)