In [None]:
import logging
from datetime import datetime
from pathlib import Path

import polars as pl
from responses import _recorder

from openhexa.sdk.workspaces.connection import DHIS2Connection
from openhexa.toolbox.dhis2 import DHIS2, dataframe

In [None]:
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logging.getLogger("openhexa.toolbox.dhis2").setLevel(logging.DEBUG)

In [None]:
url = "http://localhost:8080"
username = "admin"
password = "district"

con = DHIS2Connection(url, username, password)
sle = DHIS2(con)

In [None]:
sle.version

In [None]:
version = "2.39"
responses_dir = Path("responses", "dataframe", version)
responses_dir.mkdir(parents=True, exist_ok=True)

In [None]:
@_recorder.record(file_path=Path(responses_dir, "get_datasets.yaml"))
def test_get_datasets(dhis2: DHIS2):
    df = dataframe.get_datasets(dhis2=dhis2)
    assert len(df) > 20
    expected_schema = pl.Schema(
        {
            "id": pl.String,
            "name": pl.String,
            "organisation_units": pl.List(pl.String),
            "data_elements": pl.List(pl.String),
            "indicators": pl.List(pl.String),
            "period_type": pl.String,
        }
    )
    assert df.schema == expected_schema
    return df


df = test_get_datasets(sle)
df

In [None]:
@_recorder.record(file_path=Path(responses_dir, "get_data_elements.yaml"))
def test_get_data_elements(dhis2: DHIS2):
    df = dataframe.get_data_elements(dhis2=dhis2)
    assert len(df) > 20
    expected_schema = pl.Schema({"id": pl.String, "name": pl.String, "value_type": pl.String})
    assert df.schema == expected_schema
    return df


df = test_get_data_elements(sle)
df

In [None]:
@_recorder.record(file_path=Path(responses_dir, "get_data_element_groups.yaml"))
def test_get_data_element_groups(dhis2: DHIS2):
    df = dataframe.get_data_element_groups(dhis2=dhis2)
    assert len(df) > 20
    expected_schema = pl.Schema({"id": pl.String, "name": pl.String, "data_elements": pl.List(pl.String)})
    assert df.schema == expected_schema
    return df


df = test_get_data_element_groups(sle)
df

In [None]:
print(df)

In [None]:
@_recorder.record(file_path=Path(responses_dir, "get_category_option_combos.yaml"))
def test_get_category_option_combos(dhis2: DHIS2):
    df = dataframe.get_category_option_combos(dhis2=dhis2)
    assert len(df) > 20
    expected_schema = pl.Schema(
        {
            "id": pl.String,
            "name": pl.String,
        }
    )
    assert df.schema == expected_schema
    return df


df = test_get_category_option_combos(sle)
df

In [None]:
@_recorder.record(file_path=Path(responses_dir, "get_organisation_units.yaml"))
def test_get_organisation_units(dhis2: DHIS2):
    df = dataframe.get_organisation_units(dhis2=dhis2, max_level=2)
    assert len(df) > 2
    expected_schema = pl.Schema(
        {
            "id": pl.String,
            "name": pl.String,
            "level": int,
            "level_1_id": pl.String,
            "level_1_name": pl.String,
            "level_2_id": pl.String,
            "level_2_name": pl.String,
            "geometry": pl.String,
        }
    )
    assert df.schema == expected_schema
    return df


df = test_get_organisation_units(sle)
df

In [None]:
@_recorder.record(file_path=Path(responses_dir, "get_organisation_unit_groups.yaml"))
def test_get_organisation_unit_groups(dhis2: DHIS2):
    df = dataframe.get_organisation_unit_groups(dhis2=dhis2)
    assert len(df) > 10
    expected_schema = pl.Schema({"id": pl.String, "name": pl.String, "organisation_units": pl.List(pl.String)})
    assert df.schema == expected_schema
    return df


df = test_get_organisation_unit_groups(sle)
df

In [None]:
@_recorder.record(file_path=Path(responses_dir, "get_organisation_unit_levels.yaml"))
def test_get_organisation_unit_levels(dhis2: DHIS2):
    df = dataframe.get_organisation_unit_levels(dhis2=dhis2)
    assert len(df) == 4
    expected_schema = pl.Schema({"id": pl.String, "name": pl.String, "level": int})
    assert df.schema == expected_schema
    return df


df = test_get_organisation_unit_levels(sle)
df

In [None]:
@_recorder.record(file_path=Path(responses_dir, "extract_dataset.yaml"))
def test_extract_dataset(dhis2: DHIS2):
    df = dataframe.extract_dataset(
        dhis2=sle,
        dataset="BfMAe6Itzgt",
        start_date=datetime(2022, 7, 1),
        end_date=datetime(2022, 8, 1),
        org_units=["DiszpKrYNg8"],
        include_children=False,
    )
    assert len(df) > 20
    expected_schema = pl.Schema(
        {
            "data_element_id": pl.String,
            "period": pl.String,
            "organisation_unit_id": pl.String,
            "category_option_combo_id": pl.String,
            "attribute_option_combo_id": pl.String,
            "value": pl.String,
            "created": pl.Datetime("ms", "UTC"),
            "last_updated": pl.Datetime("ms", "UTC"),
        }
    )
    assert df.schema == expected_schema
    return df


df = test_extract_dataset(sle)
df

In [None]:
pl.Config.set_tbl_width_chars(175)

In [None]:
print(df)

