# Task 1 — Data Exploration & Schema Validation (Ethiopia Financial Inclusion)

## Objective
This notebook documents **Task 1** of the pipeline:
1. Load the unified dataset (observations + events + impact links + targets)
2. Validate the minimum schema and key integrity rules
3. Produce EDA artifacts (counts, temporal coverage, indicator coverage)
4. Confirm **temporal isolation** so future targets do not leak into historical summaries
5. Confirm **referential integrity** for the causal layer (impact links correctly reference events)

## Key Deliverables
- `counts__record_type.csv`, `counts__pillar.csv` (and optional `counts__category.csv`)
- `temporal_range.csv` and subset summaries:
  - `temporal_range__observations.csv`
  - `temporal_range__events.csv`
  - `temporal_range__targets.csv`
- `indicator_coverage.csv`
- `events.csv`, `impact_links.csv`
- diagnostics (only if issues found):
  - `invalid_events_with_pillar.csv`
  - `invalid_impact_links_missing_parent.csv`
  - `invalid_impact_links_unresolved_parent.csv`


## Reproducibility Notes
- The project follows a modular structure (`src/fi/...`) and scripts/notebooks should import from `src`.
- Outputs are written to a dedicated directory to keep raw vs processed vs diagnostics separated.
- We enforce schema and referential integrity **before** any modeling to avoid silent failures later.

## Assumptions
- `record_type` includes: `observation`, `event`, `impact_link`, `target`
- `record_id` is unique per row
- `impact_link.parent_id` must reference an existing `event.record_id`
- Events should have empty/NA pillar fields (pillar belongs to measured indicators, not events)


In [5]:
from pathlib import Path
import sys
import pandas as pd

# Find repo root by walking upward until we see a "src" folder
CWD = Path.cwd().resolve()
ROOT = next((p for p in [CWD, *CWD.parents] if (p / "src").is_dir()), None)

if ROOT is None:
    raise RuntimeError(f"Could not find project root containing 'src' starting from: {CWD}")

if str(ROOT) not in sys.path:
    sys.path.insert(0, str(ROOT))

print("Notebook CWD :", CWD)
print("Project ROOT :", ROOT)

from src.fi.io import load_csv, save_csv
from src.fi.validation import (
    assert_min_schema,
    invalid_events_with_pillar,
    invalid_impact_links_missing_parent,
    invalid_impact_links_unresolved_parent,
)
from src.fi.explore import (
    counts,
    temporal_range,
    indicator_coverage,
    list_events,
    list_impact_links,
)


Notebook CWD : D:\Python\Week10\Forecasting-Financial-Inclusion-in-Ethiopia\notebooks
Project ROOT : D:\Python\Week10\Forecasting-Financial-Inclusion-in-Ethiopia


## Configuration
We load the unified dataset and define an output directory where all Task 1 artifacts will be written.

In [6]:
# Resolve all paths from the repository root (not from notebook CWD)

# Guard against running this cell before ROOT is set (or if ROOT ended up as None)
if "ROOT" not in globals() or ROOT is None:
	CWD = Path.cwd().resolve()
	ROOT = next((p for p in [CWD, *CWD.parents] if (p / "src").is_dir()), None)
	if ROOT is None:
		raise RuntimeError(
			f"Could not find project root containing 'src' starting from: {CWD}. "
			"Run the setup cell that defines ROOT, or open the notebook inside the repo."
		)

IN_PATH = ROOT / "data" / "processed" / "ethiopia_fi_unified_data__enriched.csv"
OUT_DIR = ROOT / "data" / "processed" / "task1_notebook_outputs"
DIAG_DIR = OUT_DIR / "diagnostics"

OUT_DIR.mkdir(parents=True, exist_ok=True)
DIAG_DIR.mkdir(parents=True, exist_ok=True)

