In [1]:
import requests
import pandas as pd
import io
from datetime import datetime, timedelta
from urllib.parse import urlparse, parse_qs, urlencode

In [2]:
# Insert DHIS2 username and password
DHIS2_USERNAME = "kamutis"
DHIS2_PASSWORD = "Laptop2012"

In [3]:
original_url = "https://hiskenya.org/api/analytics.csv?dimension=pe%3A2013%3B2014%3B2015%3B2016%3B2017%3B2018%3B2019%3B2020%3B2021%3B2022%3B2023%3B2024&dimension=ou%3AHfVjCurKxh2%3BUSER_ORGUNIT%3BLEVEL-JwTgQwgnl8h&dimension=dx%3ANLKRV7bYbVy%3Bw5ltvnNihEZ%3BMpv8crmnOzy%3BdRhugDCanvB%3BEaR7DO0Fz9g%3BG5AHuOAotQq%3BbPQDV3mud0v%3Bpct05jgGcLt%3BwLbBr9Vsvwo%3Ba73BxXAmC08&tableLayout=true&rows=pe%3Bou%3Bdx&skipRounding=false&completedOnly=false&showHierarchy=true"

parsed_url = urlparse(original_url)
base_url = parsed_url.scheme + "://" + parsed_url.netloc + parsed_url.path
query_params = parse_qs(parsed_url.query)

# Convert query_params values from lists to single strings for easier manipulation,
# especially for 'dimension' which can have multiple instances.
# We'll re-handle 'dimension' specifically later.
cleaned_params = {k: v[0] if len(v) == 1 else v for k, v in query_params.items()}

# Extract existing 'dimension' parameters, excluding the 'pe' one for now
other_dimensions = [d for d in cleaned_params.get('dimension', []) if not d.startswith('pe:')]
if not isinstance(other_dimensions, list): # handle case where there was only one dimension originally
    other_dimensions = [other_dimensions] if 'dimension' in cleaned_params else []

# Remove 'pe' dimension if it exists
if 'dimension' in cleaned_params:
    if isinstance(cleaned_params['dimension'], list):
        cleaned_params['dimension'] = [d for d in cleaned_params['dimension'] if not d.startswith('pe:')]
    else: # single dimension string
        if cleaned_params['dimension'].startswith('pe:'):
            del cleaned_params['dimension']

# Clean up other parameters that are not 'dimension'
final_params = {k: v for k, v in cleaned_params.items() if k != 'dimension'}

In [4]:
# Add the new period dimension and the other existing dimensions
final_params_list = []
for k, v in final_params.items():
    if isinstance(v, list):
        for item in v:
            final_params_list.append((k, item))
    else:
        final_params_list.append((k, v))

# Define the period dimension value - using 'pe:202501' from the original URL
start_date = datetime(2013, 1, 1) # CHANGED THIS LINE TO START IN JANUARY 2019
end_date = datetime(2024, 12, 31)

periods = []
current_date = start_date
while current_date <= end_date:
    periods.append(current_date.strftime("%Y%m"))
    # Move to the next month
    if current_date.month == 12:
        current_date = datetime(current_date.year + 1, 1, 1)
    else:
        current_date = datetime(current_date.year, current_date.month + 1, 1)

period_dimension_value = "pe:" + ";".join(periods)
print(f"Generated period dimension: {period_dimension_value}")

# Add the specific dimension for periods
final_params_list.append(('dimension', period_dimension_value))

# Add the other original dimensions (dx, ou, etc.)
for dim in other_dimensions:
    final_params_list.append(('dimension', dim))

# Re-encode the query parameters
new_query_string = urlencode(final_params_list, doseq=True) # doseq=True handles multiple 'dimension' keys

full_request_url = f"{base_url}?{new_query_string}"
print(f"\nFull Request URL:\n{full_request_url}\n")

