# Set up

## Check configuration
Should return path to correct python version (from virtual environment)

In [17]:
import sys
print(sys.executable)
# print('\n'.join(sys.path[:6]))

c:\Users\WB499754\wb-projects\my_packages\tidysdmx\.venv\Scripts\python.exe


## Load libraries

In [28]:
# Automatically reload modules before execution of each cell
# so when you edit src/mypackage/*.py in your editor and rerun cells, 
# changes appear immediately.
%reload_ext autoreload
%autoreload 2

# python
from __future__ import annotations

# Standard library
from pathlib import Path

# Third-party
from pysdmx.model import FixedValueMap, ImplicitComponentMap, ValueMap, MultiValueMap, ComponentMap
from openpyxl import Workbook, load_workbook
import pandas as pd
import pysdmx as px
import pickle as pkl
from datetime import datetime


# Custom
## Functions
from tidysdmx import (
    filter_tidy_raw, 
    validate_dataset_local, 
    map_structures, 
    infer_schema, 
    infer_role_dimension, 
    apply_fixed_value_maps, 
    apply_implicit_component_maps, 
    build_date_pattern_map,
    build_value_map_list,
    build_multi_value_map_list,
    build_representation_map,
    build_single_component_map,
    extract_component_ids,
    write_excel_mapping_template,
    build_structure_map,
    create_schema_from_table
)


## Define globals

In [40]:
# CAUTION! FOR TESTING ONLY. DO NOT USE IN PRODUCTION.
# os.environ["PYTHONHTTPSVERIFY"] = "0"

# FMR and artefacts information
fmr_url = "https://fmrqa.worldbank.org/FMR/sdmx/v2"
# raw schema
raw_structure_agency = "WB"
raw_structure_id = "IFPRI_ASTI"
raw_structure_version = "1.0"
# dissemination schema
dis_structure_agency = "WB.DATA360"
dis_structure_id = "DS_DATA360"
dis_structure_version = "1.3"
# structure map
# raw_structure_map = "SM_IFPRI_ASTI_TO_DATA360"

# Path to raw data
path_to_raw_data = Path(
    "./data/ifpri_asti_raw.csv"
)
path_to_xlsx_mapping = Path(
    "../../notebooks/tidysdmx/data/pipeline-iterative-development-mapping.xlsx"
)

## Initiate API client

In [4]:
print(fmr_url)
client = px.api.fmr.RegistryClient(fmr_url)
client

https://fmrqa.worldbank.org/FMR/sdmx/v2


<pysdmx.api.fmr.RegistryClient at 0x25733c42750>

# STEP 1 - Load raw data

Here we are loading the raw dataset as provided from the source. In this demonstration notebook, the raw data is simply being loaded from file, but in the final pipeline, the provenance of the file should be fully documented in a configuration file, and read from the source / DDH possible.

In [17]:
raw_df = pd.read_csv(path_to_raw_data)
raw_df.head()

Unnamed: 0,indicator,data.1992,data.1982,data.2020,data.2021,data.2002,data.1993,data.2008,data.1981,data.1988,...,data.1995,data.1986,data.2006,data.1997,data.2011,data.2010,data.2009,data.2012,note,country
0,EXP.TOT.CONSTLCU.FTE,49.960671,12.158121,93.855539,74.409939,62.597604,51.881694,81.089028,14.461498,50.688552,...,53.792651,44.949361,63.468616,57.057503,95.01979,96.631907,104.522798,107.526537,,GHA
1,EXP.TOT.PPP.FTE,71.660959,17.43897,134.621449,106.729704,89.786711,74.416373,116.309838,20.742812,72.704993,...,77.157349,64.473,91.036045,81.840282,136.291391,138.603725,149.922006,154.23041,,GHA
2,EXP.TOT.USD.FTE,33.04605,8.041883,62.079928,49.217806,41.404639,34.316694,53.635631,9.565432,33.527501,...,35.58068,29.731363,41.980762,37.740188,62.850012,63.916332,69.135694,71.12249,,GHA
3,EXP.TOT.ARI.AGGDP,0.519452,0.148756,0.531584,0.384688,0.547649,0.623805,0.594738,0.177894,0.564879,...,0.572733,0.575812,0.550298,0.60406,0.671292,0.657093,0.693788,0.743191,,GHA
4,EXP.SALARIES.TOT.SHRE,,,,,,,,,,...,,,,,79.408495,76.128,81.465618,75.741593,Shares are based on data for CSIR only.,GHA


