In [None]:
import os
import shutil
import time
from typing import Optional

import pandas as pd
import numpy as np
import requests
import zipfile
import gc
import tracemalloc

import json

In [26]:
# Configuration
YEARS = [2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022, 2023, 2024]

DATA_DIR = os.path.join(os.getcwd(), "..", "data_raw")
PROCESSED_DATA_DIR = os.path.join(os.getcwd(), "..", "data_processed")
CONFIG_DIR = os.path.join(os.getcwd(), "..", "config")

os.makedirs(DATA_DIR, exist_ok=True)
os.makedirs(PROCESSED_DATA_DIR, exist_ok=True)

BASE_URL = "https://www.cdc.gov/brfss/annual_data/{year}/files/LLCP{year}XPT.zip"

In [28]:
def load_and_validate_mappings(config_dir: str, config_file_name: str) -> dict:
    """
    Loads and validates a specific mapping configuration file from JSON.

    This function reads a JSON file from the specified directory and parses it 
    into a dictionary. The structure of the returned dictionary depends on the 
    `config_file_name` provided, with specific handling for 'VAR_MAP.json', 
    'VALUE_MAP.json', and 'VALUE_TEXT_MAP.json'. Keys representing years or 
    codes are converted to integers.

    Args:
        config_dir (str): The directory path where the configuration file is located.
        config_file_name (str): The specific name of the JSON configuration file 
            (e.g., "VAR_MAP.json").

    Returns:
        dict: A dictionary containing the loaded configuration map with integer-converted keys.
            - For VAR_MAP: {canonical_name: {year: column_name}}
            - For VALUE_MAP: {canonical_name: {year: {code: value}}}
            - For VALUE_TEXT_MAP: {canonical_name: {code: value}}

    Raises:
        FileNotFoundError: If the specified configuration file does not exist.
        ValueError: If the `config_file_name` is not one of the expected known types.
    """
    config_file_path = os.path.join(config_dir, config_file_name)
    if not os.path.exists(config_file_path):
        raise FileNotFoundError(f"Configuration file not found: {config_file_path}")
    
    print(f"[CONFIG] Loading configuration from: {config_file_path}")
    with open(config_file_path, 'r') as f:
        config_data = json.load(f)

    config_map = {}
    if config_file_name == "VAR_MAP.json":
        for canonical, year_fields in config_data.items():
            config_map[canonical] = {int(year): col for year, col in year_fields.items()}
    elif config_file_name == "VALUE_MAP.json":
        for canonical, year_values in config_data.items():
            config_map[canonical] = {}
            for year, code_map in year_values.items():
                config_map[canonical][int(year)] = {int(k): v for k, v in code_map.items()}
    elif config_file_name == "VALUE_TEXT_MAP.json":
        for canonical, code_map in config_data.items():
            config_map[canonical] = {int(k): v for k, v in code_map.items()}
    else:
        raise ValueError(f"Unknown configuration file: {config_file_name}")

    print(f"[CONFIG] - Loaded {len(config_map)} canonical fields from {config_file_name}")
    return config_map

VAR_MAP = load_and_validate_mappings(CONFIG_DIR, "VAR_MAP.json")
VALUE_MAP = load_and_validate_mappings(CONFIG_DIR, "VALUE_MAP.json")
VALUE_TEXT_MAP = load_and_validate_mappings(CONFIG_DIR, "VALUE_TEXT_MAP.json")

print(f"[CONFIG] Mappings loaded successfully.")
print(f"[CONFIG] - VAR_MAP: {len(VAR_MAP)} fields")
print(f"[CONFIG] - VALUE_MAP: {len(VALUE_MAP)} fields")
print(f"[CONFIG] - VALUE_TEXT_MAP: {len(VALUE_TEXT_MAP)} fields")

