# Extraction de données DHIS2 basée sur un Favori

Ce notebook permet d'extraire les données analytics d'un DHIS2 en utilisant un **favori** (visualization sauvegardée).

Un favori dans DHIS2 contient une configuration prédéfinie :
- Indicateurs ou éléments de données
- Périodes
- Unités d'organisation
- Filtres et dimensions

## 1. Configuration et connexion

In [None]:
from openhexa.sdk import workspace
from openhexa.toolbox.dhis2 import DHIS2
import pandas as pd
import json
import os
from dotenv import load_dotenv
import numpy as np

In [None]:
# Charge les variables du fichier .env
load_dotenv()

url = os.getenv("DHIS2_URL")
username = os.getenv("DHIS2_USER")
password = os.getenv("DHIS2_PASS")

# Connexion sécurisée
dhis = DHIS2(url=url, username=username, password=password)

print(f"✅ Connecté à : {url} en tant que {username}")

## 2. Paramètres

Entrez l'ID du favori à extraire. Vous pouvez trouver cet ID :
- Dans l'URL du favori : `.../#/visualization/FAVORITE_ID`
- Via la recherche dans la cellule suivante

In [None]:
# ID du favori à extraire
# mldsgxAvIIi correspond à SHE Kongo Central
FAVORITE_ID = "ROzCY14OLTE"  # Remplacez par l'ID de votre favori

### 2.1 Rechercher un favori par nom (optionnel)

Si vous ne connaissez pas l'ID, vous pouvez rechercher par nom :

## 3. Récupérer les métadonnées du favori

In [None]:
def get_favorite_metadata(dhis2, favorite_id):
    """Récupère les métadonnées complètes d'un favori."""
    try:
        favorite = dhis2.api.get(
            f"visualizations/{favorite_id}",
            params={
                "fields": "*,dataDimensionItems[*,indicator[id,displayName],dataElement[id,displayName]],"
                          "organisationUnits[id,displayName],periods[*],columns[*],rows[*],filters[*]"
            }
        )
        return favorite
    except Exception as e:
        print(f"Erreur lors de la récupération du favori: {e}")
        return None

In [None]:
# Récupérer les métadonnées
favorite_metadata = get_favorite_metadata(dhis, FAVORITE_ID)

if favorite_metadata:
    print(f"Favori: {favorite_metadata.get('displayName', 'N/A')}")
    print(f"Type: {favorite_metadata.get('type', 'N/A')}")
    print(f"Dernière mise à jour: {favorite_metadata.get('lastUpdated', 'N/A')}")

## 4. Extraire les données analytics du favori

In [None]:
def extract_favorite_data(dhis2, favorite_id):
    """
    Extrait les données analytics d'un favori DHIS2.
    
    Retourne un DataFrame avec les données et les métadonnées.
    """
    try:
        # Méthode 1: Utiliser l'endpoint data du favori
        response = dhis2.api.get(
            f"visualizations/{favorite_id}/data.json"
        )
        
        # Extraire les headers et les rows
        headers = response.get("headers", [])
        rows = response.get("rows", [])
        
        if not rows:
            print("Aucune donnée trouvée pour ce favori.")
            return pd.DataFrame(), response
        
        # Créer le DataFrame
        columns = [h.get("name", h.get("column", f"col_{i}")) for i, h in enumerate(headers)]
        df = pd.DataFrame(rows, columns=columns)
        
        # Extraire les métadonnées pour le mapping des IDs vers les noms
        metadata = response.get("metaData", {})
        
        return df, metadata
        
    except Exception as e:
        print(f"Erreur lors de l'extraction des données: {e}")
        return pd.DataFrame(), {}

In [None]:
# Extraire les données
df_raw, metadata = extract_favorite_data(dhis, FAVORITE_ID)

print(f"Nombre de lignes: {len(df_raw)}")
print(f"Colonnes: {list(df_raw.columns)}")
df_raw.head(10)

## 5. Enrichir les données avec les noms lisibles