print("ROOT   :", ROOT)
print("IN_PATH:", IN_PATH)
print("Exists?:", IN_PATH.exists())
print("OUT_DIR:", OUT_DIR)
print("DIAG_DIR:", DIAG_DIR)
print("DIAG_DIR:", DIAG_DIR)

ROOT   : D:\Python\Week10\Forecasting-Financial-Inclusion-in-Ethiopia
IN_PATH: D:\Python\Week10\Forecasting-Financial-Inclusion-in-Ethiopia\data\processed\ethiopia_fi_unified_data__enriched.csv
Exists?: True
OUT_DIR: D:\Python\Week10\Forecasting-Financial-Inclusion-in-Ethiopia\data\processed\task1_notebook_outputs
DIAG_DIR: D:\Python\Week10\Forecasting-Financial-Inclusion-in-Ethiopia\data\processed\task1_notebook_outputs\diagnostics
DIAG_DIR: D:\Python\Week10\Forecasting-Financial-Inclusion-in-Ethiopia\data\processed\task1_notebook_outputs\diagnostics


## Step 1 — Load the Unified Dataset
The unified dataset contains multiple record types in a single table:
- Observations: historical indicator values
- Events: discrete changes expected to influence indicators
- Impact links: causal mappings from events → indicators (+ magnitude/lag)
- Targets: future goal-state anchors (e.g., 2030 targets)

In [7]:
df = load_csv(str(IN_PATH))
df.shape, df.columns.tolist()[:20]

((43, 35),
 ['record_id',
  'record_type',
  'category',
  'pillar',
  'indicator',
  'indicator_code',
  'indicator_direction',
  'value_numeric',
  'value_text',
  'value_type',
  'unit',
  'observation_date',
  'period_start',
  'period_end',
  'fiscal_year',
  'gender',
  'location',
  'region',
  'source_name',
  'source_type'])

## Step 2 — Validate Minimum Schema
Before any EDA, we validate the presence of required columns and basic structural assumptions.
This prevents downstream errors (e.g., missing `record_type`, missing IDs).

In [8]:
assert_min_schema(df)
"OK: minimum schema present"

'OK: minimum schema present'

## Step 3 — Dataset Composition
We examine how many rows belong to each record type and pillar.
This is used to confirm expected proportions (e.g., many observations, fewer events/targets).

In [9]:
ct_record_type = counts(df, "record_type")
ct_pillar = counts(df, "pillar")

save_csv(ct_record_type, str(OUT_DIR / "counts__record_type.csv"))
save_csv(ct_pillar, str(OUT_DIR / "counts__pillar.csv"))

ct_record_type, ct_pillar

(   record_type   n
 0  observation  30
 1        event  10
 2       target   3,
           pillar   n
 0         ACCESS  16
 1          USAGE  11
 2        (empty)  10
 3         GENDER   5
 4  AFFORDABILITY   1)

In [None]:
if "category" in df.columns:
    ct_category = counts(df, "category")
    save_csv(ct_category, str(OUT_DIR / "counts__category.csv"))
    ct_category.head(30)
else:
    "No category column found"
cov = indicator_coverage(df)
save_csv(cov, str(OUT_DIR / "indicator_coverage.csv"))
cov.head(30)




KeyError: 'event_end_date'

## Step 4 — Temporal Coverage (Overall)
We compute the minimum and maximum dates across the whole dataset.

⚠️ Note: The unified dataset includes **future targets**, which can cause temporal summaries to appear
to extend beyond observed history. Therefore, we also compute **record_type-specific** temporal ranges
to prevent temporal leakage.

In [None]:
tr_all = temporal_range(df, "observation_date")
pd.DataFrame([tr_all]).to_csv(OUT_DIR / "temporal_range.csv", index=False)
tr_all

{'min_date': '2014-12-31',
 'max_date': '2030-12-31',
 'n_parsed': 43,
 'n_total': 43}

## Step 4b — Temporal Isolation by Record Type
We compute temporal ranges separately for:
- Observations (historical / measured)
- Events (policy/product changes)
- Targets (future goal anchors)

