# Importing mobidity data to dhis2 event program
**Workflow:** load monthly CSVs â†’ filter ICD10 J-codes â†’ map to Event program â†’ batch import

> Tip: Run cells top-to-bottom. Update the config in the next cell.

### Step 1: Defining Variables

In [None]:
# ===== Step 1 â€” Defining Variables (CSV version) =====

# --- File layout (CSV in yearly folders) ---
# Example:
#   ...\immr data\
#       eIMMR raw data 2019\Jan 2019.csv
#       eIMMR raw data 2019\Feb 2019.csv
#       eIMMR raw data 2020\Jan 2020.csv
ROOT_DATA_DIR = r"...\immr data" # ðŸ”¹ Give the correct path
INCLUDE_EXTENSIONS = {".csv"}

# If headers are not on the first row, adjust these
HEADER_ROW = 0            # 0-based index of header row
SKIPROWS = []             # e.g., [0,1] to skip top lines before header

# Import ONLY these columns (+ region_name for orgUnit mapping)
USECOLS = [
    "region_name",      # used to map to DHIS2 orgUnit
    "age",
    "sex",
    "discharge_mode",
    "admission_date",
    "discharge_date",
    "days",
    "icd",
]

# --- Column semantics for later steps ---
COLS = {
    "orgunit_source": "region_name",
    "event_date": "admission_date",   # change to "discharge_date" if preferred
    "icd": "icd",
    "sex": "sex",
    "age_years": "age",
    "discharge_mode": "discharge_mode",
    "length_of_stay_days": "days",
    "discharge_date": "discharge_date",
}

# ===== Org unit mapping (CSV) =====
# CSV must have columns: region_name, orgUnitId
ORGUNIT_MAP_CSV = r"....\immr data\orgunit_mapping_region_to_uid.csv" # ðŸ”¹ Give the correct path
STRICT_ORGUNIT_MATCH = True

# --- DHIS2 target (Event program) ---
DHIS2_BASE_URL = "<URL of the instance>"  
DHIS2_USERNAME = "<UN>"
DHIS2_PASSWORD = "<PW>"  # basic auth (Phase 1)

PROGRAM_ID = "<Program UID>"
PROGRAM_STAGE_ID = "<Program stage UID>"

# Map columns â†’ DHIS2 dataElement UIDs (belonging to PROGRAM_STAGE_ID)
DE_MAP = {
    "sex": "add_UID",                   # ðŸ”¹ add correct UID
    "age_years": "add_UID",             # ðŸ”¹ add correct UID
    "discharge_mode": "add_UID",        # ðŸ”¹ add correct UID
    "length_of_stay_days": "add_UID",   # ðŸ”¹ add correct UID
    "icd": "add_UID",                   # ðŸ”¹ add correct UID
    "discharge_date": "add_UID"         # ðŸ”¹ add correct UID
}

# DHIS2 event fields
DEFAULT_EVENT_STATUS = "COMPLETED"
STORED_BY = "climate-tools-notebook"

# --- Import behavior ---
BATCH_SIZE = 500
DRY_RUN_FIRST = True
VERIFY_SSL = True

# --- Helper: case-insensitive orgUnit resolver from region_name (using CSV) ---
import re
import pandas as pd

def _normalize_key(s: str) -> str:
    return re.sub(r"\s+", " ", str(s).strip()).lower()

def load_orgunit_map(csv_path: str) -> dict:
    df = pd.read_csv(csv_path, dtype=str)
    if not {"region_name", "orgUnitId"}.issubset(df.columns):
        raise ValueError("ORGUNIT_MAP_CSV must have columns: region_name, orgUnitId")
    df["region_name_norm"] = df["region_name"].map(_normalize_key)
    df["orgUnitId"] = df["orgUnitId"].str.strip()
    df = df.dropna(subset=["region_name_norm", "orgUnitId"])
    return dict(zip(df["region_name_norm"], df["orgUnitId"]))