In [None]:
def enrich_dataframe_with_names(df, metadata):
    """
    Enrichit le DataFrame en remplaçant les IDs par les noms lisibles.
    """
    if df.empty:
        return df
    
    df_enriched = df.copy()
    
    # Récupérer le mapping des items
    items = metadata.get("items", {})
    
    # Colonnes typiques à enrichir
    dimension_columns = ["dx", "pe", "ou", "co"]  # data, period, orgunit, category option
    
    for col in df_enriched.columns:
        if col in dimension_columns or col.lower() in ["dataelement", "indicator", "orgunit", "period"]:
            # Créer une colonne avec les noms
            name_col = f"{col}_name"
            df_enriched[name_col] = df_enriched[col].apply(
                lambda x: items.get(x, {}).get("name", x) if isinstance(items.get(x), dict) else x
            )
    
    # Convertir la colonne value en numérique si présente
    if "value" in df_enriched.columns:
        df_enriched["value"] = pd.to_numeric(df_enriched["value"], errors="coerce")
    
    return df_enriched

In [None]:
# Enrichir les données
df_enriched = enrich_dataframe_with_names(df_raw, metadata)

print(f"Colonnes enrichies: {list(df_enriched.columns)}")
df_enriched.head(10)

In [None]:
# Copier et supprimer les colonnes inutiles
# Utiliser df_enriched si disponible sinon df_final
if 'df_enriched' in globals():
	target_df = df_enriched
elif 'df_final' in globals():
	target_df = df_final
else:
	raise NameError("")

default_cols = ['Organisation unit ID', 'Organisation unit code', 'Organisation unit description',
				'Reporting month', 'Organisation unit parameter', 'Organisation unit is parent']

# Si une variable columns_to_drop est définie, l'ajouter
if 'columns_to_drop' in globals() and isinstance(columns_to_drop, list) and columns_to_drop:
	cols_to_drop = list(dict.fromkeys(default_cols + columns_to_drop))
else:
	cols_to_drop = default_cols

# Ne supprimer que les colonnes existantes
cols_existing = [c for c in cols_to_drop if c in target_df.columns]
if cols_existing:
	target_df.drop(columns=cols_existing, inplace=True)
else:
	print("Aucune colonne à supprimer trouvée dans le DataFrame cible.")

target_df.head(15)

## 6. Rapport Actuels

In [None]:
# 1. Filtre strict : contient 'actual reports' MAIS PAS 'time'
col_actual_strict = [
    c for c in target_df.columns 
    if 'actual reports' in c.lower() and 'time' not in c.lower()
]

if col_actual_strict:
    # 2. Création du DataFrame de travail
    df_actual = target_df[['Organisation unit'] + col_actual_strict].copy()
    
    # 3. Nettoyage numérique (force les nombres, remplace les vides par 0)
    for col in col_actual_strict:
        df_actual[col] = pd.to_numeric(df_actual[col], errors='coerce').fillna(0)
    
    # 4. Calcul de la somme par ligne
    df_actual['Reports_Actual'] = df_actual[col_actual_strict].sum(axis=1)
    
    # 5. Arrondi à 1 chiffre après la virgule pour tout le tableau
    df_actual = df_actual.round(1)
    
    # 6. Affichage du résultat
    print(f"Analyse basée sur {len(col_actual_strict)} indicateurs de type 'Actual Reports'")
    display(df_actual.head(15))
else:
    print("⚠️ Aucune colonne correspondant aux critères n'a été trouvée.")

## 7. Rapport Attendus

In [None]:
# 1. Filtre strict : contient 'expected reports' MAIS PAS 'time'
col_actual_strict = [
    c for c in target_df.columns 
    if 'expected reports' in c.lower() and 'time' not in c.lower()
]

if col_actual_strict:
    # 2. Création du DataFrame de travail
    df_actual = target_df[['Organisation unit'] + col_actual_strict].copy()
    
    # 3. Nettoyage numérique (force les nombres, remplace les vides par 0)
    for col in col_actual_strict:
        df_actual[col] = pd.to_numeric(df_actual[col], errors='coerce').fillna(0)
    
    # 4. Calcul de la somme par ligne
    df_actual['Reports_Attendu'] = df_actual[col_actual_strict].sum(axis=1)
    
    # 5. Arrondi à 1 chiffre après la virgule pour tout le tableau
    df_actual = df_actual.round(1)
    
    # 6. Affichage du résultat
    print(f"Analyse basée sur {len(col_actual_strict)} indicateurs de type 'Expected Reports'")
    display(df_actual.head(15))