In [None]:
@_recorder.record(file_path=Path(responses_dir, "extract_data_element_group.yaml"))
def test_extract_data_element_group(dhis2: DHIS2):
    df = dataframe.extract_data_element_group(
        sle,
        "h9cuJOkOwY2",
        start_date=datetime(2020, 11, 1),
        end_date=datetime(2021, 2, 5),
        org_units="jPidqyo7cpF",
        include_children=True,
    )
    assert len(df) > 10
    expected_schema = pl.Schema(
        {
            "data_element_id": pl.String,
            "period": pl.String,
            "organisation_unit_id": pl.String,
            "category_option_combo_id": pl.String,
            "attribute_option_combo_id": pl.String,
            "value": pl.String,
            "created": pl.Datetime("ms", "UTC"),
            "last_updated": pl.Datetime("ms", "UTC"),
        }
    )
    assert df.schema == expected_schema
    return df


df = test_extract_data_element_group(sle)
df

In [None]:
@_recorder.record(file_path=Path(responses_dir, "extract_data_elements.yaml"))
def test_extract_data_elements(dhis2: DHIS2):
    df = dataframe.extract_data_elements(
        sle,
        ["pikOziyCXbM", "x3Do5e7g4Qo"],
        start_date=datetime(2020, 11, 1),
        end_date=datetime(2021, 2, 5),
        org_units=["vELbGdEphPd", "UugO8xDeLQD"],
    )
    assert len(df) > 5
    expected_schema = pl.Schema(
        {
            "data_element_id": pl.String,
            "period": pl.String,
            "organisation_unit_id": pl.String,
            "category_option_combo_id": pl.String,
            "attribute_option_combo_id": pl.String,
            "value": pl.String,
            "created": pl.Datetime("ms", "UTC"),
            "last_updated": pl.Datetime("ms", "UTC"),
        }
    )
    assert df.schema == expected_schema
    assert df.null_count().sum_horizontal().item() == 0
    return df


df = test_extract_data_elements(sle)
df

In [None]:
@_recorder.record(file_path=Path(responses_dir, "extract_analytics.yaml"))
def test_extract_analytics(dhis2: DHIS2):
    df = dataframe.extract_analytics(
        sle, periods=["2021"], data_elements=["pikOziyCXbM", "x3Do5e7g4Qo"], org_unit_levels=[2]
    )
    assert len(df) > 50
    expected_schema = pl.Schema(
        {
            "data_element_id": pl.String,
            "category_option_combo_id": pl.String,
            "organisation_unit_id": pl.String,
            "period": pl.String,
            "value": pl.String,
        }
    )
    assert df.schema == expected_schema
    assert df.null_count().sum_horizontal().item() == 0
    return df


df = test_extract_analytics(sle)
df

In [None]:
@_recorder.record(file_path=Path(responses_dir, "import_data_values.yaml"))
def test_import_data_values(dhis2: DHIS2):
    df = pl.DataFrame(
        [
            {
                "data_element_id": "pikOziyCXbM",
                "period": "202401",
                "organisation_unit_id": "O6uvpzGd5pu",
                "category_option_combo_id": "psbwp3CQEhs",
                "attribute_option_combo_id": "HllvX50cXC0",
                "value": "100",
            },
            {
                "data_element_id": "pikOziyCXbM",
                "period": "202402",
                "organisation_unit_id": "O6uvpzGd5pu",
                "category_option_combo_id": "psbwp3CQEhs",
                "attribute_option_combo_id": "HllvX50cXC0",
                "value": "150",
            },
        ]
    )

    report = dataframe.import_data_values(sle, data=df, import_strategy="CREATE_AND_UPDATE", dry_run=True)

    for status in ("imported", "updated", "ignored", "deleted"):
        assert report[status] >= 0

    return report


r = test_import_data_values(sle)
r

In [None]:
@_recorder.record(file_path=Path(responses_dir, "import_data_values_with_mapping.yaml"))
def test_import_data_values_with_mapping(dhis2: DHIS2):
    df = pl.DataFrame(
        [
            {
                "data_element_id": "yyy",
                "period": "202401",
                "organisation_unit_id": "xxx",
                "category_option_combo_id": "psbwp3CQEhs",
                "attribute_option_combo_id": "HllvX50cXC0",
                "value": "100",
            },
            {
                "data_element_id": "yyy",
                "period": "202402",
                "organisation_unit_id": "xxx",
                "category_option_combo_id": "psbwp3CQEhs",
                "attribute_option_combo_id": "HllvX50cXC0",
                "value": "150",
            },
        ]
    )

    org_units_mapping = {"xxx": "O6uvpzGd5pu"}

    data_elements_mapping = {"yyy": "pikOziyCXbM"}

    report = dataframe.import_data_values(
        sle,
        data=df,
        org_units_mapping=org_units_mapping,
        data_elements_mapping=data_elements_mapping,
        import_strategy="CREATE_AND_UPDATE",
        dry_run=True,
    )

    for status in ("imported", "updated", "ignored", "deleted"):
        assert report[status] >= 0

    return report


r = test_import_data_values_with_mapping(sle)
r

In [None]:
@_recorder.record(file_path=Path(responses_dir, "extract_events.yaml"))
def test_extract_events(dhis2: DHIS2):
    df = dataframe.extract_events(dhis2=dhis2, program_id="lxAQ7Zs9VYR", org_unit_parents=["ImspTQPwCqd"])
    assert len(df) > 0
    assert df.null_count().sum_horizontal().item() == 0
    return df


r = test_extract_events(sle)
r