_orgunit_map = load_orgunit_map(ORGUNIT_MAP_CSV)

def resolve_orgunit_from_region(region_value: str):
    key = _normalize_key(region_value)
    if key in _orgunit_map:
        return _orgunit_map[key]
    if STRICT_ORGUNIT_MATCH:
        raise KeyError(f"Unknown region_name for orgUnit mapping: '{region_value}'")
    return region_value  # pass-through if non-strict

print(f"Config loaded (CSV mode). OrgUnit map entries: {len(_orgunit_map)}.")


Config loaded (CSV mode). OrgUnit map entries: 28.


### Step 2: Setting up environment & libraries

In [None]:
# ===== Step 2 â€” Environment & Libraries (CSV mode) =====

# If needed, uncomment to install:
# %pip install -q pandas python-dateutil requests

import os, sys, glob, re, json, math, warnings
from pathlib import Path
import pandas as pd
from datetime import datetime
from dateutil.parser import parse as dtparse
import requests

def _ver(mod):
    try:
        return mod.__version__
    except Exception:
        return "n/a"

print("Environment versions:")
print("  Python   :", sys.version.split()[0])
print("  pandas   :", _ver(pd))
print("  requests :", _ver(requests))

warnings.filterwarnings("ignore", category=UserWarning)

# Reusable HTTP session
session = requests.Session()
session.verify = VERIFY_SSL
session.auth = (DHIS2_USERNAME, DHIS2_PASSWORD)
session.headers.update({"Content-Type": "application/json"})

def dhis2_ping():
    """Quick connectivity check to DHIS2 system info (optional)."""
    try:
        r = session.get(f"{DHIS2_BASE_URL}/api/system/info", timeout=20)
        print("Ping status:", r.status_code)
        if r.ok:
            info = r.json()
            print("System name  :", info.get("systemName"))
            print("Version      :", info.get("version"))
            print("Context path :", info.get("contextPath"))
        else:
            print("Response text:", r.text[:300])
    except Exception as e:
        print("Ping failed:", e)

print("Modules loaded. Ready for CSV ingestion.")
print("Checking the connection to dhis2...")
def dhis2_ping():
    """Quick connectivity check to DHIS2 system info (optional)."""
    try:
        r = session.get(f"{DHIS2_BASE_URL}/api/system/info", timeout=20)
        print("Ping status:", r.status_code)
        if r.ok:
            info = r.json()
            print("System name  :", info.get("systemName"))
            print("Version      :", info.get("version"))
            print("Context path :", info.get("contextPath"))
        else:
            print("Response text:", r.text[:300])
    except Exception as e:
        print("Ping failed:", e)
dhis2_ping()
print("Task completed.")

### Step 3: Load monthly CSV files

In [None]:
# ===== Step 3 â€” Load monthly CSV files (robust; skip mapping; derive `days`) =====
from pathlib import Path
import pandas as pd, csv, re

# --- Only include data files inside year folders like "eIMMR raw data 2019", etc. ---
DATA_DIR_PREFIX = "eIMMR raw data"  # only folders starting with this are considered data folders

# --- Skip-list: filenames that must never be treated as data files ---
SKIP_FILE_NAMES = {"orgunit_mapping_region_to_uid.csv"}  # mapping file

# --- Header normalization + aliasing (case/space-insensitive) ---
_norm = lambda s: re.sub(r"\s+", " ", str(s).strip()).lower()

HEADER_ALIASES = {
    "region": "region_name",
    "district": "region_name",
    "district_name": "region_name",
    "icd10": "icd",
    "icd_code": "icd",
    "length_of_stay": "days",
    "los": "days",
    "no_of_days": "days",
}

# Treat `days` as optional (weâ€™ll derive it if missing)
REQUIRED_BASE_COLS = [c for c in USECOLS if c != "days"]
OPTIONAL_DERIVED_COLS = {"days"}

