# Extract completed statistical reports to refactored CSVs

This notebook reads completed ATI/Privacy/Supplemental XLSX files (EN or FR) and appends rows to refactored-style CSVs.
It uses the hidden *ForConsumption mapping sheets to map each Q* id to a workbook cell, and can skip formula
cells (auto-calculated totals).

During development, output CSVs default to `test_data/`.


In [1]:
from pathlib import Path
import re
import unicodedata

import pandas as pd
import openpyxl
from openpyxl.cell.cell import ArrayFormula
from openpyxl.utils import get_column_letter, column_index_from_string


In [2]:
# Inputs
REPORT_YEAR = '2023'
INPUT_FILES = sorted(Path('test_data').glob(f'*_{REPORT_YEAR}_*.xlsx'))
# Outputs (use test_data during development)
OUTPUT_DIR = Path('test_data')
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
ATI_OUTPUT_CSV = OUTPUT_DIR / 'ATI-AI_refactored_from_xlsx.csv'
PRIVACY_OUTPUT_CSV = OUTPUT_DIR / 'Privacy-AI_refactored_from_xlsx.csv'
SUPPLEMENTAL_OUTPUT_CSV = OUTPUT_DIR / 'Supplemental-AI_refactored_from_xlsx.csv'
# Reference datasets for metadata and institution lookups
ATI_REFERENCE_CSV = Path('ATI-AI_refactored.csv')
PRIVACY_REFERENCE_CSV = Path('Privacy-AI_refactored.csv')
SUPPLEMENTAL_REFERENCE_CSV = Path('Supplemental-AI_refactored.csv')
INCLUDE_FORMULA_CELLS = True  # True to capture calculated totals
DROP_EMPTY_ROWS = False        # True to skip empty cells
DEDUPLICATE = True             # True to drop duplicates on org/period/id


In [3]:
EXPECTED_COLUMNS = [
    'gc_orgID',
    'institution_en',
    'institution_fr',
    'ReportingPeriodStart',
    'ReportingPeriodEnd',
    'id',
    'section_number',
    'section_name_en',
    'section_name_fr',
    'subsection_number',
    'subsection_name_en',
    'subsection_name_fr',
    'title_en',
    'title_fr',
    'value',
]

REPORT_SPECS = {
    'ATI': {
        'mapping_sheet': 'ATI_ForConsumption',
        'data_sheets': ['ATI', 'LAI'],
        'date_cells': ('D8', 'H8'),
        'inst_cell_default': 'D6',
        'reference_csv': ATI_REFERENCE_CSV,
        'output_csv': ATI_OUTPUT_CSV,
    },
    'Privacy': {
        'mapping_sheet': 'Privacy_ForConsumption',
        'data_sheets': ['Priv', 'LPRP'],
        'date_cells': ('D8', 'H8'),
        'inst_cell_default': 'D6',
        'reference_csv': PRIVACY_REFERENCE_CSV,
        'output_csv': PRIVACY_OUTPUT_CSV,
    },
    'Supplemental': {
        'mapping_sheet': 'ATIP_ForConsumption',
        'data_sheets': ['Supplemental Report 2024-25', 'Rapport suppl√©mentaire 2024-25'],
        'date_cells': ('D9', 'H9'),
        'inst_cell_default': 'D7',
        'reference_csv': SUPPLEMENTAL_REFERENCE_CSV,
        'output_csv': SUPPLEMENTAL_OUTPUT_CSV,
    },
}

DIRECT_REF_RE = re.compile(r"=\s*'?([^'!]+)'?!\$?([A-Z]+)\$?(\d+)")
ARRAY_REF_RE = re.compile(r"=TRANSPOSE\('?([^'!]+)'?!\$?([A-Z]+)\$?(\d+):\$?([A-Z]+)\$?(\d+)\)")
ID_QRC_RE = re.compile(r"^Q(\d+)R(\d+)([A-Za-z]?)C(\d+)([A-Za-z]?)$", re.IGNORECASE)