[CONFIG] Loading configuration from: c:\github\brfss-diabetes-trends\analysis\..\config\VAR_MAP.json
[CONFIG] - Loaded 20 canonical fields from VAR_MAP.json
[CONFIG] Loading configuration from: c:\github\brfss-diabetes-trends\analysis\..\config\VALUE_MAP.json
[CONFIG] - Loaded 20 canonical fields from VALUE_MAP.json
[CONFIG] Loading configuration from: c:\github\brfss-diabetes-trends\analysis\..\config\VALUE_TEXT_MAP.json
[CONFIG] - Loaded 20 canonical fields from VALUE_TEXT_MAP.json
[CONFIG] Mappings loaded successfully.
[CONFIG] - VAR_MAP: 20 fields
[CONFIG] - VALUE_MAP: 20 fields
[CONFIG] - VALUE_TEXT_MAP: 20 fields


In [None]:
def ensure_xpt(year, retries: int = 3, timeout: int = 30):
    """
    Ensures that the LLCP{year}.XPT file exists locally, downloading and extracting it if necessary.

    This function performs the following steps:
    1.  **Download:** If the source ZIP file is missing, it is downloaded from `BASE_URL` 
        using exponential backoff for retries.
    2.  **Extraction:** If the XPT file is missing, the function locates the first `.xpt` 
        file within the ZIP archive (case-insensitive).
    3.  **Stabilization:** It extracts the file, handles potential whitespace in the 
        filename, and waits for the file size to stabilize (ensuring extraction is 
        complete) before moving it to the final destination.

    Args:
        year (int): The target year to fetch (e.g., 2019).
        retries (int, optional): The maximum number of download attempts. Defaults to 3.
        timeout (int, optional): The request timeout in seconds for each attempt. Defaults to 30.

    Returns:
        str: The full filesystem path to the local .XPT file.

    Raises:
        RuntimeError: If the ZIP archive does not contain any .xpt files.
        zipfile.BadZipFile: If the local ZIP file is corrupt or invalid.
        requests.exceptions.RequestException: If the download fails after all retries are exhausted.
    """
    DATA_PARENT_DIR = os.path.join(DATA_DIR, f"{year}")
    if not os.path.exists(DATA_PARENT_DIR):
        os.makedirs(DATA_PARENT_DIR, exist_ok=True)
    zip_path = os.path.join(DATA_PARENT_DIR, f"LLCP{year}.zip")
    xpt_path = os.path.join(DATA_PARENT_DIR, f"LLCP{year}.XPT")

    if not os.path.exists(zip_path):
        print(f"[DOWNLOAD] {year}")
        url = BASE_URL.format(year=year)
        for attempt in range(1, retries + 1):
            try:
                with requests.get(url, stream=True, timeout=timeout) as r:
                    r.raise_for_status()
                    with open(zip_path, "wb") as f:
                        for chunk in r.iter_content(chunk_size=8192):
                            if chunk:
                                f.write(chunk)
                break
            except Exception as e:
                print(f"Download attempt {attempt} failed for {year}: {e}")
                if attempt < retries:
                    time.sleep(2 ** attempt)
                else:
                    raise
    else:
        print(f"[CACHED] {year} zip")

    if not os.path.exists(xpt_path):
        print(f"[UNZIP] {year}")
        try:
            with zipfile.ZipFile(zip_path, "r") as z:
                # Find the .xpt file (case insensitive, strip whitespace)
                all_files = z.namelist()
                members = [m for m in all_files if m.lower().strip().endswith(".xpt")]
                if not members:
                    print(f"DEBUG: Contents of zip {year}: {all_files}")
                    raise RuntimeError(f"No .xpt found in zip for {year}")
                
                # extract first .xpt and move it to expected path
                member = members[0]
                member_stripped = member.strip()  # Remove leading/trailing whitespace
                print(f"Extracting {member_stripped}...")
                z.extract(member, DATA_PARENT_DIR)

                # If the extracted filename has whitespace, rename it
                extracted_raw = os.path.join(DATA_PARENT_DIR, member)
                extracted = os.path.join(DATA_PARENT_DIR, member_stripped)
                
                if extracted_raw != extracted and os.path.exists(extracted_raw):
                    print(f"Renaming extracted file (removing whitespace)...")
                    os.rename(extracted_raw, extracted)

                # Wait loop: verify file exists and size is stable (not growing)
                print(f"Waiting for extraction to complete...")
                max_wait = 120  # seconds
                last_size = 0
                stable_count = 0
                
                for i in range(max_wait):
                    if os.path.exists(extracted):
                        try:
                            current_size = os.path.getsize(extracted)
                            if current_size > 0:
                                if current_size == last_size:
                                    stable_count += 1
                                    if stable_count >= 3:  # File size unchanged for 3 seconds = stable
                                        print(f"File extraction complete. Size: {current_size} bytes")
                                        break
                                else:
                                    stable_count = 0
                                last_size = current_size
                        except OSError:
                            pass  # File may be locked during extraction
                    time.sleep(1)
                
                # Additional wait to ensure all buffers are flushed to disk
                print(f"Flushing disk buffers...")
                time.sleep(2)

                # Move to final destination if needed
                os.makedirs(os.path.dirname(xpt_path), exist_ok=True)
                
                if os.path.abspath(extracted) != os.path.abspath(xpt_path):
                    print(f"Moving extracted file to {xpt_path}")
                    try:
                        shutil.move(extracted, xpt_path)
                    except Exception as move_err:
                        print(f"shutil.move failed, using os.replace: {move_err}")
                        if os.path.exists(xpt_path):
                            os.remove(xpt_path)
                        os.replace(extracted, xpt_path)
        except zipfile.BadZipFile as e:
            raise RuntimeError(f"Bad zip file for {year}: {e}")
    else:
        print(f"[CACHED] {year} xpt")

    return xpt_path