def align_headers(df: pd.DataFrame, expected_cols: list[str]) -> pd.DataFrame:
    cur = {_norm(c): c for c in df.columns}
    rename_map = {}
    for exp in expected_cols:
        if exp in df.columns:
            continue
        exp_norm = _norm(exp)
        # canonical or any alias that maps to this canonical name
        alias_norms = {k for k, v in HEADER_ALIASES.items() if _norm(v) == exp_norm}
        candidates = {exp_norm} | alias_norms
        for cand in candidates:
            if cand in cur:
                rename_map[cur[cand]] = exp
                break
    if rename_map:
        df = df.rename(columns=rename_map)
    return df

def sniff_delimiter_text(sample_text: str, default=","):
    try:
        return csv.Sniffer().sniff(sample_text, delimiters=",;\t|").delimiter
    except Exception:
        return default

def read_csv_guess(path: Path, usecols=None, header=HEADER_ROW, skiprows=SKIPROWS):
    last_err = None
    for enc in ("utf-8-sig", "utf-8", "latin-1"):
        try:
            with open(path, "r", encoding=enc, errors="ignore") as fh:
                sample = fh.read(4096)
            sep = sniff_delimiter_text(sample, ",")
            return pd.read_csv(path, usecols=usecols, header=header, skiprows=skiprows, sep=sep, encoding=enc)
        except Exception as e:
            last_err = e
            continue
    return pd.read_csv(path, usecols=usecols, header=header, skiprows=skiprows,
                       sep=",", encoding="latin-1", on_bad_lines="skip")

def is_data_folder(p: Path) -> bool:
    # include file only if any ancestor folder starts with "eIMMR raw data"
    return any(part.startswith(DATA_DIR_PREFIX) for part in p.parts)

def list_csv_files(root_dir: str) -> list[Path]:
    p = Path(root_dir)
    files = []
    for f in p.rglob("*.csv"):
        if not f.is_file():
            continue
        if f.name.lower() in SKIP_FILE_NAMES:
            continue
        if not is_data_folder(f):
            continue
        files.append(f)
    return sorted(files, key=lambda x: (x.parent.as_posix(), x.name))

def read_month_file(path: Path) -> pd.DataFrame:
    # try fast path with just the required base columns (+ optional days if present)
    try:
        fast_usecols = list({*REQUIRED_BASE_COLS, *OPTIONAL_DERIVED_COLS} & set(USECOLS))
        df = read_csv_guess(path, usecols=fast_usecols, header=HEADER_ROW, skiprows=SKIPROWS)
    except Exception:
        # reload full, align headers, enforce only base required cols
        df_all = read_csv_guess(path, usecols=None, header=HEADER_ROW, skiprows=SKIPROWS)
        df_all = align_headers(df_all, REQUIRED_BASE_COLS + list(OPTIONAL_DERIVED_COLS))
        missing_base = [c for c in REQUIRED_BASE_COLS if c not in df_all.columns]
        if missing_base:
            raise ValueError(f"Missing required columns {missing_base}; found: {list(df_all.columns)}")
        keep = REQUIRED_BASE_COLS + [c for c in OPTIONAL_DERIVED_COLS if c in df_all.columns]
        df = df_all[keep]

    if df is None or df.empty:
        return pd.DataFrame()

    # --- IMPORTANT: work on a real copy to avoid SettingWithCopyWarning ---
    df = df.copy()

    # Derive/patch `days`
    have_days_col = "days" in df.columns
    have_dates = {"admission_date", "discharge_date"}.issubset(df.columns)

    if have_dates:
        ad = pd.to_datetime(df["admission_date"], errors="coerce", infer_datetime_format=True)
        dd = pd.to_datetime(df["discharge_date"], errors="coerce", infer_datetime_format=True)
        days_calc = (dd - ad).dt.days

        if have_days_col:
            # only fill where days is missing/NaN
            need_fill = df["days"].isna()
            df.loc[need_fill, "days"] = days_calc[need_fill]
        else:
            df.loc[:, "days"] = days_calc

    # provenance (use .loc)
    df.loc[:, "__source_file"] = path.name
    df.loc[:, "__source_year_folder"] = path.parent.name

    return df