def normalize_text(text):
    if text is None:
        return None
    normalized = unicodedata.normalize('NFKD', str(text)).encode('ascii', 'ignore').decode('ascii')
    return re.sub(r'\s+', ' ', normalized).strip().lower() or None


def normalize_sub_key(value):
    if value is None:
        return None
    text = str(value).strip()
    if text == '' or text.lower() == 'nan':
        return None
    if text.endswith('.0'):
        text = text[:-2]
    return text.replace(',', '.')


def build_refactored_id(key_text, sub_key, report_type):
    if report_type == 'Supplemental':
        return key_text
    match = ID_QRC_RE.match(str(key_text).strip())
    if not match:
        return key_text
    row_num = match.group(2)
    row_suffix = (match.group(3) or '').lower()
    cell_num = match.group(4)
    cell_suffix = (match.group(5) or '').lower()
    if not sub_key:
        return key_text
    sub_text = str(sub_key).replace(',', '.').strip()
    if '.' in sub_text:
        section, subsection = sub_text.split('.', 1)
        section = section.strip()
        subsection = subsection.strip()
        if section and subsection:
            return f"Q{section}_{subsection}Row{row_num}{row_suffix}Cell{cell_num}{cell_suffix}"
    if sub_text:
        return f"Q{sub_text}Row{row_num}{row_suffix}Cell{cell_num}{cell_suffix}"
    return key_text


def expand_range(start_col, start_row, end_col, end_row):
    c1 = column_index_from_string(start_col)
    c2 = column_index_from_string(end_col)
    r1 = int(start_row)
    r2 = int(end_row)
    if c2 < c1:
        c1, c2 = c2, c1
    if r2 < r1:
        r1, r2 = r2, r1
    cells = []
    for row in range(r1, r2 + 1):
        for col in range(c1, c2 + 1):
            cells.append(f"{get_column_letter(col)}{row}")
    return cells


def parse_direct_ref(formula):
    match = DIRECT_REF_RE.match(str(formula).strip())
    if not match:
        return None, None
    sheet = match.group(1).strip("'")
    cell = f"{match.group(2)}{match.group(3)}"
    return sheet, cell


def parse_array_formula(text):
    match = ARRAY_REF_RE.match(str(text).strip())
    if not match:
        return None, []
    sheet = match.group(1).strip("'")
    cells = expand_range(match.group(2), match.group(3), match.group(4), match.group(5))
    return sheet, cells


def build_row_targets(ws):
    row_to_target = {}
    seen_refs = set()

    for row in range(1, ws.max_row + 1):
        value = ws.cell(row=row, column=2).value
        if isinstance(value, ArrayFormula):
            if value.ref in seen_refs:
                continue
            seen_refs.add(value.ref)
            ref_match = re.match(r'B(\d+):B(\d+)', value.ref)
            if not ref_match:
                continue
            start_row = int(ref_match.group(1))
            end_row = int(ref_match.group(2))
            sheet, cells = parse_array_formula(value.text)
            if not sheet or not cells:
                continue
            for idx, target_row in enumerate(range(start_row, end_row + 1)):
                if idx < len(cells):
                    row_to_target[target_row] = (sheet, cells[idx])

    for row in range(1, ws.max_row + 1):
        if row in row_to_target:
            continue
        value = ws.cell(row=row, column=2).value
        if isinstance(value, str) and value.startswith('='):
            sheet, cell = parse_direct_ref(value)
            if sheet and cell:
                row_to_target[row] = (sheet, cell)

    return row_to_target


def find_inst_target(ws, row_to_target):
    for row in range(1, ws.max_row + 1):
        key = ws.cell(row=row, column=1).value
        if key is None:
            continue
        if str(key).strip().lower() == 'inst':
            return row_to_target.get(row)
    return None


def is_formula_cell(cell):
    if cell.data_type == 'f':
        return True
    value = cell.value
    return isinstance(value, str) and value.startswith('=')