try:
    response = requests.get(full_request_url, auth=(DHIS2_USERNAME, DHIS2_PASSWORD))
    response.raise_for_status() # Raise an exception for HTTP errors (4xx or 5xx)

    # Read the content into a pandas DataFrame
    data_df = pd.read_csv(io.StringIO(response.text))

    print("Data fetched successfully!")
    print("\nFirst 5 rows of the DataFrame:")
    print(data_df.head())

    print(f"\nDataFrame shape: {data_df.shape}")

except requests.exceptions.HTTPError as err:
    print(f"HTTP error occurred: {err}")
    print(f"Response Content:\n{response.text}")
except requests.exceptions.ConnectionError as err:
    print(f"Connection error occurred: {err}")
except requests.exceptions.Timeout as err:
    print(f"Timeout error occurred: {err}")
except requests.exceptions.RequestException as err:
    print(f"An unexpected error occurred: {err}")

Generated period dimension: pe:201301;201302;201303;201304;201305;201306;201307;201308;201309;201310;201311;201312;201401;201402;201403;201404;201405;201406;201407;201408;201409;201410;201411;201412;201501;201502;201503;201504;201505;201506;201507;201508;201509;201510;201511;201512;201601;201602;201603;201604;201605;201606;201607;201608;201609;201610;201611;201612;201701;201702;201703;201704;201705;201706;201707;201708;201709;201710;201711;201712;201801;201802;201803;201804;201805;201806;201807;201808;201809;201810;201811;201812;201901;201902;201903;201904;201905;201906;201907;201908;201909;201910;201911;201912;202001;202002;202003;202004;202005;202006;202007;202008;202009;202010;202011;202012;202101;202102;202103;202104;202105;202106;202107;202108;202109;202110;202111;202112;202201;202202;202203;202204;202205;202206;202207;202208;202209;202210;202211;202212;202301;202302;202303;202304;202305;202306;202307;202308;202309;202310;202311;202312;202401;202402;202403;202404;202405;202406;202

In [5]:
import requests
import pandas as pd
import io
from datetime import datetime, timedelta
from urllib.parse import urlparse, parse_qs, urlencode
from google.colab import files # Import the files module


# --- Section to save and download the data ---
if 'data_df' in locals() and not data_df.empty:
    # Define the filename to be saved in Colab's temporary storage
    # This file will then be downloaded to your local machine
    output_filename = "ke_population_data.csv"

    # Save the DataFrame to a CSV file within Colab's temporary environment
    data_df.to_csv(output_filename, index=False)
    print(f"\nData saved to Colab's temporary storage: /{output_filename}")

    # Trigger the download to your local machine
    files.download(output_filename)
    print(f"'{output_filename}' has been downloaded to your local machine's default downloads folder.")
    print("Please move it manually from there to <\\your local working directory>")


    # To display info about the DataFrame columns and types
    print("\nDataFrame Info:")
    data_df.info()
else:
    print("\nNo data to save or download (DataFrame is empty or not created).")

ModuleNotFoundError: No module named 'google.colab'

In [None]:
data_df.head()


In [None]:
if 'data_df' in locals() and isinstance(data_df, pd.DataFrame):
    num_rows = data_df.shape[0]
    num_cols = data_df.shape[1]
    print(f"\nThe DataFrame has {num_rows} rows and {num_cols} columns.")
else:
    print("\n'data_df' is not available or is not a pandas DataFrame. Please ensure the data fetching step was successful.")

In [None]:
# import pandas as pd

# class FPDataAnalyzer:
#     """
#     A class to analyze family planning stock data from a single row (or dictionary).

#     This class computes various stock metrics (losses, dispensed, at hand, requested, received)
#     for different family planning methods based on predefined column name patterns.
#     """

#     def __init__(self, data_row):
#         """
#         Initializes the FPDataAnalyzer with a single row of data.

#         Args:
#             data_row (dict or pandas.Series): A dictionary or pandas Series
#                                               containing the column data for a single record.
#         """
#         self.data = data_row

#     def _get_metric(self, method_prefix, metric_suffix):
#         """
#         Helper method to safely retrieve a metric value.