# ---- Run loader ----
files = list_csv_files(ROOT_DATA_DIR)
print(f"Found {len(files)} CSV files under data folders within: {ROOT_DATA_DIR}")
for f in files[:2]:
    print("  -", f)

print(f"\nCombining rows from CSV. please wait...")

frames, errors = [], []
for f in files:
    try:
        df = read_month_file(f)
        if df is not None and len(df) > 0:
            frames.append(df)
        else:
            errors.append((f, "empty or no rows"))
    except Exception as e:
        errors.append((f, str(e)))

if not frames:
    msg = "No usable CSV files were readâ€”check ROOT_DATA_DIR, delimiters, and headers.\n"
    for p, m in errors[:5]:
        msg += f"  - {p}: {m}\n"
    raise SystemExit(msg)



raw = pd.concat(frames, ignore_index=True)
print(f"Combined rows from CSV: {len(raw):,}")
print("Columns:", list(raw.columns))

if errors:
    print(f"\nWarnings while reading ({len(errors)} file(s)):")
    for p, msg in errors[:12]:
        print("  -", p, "â†’", msg)
    if len(errors) > 12:
        print("  ... (truncated)")

print("\nOrg unit mapping started. Please wait...")

# ---- Resolve orgUnit from region_name via CSV map ----
org_col = COLS["orgunit_source"]
if org_col not in raw.columns:
    raise ValueError(f"Expected column '{org_col}' not found. Present columns: {list(raw.columns)}")

_raw_norm = raw[org_col].astype(str).map(_normalize_key)
raw["orgUnit_resolved"] = _raw_norm.map(_orgunit_map)

missing_mask = raw["orgUnit_resolved"].isna()
if missing_mask.any():
    missing_regions = sorted(set(raw.loc[missing_mask, org_col].astype(str).str.strip()))
    if STRICT_ORGUNIT_MATCH:
        raise KeyError(
            f"{len(missing_regions)} region_name value(s) not found in {ORGUNIT_MAP_CSV}: {missing_regions}"
        )
    else:
        raw.loc[missing_mask, "orgUnit_resolved"] = raw.loc[missing_mask, org_col]

print("OrgUnit mapping complete.")
print("  Resolved:", (~missing_mask).sum(), "rows")
print("  Unresolved:", missing_mask.sum(), "rows")

print("\nPreparing display...")
display(raw.head(5)[ [c for c in (REQUIRED_BASE_COLS + ["days"]) if c in raw.columns] + ["orgUnit_resolved", "__source_file", "__source_year_folder"] ])

print("\nTask completed successfull.")

### Step 4: Filter to ICD-10 J-codes

In [7]:
##  --- Step 4: Filter to ICD-10 J-codes

print(f"Filtering J-codes from {len(raw):,} records. Please wait...")
icd_col = COLS["icd"]
if icd_col not in raw.columns:
    raise ValueError(f"ICD column '{icd_col}' not in CSV! Columns available: {list(raw.columns)}")

# Normalize ICD to uppercase and strip spaces
raw[icd_col] = raw[icd_col].astype(str).str.upper().str.strip()

# Keep only J-codes (J00â€“J99.* etc.)
j_mask = raw[icd_col].str.match(r"^J\d{2}(\.\d+)?$")  # adjust if your data uses different patterns
filtered = raw.loc[j_mask].copy()

print(f"Filtered J-codes: {len(filtered):,} rows (of {len(raw):,})")
filtered[[icd_col]].sample(min(5, len(filtered)))