def get_merged_cell_value(ws, cell_ref):
    for merged_range in ws.merged_cells.ranges:
        if cell_ref in merged_range:
            start_cell = merged_range.start_cell
            if hasattr(start_cell, "coordinate"):
                start_ref = start_cell.coordinate
            elif hasattr(start_cell, "coord"):
                start_ref = start_cell.coord
            else:
                start_ref = str(start_cell)
            return ws[start_ref].value
    return ws[cell_ref].value


def coerce_date(value):
    if value is None or str(value).strip() == '':
        return None
    try:
        return pd.to_datetime(value, errors='coerce').date()
    except Exception:
        return None


def normalize_org_id(value):
    if value is None:
        return None
    text = str(value).strip()
    if text == '' or text.lower() == 'nan':
        return None
    try:
        return int(float(text))
    except Exception:
        return text


In [4]:
def load_reference_maps(csv_path):
    if not csv_path or not Path(csv_path).exists():
        return None, {}
    df = pd.read_csv(csv_path, dtype=str, low_memory=False)

    id_meta = {}
    for _, row in df.dropna(subset=['id']).iterrows():
        id_value = str(row.get('id')).strip()
        if id_value and id_value not in id_meta:
            id_meta[id_value] = row

    inst_lookup = {}
    for _, row in df.dropna(subset=['gc_orgID']).iterrows():
        gc_org_id = normalize_org_id(row.get('gc_orgID'))
        inst_en = row.get('institution_en')
        inst_fr = row.get('institution_fr')
        for name in [inst_en, inst_fr]:
            norm = normalize_text(name)
            if norm and norm not in inst_lookup:
                inst_lookup[norm] = (gc_org_id, inst_en, inst_fr)

    return id_meta, inst_lookup


def resolve_institution(inst_name, inst_lookup):
    norm = normalize_text(inst_name)
    if norm and norm in inst_lookup:
        return inst_lookup[norm]
    if inst_name is None:
        return None, None, None
    return None, str(inst_name).strip(), str(inst_name).strip()


def read_cell_value(wb, wb_values, sheet_name, cell_ref, include_formula=False):
    ws = wb[sheet_name]
    cell = ws[cell_ref]
    if is_formula_cell(cell):
        if not include_formula:
            return None, True
        if wb_values is not None:
            return get_merged_cell_value(wb_values[sheet_name], cell_ref), False
    return get_merged_cell_value(ws, cell_ref), False