else:
    print("⚠️ Aucune colonne correspondant aux critères n'a été trouvée.")

## 8. Taux de déclaration

In [None]:
# 1. Extraction ciblée : On veut 'reporting rate' MAIS PAS 'on time'
col_rate_uniquement = [
    c for c in target_df.columns 
    if 'reporting rate' in c.lower() and 'on time' not in c.lower()
]

if col_rate_uniquement:
    # 2. Création du DataFrame avec l'unité d'organisation
    df_affichage = target_df[['Organisation unit'] + col_rate_uniquement].copy()
    
    # 3. Nettoyage et arrondi à 1 chiffre après la virgule
    for col in col_rate_uniquement:
        # Conversion en numérique (force les erreurs en NaN puis remplace par 0)
        df_affichage[col] = pd.to_numeric(df_affichage[col], errors='coerce').fillna(0)
        # Application de la règle d'arrondi
        df_affichage[col] = df_affichage[col].round(1)
    
    # 4. Affichage simple (sans calcul de somme)
    print(f"Affichage de {len(col_rate_uniquement)} indicateurs de complétude :")
    display(df_affichage.head(15))
    
else:
    print("⚠️ Aucune colonne de 'reporting rate' (hors 'on time') n'a été trouvée.")

## 9. Complétude Globale

In [None]:
# 1. Détection automatique et filtrage strict (Exclut 'on time')
# On cherche les colonnes 'actual' et 'expected' mais on élimine celles qui contiennent 'time'
actual_cols = [c for c in target_df.columns 
               if 'actual reports' in c.lower() and 'time' not in c.lower()]

expected_cols = [c for c in target_df.columns 
                 if 'expected reports' in c.lower() and 'time' not in c.lower()]

# 2. Création du DataFrame de synthèse
df_synthese = target_df[['Organisation unit']].copy()

# 3. Calculs des sommes avec conversion numérique forcée
# On applique l'arrondi à 1 chiffre après la virgule sur les sommes
df_synthese['Reports_Actual'] = target_df[actual_cols].apply(pd.to_numeric, errors='coerce').fillna(0).sum(axis=1).round(1)
df_synthese['Reports_Attendu'] = target_df[expected_cols].apply(pd.to_numeric, errors='coerce').fillna(0).sum(axis=1).round(1)

# 4. Calcul de la Complétude avec sécurité contre la division par zéro
df_synthese['Complétude_Globale (%)'] = (
    df_synthese['Reports_Actual'] / df_synthese['Reports_Attendu'].replace(0, np.nan)
) * 100

# 5. Application de l'arrondi final à 1 chiffre après la virgule
df_synthese['Complétude_Globale (%)'] = df_synthese['Complétude_Globale (%)'].fillna(0).round(1)

# 6. Affichage final propre
cols_finales = ['Organisation unit', 'Reports_Actual', 'Reports_Attendu', 'Complétude_Globale (%)']
display(df_synthese[cols_finales].head(15))

## 10. Taux de déclaration & Complétude globale

In [None]:
# 1. IDENTIFICATION AUTOMATIQUE DES COLONNES (Filtre strict sans "on time")
col_rate_uniquement = [c for c in target_df.columns 
                       if 'reporting rate' in c.lower() and 'on time' not in c.lower()]

actual_cols = [c for c in target_df.columns 
               if 'actual reports' in c.lower() and 'time' not in c.lower()]

expected_cols = [c for c in target_df.columns 
                 if 'expected reports' in c.lower() and 'time' not in c.lower()]

# 2. CRÉATION DU DATAFRAME DE SYNTHÈSE
# On commence par l'unité d'organisation
df_final = target_df[['Organisation unit']].copy()

# 3. TRAITEMENT DES REPORTING RATES INDIVIDUELS
for col in col_rate_uniquement:
    # Conversion numérique + Arrondi à 1 rang
    df_final[col] = pd.to_numeric(target_df[col], errors='coerce').fillna(0).round(1)