Filtering J-codes from 351,019 records. Please wait...
Filtered J-codes: 17,655 rows (of 351,019)


Unnamed: 0,icd
12652,J45
269224,J44.9
312455,J44
255236,J45.9
309759,J03.9


### Step 5 â€” Parse dates & resolve Organization Units

In [8]:
# ===== Step 5 â€” Parse dates to ISO (eventDate) =====

print("Parsing dates & resolving organization units. Please wait...")
date_col = COLS["event_date"]            # 'admission_date' per Step 1
org_col  = COLS["orgunit_source"]        # 'region_name'

# Work on a copy
filtered = filtered.copy()

# Parse admission/discharge dates (keep originals)
filtered.loc[:, "eventDate_iso"] = pd.to_datetime(
    filtered[date_col], errors="coerce", infer_datetime_format=True
).dt.date.astype("string")

# Optional: also normalize discharge_date if you're storing it as a DE
if "discharge_date" in filtered.columns:
    filtered.loc[:, "discharge_date_iso"] = pd.to_datetime(
        filtered["discharge_date"], errors="coerce", infer_datetime_format=True
    ).dt.date.astype("string")

# Final orgUnit is already in raw (Step 3) as orgUnit_resolved; keep it here too
if "orgUnit_resolved" not in filtered.columns:
    raise ValueError("Expected 'orgUnit_resolved' from Step 3 is missing.")

print("Parsing dates & resolving organization units completed.")

Parsing dates & resolving organization units. Please wait...
Parsing dates & resolving organization units completed.


### Step 6 â€” Build DHIS2 Event JSON payload

In [10]:
# ===== Step 6 â€” Build Event JSON =====

print("Preparing JSON payload. Please wait...")

def row_to_event(row):
    dvs = []
    for csv_key, de_uid in DE_MAP.items():
        val = row.get(csv_key, row.get(COLS.get(csv_key, csv_key)))
        if csv_key == "sex":
            val = row.get("sex_code", val)  # normalized option code
        if csv_key == "discharge_date" and "discharge_date_iso" in row:
            val = row.get("discharge_date_iso", val)
        if pd.isna(val):
            continue
        dvs.append({"dataElement": de_uid, "value": str(val)})

    ev = {
        "event": row["event_client_uid"],                 # <<â€”â€” stable client UID
        "program": PROGRAM_ID,
        "programStage": PROGRAM_STAGE_ID,
        "orgUnit": row["orgUnit_resolved"],
        "eventDate": row["eventDate_iso"],
        "status": DEFAULT_EVENT_STATUS,
        "storedBy": STORED_BY,
        "dataValues": dvs,
    }
    if DEFAULT_EVENT_STATUS == "COMPLETED" and pd.notna(row.get("eventDate_iso")):
        ev["completedDate"] = str(row["eventDate_iso"])
    return ev


events = []
bad_rows = 0
for _, r in filtered.iterrows():
    if pd.isna(r.get("orgUnit_resolved")) or pd.isna(r.get("eventDate_iso")):
        bad_rows += 1
        continue
    events.append(row_to_event(r))

print(f"Built {len(events):,} event(s). Skipped {bad_rows:,} row(s) with missing orgUnit/date.")
print(json.dumps(events[:1], indent=2)[:800])


Preparing JSON payload. Please wait...
Built 17,654 event(s). Skipped 1 row(s) with missing orgUnit/date.
[
  {
    "event": "K1XxGyOtTGM",
    "program": "FG1oe4dXuRQ",
    "programStage": "fhBQ2bdH9Vi",
    "orgUnit": "tmYbVNQz6uy",
    "eventDate": "2020-03-18",
    "status": "COMPLETED",
    "storedBy": "climate-tools-notebook",
    "dataValues": [
      {
        "dataElement": "p7x5goMW59L",
        "value": "Male"
      },
      {
        "dataElement": "IJwdAM6XTaL",
        "value": "0.4167"
      },
      {
        "dataElement": "mv9BXIKbqyB",
        "value": "live"
      },
      {
        "dataElement": "lmwfGS3XTD9",
        "value": "14.0"
      },
      {
        "dataElement": "VZrKVPTEwyI",
        "value": "J21.9"
      },
      {
        "dataElement": "FvHnVblNelg",
        "value": "2020-04-01"
      }
    ],
    "completedDate": "2020-03-18"
  }
]