This ensures **future targets** do not get mistaken for historical observations during EDA.

In [None]:
def subset(df: pd.DataFrame, record_type: str) -> pd.DataFrame:
    rt = df["record_type"].fillna("").astype(str).str.strip().str.lower()
    return df.loc[rt.eq(record_type)].copy()

df_obs = subset(df, "observation")
df_evt = subset(df, "event")
df_tgt = subset(df, "target")

def save_temporal(name: str, dfx: pd.DataFrame):
    tr = temporal_range(dfx, "observation_date") if len(dfx) else {"min_date": None, "max_date": None, "n_non_null_dates": 0}
    pd.DataFrame([tr]).to_csv(OUT_DIR / f"{name}.csv", index=False)
    return tr

tr_obs = save_temporal("temporal_range__observations", df_obs)
tr_evt = save_temporal("temporal_range__events", df_evt)
tr_tgt = save_temporal("temporal_range__targets", df_tgt)

tr_obs, tr_evt, tr_tgt


({'min_date': '2014-12-31',
  'max_date': '2025-12-31',
  'n_parsed': 30,
  'n_total': 30},
 {'min_date': '2021-05-17',
  'max_date': '2025-12-18',
  'n_parsed': 10,
  'n_total': 10},
 {'min_date': '2025-12-31',
  'max_date': '2030-12-31',
  'n_parsed': 3,
  'n_total': 3})

## Step 5 — Indicator Coverage (Observations Only)
We summarize the number of observed data points per indicator and the date span for each.
This helps identify:
- sparse indicators
- indicators with gaps
- indicators with only anchor points

This table is a key input to Task 2 (S-curve fitting and baseline trend modeling).

In [None]:
cov = indicator_coverage(df)
save_csv(cov, str(OUT_DIR / "indicator_coverage.csv"))
cov.head(30)
cov.shape

(19, 5)

## Step 6 — Events and Impact Links
We extract and save:
- the event table (event_id, names/dates/metadata)
- the impact_link table (which event affects which indicator, with parameters such as lag/magnitude)

These tables are used in Task 3 / causal impact logic and should be validated carefully.

In [None]:
events = list_events(df)
impact_links = list_impact_links(df)

save_csv(events, str(OUT_DIR / "events.csv"))
save_csv(impact_links, str(OUT_DIR / "impact_links.csv"))

events.head(20), impact_links.head(20)


(   record_id        category                               indicator  \
 33  EVT_0001  product_launch                         Telebirr Launch   
 41  EVT_0009          policy                 NFIS-II Strategy Launch   
 34  EVT_0002    market_entry    Safaricom Ethiopia Commercial Launch   
 35  EVT_0003  product_launch                  M-Pesa Ethiopia Launch   
 36  EVT_0004  infrastructure        Fayda Digital ID Program Rollout   
 37  EVT_0005          policy         Foreign Exchange Liberalization   
 38  EVT_0006       milestone     P2P Transaction Count Surpasses ATM   
 39  EVT_0007     partnership            M-Pesa EthSwitch Integration   
 42  EVT_0010         pricing       Safaricom Ethiopia Price Increase   
 40  EVT_0008  infrastructure  EthioPay Instant Payment System Launch   
 
        indicator_code observation_date    source_name confidence  
 33       EVT_TELEBIRR       2021-05-17  Ethio Telecom       high  
 41          EVT_NFIS2       2021-09-01            NBE     

## Step 7 — Referential Integrity (Causal Layer Readiness)

We run three checks:
1) **Events must not carry indicator pillar fields**  
   Events are exogenous drivers; pillars belong to measured indicators.
2) **Impact links must have a `parent_id`**  
   Every impact link row must identify which event it references.
3) **Impact links must resolve to an existing event record**  
   Every `impact_link.parent_id` must match an `event.record_id`.
Any failures are written to `diagnostics/` for debugging.