# 4. CALCULS POUR LA COMPLÉTUDE GLOBALE
# Somme des rapports réels (Actual)
df_final['Reports_Actual'] = target_df[actual_cols].apply(pd.to_numeric, errors='coerce').fillna(0).sum(axis=1).round(1)

# Somme des rapports attendus (Attendu)
df_final['Reports_Attendu'] = target_df[expected_cols].apply(pd.to_numeric, errors='coerce').fillna(0).sum(axis=1).round(1)

# Calcul du pourcentage global
df_final['Complétude_Globale (%)'] = (
    df_final['Reports_Actual'] / df_final['Reports_Attendu'].replace(0, np.nan)
) * 100

# Arrondi final du pourcentage global
df_final['Complétude_Globale (%)'] = df_final['Complétude_Globale (%)'].fillna(0).round(1)

# 5. ORGANISATION ET AFFICHAGE DES COLONNES
# On affiche : Unité d'organisation + les taux par programme + la complétude globale
cols_a_afficher = ['Organisation unit'] + col_rate_uniquement + ['Reports_Actual', 'Reports_Attendu', 'Complétude_Globale (%)']

display(df_final[cols_a_afficher].head(15))

## 11. Mise en forme conditionnelle

In [None]:
# --- 1. IDENTIFICATION ET CALCULS ---
col_rate_uniquement = [c for c in target_df.columns if 'reporting rate' in c.lower() and 'on time' not in c.lower()]
actual_cols = [c for c in target_df.columns if 'actual reports' in c.lower() and 'time' not in c.lower()]
expected_cols = [c for c in target_df.columns if 'expected reports' in c.lower() and 'time' not in c.lower()]

df_final = target_df[['Organisation unit']].copy()

# Traitement des taux individuels
for col in col_rate_uniquement:
    df_final[col] = pd.to_numeric(target_df[col], errors='coerce').fillna(0).round(1)

# Somme des rapports (Arrondi à 1 décimale)
df_final['Reports_Actual'] = target_df[actual_cols].apply(pd.to_numeric, errors='coerce').fillna(0).sum(axis=1).round(1)
df_final['Reports_Attendu'] = target_df[expected_cols].apply(pd.to_numeric, errors='coerce').fillna(0).sum(axis=1).round(1)

# Score NB.SI (Programmes >= 95%)
df_final['Nombre des data set complétude >/=95%'] = (df_final[col_rate_uniquement] >= 95).sum(axis=1)

# Complétude Globale
df_final['Complétude_Globale (%)'] = (df_final['Reports_Actual'] / df_final['Reports_Attendu'].replace(0, np.nan)) * 100
df_final['Complétude_Globale (%)'] = df_final['Complétude_Globale (%)'].fillna(0).round(1)

# --- 2. FONCTIONS DE COLORATION ---

def style_taux(val):
    try: val = float(val)
    except: return ''
    if val < 50: return 'color: white; background-color: #FF0000'
    elif 50 <= val <= 69: return 'color: white; background-color: #800000'
    elif 70 <= val <= 79: return 'color: black; background-color: #FFFF00'
    elif 80 <= val <= 95: return 'color: black; background-color: #32CD32'
    elif 96 <= val <= 100: return 'color: white; background-color: #008000'
    return ''

def style_score(val):
    try: val = int(val)
    except: return ''
    if val < 5: return 'color: white; background-color: #800000'
    elif val == 5: return 'color: black; background-color: #FFC0CB'
    elif 6 <= val <= 9: return 'color: black; background-color: #32CD32'
    elif val == 10: return 'color: white; background-color: #008000'
    elif val > 10: return 'color: white; background-color: #004d00'
    return ''

# --- 3. AFFICHAGE FINAL ---

cols_finales = (['Organisation unit'] + col_rate_uniquement + 
                 ['Reports_Actual', 'Reports_Attendu', 'Complétude_Globale (%)', 'Nombre des data set complétude >/=95%'])

# Colonnes à afficher avec une décimale
cols_format_1d = col_rate_uniquement + ['Reports_Actual', 'Reports_Attendu', 'Complétude_Globale (%)']