### Step 7: Import events in clusters

In [11]:
# --- Step 7 â€” Import in clusters (request_timeout=120 + timing/ETA) ---

import json, os, time, requests
import pandas as pd
from datetime import timedelta

# ---------- Inspect a batch response & show conflicts; optionally save failed rows ----------
def summarize_import_response(code, out):
    """Return (totals, conflicts_df) from a DHIS2 /api/events response."""
    totals = {"imported": 0, "updated": 0, "deleted": 0, "ignored": 0}
    rows = []

    resp = (out or {}).get("response") or {}
    summaries = resp.get("importSummaries") or []

    for idx, s in enumerate(summaries):
        ic = s.get("importCount", {}) or {}
        for k in totals:
            totals[k] += int(ic.get(k, 0))
        for c in (s.get("conflicts") or []):
            rows.append({
                "event_idx_in_batch": idx,
                "object": c.get("object"),
                "message": c.get("value")
            })

    df = pd.DataFrame(rows)
    return totals, df

def show_batch_details(batch_index, start_idx, end_idx, batch, code, out, save_failed=True, base_dir="/mnt/data/dhis2_batch_diagnostics"):
    """Pretty-print batch result, top conflicts, and save failed rows for re-run."""
    status = (out or {}).get("status", out.get("httpStatusCode", code) if isinstance(out, dict) else code)
    totals, conflicts_df = summarize_import_response(code, out)

    print(f"\nBatch {batch_index} ({start_idx}-{end_idx}) HTTP {code} ({status})")
    print("Import counts:", totals)

    if not conflicts_df.empty:
        print("\nTop conflict messages:")
        display(conflicts_df["message"].value_counts().head(15))
        print("\nFirst conflicts:")
        display(conflicts_df.head(20))

    if save_failed:
        os.makedirs(base_dir, exist_ok=True)
        diag_path = os.path.join(base_dir, f"batch_{batch_index:04d}_response.json")
        with open(diag_path, "w", encoding="utf-8") as f:
            json.dump(out, f, ensure_ascii=False, indent=2)

        # Identify failed rows
        failed_indices = []
        resp = (out or {}).get("response") or {}
        summaries = resp.get("importSummaries") or []
        for i, s in enumerate(summaries):
            if (s.get("status") or "").upper() in {"ERROR", "WARNING"}:
                failed_indices.append(i)

        if failed_indices:
            failed_events = [batch[i] for i in failed_indices if i < len(batch)]
            failed_path = os.path.join(base_dir, f"batch_{batch_index:04d}_failed_events.json")
            with open(failed_path, "w", encoding="utf-8") as f:
                json.dump({"events": failed_events}, f, ensure_ascii=False, indent=2)
            print(f"\nSaved diagnostics to:\n  - {diag_path}\n  - {failed_path}  (failed {len(failed_events)} / {len(batch)})")
        else:
            print(f"\nSaved diagnostics to:\n  - {diag_path}  (no per-event failures listed)")

# ---------- Ensure session exists ----------
if "session" not in globals():
    session = requests.Session()
    session.verify = VERIFY_SSL
    session.auth = (DHIS2_USERNAME, DHIS2_PASSWORD)
    session.headers.update({"Content-Type": "application/json"})

# HTTP timeout (seconds) for each POST
request_timeout = 120

def chunks(lst, n):
    """Yield successive n-sized chunks from lst."""
    for i in range(0, len(lst), n):
        yield lst[i:i+n]