In [7]:
def decode_value(canonical: str, year: int, val) -> Optional[str]:
    """
    Decode a single scalar value to its human-readable label.

    Parameters
    ----------
    canonical : str
        Canonical field name (key in VALUE_MAP / VALUE_TEXT_MAP).
    year : int
        Year to use for per-year mapping.
    val : scalar
        Value to decode (int, str convertible to int, or missing).

    Returns
    -------
    str or pd.NA
        Mapped label string, or pd.NA if the value is missing/unmapped.

    Notes
    -----
    - Missing values (pd.isna) or non-integer-convertible values return pd.NA.
    - First attempts to use the per-year mapping in VALUE_MAP; if not found,
      falls back to the constant mapping in VALUE_TEXT_MAP.
    """
    if pd.isna(val):
        return pd.NA
    try:
        key = int(val)
    except (ValueError, TypeError):
        return pd.NA

    # Try per-year mapping first
    per_year_map = VALUE_MAP.get(canonical, {}).get(year, {})
    label = per_year_map.get(key)
    if label is not None:
        return label

    # Fallback to constant mapping
    label = VALUE_TEXT_MAP.get(canonical, {}).get(key)
    return label if label is not None else pd.NA

In [None]:
def decode_series(canonical: str, year: int, series: pd.Series) -> pd.Series:
    """
    Decodes a single scalar value into its human-readable label using a hierarchical lookup.

    This function first attempts to convert the input `val` to an integer. It then resolves 
    the label by checking sources in the following order:
    1.  **Year-Specific:** Checks `VALUE_MAP` for a mapping specific to the provided `year`.
    2.  **Global Fallback:** If no year-specific match is found, checks `VALUE_TEXT_MAP` 
        for a constant/global mapping.

    Args:
        canonical (str): The canonical field name (the key used in the configuration maps).
        year (int): The survey year associated with the value (used for versioned lookups).
        val (Any): The scalar value to decode. This handles integers, strings that can be 
            converted to integers, and missing values.

    Returns:
        Optional[str]: The decoded text label. Returns `pd.NA` in the following cases:
            - The input `val` is missing (`pd.isna`).
            - The input `val` cannot be converted to an integer.
            - No mapping exists for the code in either the year-specific or global maps.
    """
    if series is None:
        return pd.Series(dtype="object")
    if series.empty:
        return pd.Series(index=series.index, dtype="object")

    # coerce to integer codes where possible (nullable Int64)
    codes = pd.to_numeric(series, errors="coerce").astype("Int64")

    per_year_map = VALUE_MAP.get(canonical, {}).get(year, {})
    fallback_map = VALUE_TEXT_MAP.get(canonical, {})
    # fallback_map provides defaults, per_year_map overrides them
    combined_map = {**fallback_map, **per_year_map}

    mapped = codes.map(combined_map)

    # preserve explicit missing codes and ensure unmapped numeric codes become pd.NA
    mapped = mapped.where(~codes.isna(), pd.NA)
    mapped = mapped.where(mapped.notna(), pd.NA)

    return mapped.astype("object")