df_stylise = df_final[cols_finales].style.format({
    col: "{:.1f}" for col in cols_format_1d
}).applymap(style_taux, subset=col_rate_uniquement + ['Complétude_Globale (%)'])\
  .applymap(style_score, subset=['Nombre des data set complétude >/=95%'])

display(df_stylise)

## 12. Export des données de complétude

In [None]:
# 1. On définit le chemin
excel_path = "Rapport_Performance_Synthese.xlsx"

# 2. On exporte directement l'objet stylisé (celui qui a les couleurs)
# On utilise 'openpyxl' comme moteur alternatif
df_stylise.to_excel(excel_path, engine='openpyxl', index=False)

print(f"✅ Données exportées dans : {excel_path}")

In [None]:
# 1. On définit le chemin
csv_path = "Rapport_Performance_Synthese.csv"

# 2. On exporte les données brutes du DataFrame (sans les couleurs, car CSV ne les supporte pas)
# On utilise la propriété .data pour accéder au DataFrame sous-jacent
df_stylise.data.to_csv(csv_path, index=False)

print(f"✅ Données exportées dans : {csv_path}")

## 13. Actual Reports on Time

In [None]:
# 1. Détection automatique et filtrage strict
# On veut uniquement les colonnes qui contiennent 'actual reports on time'
actual_cols = [c for c in target_df.columns 
               if 'actual reports on time' in c.lower()]

# On garde les rapports attendus (qui n'ont généralement pas de mention 'time')
expected_cols = [c for c in target_df.columns 
                 if 'expected reports' in c.lower() and 'time' not in c.lower()]

# 2. Création du DataFrame de synthèse
df_synthese = target_df[['Organisation unit']].copy()

# 3. Calculs des sommes avec conversion numérique forcée
# On calcule le total des rapports reçus à temps
df_synthese['Reports_Actual_on_time'] = target_df[actual_cols].apply(pd.to_numeric, errors='coerce').fillna(0).sum(axis=1).round(1)

# On calcule le total des rapports attendus
df_synthese['Reports_Attendu'] = target_df[expected_cols].apply(pd.to_numeric, errors='coerce').fillna(0).sum(axis=1).round(1)

# 4. Calcul de la Complétude (Promptitude) avec sécurité
df_synthese['Complétude_Globale (%)'] = (
    df_synthese['Reports_Actual_on_time'] / df_synthese['Reports_Attendu'].replace(0, np.nan)
) * 100

# 5. Application de l'arrondi final
df_synthese['Complétude_Globale (%)'] = df_synthese['Complétude_Globale (%)'].fillna(0).round(1)

# 6. Affichage final
cols_finales = ['Organisation unit', 'Reports_Actual_on_time', 'Reports_Attendu', 'Complétude_Globale (%)']
display(df_synthese[cols_finales].head(15))

## 14. Promptitude Actual Report on Time

In [None]:
# 1. Filtre strict : contient 'actual reports' MAIS PAS 'time'
col_actual_strict = [
    c for c in target_df.columns 
    if 'actual reports on time' in c.lower()
]

if col_actual_strict:
    # 2. Création du DataFrame de travail
    df_actual = target_df[['Organisation unit'] + col_actual_strict].copy()
    
    # 3. Nettoyage numérique (force les nombres, remplace les vides par 0)
    for col in col_actual_strict:
        df_actual[col] = pd.to_numeric(df_actual[col], errors='coerce').fillna(0)
    
    # 4. Calcul de la somme par ligne
    df_actual['Reports_Actual_on_time'] = df_actual[col_actual_strict].sum(axis=1)
    
    # 5. Arrondi à 1 chiffre après la virgule pour tout le tableau
    df_actual = df_actual.round(1)
    
    # 6. Affichage du résultat
    print(f"Analyse basée sur {len(col_actual_strict)} indicateurs de type 'Actual Reports on Time'")
    display(df_actual.head(15))
else:
    print("⚠️ Aucune colonne correspondant aux critères n'a été trouvée.")

## 15. Mise en forme conditionnelle de promptitude

In [None]:
# --- 1. IDENTIFICATION ET CALCULS (ON TIME UNIQUEMENT) ---