def import_batch(batch, dry_run=True):
    """POST /api/events with optional dryRun and request timeout."""
    url = f"{DHIS2_BASE_URL}/api/events"
    params = {"skipFirst": "false", "dryRun": str(dry_run).lower()}
    payload = {"events": batch}
    r = session.post(url, params=params, data=json.dumps(payload), timeout=request_timeout)
    try:
        out = r.json()
    except Exception:
        out = {"status_code": r.status_code, "text": r.text[:500]}
    return r.status_code, out

# Optional: tiny conflict probe
def dry_run_debug(n=25):
    sample = events[:n]
    code, out = import_batch(sample, dry_run=True)
    print("HTTP:", code, "| status:", out.get("status"), "| msg:", out.get("message"))
    summaries = (out.get("response") or {}).get("importSummaries", []) or []
    rows = []
    for i, s in enumerate(summaries):
        for c in s.get("conflicts", []) or []:
            rows.append({"event_idx": i, "object": c.get("object"), "value": c.get("value")})
    if rows:
        df = pd.DataFrame(rows)
        display(df.head(50))
        print("\nTop conflict messages:")
        display(df["value"].value_counts().head(20))
    else:
        print("No conflicts returned.")
    return code, out

def fmt_hms(seconds: float) -> str:
    return str(timedelta(seconds=int(seconds)))

# ---------- 7a) Dry-run a representative sample (up to 1000) ----------
print("Attempting dry-run first. Please wait...")
if DRY_RUN_FIRST and events:
    sample = list(chunks(events, min(BATCH_SIZE, 1000)))[0]
    t0 = time.perf_counter()
    code, out = import_batch(sample, dry_run=True)
    dt = time.perf_counter() - t0
    print("Dry-run HTTP:", code, "| time:", fmt_hms(dt))
    print(json.dumps(out, indent=2)[:1500])

# _ = dry_run_debug(30)  # uncomment if you want a tiny conflict probe

# ---------- 7b) Actual import with per-batch timing, throughput, ETA, total time ----------
import_results = []
if events and (not DRY_RUN_FIRST or input("Proceed with real import? (y/N) ").strip().lower() == "y"):
    total = len(events)
    overall_start = time.perf_counter()
    processed = 0

    for i, batch in enumerate(chunks(events, BATCH_SIZE), start=1):
        start_idx = (i - 1) * BATCH_SIZE + 1
        end_idx   = min(i * BATCH_SIZE, total)

        batch_start = time.perf_counter()
        code, out = import_batch(batch, dry_run=False)
        batch_dt = time.perf_counter() - batch_start

        status = out.get("status", out.get("httpStatusCode", code))
        processed += len(batch)

        # Throughput + ETA
        overall_dt = time.perf_counter() - overall_start
        eps = processed / overall_dt if overall_dt > 0 else 0.0
        remaining = total - processed
        eta_sec = remaining / eps if eps > 0 else 0.0

        print(
            f"Batch {i}: {start_idx}-{end_idx} of {total} events â†’ "
            f"HTTP {code} ({status}) | batch {fmt_hms(batch_dt)} | "
            f"avg {eps:.1f} ev/s | ETA {fmt_hms(eta_sec)}"
        )

        # inside your Step 7 loop after a successful batch:
        with open("/mnt/data/import_checkpoint.txt", "w") as f:
            f.write(str(end_idx))  # last 1-based event index processed


        # On error/conflict, show details and save failed rows
        if code >= 400 or str(status).upper() in {"ERROR", "CONFLICT"}:
            show_batch_details(i, start_idx, end_idx, batch, code, out, save_failed=True)

        import_results.append((i, start_idx, end_idx, len(batch), code, status, batch_dt))

    overall_dt = time.perf_counter() - overall_start
    print(f"\nAll done. Total time: {fmt_hms(overall_dt)} "
          f"({total} events, avg {(total/overall_dt):.1f} ev/s).")