In [None]:
bad_events = invalid_events_with_pillar(df)
bad_missing_parent = invalid_impact_links_missing_parent(df)
bad_unresolved_parent = invalid_impact_links_unresolved_parent(df)

len(bad_events), len(bad_missing_parent), len(bad_unresolved_parent)


(0, 0, 0)

In [None]:
if len(bad_events) > 0:
    save_csv(bad_events, str(DIAG_DIR / "invalid_events_with_pillar.csv"))

if len(bad_missing_parent) > 0:
    save_csv(bad_missing_parent, str(DIAG_DIR / "invalid_impact_links_missing_parent.csv"))

if len(bad_unresolved_parent) > 0:
    save_csv(bad_unresolved_parent, str(DIAG_DIR / "invalid_impact_links_unresolved_parent.csv"))

"Diagnostics written (only for non-empty failures)."


'Diagnostics written (only for non-empty failures).'

## Step 8 — Interpretation Summary (Report-Ready Notes)

Use the following bullets directly in the report:

- **Dataset size & composition:** confirm the number of observations/events/targets.
- **Temporal coverage:** report observation-only range (prevents target leakage).
- **Indicator coverage:** identify which indicators have sufficient density for trend fitting vs only anchors.
- **Causal layer readiness:** confirm zero unresolved parent IDs and no invalid event pillar fields.


In [None]:
rt_counts = dict(zip(ct_record_type["record_type"], ct_record_type["n"]))

summary = {
    "total_records": len(df),
    "record_type_counts": rt_counts,
    "overall_min_date": tr_all.get("min_date"),
    "overall_max_date": tr_all.get("max_date"),
    "obs_min_date": tr_obs.get("min_date"),
    "obs_max_date": tr_obs.get("max_date"),
    "tgt_min_date": tr_tgt.get("min_date"),
    "tgt_max_date": tr_tgt.get("max_date"),
    "evt_min_date": tr_evt.get("min_date"),
    "evt_max_date": tr_evt.get("max_date"),
    "invalid_events_with_pillar": len(bad_events),
    "impact_links_missing_parent_id": len(bad_missing_parent),
    "impact_links_unresolved_parent_id": len(bad_unresolved_parent),
}

save_csv(pd.DataFrame([summary]), str(OUT_DIR / "data_summary.csv"))
print(summary)
df.shape, df.columns.tolist()[:20]

{'total_records': 43, 'record_type_counts': {'observation': 30, 'event': 10, 'target': 3}, 'overall_min_date': '2014-12-31', 'overall_max_date': '2030-12-31', 'obs_min_date': '2014-12-31', 'obs_max_date': '2025-12-31', 'tgt_min_date': '2025-12-31', 'tgt_max_date': '2030-12-31', 'evt_min_date': '2021-05-17', 'evt_max_date': '2025-12-18', 'invalid_events_with_pillar': 0, 'impact_links_missing_parent_id': 0, 'impact_links_unresolved_parent_id': 0}


((43, 35),
 ['record_id',
  'record_type',
  'category',
  'pillar',
  'indicator',
  'indicator_code',
  'indicator_direction',
  'value_numeric',
  'value_text',
  'value_type',
  'unit',
  'observation_date',
  'period_start',
  'period_end',
  'fiscal_year',
  'gender',
  'location',
  'region',
  'source_name',
  'source_type'])

## Outputs Checklist (what should exist on disk)

In `data/processed/task1_notebook_outputs/`:
- `counts__record_type.csv`
- `counts__pillar.csv`
- (optional) `counts__category.csv`
- `temporal_range.csv`
- `temporal_range__observations.csv`
- `temporal_range__events.csv`
- `temporal_range__targets.csv`
- `indicator_coverage.csv`
- `events.csv`
- `impact_links.csv`

In `data/processed/task1_notebook_outputs/diagnostics/` (only if errors detected):
- `invalid_events_with_pillar.csv`
- `invalid_impact_links_missing_parent.csv`
- `invalid_impact_links_unresolved_parent.csv`