# Filtrage pour ne garder que le "On Time"
col_rate_uniquement = [c for c in target_df.columns if 'reporting rate' in c.lower() and 'on time' in c.lower()]
actual_cols = [c for c in target_df.columns if 'actual reports' in c.lower() and 'on time' in c.lower()]
expected_cols = [c for c in target_df.columns if 'expected reports' in c.lower() and 'time' not in c.lower()]

df_final = target_df[['Organisation unit']].copy()

# Traitement des taux individuels (Promptitude)
for col in col_rate_uniquement:
    df_final[col] = pd.to_numeric(target_df[col], errors='coerce').fillna(0).round(1)

# Somme des rapports On Time et Attendus (Arrondi à 1 décimale)
df_final['Reports_Actual_On_Time'] = target_df[actual_cols].apply(pd.to_numeric, errors='coerce').fillna(0).sum(axis=1).round(1)
df_final['Reports_Attendu'] = target_df[expected_cols].apply(pd.to_numeric, errors='coerce').fillna(0).sum(axis=1).round(1)

# Score NB.SI (Programmes On Time >= 95%)
nom_col_score = 'Nombre des data set promptitude >/=95%'
df_final[nom_col_score] = (df_final[col_rate_uniquement] >= 95).sum(axis=1)

# Promptitude Globale
df_final['Promptitude_Globale (%)'] = (df_final['Reports_Actual_On_Time'] / df_final['Reports_Attendu'].replace(0, np.nan)) * 100
df_final['Promptitude_Globale (%)'] = df_final['Promptitude_Globale (%)'].fillna(0).round(1)

# --- 2. FONCTIONS DE COLORATION ---

def style_taux(val):
    try: val = float(val)
    except: return ''
    if val < 50: return 'color: white; background-color: #FF0000' # Rouge
    elif 50 <= val <= 69: return 'color: white; background-color: #800000' # Bordeaux
    elif 70 <= val <= 79: return 'color: black; background-color: #FFFF00' # Jaune
    elif 80 <= val <= 95: return 'color: black; background-color: #32CD32' # Vert citron
    elif val > 95: return 'color: white; background-color: #008000' # Vert foncé
    return ''

def style_score(val):
    try: val = int(val)
    except: return ''
    if val < 5: return 'color: white; background-color: #800000'
    elif val == 5: return 'color: black; background-color: #FFC0CB'
    elif 6 <= val <= 9: return 'color: black; background-color: #32CD32'
    elif val == 10: return 'color: white; background-color: #008000'
    elif val > 10: return 'color: white; background-color: #004d00'
    return ''

# --- 3. AFFICHAGE FINAL ---

cols_finales = (['Organisation unit'] + col_rate_uniquement + 
                 ['Reports_Actual_On_Time', 'Reports_Attendu', 'Promptitude_Globale (%)', nom_col_score])

# Colonnes à formater avec une décimale
cols_format_1d = col_rate_uniquement + ['Reports_Actual_On_Time', 'Reports_Attendu', 'Promptitude_Globale (%)']

df_stylise = df_final[cols_finales].style.format({
    col: "{:.1f}" for col in cols_format_1d
}).map(style_taux, subset=col_rate_uniquement + ['Promptitude_Globale (%)'])\
  .map(style_score, subset=[nom_col_score])

display(df_stylise)

## 15. Exportation des données de promptitude

In [None]:
# 1. On définit le chemin
excel_path = "Rapport_Promptitude_Synthese.xlsx"

# 2. On exporte directement l'objet stylisé (celui qui a les couleurs)
# On utilise 'openpyxl' comme moteur alternatif
df_stylise.to_excel(excel_path, engine='openpyxl', index=False)

print(f"✅ Données exportées dans : {excel_path}")

In [None]:
# 1. On définit le chemin
csv_path = "Rapport_Promptitude_Synthese.csv"

# 2. On exporte les données brutes du DataFrame (sans les couleurs, car CSV ne les supporte pas)
# On utilise la propriété .data pour accéder au DataFrame sous-jacent
df_stylise.data.to_csv(csv_path, index=False)

print(f"✅ Données exportées dans : {csv_path}")