In [None]:
def normalize_days(series: pd.Series) -> pd.Series:
    """
    Normalizes a Series of day-count values to the 2016 categorical coding standard.

    This transformation simplifies continuous day-count fields (and their specific 
    "None" codes like 88) into a discrete 4-level scale. Input values are first 
    coerced to numeric types.

    Args:
        series (pd.Series): The input Series containing raw day-count data. 
            Handles numeric types or strings convertible to numbers.

    Returns:
        pd.Series: A Series of 'Int64' dtype containing the normalized codes:
            * **1**: Zero days (mapped from raw values `0` or `88`).
            * **2**: 1-13 days (mapped from range `1-13`).
            * **3**: 14+ days (mapped from range `14-96`).
            * **9**: Refused/Unknown (mapped from `9` or `99`).
            * **<NA>**: Any other value, parsing error, or empty input.
    """
    if series is None or series.empty:
        return pd.Series(dtype="Int64")
    nums = pd.to_numeric(series, errors="coerce")

    def _norm(v):
        if pd.isna(v):
            return pd.NA
        vi = int(v)
        if vi in (9, 99):
            return 9
        if vi in (88, 0):
            return 1
        if 1 <= vi <= 13:
            return 2
        if vi >= 14 and vi < 97:
            return 3
        return pd.NA

    normalized = nums.apply(lambda x: _norm(x)).astype("Int64")
    return normalized

In [None]:
def validate_year_mappings(df_raw: pd.DataFrame, year: int) -> list:
    """
    Identifies canonical fields defined in VAR_MAP that are missing from the raw DataFrame.

    This function iterates through the global `VAR_MAP` configuration to find expected 
    column names for the specified `year`. It supports mapping definitions where the 
    target can be a single string or a list of candidate strings. A field is considered 
    "found" if at least one of its candidate columns exists in `df_raw`.

    Args:
        df_raw (pd.DataFrame): The raw DataFrame loaded from the BRFSS XPT file for the given year.
        year (int): The specific year being validated (used to look up expected columns in VAR_MAP).

    Returns:
        list: A list of canonical field names (strings) that could not be resolved to any 
        column in `df_raw`.

    Notes:
        - Prints a summary log to stdout indicating how many fields are missing.
        - If the mapping for a year is `None` or invalid, the field is treated as missing.
    """
    missing = []
    for canonical, year_fields in VAR_MAP.items():
        candidates = None
        if isinstance(year_fields, dict):
            candidates = year_fields.get(year)
        if candidates is None:
            candidates = []
        elif isinstance(candidates, str):
            candidates = [candidates]
        elif not isinstance(candidates, (list, tuple, set)):
            try:
                candidates = list(candidates)
            except Exception:
                candidates = []

        found = any(c in df_raw.columns for c in candidates)
        if not found:
            missing.append(canonical)

    print(f"Year {year}: {len(missing)} of {len(VAR_MAP)} canonical fields missing")
    if missing:
        print(f"Missing fields for {year}: {missing}")
    return missing