In [5]:
def extract_report_rows(wb, wb_values, report_type, spec, id_meta, inst_lookup):
    if spec['mapping_sheet'] not in wb.sheetnames:
        return [], {'skipped': f"missing mapping sheet {spec['mapping_sheet']}"}
    data_sheet = None
    for candidate in spec['data_sheets']:
        if candidate in wb.sheetnames:
            data_sheet = candidate
            break
    if not data_sheet:
        return [], {'skipped': f"missing data sheet for {report_type}"}
    mapping_ws = wb[spec['mapping_sheet']]
    row_to_target = build_row_targets(mapping_ws)
    inst_target = find_inst_target(mapping_ws, row_to_target)
    inst_sheet, inst_cell = (inst_target if inst_target else (data_sheet, spec['inst_cell_default']))
    inst_value = get_merged_cell_value(wb[inst_sheet], inst_cell)
    gc_org_id, inst_en, inst_fr = resolve_institution(inst_value, inst_lookup)
    start_cell, end_cell = spec['date_cells']
    start_value = get_merged_cell_value(wb[data_sheet], start_cell)
    end_value = get_merged_cell_value(wb[data_sheet], end_cell)
    report_start = coerce_date(start_value)
    report_end = coerce_date(end_value)
    rows = []
    skipped_formula = 0
    missing_target = 0
    missing_meta = 0
    any_value = False
    for row in range(1, mapping_ws.max_row + 1):
        key = mapping_ws.cell(row=row, column=1).value
        if key is None:
            continue
        key_text = str(key).strip()
        if key_text == '' or key_text.lower() == 'inst':
            continue
        target = row_to_target.get(row)
        if not target:
            missing_target += 1
            continue
        sheet_name, cell_ref = target
        if sheet_name not in wb.sheetnames:
            missing_target += 1
            continue
        value, is_formula = read_cell_value(wb, wb_values, sheet_name, cell_ref, INCLUDE_FORMULA_CELLS)
        if is_formula and not INCLUDE_FORMULA_CELLS:
            skipped_formula += 1
            continue
        value_is_empty = value is None or str(value).strip() == ''
        if not value_is_empty:
            any_value = True
        if DROP_EMPTY_ROWS and value_is_empty:
            continue
        sub_value = mapping_ws.cell(row=row, column=3).value
        sub_key = normalize_sub_key(sub_value)
        mapped_id = build_refactored_id(key_text, sub_key, report_type)
        meta = id_meta.get(mapped_id) if id_meta else None
        if meta is None:
            missing_meta += 1
        row_data = {
            'gc_orgID': gc_org_id,
            'institution_en': inst_en,
            'institution_fr': inst_fr,
            'ReportingPeriodStart': report_start.isoformat() if report_start else None,
            'ReportingPeriodEnd': report_end.isoformat() if report_end else None,
            'id': mapped_id,
            'section_number': meta.get('section_number') if meta is not None else None,
            'section_name_en': meta.get('section_name_en') if meta is not None else None,
            'section_name_fr': meta.get('section_name_fr') if meta is not None else None,
            'subsection_number': meta.get('subsection_number') if meta is not None else sub_key,
            'subsection_name_en': meta.get('subsection_name_en') if meta is not None else None,
            'subsection_name_fr': meta.get('subsection_name_fr') if meta is not None else None,
            'title_en': meta.get('title_en') if meta is not None else None,
            'title_fr': meta.get('title_fr') if meta is not None else None,
            'value': value,
        }
        rows.append(row_data)
    if not any_value:
        return [], {'skipped': f"no values found for {report_type}", 'data_sheet': data_sheet}
    stats = {
        'rows': len(rows),
        'skipped_formula': skipped_formula,
        'missing_target': missing_target,
        'missing_meta': missing_meta,
        'data_sheet': data_sheet,
    }
    return rows, stats
def append_rows_to_csv(output_path, rows):
    if not rows:
        return 0
    new_df = pd.DataFrame(rows)
    for col in EXPECTED_COLUMNS:
        if col not in new_df.columns:
            new_df[col] = pd.NA
    new_df = new_df[EXPECTED_COLUMNS]
    if output_path.exists():
        existing = pd.read_csv(output_path, dtype=str, low_memory=False)
        for col in EXPECTED_COLUMNS:
            if col not in existing.columns:
                existing[col] = pd.NA
        existing = existing[EXPECTED_COLUMNS]
        combined = pd.concat([existing, new_df], ignore_index=True)
        if DEDUPLICATE:
            combined = combined.drop_duplicates(
                subset=['gc_orgID', 'ReportingPeriodStart', 'ReportingPeriodEnd', 'id'],
                keep='last',
            )
    else:
        combined = new_df
    combined.to_csv(output_path, index=False)
    return len(new_df)


In [None]:
# Load reference maps once per report type
REFERENCE_MAPS = {}
for report_type, spec in REPORT_SPECS.items():
    id_meta, inst_lookup = load_reference_maps(spec['reference_csv'])
    REFERENCE_MAPS[report_type] = (id_meta or {}, inst_lookup or {})

summary = []

for path in INPUT_FILES:
    wb = openpyxl.load_workbook(path, data_only=False)
    wb_values = openpyxl.load_workbook(path, data_only=True) if INCLUDE_FORMULA_CELLS else None

    for report_type, spec in REPORT_SPECS.items():
        id_meta, inst_lookup = REFERENCE_MAPS[report_type]
        rows, stats = extract_report_rows(wb, wb_values, report_type, spec, id_meta, inst_lookup)
        if not rows:
            continue
        appended = append_rows_to_csv(spec['output_csv'], rows)
        summary.append((path.name, report_type, appended, stats))

summary