else:
    print("Skipped actual import. You can inspect 'events' or re-run without DRY_RUN_FIRST.")


Attempting dry-run first. Please wait...
Dry-run HTTP: 200 | time: 0:00:02
{
  "httpStatus": "OK",
  "httpStatusCode": 200,
  "status": "OK",
  "message": "Import was successful.",
  "response": {
    "responseType": "ImportSummaries",
    "status": "SUCCESS",
    "imported": 0,
    "updated": 0,
    "deleted": 0,
    "ignored": 0,
    "importOptions": {
      "idSchemes": {},
      "dryRun": true,
      "async": false,
      "importStrategy": "CREATE_AND_UPDATE",
      "mergeMode": "REPLACE",
      "reportMode": "FULL",
      "skipExistingCheck": false,
      "sharing": false,
      "skipNotifications": false,
      "skipAudit": false,
      "datasetAllowsPeriods": false,
      "strictPeriods": false,
      "strictDataElements": false,
      "strictCategoryOptionCombos": false,
      "strictAttributeOptionCombos": false,
      "strictOrganisationUnits": false,
      "strictDataSetApproval": false,
      "strictDataSetLocking": false,
      "strictDataSetInputPeriods": false,
      "re

Proceed with real import? (y/N)  N


Skipped actual import. You can inspect 'events' or re-run without DRY_RUN_FIRST.


### Step 8: Post-import sanity checks

In [13]:

# You can GET a small sample back to verify
def get_recent_events(limit=5):
    url = f"{DHIS2_BASE_URL}/api/events.json"
    params = {
        "program": PROGRAM_ID,
        "pageSize": limit,
        "order": "created:desc",
        "paging": "true"
    }
    r = session.get(url, params=params)
    return r.json()

try:
    sample_events = get_recent_events(5)
    print(json.dumps(sample_events, indent=2)[:1200])
except Exception as e:
    print("Skipping fetch:", e)


{
  "pager": {
    "page": 1,
    "pageSize": 5,
    "isLastPage": false
  },
  "events": [
    {
      "programStage": "fhBQ2bdH9Vi",
      "programType": "WITHOUT_REGISTRATION",
      "orgUnit": "rrPMoH1VER7",
      "program": "FG1oe4dXuRQ",
      "event": "jWERIzfIcEQ",
      "status": "COMPLETED",
      "orgUnitName": "Gampaha RDHS",
      "eventDate": "2020-04-01T00:00:00.000",
      "created": "2025-10-17T03:24:14.143",
      "lastUpdated": "2025-10-17T03:24:14.143",
      "deleted": false,
      "attributeOptionCombo": "HllvX50cXC0",
      "dataValues": [
        {
          "lastUpdated": "2025-10-17T03:24:09.297",
          "created": "2025-10-17T03:24:09.297",
          "dataElement": "IJwdAM6XTaL",
          "value": "1.1667",
          "providedElsewhere": false,
          "createdByUserInfo": {
            "uid": "qtojH98PIy9",
            "firstName": "HISP",
            "surname": "Sri Lanka",
            "username": "hispsl"
          },
          "lastUpdatedByUserInfo

## Notes & Tips
- Ensure your **program** is an *Event* program (not Tracker) and the **program stage** allows eventDate.
- DataElements in `DE_MAP` must belong to the program stage.
- If your org units are by CODE, set `ORGUNIT_ID_SCHEME = 'CODE'` and supply codes in `orgUnit_resolved`.
- If you see `conflicts` in the import summary, check date formats, missing org units, or wrong DE UIDs.
- For performance, keep `BATCH_SIZE` between 200â€“1000 depending on your server.
- If your server has a self-signed certificate, set `VERIFY_SSL=False` (only if you understand the risk).
- To transform columns before mapping (e.g., normalize sex M/F), create new columns and reference them in `DE_MAP`.