In [None]:
def load_year(year):
    """
    Loads, maps, and normalizes a single year of BRFSS survey data.

    This function orchestrates the data loading pipeline for a specific year:
    1.  **Fetch:** Ensures the local SAS XPT file exists (downloading if necessary via `ensure_xpt`).
    2.  **Load:** Reads the XPT file into a pandas DataFrame using 'latin1' encoding.
    3.  **Validate:** Checks for missing canonical fields using `validate_year_mappings`.
    4.  **Map & Normalize:** Iterates through `VAR_MAP` to rename columns to their canonical names.
        - Prioritizes candidates in the order they appear in the configuration.
        - Applies `normalize_days` logic to 'PHYSICAL_HEALTH_STATUS' and 'MENTAL_HEALTH_STATUS' fields.
        - Fills missing columns with `np.nan`.

    Args:
        year (int): The 4-digit year to load (e.g., 2019).

    Returns:
        pd.DataFrame: A normalized DataFrame containing only the canonical columns defined 
        in `VAR_MAP`, plus a 'YEAR' column. Returns an empty DataFrame if the XPT file cannot be read.

    Notes:
        - Logs errors to stdout if specific columns are missing or if normalization fails.
        - Uses `ensure_xpt` internally, which may trigger a file download.
    """
    xpt = ensure_xpt(year)
    try:
        df_raw = pd.read_sas(xpt, format="xport", encoding="latin1")
    except Exception as e:
        print(f"Failed to read SAS XPT for year {year}: {e}")
        return pd.DataFrame()

    # validate mappings and log a summary
    validate_year_mappings(df_raw, year)

    out = pd.DataFrame()

    for canonical, year_fields in VAR_MAP.items():
        # normalize candidate(s) to a list
        candidates = None
        if isinstance(year_fields, dict):
            candidates = year_fields.get(year)
        if candidates is None:
            candidates = []
        elif isinstance(candidates, str):
            candidates = [candidates]
        elif not isinstance(candidates, (list, tuple, set)):
            try:
                candidates = list(candidates)
            except Exception:
                candidates = []

        raw = None
        for candidate in candidates:
            if candidate in df_raw.columns:
                raw = df_raw[candidate]
                break

        if raw is None:
            print(f"Year {year}: no column found for {canonical} among {candidates}")
            out[f"{canonical}"] = np.nan
        else:
            # Normalize day-count health fields to 2016 enum codes
            if canonical in ("PHYSICAL_HEALTH_STATUS", "MENTAL_HEALTH_STATUS"):
                try:
                    normalized = normalize_days(raw)
                    out[f"{canonical}"] = normalized
                except Exception as e:
                    print(f"Normalization failed for {canonical} in {year}: {e}")
                    out[f"{canonical}"] = raw
            else:
                out[f"{canonical}"] = raw

    out["YEAR"] = year
    return out

In [None]:
def _get_memory_bytes() -> int:
    """
    Returns the current memory usage of the process in bytes.

    This function prioritizes using `psutil` to retrieve the Resident Set Size (RSS), 
    which represents the portion of memory occupied by the process that is held in 
    main memory (RAM). If `psutil` is unavailable or fails, it falls back to 
    `tracemalloc` to report the current size of memory blocks traced by Python.

    Returns:
        int: The memory usage in bytes.
    """
    try:
        import psutil
        proc = psutil.Process(os.getpid())
        return int(proc.memory_info().rss)
    except Exception:
        # Ensure tracemalloc is started
        if not tracemalloc.is_tracing():
            tracemalloc.start()
        current, peak = tracemalloc.get_traced_memory()
        return int(current)

In [None]:
def _format_bytes(b: int) -> str:
    """
    Formats a byte count into a human-readable string with appropriate units (B, KB, MB, etc.).

    This function calculates the most suitable unit based on a binary prefix (1024 base).
    It handles negative values by preserving the sign and treats `None` as 0 bytes.
    The result is formatted to two decimal places.

    Args:
        b (int): The number of bytes to format. Can be negative or None.

    Returns:
        str: A string representing the size (e.g., "1.23 MB", "-500.00 B").
             Returns "0 B" if the input is None.
    """
    if b is None:
        return "0 B"
    sign = "" if b >= 0 else "-"
    b = abs(int(b))
    for unit in ["B", "KB", "MB", "GB", "TB"]:
        if b < 1024:
            return f"{sign}{b:.2f} {unit}"
        b /= 1024
    return f"{sign}{b:.2f} PB"