#         Args:
#             method_prefix (str): The prefix for the FP method (e.g., "Combined oral contraceptive Pills").
#             metric_suffix (str): The suffix for the specific metric (e.g., "Losses").

#         Returns:
#             float or int: The value of the metric, or 0 if the column is missing or value is not numeric.
#         """
#         column_name = f"{method_prefix} {metric_suffix}"
#         try:
#             # Attempt to convert to numeric, coercing errors to NaN, then fill with 0
#             value = pd.to_numeric(self.data.get(column_name, 0), errors='coerce')
#             return value if pd.notna(value) else 0
#         except Exception as e:
#             print(f"Warning: Could not retrieve or convert '{column_name}' for {method_prefix}. Error: {e}. Defaulting to 0.")
#             return 0

#     def get_pills_metrics(self):
#         """
#         Computes stock metrics for all types of Pills (Combined, Emergency, Progestin only).

#         Returns:
#             dict: A dictionary containing metrics for each pill type.
#         """
#         pills_metrics = {}
#         pill_types = {
#             "combined_oral_contraceptive": "Combined oral contraceptive Pills",
#             "emergency_pill": "Emergency Pill",
#             "progestin_only_pills": "Progestin only Pills"
#         }
#         metric_suffixes = {
#             "stock_losses": "Losses",
#             "stock_dispensed": "Issued/Dispensed",
#             "stock_at_hand": "Ending Balanc",
#             "stock_requested": "Quantity Needed/Requested",
#             "stock_received": "Stock Received"
#         }

#         for key, prefix in pill_types.items():
#             pills_metrics[key] = {
#                 metric_name: self._get_metric(prefix, suffix)
#                 for metric_name, suffix in metric_suffixes.items()
#             }
#         return pills_metrics

#     def get_condoms_metrics(self):
#         """
#         Computes stock metrics for all types of Condoms (Female, Male).

#         Returns:
#             dict: A dictionary containing metrics for each condom type.
#         """
#         condoms_metrics = {}
#         condom_types = {
#             "female_condom": "Female Condom",
#             "male_condom": "Male Condom"
#         }
#         metric_suffixes = {
#             "stock_losses": "Losses",
#             "stock_dispensed": "Issued/Dispensed",
#             "stock_at_hand": "Ending Balanc",
#             "stock_requested": "Quantity Needed/Requested",
#             "stock_received": "Stock Received"
#         }

#         for key, prefix in condom_types.items():
#             condoms_metrics[key] = {
#                 metric_name: self._get_metric(prefix, suffix)
#                 for metric_name, suffix in metric_suffixes.items()
#             }
#         return condoms_metrics

#     def get_injectables_metrics(self):
#         """
#         Computes stock metrics for Injectables.

#         Returns:
#             dict: A dictionary containing metrics for Injectables.
#         """
#         injectables_prefix = "Injectables"
#         metric_suffixes = {
#             "stock_losses": "Losses",
#             "stock_dispensed": "Issued/Dispensed",
#             "stock_at_hand": "Ending Balanc",
#             "stock_requested": "Quantity Needed/Requested",
#             "stock_received": "Stock Received"
#         }
#         return {
#             metric_name: self._get_metric(injectables_prefix, suffix)
#             for metric_name, suffix in metric_suffixes.items()
#         }

#     def get_implants_metrics(self):
#         """
#         Computes stock metrics for Implants.

#         Returns:
#             dict: A dictionary containing metrics for Implants.
#         """
#         implants_prefix = "Implants (1-Rod)"
#         metric_suffixes = {
#             "stock_losses": "Losses",
#             "stock_dispensed": "Issued/Dispensed",
#             "stock_at_hand": "Ending Balanc",
#             "stock_requested": "Quantity Needed/Requested",
#             "stock_received": "Stock Received"
#         }
#         return {
#             metric_name: self._get_metric(implants_prefix, suffix)
#             for metric_name, suffix in metric_suffixes.items()
#         }