# STEP 2: Reshape raw data

A critical step of this opinionated pipeline framework is to systematically reshape raw into tidy format (one observation per row). For more information about tidy data, please refer to [Hadley Wickam's original paper](https://vita.had.co.nz/papers/tidy-data.pdf). 

This step is critical because once data has been reshaped into a tidy format, the rest of the pipeline can be fully standradized, bringing immediate maintenance, scalability, and insititutional knowledge benefits. 

This is also a good place to implement minimal data cleaning if necessary.

In [18]:
def reshape_raw_data(df: pd.DataFrame) -> pd.DataFrame:
    """Reshape raw data and implements basic data cleaning.

    It 'melts' (unpivots) columns starting with 'data.' into two columns ('name' and 'value'),
    and then cleans the 'name' column by removing the 'data.' prefix.

    Args:
        df: The input pandas DataFrame containing columns like 'data.1', 'data.2', etc.

    Returns:
        A new DataFrame in the longer format.
    """
    # 1. Equivalent of R's pivot_longer (using melt)
    # Selects columns starting with 'data.' for unpivoting
    data_cols = df.filter(like='data.').columns.tolist()

    df_lg = df.melt(
        id_vars=[col for col in df.columns if col not in data_cols], # Keep all non-data columns as identifier variables
        var_name='TIME_PERIOD',    # New column for the original column names
        value_name='OBS_VALUE', # New column for the values
    )

    # 2. Equivalent of R's stringr::str_replace
    # Removes the 'data.' prefix from the 'name' column
    df_lg['TIME_PERIOD'] = df_lg['TIME_PERIOD'].str.replace('data.', '', regex=False)

    # 3. Rename columns
    df_lg = df_lg.rename(columns={
        'indicator': "INDICATOR",
        'country': "AREA",
        'note': "NOTE"
    })
    
    return df_lg

tidy_raw_df = reshape_raw_data(raw_df)
tidy_raw_df.head()

Unnamed: 0,INDICATOR,NOTE,AREA,TIME_PERIOD,OBS_VALUE
0,EXP.TOT.CONSTLCU.FTE,,GHA,1992,49.960671
1,EXP.TOT.PPP.FTE,,GHA,1992,71.660959
2,EXP.TOT.USD.FTE,,GHA,1992,33.04605
3,EXP.TOT.ARI.AGGDP,,GHA,1992,0.519452
4,EXP.SALARIES.TOT.SHRE,Shares are based on data for CSIR only.,GHA,1992,


# STEP 3: Describe the tidy raw data input

We will describe the tidy raw data input using an SDMX schema. This description will allow for early validation of our input data during subsequent run of the pipelines for data updates. 

The `create_schema_from_table()` helper function allows pipeline developers to create pysdmx schema object automatically with minimal inputs from the pipeline developers.

In [29]:
tidy_raw_schema=create_schema_from_table(
    tidy_raw_df, 
    dimensions=["INDICATOR", "AREA"],
    time_dimension="TIME_PERIOD", 
    measure="OBS_VALUE")

tidy_raw_schema.components["INDICATOR"]

Component(id='INDICATOR', required=True, role=Role.DIMENSION, concept=Concept(id='INDICATOR', name='INDICATOR', description='Concept inferred from column INDICATOR', dtype=DataType.STRING), local_dtype=DataType.STRING, name='INDICATOR', description='Concept inferred from column INDICATOR', local_codes=Codelist(id='CL_INDICATOR', name='Codelist for INDICATOR', version='1.0', agency='SDMX', items=[Code(id='EXP_CAP_TOT_SHRE', name='EXP.CAP.TOT.SHRE'), Code(id='EXP_OPERAT_TOT_SHRE', name='EXP.OPERAT.TOT.SHRE'), Code(id='EXP_SALARIES_TOT_SHRE', name='EXP.SALARIES.TOT.SHRE'), Code(id='EXP_TOT_ARI_AGGDP', name='EXP.TOT.ARI.AGGDP'), Code(id='EXP_TOT_CONSTLCU_FTE', name='EXP.TOT.CONSTLCU.FTE'), Code(id='EXP_TOT_PPP_FTE', name='EXP.TOT.PPP.FTE'), Code(id='EXP_TOT_USD_FTE', name='EXP.TOT.USD.FTE'), Code(id='RES_31_40_TOT_SHRE', name='RES.31_40.TOT.SHRE'), Code(id='RES_41_50_TOT_SHRE', name='RES.41_50.TOT.SHRE'), Code(id='RES_51_60_TOT_SHRE', name='RES.51_60.TOT.SHRE'), Code(id='RES_BSC_TOT_FTE', 

# STEP 4: Filter out unnecessary rows

In [38]:
def apply_constraints(df: pd.DataFrame, constraints: Dict[str, List]) -> pd.DataFrame:
    """Filters a DataFrame based on a dictionary of column names and valid values.
    
    Args:
        df (pd.DataFrame): The source dataframe.
        constraints (dict): A dict where keys are column names and values are 
                        lists of valid entries to keep (e.g., {'col': ['val1', 'val2']}).
                        
    Returns:
        pd.DataFrame: A filtered copy of the original dataframe.
    """
    for column, valid_values in constraints.items():
        # strict check: ensure column exists to avoid KeyErrors
        if column in df.columns:
            df = df[df[column].isin(valid_values)]
        else:
            print(f"Warning: Column '{column}' not in DataFrame. Skipping.")
            
    return df


constraints = {
    "INDICATOR": ["RES.FEMALE.TOT.FTE", "RES.MALE.TOT.FTE", "RES.TOT.FTE"]#,
    # "TIME_PERIOD": ["1992"],
    # "AREA": ["GHA"]
}


tidy_raw_df=apply_constraints(tidy_raw_df, constraints)

# STEP 5: Create structure map (Iterative process)

## Fetch dissemination schema

In [None]:
dis_schema = client.get_schema("datastructure", agency=dis_structure_agency, id=dis_structure_id, version=dis_structure_version)
dis_schema

## Create map template from dissemination schema

In [None]:
comp_ids=extract_component_ids(dis_schema)

write_excel_mapping_template(
    components=comp_ids, 
    rep_maps=["SEX", 'URBANISATION'], 
    output_path=path_to_xlsx_mapping)

WindowsPath('../../notebooks/tidysdmx/data/pipeline-iterative-development-mapping.xlsx')

# Open mapping template and start editing it

This is where the bulk of the iterative process takes: The mapping file will need to be edited until the mapping and validations are fully successful

In [42]:
os.startfile(path_to_xlsx_mapping)

# Map data to dissemination schema

## Load edited mapping template as pysdmx schema

In [None]:
wb=load_workbook(path_to_xlsx_mapping)
sm=build_structure_map(wb)
sm.maps

## Implement mapping

In [45]:
out = map_structures(df = tidy_raw_df, structure_map = sm)

# STEP 6: Final validation

In [46]:
dis_errors = validate_dataset_local(df = out, schema = dis_schema, sdmx_cols=[])
dis_errors

Unnamed: 0,Validation,Error
0,columns,Found unexpected column: NOTE
1,codelist_ids,Invalid values found in column 'INDICATOR': ['...
2,missing_values,Missing values found in mandatory columns:\n ...