In [None]:
def load_multi_year(years: list, output_dir: str = "data_processed", merged_output_filename: str = "BRFSS.csv") -> str:
    """
    Iteratively loads, normalizes, and exports BRFSS data for multiple years.

    This function processes a list of years sequentially to manage memory usage. For each year:
    1.  **Load & Normalize:** Calls `load_year` to fetch and clean the data.
    2.  **Align Columns:** Reindexes the DataFrame to ensure a strict, consistent column order 
        defined by `VAR_MAP`, placing 'YEAR' first.
    3.  **Export:**
        - Writes an individual CSV file for the specific year (e.g., `brfss_2019.csv`).
        - Appends the data to a cumulative merged CSV file (`merged_output_filename`).
    4.  **Cleanup:** Explicitly deletes the DataFrame and triggers garbage collection 
        to free memory before processing the next year.

    Args:
        years (list): A list of integer years to process (e.g., `[2019, 2020]`).
        output_dir (str, optional): The directory path for output files. Defaults to "data_processed".
        merged_output_filename (str, optional): The filename for the cumulative CSV. 
            Defaults to "BRFSS.csv".

    Returns:
        str: The filename of the merged CSV file.

    Notes:
        - The function logs memory usage deltas before and after processing each year.
        - The merged CSV is created in 'write' mode for the first year and 'append' mode 
          for subsequent years.
    """
    canonical_cols = list(VAR_MAP.keys()) + ["YEAR"]
    first = True
    total_rows = 0
    for year in years:
        mem_before = _get_memory_bytes()
        print(f"Year {year}: memory before processing: {_format_bytes(mem_before)}")

        df_year = load_year(year)
        if df_year.empty:
            print(f"Year {year}: no data loaded; skipping write")
            mem_after = _get_memory_bytes()
            print(f"Year {year}: memory after processing (skipped): {_format_bytes(mem_after)} delta: {_format_bytes(mem_after - mem_before)}")
            continue

        # ensure columns are in canonical order and any missing columns are added as NA
        df_year = df_year.reindex(columns=canonical_cols)
        # Make the YEAR column the first column
        cols = df_year.columns.tolist()
        cols.insert(0, cols.pop(cols.index("YEAR")))
        df_year = df_year[cols]

        print(f"Year {year}: writing {df_year.shape[0]} rows to CSV {os.path.join(output_dir, f'brfss_{year}.csv')}")
        df_year.to_csv(os.path.join(output_dir, f"brfss_{year}.csv"), mode="w", header=first, index=False)

        print(f"Year {year}: appending {df_year.shape[0]} rows to merged CSV {os.path.join(output_dir, merged_output_filename)}")
        df_year.to_csv(os.path.join(output_dir, merged_output_filename), mode="w" if first else "a", header=first, index=False)
        
        total_rows += df_year.shape[0]
        print(f"[WROTE] Year {year}: {df_year.shape[0]} rows")

        # attempt to free memory and measure after write
        del df_year
        gc.collect()
        mem_after = _get_memory_bytes()
        print(f"Year {year}: memory after processing: {_format_bytes(mem_after)} delta: {_format_bytes(mem_after - mem_before)}")

        first = False

    print(f"[DONE] {total_rows} rows written to {merged_output_filename}")
    return merged_output_filename

In [21]:
start_year = 2015
end_year = 2024
merged_output_file_name = f"BRFSS_{start_year}_{end_year}.csv"
years = list(range(start_year, end_year))
output_dir = PROCESSED_DATA_DIR
outputfile = os.path.join(output_dir, merged_output_file_name)

load_multi_year(years, output_dir, merged_output_file_name)

Year 2015: memory before processing: 162.05 MB
[CACHED] 2015 zip
[CACHED] 2015 xpt
Year 2015: 0 of 20 canonical fields missing
Year 2015: writing 441456 rows to CSV c:\github\brfss-diabetes-trends\analysis\..\test_data_processed\brfss_2015.csv
Year 2015: appending 441456 rows to merged CSV c:\github\brfss-diabetes-trends\analysis\..\test_data_processed\BRFSS_2015_2024.csv
[WROTE] Year 2015: 441456 rows
Year 2015: memory after processing: 159.59 MB delta: -2.46 MB
Year 2016: memory before processing: 159.59 MB
[CACHED] 2016 zip
[CACHED] 2016 xpt
Year 2016: 0 of 20 canonical fields missing
Year 2016: writing 486303 rows to CSV c:\github\brfss-diabetes-trends\analysis\..\test_data_processed\brfss_2016.csv
Year 2016: appending 486303 rows to merged CSV c:\github\brfss-diabetes-trends\analysis\..\test_data_processed\BRFSS_2015_2024.csv
[WROTE] Year 2016: 486303 rows
Year 2016: memory after processing: 161.41 MB delta: 1.81 MB
Year 2017: memory before processing: 161.41 MB
[CACHED] 2017 zip


'BRFSS_2015_2024.csv'