#     def get_iud_metrics(self):
#         """
#         Computes stock metrics for IUD.

#         Returns:
#             dict: A dictionary containing metrics for IUD.
#         """
#         iud_prefix = "IUCD Copper T"
#         metric_suffixes = {
#             "stock_losses": "Losses",
#             "stock_dispensed": "Issued/Dispensed",
#             "stock_at_hand": "Ending Balanc",
#             "stock_requested": "Quantity Needed/Requested",
#             "stock_received": "Stock Received"
#         }
#         return {
#             metric_name: self._get_metric(iud_prefix, suffix)
#             for metric_name, suffix in metric_suffixes.items()
#         }

#     def get_all_fp_metrics(self):
#         """
#         Computes all available family planning metrics.

#         Returns:
#             dict: A nested dictionary containing metrics for all FP methods.
#         """
#         all_metrics = {
#             "pills": self.get_pills_metrics(),
#             "condoms": self.get_condoms_metrics(),
#             "injectables": self.get_injectables_metrics(),
#             "implants": self.get_implants_metrics(),
#             "iud": self.get_iud_metrics()
#         }
#         return all_metrics

# def flatten_dict(d, parent_key='', sep='_'):
#     """
#     Flattens a nested dictionary.

#     Args:
#         d (dict): The dictionary to flatten.
#         parent_key (str): The base key for the current level of recursion.
#         sep (str): The separator to use for concatenating keys.

#     Returns:
#         dict: A flattened dictionary.
#     """
#     items = []
#     for k, v in d.items():
#         new_key = parent_key + sep + k if parent_key else k
#         if isinstance(v, dict):
#             items.extend(flatten_dict(v, new_key, sep=sep).items())
#         else:
#             items.append((new_key, v))
#     return dict(items)


# # --- Example Usage with a Pandas DataFrame ---
# if __name__ == "__main__":
#     # Load your data into 'data_df'
#     # Make sure 'ke_fp_commodity_data.csv' is in the same directory or provide the full path.
#     try:
#         data_df = pd.read_csv('ke_fp_commodity_data.csv')
#         print("Successfully loaded 'ke_fp_commodity_data.csv' into data_df.")
#     except FileNotFoundError:
#         print("Error: 'ke_fp_commodity_data.csv' not found.")
#         print("Please ensure the CSV file is in the correct directory or provide the full path.")
#         # Create an empty DataFrame to prevent further errors in the example usage
#         data_df = pd.DataFrame()
#     except Exception as e:
#         print(f"An error occurred while loading the CSV: {e}")
#         data_df = pd.DataFrame() # Ensure data_df is defined even on other errors

#     if not data_df.empty:
#         print("--- Original DataFrame Head ---")
#         print(data_df.head())

#         # List to hold flattened metrics for each row
#         all_flattened_metrics = []

#         # Iterate through rows, compute metrics, and flatten them
#         print("\n--- Computing and flattening FP metrics for each row ---")
#         for index, row in data_df.iterrows():
#             analyzer = FPDataAnalyzer(row)
#             metrics = analyzer.get_all_fp_metrics()
#             flattened = flatten_dict(metrics)
#             all_flattened_metrics.append(flattened)

#         # Create a new DataFrame from the list of flattened metrics
#         metrics_df = pd.DataFrame(all_flattened_metrics)

#         # Concatenate the original DataFrame with the new metrics DataFrame
#         # This adds the flattened metrics as new columns to data_df
#         data_df = pd.concat([data_df, metrics_df], axis=1)

#         print("\n--- DataFrame with new FP metrics columns (Head) ---")
#         # Display the head of the modified DataFrame, showing the new columns
#         print(data_df.head())

#         # You can now access the new columns directly, for example:
#         # print(data_df['pills_combined_oral_contraceptive_stock_losses'].head())
#         # print(data_df['injectables_stock_at_hand'].head())

#     else:
#         print("\nDataFrame is empty. Cannot proceed with analysis.")


In [None]:
data_df.head()


In [None]:
data_df.columns
