# MSF Data Engineering Technical Test - Development Notebook

This notebook serves as a development and testing environment for building the data pipeline as required by the Ennonce Technical Test documentation.

It will include:
* Iterative code development and testing for each layer of the Medallion Architecture (Bronze, Silver, Gold).
* Helper functions and snippets used during development.
* Validation steps to ensure data quality and transformations are correct.

Once stable, the production-ready code will be refactored written to `main.py`.

Refer to the "Ennonce_Technical Test – Data Engineer.docx" for full requirements and context.

In [2]:
# Bronze layer
# get the bronze dataframes (budget dataframe and the expenses)


In [None]:

# This dictionary contains the TRUTH for project country and currency,
# overriding any inconsistencies found in source data.
PROJECT_CURRENCY_MAP = {
    "BE01": {"country": "Belgium", "currency": "EUR"},
    "BE55": {"country": "Belgium", "currency": "EUR"},
    "KE01": {"country": "Kenya", "currency": "KES"},   # Corrected: Force KES for KE01 expenses, overriding DB's XOF
    "KE02": {"country": "Kenya", "currency": "KES"},
    "SN01": {"country": "Senegal", "currency": "XOF"}, # Corrected: Force Senegal for SN01 country, overriding DB's Kenya
    "SN02": {"country": "Senegal", "currency": "XOF"},
    "BF01": {"country": "Burkina Faso", "currency": "XOF"},
    "BF02": {"country": "Burkina Faso", "currency": "XOF"},
}

In [20]:
DATA_DIRECTORY = "/Users/hajirufai/test/MSF-test/OneDrive_1_7-4-2025"
OUTPUT_DIRECTORY = "/Users/hajirufai/test/MSF-test/processed_data"

In [21]:
# Read csvs (budget data)
import pandas as pd
import os

# --- Function to extract budget from a single CSV ---
def extract_budget_from_csv(csv_path: str) -> pd.DataFrame:
    """
    Reads budget data from a CSV file.
    """
    try:
        df = pd.read_csv(csv_path)
        return df
    except Exception as e:
        print(f"Error reading budget from {csv_path}: {e}")
        return pd.DataFrame()


In [236]:
# --- Main logic to iterate and combine CSVs ---
all_budget_df = pd.DataFrame()
csv_files_found = 0

print(f"\n--- Bronze Layer: Starting CSV Data Ingestion from {DATA_DIRECTORY} ---")

if not os.path.exists(DATA_DIRECTORY):
    print(f"Error: Data directory '{DATA_DIRECTORY}' not found. Please verify the path.")
else:
    for filename in os.listdir(DATA_DIRECTORY):
        file_path = os.path.join(DATA_DIRECTORY, filename)
        
        # Process only files ending with '_budget.csv' and skip directories
        if os.path.isdir(file_path) or not filename.endswith("_budget.csv"):
            continue

        # Derive project_id from filename (e.g., "BE01" from "BE01_budget.csv")
        project_name_from_file = os.path.splitext(filename)[0]
        print(f"project_name_from_file: {project_name_from_file}")
        base_project_id = project_name_from_file.replace('_budget', '')

        # Get country from our PROJECT_CURRENCY_MAP
        project_info = PROJECT_CURRENCY_MAP.get(base_project_id)
        if not project_info:
            print(f"Warning: Project '{base_project_id}' (from '{filename}') not found in PROJECT_CURRENCY_MAP. Skipping.")
            continue
        
        country_from_map = project_info['country']

        print(f"  Ingesting budget from CSV: {filename} (Project: {base_project_id}, Country: {country_from_map})...")
        df = extract_budget_from_csv(file_path)
        print(f"df: \n{df.head()}")
        
        if not df.empty:
            df['project_id'] = base_project_id
            df['country'] = country_from_map
            # Also add the original_currency_map for consistency, even if budget is already in EUR
            all_budget_df = pd.concat([all_budget_df, df], ignore_index=True)
            csv_files_found += 1
        else:
            print(f" No data extracted or error occurred for {filename}.")



--- Bronze Layer: Starting CSV Data Ingestion from /Users/hajirufai/test/MSF-test/OneDrive_1_7-4-2025 ---
project_name_from_file: BE55_budget
  Ingesting budget from CSV: BE55_budget.csv (Project: BE55, Country: Belgium)...
df: 
   id  year  month department           category  budget_eur version
0   1  2023      1         HR           Salaries     2293.00      v1
1   2  2023      1         HR           Training     4768.76      v1
2   3  2023      1         HR        Recruitment     1642.21      v1
3   4  2023      1    Medical        Medications     1377.75      v1
4   5  2023      1    Medical  Medical Equipment     2062.46      v1
project_name_from_file: SN02_budget
  Ingesting budget from CSV: SN02_budget.csv (Project: SN02, Country: Senegal)...
df: 
   id  year  month department           category  budget_eur version
0   1  2023      1         HR           Salaries     4046.14      v1
1   2  2023      1         HR           Training     2020.66      v1
2   3  2023      1        

In [23]:
print(PROJECT_CURRENCY_MAP.keys())

dict_keys(['BE01', 'BE55', 'KE01', 'KEO2', 'SN01', 'SN02', 'BF01', 'BF02'])


In [24]:
all_budget_df.head()

Unnamed: 0,id,year,month,department,category,budget_eur,version,project_id,country
0,1,2023,1,HR,Salaries,2293.0,v1,BE55,Belgium
1,2,2023,1,HR,Training,4768.76,v1,BE55,Belgium
2,3,2023,1,HR,Recruitment,1642.21,v1,BE55,Belgium
3,4,2023,1,Medical,Medications,1377.75,v1,BE55,Belgium
4,5,2023,1,Medical,Medical Equipment,2062.46,v1,BE55,Belgium


In [26]:
# Filter the DataFrame
df_filtered = all_budget_df[all_budget_df['project_id'] == 'KEO2']


In [28]:
df_filtered.head()

Unnamed: 0,id,year,month,department,category,budget_eur,version,project_id,country
3024,1,2023,1,HR,Salaries,1331.85,v1,KEO2,Kenya
3025,2,2023,1,HR,Training,4093.98,v1,KEO2,Kenya
3026,3,2023,1,HR,Recruitment,3510.71,v1,KEO2,Kenya
3027,4,2023,1,Medical,Medications,3764.67,v1,KEO2,Kenya
3028,5,2023,1,Medical,Medical Equipment,2855.33,v1,KEO2,Kenya


In [25]:
len(all_budget_df)

3456

In [11]:
temp_df = pd.read_csv('/Users/hajirufai/test/MSF-test/OneDrive_1_7-4-2025/BE01_budget.csv')
temp_df.head()


Unnamed: 0,id,year,month,department,category,budget_eur,version
0,1,2023,1,HR,Salaries,3059.1,v1
1,2,2023,1,HR,Training,3080.06,v1
2,3,2023,1,HR,Recruitment,2695.93,v1
3,4,2023,1,Medical,Medications,3000.73,v1
4,5,2023,1,Medical,Medical Equipment,3353.35,v1


In [12]:
len(temp_df)

432

In [30]:
# save the bronze budget
# Define the output path for the bronze budget data
bronze_budget_output_path = os.path.join(OUTPUT_DIRECTORY, "bronze_budget_data.parquet")

if not all_budget_df.empty:
    all_budget_df.to_parquet(bronze_budget_output_path, index=False)
    print(f"\nBronze Budget DataFrame saved to: {bronze_budget_output_path}")
else:
    print("\nBronze Budget DataFrame is empty, skipping save to Parquet.")


Bronze Budget DataFrame saved to: /Users/hajirufai/test/MSF-test/processed_data/bronze_budget_data.parquet


In [32]:
all_budget_df.tail()

Unnamed: 0,id,year,month,department,category,budget_eur,version,project_id,country
3451,428,2025,12,Logistics,Transport,4017.93,v1,KEO2,Kenya
3452,429,2025,12,Logistics,Vehicle Maintenance,2099.59,v1,KEO2,Kenya
3453,430,2025,12,Supply,Supplies,4371.15,v1,KEO2,Kenya
3454,431,2025,12,Supply,Warehousing,2842.26,v1,KEO2,Kenya
3455,432,2025,12,Supply,Cold Chain,4898.71,v1,KEO2,Kenya


In [38]:
# reading the bronze budget parquet file
bronze_budget_df = pd.read_parquet('/Users/hajirufai/test/MSF-test/processed_data/bronze_budget_data.parquet')
bronze_budget_df.head()

Unnamed: 0,id,year,month,department,category,budget_eur,version,project_id,country
0,1,2023,1,HR,Salaries,2293.0,v1,BE55,Belgium
1,2,2023,1,HR,Training,4768.76,v1,BE55,Belgium
2,3,2023,1,HR,Recruitment,1642.21,v1,BE55,Belgium
3,4,2023,1,Medical,Medications,1377.75,v1,BE55,Belgium
4,5,2023,1,Medical,Medical Equipment,2062.46,v1,BE55,Belgium


## Bronze expenses

In [40]:
# --- Bronze Layer: Ingesting Expenses from SQLite DBs ---

import pandas as pd
import sqlite3
import os

def extract_expenses_from_db(db_path: str) -> pd.DataFrame:
    """
    Connects to a SQLite database and extracts all data from the 'expenses' table.
    """
    conn = None
    try:
        conn = sqlite3.connect(db_path)
        query = "SELECT * FROM expenses" # Assumes 'expenses' is the table name
        df = pd.read_sql_query(query, conn)
        return df
    except sqlite3.Error as e:
        print(f"Error reading expenses from {db_path}: {e}")
        return pd.DataFrame() # Return empty DataFrame on error
    finally:
        if conn:
            conn.close()

In [44]:
# --- Main logic to iterate and combine DBs ---
all_expenses_df = pd.DataFrame()
db_files_found = 0

# Define project currency map for expenses
PROJECT_CURRENCY_MAP = {
    "BE01": {"country": "Belgium", "currency": "EUR"},
    "BE55": {"country": "Belgium", "currency": "EUR"},
    "KE01": {"country": "Kenya", "currency": "KES"},
    "KE02": {"country": "Kenya", "currency": "KES"},   
    "SN01": {"country": "Senegal", "currency": "XOF"},
    "SN02": {"country": "Senegal", "currency": "XOF"},
    "BF01": {"country": "Burkina Faso", "currency": "XOF"},
    "BF02": {"country": "Burkina Faso", "currency": "XOF"},
}

print(f"\n--- Bronze Layer: Starting DB Data Ingestion from {DATA_DIRECTORY} ---")

if not os.path.exists(DATA_DIRECTORY):
    print(f"Error: Data directory '{DATA_DIRECTORY}' not found. Please verify the path.")
else:
    for filename in os.listdir(DATA_DIRECTORY):
        file_path = os.path.join(DATA_DIRECTORY, filename)
        
        # Process only files ending with '.db' and skip directories
        if os.path.isdir(file_path) or not filename.endswith(".db"):
            continue

        # Derive project_id from filename (e.g., "BE01" from "BE01.db")
        project_name_from_file = os.path.splitext(filename)[0]
        base_project_id = project_name_from_file # For DBs, the filename directly is the project_id

        # Get country and currency from our PROJECT_CURRENCY_MAP
        project_info = PROJECT_CURRENCY_MAP.get(base_project_id)
        if not project_info:
            print(f"Warning: Project '{base_project_id}' (from '{filename}') not found in PROJECT_CURRENCY_MAP. Skipping.")
            continue
        
        country_from_map = project_info['country']
        currency_from_map = project_info['currency'] # This will be our source of truth for currency

        print(f"  Ingesting expenses from DB: {filename} (Project: {base_project_id}, Country: {country_from_map})...")
        df = extract_expenses_from_db(file_path)
        
        if not df.empty:
            df['project_id'] = base_project_id
            df['country'] = country_from_map
            df['original_currency'] = currency_from_map # Add the true currency from our map
            all_expenses_df = pd.concat([all_expenses_df, df], ignore_index=True)
            db_files_found += 1
        else:
            print(f"    No data extracted or error occurred for {filename}.")



--- Bronze Layer: Starting DB Data Ingestion from /Users/hajirufai/test/MSF-test/OneDrive_1_7-4-2025 ---
  Ingesting expenses from DB: BE55.db (Project: BE55, Country: Belgium)...
  Ingesting expenses from DB: BE01.db (Project: BE01, Country: Belgium)...
  Ingesting expenses from DB: BF02.db (Project: BF02, Country: Burkina Faso)...
  Ingesting expenses from DB: KE02.db (Project: KE02, Country: Kenya)...
  Ingesting expenses from DB: SN02.db (Project: SN02, Country: Senegal)...
  Ingesting expenses from DB: SN01.db (Project: SN01, Country: Senegal)...
  Ingesting expenses from DB: KE01.db (Project: KE01, Country: Kenya)...
  Ingesting expenses from DB: BF01.db (Project: BF01, Country: Burkina Faso)...


In [45]:
all_expenses_df.head()

Unnamed: 0,id,year,month,department,category,amount_local,currency,project_id,country,original_currency,amount_eur
0,1,2023,1,HR,Salaries,2278.71,EUR,BE55,Belgium,EUR,
1,2,2023,1,HR,Training,4053.43,EUR,BE55,Belgium,EUR,
2,3,2023,1,HR,Recruitment,1450.1,EUR,BE55,Belgium,EUR,
3,4,2023,1,Medical,Medications,1275.95,EUR,BE55,Belgium,EUR,
4,5,2023,1,Medical,Medical Equipment,2182.81,EUR,BE55,Belgium,EUR,


In [47]:
print(len(all_expenses_df))
all_expenses_df.info()

3456
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3456 entries, 0 to 3455
Data columns (total 11 columns):
 #   Column             Non-Null Count  Dtype  
---  ------             --------------  -----  
 0   id                 3456 non-null   int64  
 1   year               3456 non-null   int64  
 2   month              3456 non-null   object 
 3   department         3456 non-null   object 
 4   category           3456 non-null   object 
 5   amount_local       2880 non-null   float64
 6   currency           3456 non-null   object 
 7   project_id         3456 non-null   object 
 8   country            3456 non-null   object 
 9   original_currency  3456 non-null   object 
 10  amount_eur         432 non-null    float64
dtypes: float64(2), int64(2), object(7)
memory usage: 297.1+ KB


In [53]:
mask = (all_expenses_df['year'] == 2025) & (all_expenses_df['month'] == "07")
all_expenses_df[mask].head()

Unnamed: 0,id,year,month,department,category,amount_local,currency,project_id,country,original_currency,amount_eur
360,361,2025,7,HR,Salaries,,EUR,BE55,Belgium,EUR,
361,362,2025,7,HR,Training,,EUR,BE55,Belgium,EUR,
362,363,2025,7,HR,Recruitment,,EUR,BE55,Belgium,EUR,
363,364,2025,7,Medical,Medications,,EUR,BE55,Belgium,EUR,
364,365,2025,7,Medical,Medical Equipment,,EUR,BE55,Belgium,EUR,


In [54]:
# save the bronze expenses to parquet:
import pandas as pd
import os


# Ensure the output directory exists
os.makedirs(OUTPUT_DIRECTORY, exist_ok=True)

# Define the output path for the bronze expenses data
bronze_expenses_output_path = os.path.join(OUTPUT_DIRECTORY, "bronze_expenses_data.parquet")

if not all_expenses_df.empty:
    all_expenses_df.to_parquet(bronze_expenses_output_path, index=False)
    print(f"\nDataFrame successfully saved to: {bronze_expenses_output_path}")
else:
    print("\nDataFrame is empty, skipping save to Parquet.")



DataFrame successfully saved to: /Users/hajirufai/test/MSF-test/processed_data/bronze_expenses_data.parquet


## Silver layer

In [106]:
### Dealing with expenses dataframe


In [107]:
expenses_df = pd.read_parquet("/Users/hajirufai/test/MSF-test/processed_data/bronze_expenses_data.parquet")

In [108]:
expenses_df.head()

Unnamed: 0,id,year,month,department,category,amount_local,currency,project_id,country,original_currency,amount_eur
0,1,2023,1,HR,Salaries,2278.71,EUR,BE55,Belgium,EUR,
1,2,2023,1,HR,Training,4053.43,EUR,BE55,Belgium,EUR,
2,3,2023,1,HR,Recruitment,1450.1,EUR,BE55,Belgium,EUR,
3,4,2023,1,Medical,Medications,1275.95,EUR,BE55,Belgium,EUR,
4,5,2023,1,Medical,Medical Equipment,2182.81,EUR,BE55,Belgium,EUR,


In [109]:
expenses_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3456 entries, 0 to 3455
Data columns (total 11 columns):
 #   Column             Non-Null Count  Dtype  
---  ------             --------------  -----  
 0   id                 3456 non-null   int64  
 1   year               3456 non-null   int64  
 2   month              3456 non-null   object 
 3   department         3456 non-null   object 
 4   category           3456 non-null   object 
 5   amount_local       2880 non-null   float64
 6   currency           3456 non-null   object 
 7   project_id         3456 non-null   object 
 8   country            3456 non-null   object 
 9   original_currency  3456 non-null   object 
 10  amount_eur         432 non-null    float64
dtypes: float64(2), int64(2), object(7)
memory usage: 297.1+ KB


In [110]:
# Drop the id and currency column
expenses_df.drop(columns=['id', 'currency'], inplace=True)

In [None]:
### Combine year and month to have a date like column and convert to datetime and add the last day of the month


In [114]:
import pandas as pd
from pandas.tseries.offsets import MonthEnd # Import MonthEnd for easy last-day-of-month calculation


print("--- Processing Expenses DataFrame Dates ---")

# Step 1: Convert year and month to string format
expenses_df['year_str'] = expenses_df['year'].astype(str)
expenses_df['month_str'] = expenses_df['month'].astype(str)

# Step 2: Create a temporary date string with the 1st day of the month
# This is a common intermediate step before converting to datetime and then adjusting to month end.
expenses_df['temp_date_str'] = expenses_df['year_str'] + '-' + expenses_df['month_str'].str.zfill(2) + '-01'
# .str.zfill(2) ensures single-digit months (e.g., '1') become '01' for correct parsing

# Step 3: Convert the temporary date string to datetime objects
expenses_df['date'] = pd.to_datetime(expenses_df['temp_date_str'], errors='coerce')

# Step 4: Convert to the last day of the month using MonthEnd
expenses_df['date'] = expenses_df['date'] + MonthEnd(1)

# Handle any rows where date conversion might have failed (e.g., if 'month' was not a valid month)
# You might want to inspect these rows if any NaT (Not a Time) values appear.
if expenses_df['date'].isnull().any():
    print("Warning: Some budget date values are NaT (Not a Time) after conversion. Check original year/month data.")

print("Expenses DataFrame 'date' column created and set to the last day of the month.")

# Step 5: Drop the intermediate columns if no longer needed
expenses_df = expenses_df.drop(columns=['year_str', 'month_str', 'temp_date_str'])

# Display updated info and head to verify
print("\n--- Updated Expenses DataFrame Info ---")
expenses_df.info()
print("\n--- Updated Expenses DataFrame Head (with budget_date) ---")
print(expenses_df.head())

--- Processing Expenses DataFrame Dates ---
Expenses DataFrame 'date' column created and set to the last day of the month.

--- Updated Expenses DataFrame Info ---
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3456 entries, 0 to 3455
Data columns (total 10 columns):
 #   Column             Non-Null Count  Dtype         
---  ------             --------------  -----         
 0   year               3456 non-null   int64         
 1   month              3456 non-null   object        
 2   department         3456 non-null   object        
 3   category           3456 non-null   object        
 4   amount_local       2880 non-null   float64       
 5   project_id         3456 non-null   object        
 6   country            3456 non-null   object        
 7   original_currency  3456 non-null   object        
 8   amount_eur         432 non-null    float64       
 9   date               3456 non-null   datetime64[ns]
dtypes: datetime64[ns](1), float64(2), int64(1), object(6)
memory usa

In [115]:
expenses_df.head()

Unnamed: 0,year,month,department,category,amount_local,project_id,country,original_currency,amount_eur,date
0,2023,1,HR,Salaries,2278.71,BE55,Belgium,EUR,,2023-01-31
1,2023,1,HR,Training,4053.43,BE55,Belgium,EUR,,2023-01-31
2,2023,1,HR,Recruitment,1450.1,BE55,Belgium,EUR,,2023-01-31
3,2023,1,Medical,Medications,1275.95,BE55,Belgium,EUR,,2023-01-31
4,2023,1,Medical,Medical Equipment,2182.81,BE55,Belgium,EUR,,2023-01-31


In [116]:
# Drop year and month columns
expenses_df.drop(columns=['year', 'month'], inplace=True)

expenses_df.head()

Unnamed: 0,department,category,amount_local,project_id,country,original_currency,amount_eur,date
0,HR,Salaries,2278.71,BE55,Belgium,EUR,,2023-01-31
1,HR,Training,4053.43,BE55,Belgium,EUR,,2023-01-31
2,HR,Recruitment,1450.1,BE55,Belgium,EUR,,2023-01-31
3,Medical,Medications,1275.95,BE55,Belgium,EUR,,2023-01-31
4,Medical,Medical Equipment,2182.81,BE55,Belgium,EUR,,2023-01-31


In [118]:
# Rearrange so date is the first column
expenses_df = expenses_df[['date'] + [col for col in expenses_df.columns if col != 'date']]
expenses_df.head()

Unnamed: 0,date,department,category,amount_local,project_id,country,original_currency,amount_eur
0,2023-01-31,HR,Salaries,2278.71,BE55,Belgium,EUR,
1,2023-01-31,HR,Training,4053.43,BE55,Belgium,EUR,
2,2023-01-31,HR,Recruitment,1450.1,BE55,Belgium,EUR,
3,2023-01-31,Medical,Medications,1275.95,BE55,Belgium,EUR,
4,2023-01-31,Medical,Medical Equipment,2182.81,BE55,Belgium,EUR,


In [119]:
len(expenses_df)

3456

In [120]:
expenses_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3456 entries, 0 to 3455
Data columns (total 8 columns):
 #   Column             Non-Null Count  Dtype         
---  ------             --------------  -----         
 0   date               3456 non-null   datetime64[ns]
 1   department         3456 non-null   object        
 2   category           3456 non-null   object        
 3   amount_local       2880 non-null   float64       
 4   project_id         3456 non-null   object        
 5   country            3456 non-null   object        
 6   original_currency  3456 non-null   object        
 7   amount_eur         432 non-null    float64       
dtypes: datetime64[ns](1), float64(2), object(5)
memory usage: 216.1+ KB


In [123]:
import numpy as np

In [124]:
mask = expenses_df['amount_eur'] == np.nan
expenses_df[mask].head()

Unnamed: 0,date,department,category,amount_local,project_id,country,original_currency,amount_eur


In [125]:
expenses_df [expenses_df['amount_eur'].isna()]

Unnamed: 0,date,department,category,amount_local,project_id,country,original_currency,amount_eur
0,2023-01-31,HR,Salaries,2278.71,BE55,Belgium,EUR,
1,2023-01-31,HR,Training,4053.43,BE55,Belgium,EUR,
2,2023-01-31,HR,Recruitment,1450.10,BE55,Belgium,EUR,
3,2023-01-31,Medical,Medications,1275.95,BE55,Belgium,EUR,
4,2023-01-31,Medical,Medical Equipment,2182.81,BE55,Belgium,EUR,
...,...,...,...,...,...,...,...,...
3451,2025-12-31,Logistics,Transport,,BF01,Burkina Faso,XOF,
3452,2025-12-31,Logistics,Vehicle Maintenance,,BF01,Burkina Faso,XOF,
3453,2025-12-31,Supply,Supplies,,BF01,Burkina Faso,XOF,
3454,2025-12-31,Supply,Warehousing,,BF01,Burkina Faso,XOF,


In [126]:
len(expenses_df [expenses_df['amount_eur'].isna()])

3024

In [129]:
expenses_df [~ expenses_df['amount_eur'].isna()]

Unnamed: 0,date,department,category,amount_local,project_id,country,original_currency,amount_eur
864,2023-01-31,HR,Salaries,9.340303e+05,BF02,Burkina Faso,XOF,1423.92
865,2023-01-31,HR,Training,2.122362e+06,BF02,Burkina Faso,XOF,3235.52
866,2023-01-31,HR,Recruitment,1.894469e+06,BF02,Burkina Faso,XOF,2888.10
867,2023-01-31,Medical,Medications,1.876122e+06,BF02,Burkina Faso,XOF,2860.13
868,2023-01-31,Medical,Medical Equipment,1.760707e+06,BF02,Burkina Faso,XOF,2684.18
...,...,...,...,...,...,...,...,...
1291,2025-12-31,Logistics,Transport,,BF02,Burkina Faso,XOF,3994.75
1292,2025-12-31,Logistics,Vehicle Maintenance,,BF02,Burkina Faso,XOF,1757.12
1293,2025-12-31,Supply,Supplies,,BF02,Burkina Faso,XOF,2288.36
1294,2025-12-31,Supply,Warehousing,,BF02,Burkina Faso,XOF,1364.32


In [130]:
len(expenses_df [~ expenses_df['amount_eur'].isna()])

432

In [131]:
expenses_df.isna().sum()

date                    0
department              0
category                0
amount_local          576
project_id              0
country                 0
original_currency       0
amount_eur           3024
dtype: int64

In [132]:
# check the rows in which amount_local is NaN
expenses_df[expenses_df['amount_local'].isna()]

Unnamed: 0,date,department,category,amount_local,project_id,country,original_currency,amount_eur
360,2025-07-31,HR,Salaries,,BE55,Belgium,EUR,
361,2025-07-31,HR,Training,,BE55,Belgium,EUR,
362,2025-07-31,HR,Recruitment,,BE55,Belgium,EUR,
363,2025-07-31,Medical,Medications,,BE55,Belgium,EUR,
364,2025-07-31,Medical,Medical Equipment,,BE55,Belgium,EUR,
...,...,...,...,...,...,...,...,...
3451,2025-12-31,Logistics,Transport,,BF01,Burkina Faso,XOF,
3452,2025-12-31,Logistics,Vehicle Maintenance,,BF01,Burkina Faso,XOF,
3453,2025-12-31,Supply,Supplies,,BF01,Burkina Faso,XOF,
3454,2025-12-31,Supply,Warehousing,,BF01,Burkina Faso,XOF,


In [133]:
len(expenses_df[expenses_df['amount_local'].isna()])

576

## silver layer budget df

In [136]:
# read the bronze budget data
budget_df = pd.read_parquet('/Users/hajirufai/test/MSF-test/processed_data/bronze_budget_data.parquet')
budget_df.head()

Unnamed: 0,id,year,month,department,category,budget_eur,version,project_id,country
0,1,2023,1,HR,Salaries,2293.0,v1,BE55,Belgium
1,2,2023,1,HR,Training,4768.76,v1,BE55,Belgium
2,3,2023,1,HR,Recruitment,1642.21,v1,BE55,Belgium
3,4,2023,1,Medical,Medications,1377.75,v1,BE55,Belgium
4,5,2023,1,Medical,Medical Equipment,2062.46,v1,BE55,Belgium


In [137]:
# drop version column, id column
budget_df.drop(columns=['version', 'id'], inplace=True)

# convert the 'year' and 'month' columns to string format
budget_df['year'] = budget_df['year'].astype(str)
budget_df['month'] = budget_df['month'].astype(str)

# create a temporary date string with the 1st day of the month
budget_df['temp_date_str'] = budget_df['year'] + '-' + budget_df['month'].str.zfill(2) + '-01'

# convert the temporary date string to datetime objects
budget_df['date'] = pd.to_datetime(budget_df['temp_date_str'], errors='coerce')

# convert to the last day of the month using MonthEnd
budget_df['date'] = budget_df['date'] + MonthEnd(1)

# drop the intermediate columns if no longer needed
budget_df = budget_df.drop(columns=['year', 'month', 'temp_date_str'])

# display updated info and head to verify
budget_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3456 entries, 0 to 3455
Data columns (total 6 columns):
 #   Column      Non-Null Count  Dtype         
---  ------      --------------  -----         
 0   department  3456 non-null   object        
 1   category    3456 non-null   object        
 2   budget_eur  3456 non-null   float64       
 3   project_id  3456 non-null   object        
 4   country     3456 non-null   object        
 5   date        3456 non-null   datetime64[ns]
dtypes: datetime64[ns](1), float64(1), object(4)
memory usage: 162.1+ KB


In [139]:
# move date column to the first position
budget_df = budget_df[['date'] + [col for col in budget_df.columns if col != 'date']]

budget_df.head()

Unnamed: 0,date,department,category,budget_eur,project_id,country
0,2023-01-31,HR,Salaries,2293.0,BE55,Belgium
1,2023-01-31,HR,Training,4768.76,BE55,Belgium
2,2023-01-31,HR,Recruitment,1642.21,BE55,Belgium
3,2023-01-31,Medical,Medications,1377.75,BE55,Belgium
4,2023-01-31,Medical,Medical Equipment,2062.46,BE55,Belgium


In [142]:
# save the budget dataframe to silver parquet
budget_df.to_parquet('/Users/hajirufai/test/MSF-test/processed_data/silver_budget_data.parquet', index=False)


In [143]:
df = pd.read_parquet('/Users/hajirufai/test/MSF-test/processed_data/silver_budget_data.parquet')
df.head()

Unnamed: 0,date,department,category,budget_eur,project_id,country
0,2023-01-31,HR,Salaries,2293.0,BE55,Belgium
1,2023-01-31,HR,Training,4768.76,BE55,Belgium
2,2023-01-31,HR,Recruitment,1642.21,BE55,Belgium
3,2023-01-31,Medical,Medications,1377.75,BE55,Belgium
4,2023-01-31,Medical,Medical Equipment,2062.46,BE55,Belgium


In [144]:
df.isnull().sum()

date          0
department    0
category      0
budget_eur    0
project_id    0
country       0
dtype: int64

In [145]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3456 entries, 0 to 3455
Data columns (total 6 columns):
 #   Column      Non-Null Count  Dtype         
---  ------      --------------  -----         
 0   date        3456 non-null   datetime64[ns]
 1   department  3456 non-null   object        
 2   category    3456 non-null   object        
 3   budget_eur  3456 non-null   float64       
 4   project_id  3456 non-null   object        
 5   country     3456 non-null   object        
dtypes: datetime64[ns](1), float64(1), object(4)
memory usage: 162.1+ KB


In [146]:
len(df)

3456

In [147]:
expenses_df.head()

Unnamed: 0,date,department,category,amount_local,project_id,country,original_currency,amount_eur
0,2023-01-31,HR,Salaries,2278.71,BE55,Belgium,EUR,
1,2023-01-31,HR,Training,4053.43,BE55,Belgium,EUR,
2,2023-01-31,HR,Recruitment,1450.1,BE55,Belgium,EUR,
3,2023-01-31,Medical,Medications,1275.95,BE55,Belgium,EUR,
4,2023-01-31,Medical,Medical Equipment,2182.81,BE55,Belgium,EUR,


In [148]:
expenses_df.isnull().sum()

date                    0
department              0
category                0
amount_local          576
project_id              0
country                 0
original_currency       0
amount_eur           3024
dtype: int64

In [149]:
# display rows where amount_local is null
expenses_df[expenses_df['amount_local'].isnull()]

Unnamed: 0,date,department,category,amount_local,project_id,country,original_currency,amount_eur
360,2025-07-31,HR,Salaries,,BE55,Belgium,EUR,
361,2025-07-31,HR,Training,,BE55,Belgium,EUR,
362,2025-07-31,HR,Recruitment,,BE55,Belgium,EUR,
363,2025-07-31,Medical,Medications,,BE55,Belgium,EUR,
364,2025-07-31,Medical,Medical Equipment,,BE55,Belgium,EUR,
...,...,...,...,...,...,...,...,...
3451,2025-12-31,Logistics,Transport,,BF01,Burkina Faso,XOF,
3452,2025-12-31,Logistics,Vehicle Maintenance,,BF01,Burkina Faso,XOF,
3453,2025-12-31,Supply,Supplies,,BF01,Burkina Faso,XOF,
3454,2025-12-31,Supply,Warehousing,,BF01,Burkina Faso,XOF,


In [151]:
len(expenses_df[expenses_df['amount_local'].isnull()])

576

In [154]:
# find there are how many rows that have dates from 2025-07-01 to 2025-12-31
mask = (expenses_df['date'] >= '2025-07-01') & (expenses_df['date'] <= '2025-12-31')
expenses_df[mask].head()

Unnamed: 0,date,department,category,amount_local,project_id,country,original_currency,amount_eur
360,2025-07-31,HR,Salaries,,BE55,Belgium,EUR,
361,2025-07-31,HR,Training,,BE55,Belgium,EUR,
362,2025-07-31,HR,Recruitment,,BE55,Belgium,EUR,
363,2025-07-31,Medical,Medications,,BE55,Belgium,EUR,
364,2025-07-31,Medical,Medical Equipment,,BE55,Belgium,EUR,


In [156]:
len(expenses_df[mask])

# ave the expenses dataframe to silver parquet
expenses_df.to_parquet('/Users/hajirufai/test/MSF-test/processed_data/silver_expenses_data.parquet', index=False)

In [159]:
df = pd.read_parquet('/Users/hajirufai/test/MSF-test/processed_data/silver_expenses_data.parquet')
df.head()

Unnamed: 0,date,department,category,amount_local,project_id,country,original_currency,amount_eur
0,2023-01-31,HR,Salaries,2278.71,BE55,Belgium,EUR,
1,2023-01-31,HR,Training,4053.43,BE55,Belgium,EUR,
2,2023-01-31,HR,Recruitment,1450.1,BE55,Belgium,EUR,
3,2023-01-31,Medical,Medications,1275.95,BE55,Belgium,EUR,
4,2023-01-31,Medical,Medical Equipment,2182.81,BE55,Belgium,EUR,


In [160]:
expenses_df.head()

Unnamed: 0,date,department,category,amount_local,project_id,country,original_currency,amount_eur
0,2023-01-31,HR,Salaries,2278.71,BE55,Belgium,EUR,
1,2023-01-31,HR,Training,4053.43,BE55,Belgium,EUR,
2,2023-01-31,HR,Recruitment,1450.1,BE55,Belgium,EUR,
3,2023-01-31,Medical,Medications,1275.95,BE55,Belgium,EUR,
4,2023-01-31,Medical,Medical Equipment,2182.81,BE55,Belgium,EUR,


In [161]:
# One thing remaining for our expenses_df is amount_eur column. We need to convert amount_local to amount_eur based on the exchange rate.


In [163]:
# --- Silver Layer: Currency Conversion for Expenses DataFrame ---

import pandas as pd
# Import the get_latest_exchange_rate function from a separate utility file (e.g., api_utils.py)
# This assumes you have a file named api_utils.py in your project structure
# and that it correctly imports API_KEY and BASE_URL from config.py.
from get_latest_exchange_rate import get_latest_exchange_rate 

# Import PROJECT_CURRENCY_MAP from your central config file
# from config import PROJECT_CURRENCY_MAP

# Assuming expenses_df is already loaded from bronze_expenses_data.parquet

print("\n--- Silver Layer: Starting Currency Conversion for Expenses ---")

# Step 1: Fetch all necessary exchange rates and store them in a dictionary
# We need to convert KES to EUR and XOF to EUR.
# Also, we might need EUR to EUR (rate 1.0) for consistency.

# Initialize a dictionary to hold our rates (this cache is specific to this run,
# the one in api_utils.py handles caching across calls to that function)
currency_conversion_rates = {}

# Fetch KES to EUR
kes_to_eur_rate = get_latest_exchange_rate("KES", "EUR")
if kes_to_eur_rate is not None:
    currency_conversion_rates['KES_to_EUR'] = kes_to_eur_rate
    print(f"  Fetched KES to EUR rate: {kes_to_eur_rate}")
else:
    print("  Warning: Failed to fetch KES to EUR rate. KES amounts will not be converted.")

# Fetch XOF to EUR
xof_to_eur_rate = get_latest_exchange_rate("XOF", "EUR")
if xof_to_eur_rate is not None:
    currency_conversion_rates['XOF_to_EUR'] = xof_to_eur_rate
    print(f"  Fetched XOF to EUR rate: {xof_to_eur_rate}")
else:
    print("  Warning: Failed to fetch XOF to EUR rate. XOF amounts will not be converted.")

# EUR to EUR rate (always 1.0) for consistency in logic
currency_conversion_rates['EUR_to_EUR'] = 1.0
print(f"  Set EUR to EUR rate: {currency_conversion_rates['EUR_to_EUR']}")


# Step 2: Define a function to apply the conversion to each row
def convert_amount_to_eur(row):
    original_amount = row['amount_local']
    # Use 'original_currency_map' as the source of truth for currency
    original_currency = row['original_currency_map'] 
    
    # Handle cases where original_amount might be NaN or not numeric
    if pd.isna(original_amount):
        return None # Or 0, depending on how you want to treat missing amounts

    if original_currency == 'EUR':
        return original_amount # Already in EUR
    elif original_currency == 'KES':
        rate = currency_conversion_rates.get('KES_to_EUR')
        if rate is not None:
            return original_amount * rate
        else:
            print(f"  Warning: Missing KES to EUR rate for Project {row['project_id']}, Date {row['date']}. Amount not converted.")
            return None # Or original_amount if you want to keep it in local currency
    elif original_currency == 'XOF':
        rate = currency_conversion_rates.get('XOF_to_EUR')
        if rate is not None:
            return original_amount * rate
        else:
            print(f"  Warning: Missing XOF to EUR rate for Project {row['project_id']}, Date {row['date']}. Amount not converted.")
            return None # Or original_amount
    else:
        print(f"  Warning: Unrecognized currency '{original_currency}' for Project {row['project_id']}, Date {row['date']}. Amount not converted.")
        return None # Or original_amount

# Step 3: Apply the conversion function to create/update 'amount_eur' column
# Ensure 'amount_local' is numeric before applying
expenses_df['amount_local'] = pd.to_numeric(expenses_df['amount_local'], errors='coerce')

# Apply the conversion function row-wise
expenses_df['amount_eur'] = expenses_df.apply(convert_amount_to_eur, axis=1)

# Fill any remaining NaN in amount_eur (e.g., from conversion failures or original NaNs) with 0
expenses_df['amount_eur'] = expenses_df['amount_eur'].fillna(0)

print("  'amount_eur' column updated with converted values.")

# --- Step 4: Drop the original 'original_currency' from DB if not needed ---
# Assuming 'original_currency' is the DB's currency column that might be incorrect.
# We are using 'original_currency_map' as the source of truth for currency.
if 'original_currency' in expenses_df.columns:
    expenses_df = expenses_df.drop(columns=['original_currency'])
    print("  Dropped 'original_currency' column (from DB).")

# Also, you might want to drop the 'amount_local' if 'amount_eur' is the final currency needed.
# However, keeping it might be useful for auditing or if local currency reporting is ever needed.
# For this test, let's keep it for now unless explicitly told to remove.

# --- Step 5: Handle the date column ---
# Your head() output shows the first column is a date, but it's unnamed.
# Let's assume its name is 'date' or similar from the DB.
# You MUST replace 'date_column_name_from_db' with the actual column name from your all_expenses_df.info()
# If your first column is truly unnamed, you might need to rename it first.
# For now, I'll assume the date column is named 'date' based on common patterns.
# If it's unnamed, you might need to do: expenses_df.rename(columns={expenses_df.columns[0]: 'date'}, inplace=True)
# Then proceed with conversion.

# Convert the date column to datetime objects
# Replace 'date' with the actual column name if different
expenses_df['date'] = pd.to_datetime(expenses_df['date'], errors='coerce')
print("  Date column converted to datetime.")

# Drop rows where date conversion failed (NaT) or amount_eur is 0 (if that's desired for cleanup)
expenses_df = expenses_df.dropna(subset=['date', 'amount_eur'])
print(f"  Dropped rows with missing essential 'date' or 'amount_eur'. Remaining rows: {len(expenses_df)}")


# Display updated info and head to verify
print("\n--- Updated Expenses DataFrame Info (after currency conversion) ---")
expenses_df.info()
print("\n--- Updated Expenses DataFrame Head (after currency conversion) ---")
print(expenses_df.head())


--- Silver Layer: Starting Currency Conversion for Expenses ---
  Fetched KES to EUR rate: 0.00657
  Fetched XOF to EUR rate: 0.001524
  Set EUR to EUR rate: 1.0


KeyError: 'original_currency_map'

# ROUGH WORK
## conversion of amount_local to amount_eur

In [165]:
df = pd.read_parquet('/Users/hajirufai/test/MSF-test/processed_data/silver_expenses_data.parquet')
df.info()
print(df.isnull().sum())
df.head()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3456 entries, 0 to 3455
Data columns (total 8 columns):
 #   Column             Non-Null Count  Dtype         
---  ------             --------------  -----         
 0   date               3456 non-null   datetime64[ns]
 1   department         3456 non-null   object        
 2   category           3456 non-null   object        
 3   amount_local       2880 non-null   float64       
 4   project_id         3456 non-null   object        
 5   country            3456 non-null   object        
 6   original_currency  3456 non-null   object        
 7   amount_eur         432 non-null    float64       
dtypes: datetime64[ns](1), float64(2), object(5)
memory usage: 216.1+ KB
date                    0
department              0
category                0
amount_local          576
project_id              0
country                 0
original_currency       0
amount_eur           3024
dtype: int64


Unnamed: 0,date,department,category,amount_local,project_id,country,original_currency,amount_eur
0,2023-01-31,HR,Salaries,2278.71,BE55,Belgium,EUR,
1,2023-01-31,HR,Training,4053.43,BE55,Belgium,EUR,
2,2023-01-31,HR,Recruitment,1450.1,BE55,Belgium,EUR,
3,2023-01-31,Medical,Medications,1275.95,BE55,Belgium,EUR,
4,2023-01-31,Medical,Medical Equipment,2182.81,BE55,Belgium,EUR,


In [None]:
# define a function to convert amount_local to amount_eur based on the exchange rate
def convert_amount_to_eur(row):
    original_amount = row['amount_local']
    original_currency = row['original_currency_map']
    if pd.isna(original_amount):
        return None
    if original_currency == 'EUR':
        return original_amount
    elif original_currency == 'KES':
        rate = get_latest_exchange_rate("KES", "EUR")
        if rate is not None:
            return original_amount * rate
        else:

In [None]:
# when amount local is not NaN, convert amount_local to amount_eur based on the exchange rate
# and update the amoount_eur column
expenses_df['amount_eur'] = expenses_df.apply(convert_amount_to_eur, axis=1)
expenses_df['amount_eur'] = expenses_df['amount_eur'].fillna(0)
expenses_df.head()

In [166]:
# RESTART

In [167]:
import pandas as pd
import sys
import os
import numpy as np # Often useful for conditional operations

# Add the directory containing get_latest_exchange_rate.py to the Python path
# This assumes get_latest_exchange_rate.py is in the same directory as the parquet file.
# script_dir = os.path.dirname('/Users/hajirufai/test/MSF-test/processed_data/')
# if script_dir not in sys.path:
#     sys.path.insert(0, script_dir)

from get_latest_exchange_rate import get_latest_exchange_rate

# --- Load the DataFrame ---
df = pd.read_parquet('/Users/hajirufai/test/MSF-test/processed_data/silver_expenses_data.parquet')
df.head()
print(df.isnull().sum())
df.info()

date                    0
department              0
category                0
amount_local          576
project_id              0
country                 0
original_currency       0
amount_eur           3024
dtype: int64
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3456 entries, 0 to 3455
Data columns (total 8 columns):
 #   Column             Non-Null Count  Dtype         
---  ------             --------------  -----         
 0   date               3456 non-null   datetime64[ns]
 1   department         3456 non-null   object        
 2   category           3456 non-null   object        
 3   amount_local       2880 non-null   float64       
 4   project_id         3456 non-null   object        
 5   country            3456 non-null   object        
 6   original_currency  3456 non-null   object        
 7   amount_eur         432 non-null    float64       
dtypes: datetime64[ns](1), float64(2), object(5)
memory usage: 216.1+ KB


In [168]:

# Display initial info
# print("Initial DataFrame Info:")
# df.info()
# print("\nInitial Null Counts:")
# print(df.isnull().sum())
# print("\nInitial DataFrame Head:")
# print(df.head())

# --- Step 1: Fetch the required exchange rates once ---


# Define rates
print("\nFetching exchange rates...")
eur_to_eur_rate = 1.0
kes_to_eur_rate = get_latest_exchange_rate("KES", "EUR")
xof_to_eur_rate = get_latest_exchange_rate("XOF", "EUR")


# --- Step 2: Create a rate mapping dictionary ---
rate_mapping = {
    'EUR': eur_to_eur_rate,
    'KES': kes_to_eur_rate,
    'XOF': xof_to_eur_rate
}




Fetching exchange rates...


In [169]:

# --- Step 3: Drop the existing 'amount_eur' column if it exists ---
if 'amount_eur' in df.columns:
    df = df.drop(columns=['amount_eur'])
    print("\n'amount_eur' column dropped (if it existed).")

# --- Step 4: Create the 'rate' column based on 'original_currency' ---
# Using .map() is efficient for this kind of lookup
df['rate'] = df['original_currency'].map(rate_mapping)



'amount_eur' column dropped (if it existed).


In [171]:
df

Unnamed: 0,date,department,category,amount_local,project_id,country,original_currency,rate
0,2023-01-31,HR,Salaries,2278.71,BE55,Belgium,EUR,1.000000
1,2023-01-31,HR,Training,4053.43,BE55,Belgium,EUR,1.000000
2,2023-01-31,HR,Recruitment,1450.10,BE55,Belgium,EUR,1.000000
3,2023-01-31,Medical,Medications,1275.95,BE55,Belgium,EUR,1.000000
4,2023-01-31,Medical,Medical Equipment,2182.81,BE55,Belgium,EUR,1.000000
...,...,...,...,...,...,...,...,...
3451,2025-12-31,Logistics,Transport,,BF01,Burkina Faso,XOF,0.001524
3452,2025-12-31,Logistics,Vehicle Maintenance,,BF01,Burkina Faso,XOF,0.001524
3453,2025-12-31,Supply,Supplies,,BF01,Burkina Faso,XOF,0.001524
3454,2025-12-31,Supply,Warehousing,,BF01,Burkina Faso,XOF,0.001524


In [172]:

# # Handle cases where original_currency might not be in our predefined map (optional, but good for robustness)
# # If there are other currencies, their 'rate' will be NaN.
# # You could add a check here or decide on a default action (e.g., set to 0 or raise error).
# unmapped_currencies = df[df['rate'].isna() & df['original_currency'].notna()]['original_currency'].unique()
# if len(unmapped_currencies) > 0:
#     print(f"\nWarning: The following currencies were found but no rate was defined for them: {unmapped_currencies}")
#     print("Their 'rate' and consequently 'amount_eur' will be NaN.")

# print("\n'rate' column created.")

# --- Step 5: Calculate 'amount_eur' ---
# Multiplying by amount_local will correctly propagate NaNs from amount_local
df['amount_eur'] = df['amount_local'] * df['rate']

# print("\n'amount_eur' column calculated.")

# # --- Step 6: Display updated DataFrame info and head ---
# print("\nUpdated DataFrame Info:")
# df.info()
# print("\nUpdated Null Counts:")
# print(df.isnull().sum())
# print("\nUpdated DataFrame Head with 'rate' and 'amount_eur':")
# print(df.head())

# # To verify nulls for 2025-07-31 to 2025-12-31
# print("\nChecking nulls for dates 2025-07-31 to 2025-12-31:")
# future_dates_df = df[(df['date'] >= '2025-07-31') & (df['date'] <= '2025-12-31')]
# print(future_dates_df[['date', 'amount_local', 'rate', 'amount_eur']].isnull().sum())
# print(future_dates_df[['date', 'amount_local', 'rate', 'amount_eur']].head())

# # Optional: Display some rows where conversion happened
# print("\nSample rows with 'amount_eur' calculated (first 10 non-null values):")
# print(df[df['amount_eur'].notna()].head(10))

In [173]:
df

Unnamed: 0,date,department,category,amount_local,project_id,country,original_currency,rate,amount_eur
0,2023-01-31,HR,Salaries,2278.71,BE55,Belgium,EUR,1.000000,2278.71
1,2023-01-31,HR,Training,4053.43,BE55,Belgium,EUR,1.000000,4053.43
2,2023-01-31,HR,Recruitment,1450.10,BE55,Belgium,EUR,1.000000,1450.10
3,2023-01-31,Medical,Medications,1275.95,BE55,Belgium,EUR,1.000000,1275.95
4,2023-01-31,Medical,Medical Equipment,2182.81,BE55,Belgium,EUR,1.000000,2182.81
...,...,...,...,...,...,...,...,...,...
3451,2025-12-31,Logistics,Transport,,BF01,Burkina Faso,XOF,0.001524,
3452,2025-12-31,Logistics,Vehicle Maintenance,,BF01,Burkina Faso,XOF,0.001524,
3453,2025-12-31,Supply,Supplies,,BF01,Burkina Faso,XOF,0.001524,
3454,2025-12-31,Supply,Warehousing,,BF01,Burkina Faso,XOF,0.001524,


In [175]:
df.isnull().sum()

date                   0
department             0
category               0
amount_local         576
project_id             0
country                0
original_currency      0
rate                   0
amount_eur           576
dtype: int64

In [177]:
# save the dataframe to a parquet file
df.to_parquet('/Users/hajirufai/test/MSF-test/processed_data/silver_expenses_data.parquet')

In [178]:
# GOLD LAYER

In [180]:
budget_df = pd.read_parquet('/Users/hajirufai/test/MSF-test/processed_data/silver_budget_data.parquet')
print(budget_df.isnull().sum())
budget_df.info()
budget_df.head()

date          0
department    0
category      0
budget_eur    0
project_id    0
country       0
dtype: int64
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3456 entries, 0 to 3455
Data columns (total 6 columns):
 #   Column      Non-Null Count  Dtype         
---  ------      --------------  -----         
 0   date        3456 non-null   datetime64[ns]
 1   department  3456 non-null   object        
 2   category    3456 non-null   object        
 3   budget_eur  3456 non-null   float64       
 4   project_id  3456 non-null   object        
 5   country     3456 non-null   object        
dtypes: datetime64[ns](1), float64(1), object(4)
memory usage: 162.1+ KB


Unnamed: 0,date,department,category,budget_eur,project_id,country
0,2023-01-31,HR,Salaries,2293.0,BE55,Belgium
1,2023-01-31,HR,Training,4768.76,BE55,Belgium
2,2023-01-31,HR,Recruitment,1642.21,BE55,Belgium
3,2023-01-31,Medical,Medications,1377.75,BE55,Belgium
4,2023-01-31,Medical,Medical Equipment,2062.46,BE55,Belgium


In [181]:
# read the expenses df
expenses_df = pd.read_parquet('/Users/hajirufai/test/MSF-test/processed_data/silver_expenses_data.parquet')
print(expenses_df.isnull().sum())
expenses_df.info()
expenses_df.head()

date                   0
department             0
category               0
amount_local         576
project_id             0
country                0
original_currency      0
rate                   0
amount_eur           576
dtype: int64
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3456 entries, 0 to 3455
Data columns (total 9 columns):
 #   Column             Non-Null Count  Dtype         
---  ------             --------------  -----         
 0   date               3456 non-null   datetime64[ns]
 1   department         3456 non-null   object        
 2   category           3456 non-null   object        
 3   amount_local       2880 non-null   float64       
 4   project_id         3456 non-null   object        
 5   country            3456 non-null   object        
 6   original_currency  3456 non-null   object        
 7   rate               3456 non-null   float64       
 8   amount_eur         2880 non-null   float64       
dtypes: datetime64[ns](1), float64(3), object(5)
me

Unnamed: 0,date,department,category,amount_local,project_id,country,original_currency,rate,amount_eur
0,2023-01-31,HR,Salaries,2278.71,BE55,Belgium,EUR,1.0,2278.71
1,2023-01-31,HR,Training,4053.43,BE55,Belgium,EUR,1.0,4053.43
2,2023-01-31,HR,Recruitment,1450.1,BE55,Belgium,EUR,1.0,1450.1
3,2023-01-31,Medical,Medications,1275.95,BE55,Belgium,EUR,1.0,1275.95
4,2023-01-31,Medical,Medical Equipment,2182.81,BE55,Belgium,EUR,1.0,2182.81


In [183]:
len(expenses_df)


3456

In [188]:
budget_df['project_id'].unique()

array(['BE55', 'SN02', 'BF01', 'BE01', 'BF02', 'SN01', 'KE01', 'KEO2'],
      dtype=object)

In [190]:
expenses_df['project_id'].unique() 

array(['BE55', 'BE01', 'BF02', 'KE02', 'SN02', 'SN01', 'KE01', 'BF01'],
      dtype=object)

In [196]:
a = set(budget_df['project_id'].unique())

In [197]:
b = set(expenses_df['project_id'].unique())

In [198]:
print(a)
print(b)

{'BE01', 'BE55', 'SN01', 'KE01', 'BF02', 'SN02', 'KEO2', 'BF01'}
{'BE01', 'BE55', 'SN01', 'KE01', 'BF02', 'SN02', 'BF01', 'KE02'}


In [194]:
# find uniqye project ids in the expenses df
set(budget_df['project_id'].unique()) == set(expenses_df['project_id'].unique())

False

In [199]:
# convert the 'KEO2' to 'KE02' in the budget df project id column
budget_df['project_id'] = budget_df['project_id'].replace('KEO2', 'KE02')

In [201]:
a =set(budget_df['project_id'].unique())
b =set(expenses_df['project_id'].unique())
a == b

True

In [204]:
expenses_df.head()

Unnamed: 0,date,department,category,amount_local,project_id,country,original_currency,rate,amount_eur
0,2023-01-31,HR,Salaries,2278.71,BE55,Belgium,EUR,1.0,2278.71
1,2023-01-31,HR,Training,4053.43,BE55,Belgium,EUR,1.0,4053.43
2,2023-01-31,HR,Recruitment,1450.1,BE55,Belgium,EUR,1.0,1450.1
3,2023-01-31,Medical,Medications,1275.95,BE55,Belgium,EUR,1.0,1275.95
4,2023-01-31,Medical,Medical Equipment,2182.81,BE55,Belgium,EUR,1.0,2182.81


In [202]:
budget_df.head()

Unnamed: 0,date,department,category,budget_eur,project_id,country
0,2023-01-31,HR,Salaries,2293.0,BE55,Belgium
1,2023-01-31,HR,Training,4768.76,BE55,Belgium
2,2023-01-31,HR,Recruitment,1642.21,BE55,Belgium
3,2023-01-31,Medical,Medications,1377.75,BE55,Belgium
4,2023-01-31,Medical,Medical Equipment,2062.46,BE55,Belgium


In [206]:
silver_expenses_df = expenses_df
silver_budget_df = budget_df
display(silver_expenses_df.head())
silver_budget_df.head()

Unnamed: 0,date,department,category,amount_local,project_id,country,original_currency,rate,amount_eur
0,2023-01-31,HR,Salaries,2278.71,BE55,Belgium,EUR,1.0,2278.71
1,2023-01-31,HR,Training,4053.43,BE55,Belgium,EUR,1.0,4053.43
2,2023-01-31,HR,Recruitment,1450.1,BE55,Belgium,EUR,1.0,1450.1
3,2023-01-31,Medical,Medications,1275.95,BE55,Belgium,EUR,1.0,1275.95
4,2023-01-31,Medical,Medical Equipment,2182.81,BE55,Belgium,EUR,1.0,2182.81


Unnamed: 0,date,department,category,budget_eur,project_id,country
0,2023-01-31,HR,Salaries,2293.0,BE55,Belgium
1,2023-01-31,HR,Training,4768.76,BE55,Belgium
2,2023-01-31,HR,Recruitment,1642.21,BE55,Belgium
3,2023-01-31,Medical,Medications,1377.75,BE55,Belgium
4,2023-01-31,Medical,Medical Equipment,2062.46,BE55,Belgium


In [210]:
print(len(silver_expenses_df))
print(len(silver_budget_df))

3456
3456


In [207]:
# --- Section 3: Gold Layer - Curated & Optimized Data for Reporting ---

import pandas as pd
from datetime import datetime, timedelta
import calendar # For getting days in month if needed for date dimension (though pd.date_range is easier)

# Assuming silver_expenses_dfa and silver_budget_df are available from previous cells
# Assuming OUTPUT_DIRECTORY is defined from global configurations

print("\n--- Gold Layer: Starting Data Modeling for Power BI ---")



--- Gold Layer: Starting Data Modeling for Power BI ---


In [215]:
# --- Section 3: Gold Layer - Curated & Optimized Data for Reporting ---

import pandas as pd
from datetime import datetime
from pandas.tseries.offsets import MonthEnd # Import MonthEnd for monthly frequency

# Assuming silver_expenses_df and silver_budget_df are available from previous cells
# Assuming OUTPUT_DIRECTORY is defined from global configurations

print("\n--- Gold Layer: Starting Data Modeling for Power BI ---")

# --- Step 1: Create the Date Dimension Table (dim_date_df) - CORRECTED FOR MONTHLY GRANULARITY ---
# Determine the overall date range from both expenses and budget data.
# We need to cover all months from the earliest record to the latest budget record (Dec 2025).
min_date = min(silver_expenses_df['date'].min(), silver_budget_df['date'].min())
max_date = max(silver_expenses_df['date'].max(), silver_budget_df['date'].max())

# Generate a date range specifically for the last day of each month
# Start from the 1st day of the min month, and end at the last day of the max month.
# Then, use 'M' frequency to get month-end dates.
print(f"  Generating Date Dimension for month-ends from {min_date.strftime('%Y-%m')} to {max_date.strftime('%Y-%m')}")

# Create a range of month-end dates
# The 'M' frequency generates month-end dates.
# Corrected: The 'end' date should be the max_date itself, as 'freq='M'' will correctly find its month end.
date_range = pd.date_range(start=min_date.replace(day=1), end=max_date, freq='M')
dim_date_df = pd.DataFrame({'date': date_range})

dim_date_df['date_key'] = dim_date_df['date'].dt.strftime('%Y%m%d').astype(int)
dim_date_df['year'] = dim_date_df['date'].dt.year
dim_date_df['month_num'] = dim_date_df['date'].dt.month
dim_date_df['month_name'] = dim_date_df['date'].dt.strftime('%b') # Abbreviated month name (Jan, Feb)
dim_date_df['month_name_full'] = dim_date_df['date'].dt.strftime('%B') # Full month name (January, February)
dim_date_df['quarter'] = dim_date_df['date'].dt.quarter
# For monthly granularity, day_of_week_num, day_of_week_name, day_of_month, week_of_year
# might be less relevant or refer to the last day of the month.
# Let's include them as they refer to the 'date' column which is month-end.
dim_date_df['day_of_week_num'] = dim_date_df['date'].dt.dayofweek # Monday=0, Sunday=6
dim_date_df['day_of_week_name'] = dim_date_df['date'].dt.day_name()
dim_date_df['day_of_month'] = dim_date_df['date'].dt.day # This will be the last day of the month
dim_date_df['week_of_year'] = dim_date_df['date'].dt.isocalendar().week.astype(int)
dim_date_df['year_month_key'] = dim_date_df['date'].dt.strftime('%Y%m').astype(int) #MM for monthly joins

# Select and reorder columns for the final Date Dimension
dim_date_df = dim_date_df[[
    'date_key', 'date', 'year', 'month_num', 'month_name', 'month_name_full',
    'quarter', 'day_of_week_num', 'day_of_week_name', 'day_of_month', 'week_of_year', 'year_month_key'
]]

print(f"  Date Dimension created with {len(dim_date_df)} rows (expected 36 for 3 years * 12 months).")
print("\n--- Date Dimension DataFrame Info ---")
dim_date_df.info()
print("\n--- Date Dimension DataFrame Head ---")
print(dim_date_df.head())
print("\n--- Date Dimension DataFrame Tail ---")
print(dim_date_df.tail())


# --- Step 2: Create the Fact Analysis Table (fact_analysis_df) ---
print("\n--- Gold Layer: Creating Fact Analysis Table ---")

# Prepare expenses data for concatenation
# We only need the date, dimensions, and the converted amount_eur
fact_expenses = silver_expenses_df[[
    'date', 'department', 'category', 'project_id', 'country', 'amount_eur'
]].copy()
fact_expenses['record_type'] = 'Expense'
fact_expenses = fact_expenses.rename(columns={'amount_eur': 'amount_eur_final'})

# Prepare budget data for concatenation
# We only need the date, dimensions, and the budget_eur
fact_budget = silver_budget_df[[
    'date', 'department', 'category', 'project_id', 'country', 'budget_eur'
]].copy()
fact_budget['record_type'] = 'Budget'
fact_budget = fact_budget.rename(columns={'budget_eur': 'amount_eur_final'})

# Concatenate the two DataFrames vertically
fact_analysis_df = pd.concat([fact_expenses, fact_budget], ignore_index=True)
print(f"  Combined expenses ({len(fact_expenses)} rows) and budgets ({len(fact_budget)} rows).")

# Fill any NaN values in 'amount_eur_final' with 0.
# This is crucial for the 2025 Jul-Dec expenses where actuals are blank,
# ensuring they appear as 0 in Power BI sums.
fact_analysis_df['amount_eur_final'] = fact_analysis_df['amount_eur_final'].fillna(0)
print("  Filled NaN values in 'amount_eur_final' with 0.")

# Add date_key for linking to Date Dimension (using the date column from fact table)
fact_analysis_df['date_key'] = fact_analysis_df['date'].dt.strftime('%Y%m%d').astype(int)
fact_analysis_df['year_month_key'] = fact_analysis_df['date'].dt.strftime('%Y%m').astype(int) # For monthly aggregation/filtering

# Reorder columns for the final Fact Table (optional, but good for consistency)
fact_analysis_df = fact_analysis_df[[
    'date_key', 'year_month_key', 'date', 'record_type', 'amount_eur_final',
    'project_id', 'country', 'department', 'category'
]]

print(f"  Fact Analysis Table created with {len(fact_analysis_df)} rows.")
print("\n--- Fact Analysis DataFrame Info ---")
fact_analysis_df.info()
print("\n--- Fact Analysis DataFrame Head ---")
print(fact_analysis_df.head())
print("\n--- Fact Analysis DataFrame Tail (to check 2025-Jul-Dec for expenses) ---")
print(fact_analysis_df.tail())


# --- Step 3: Save Gold Layer Data to Parquet Files ---
print("\n--- Gold Layer: Saving Output to Parquet Files ---")

# Ensure the output directory exists (already handled in config, but good for standalone test)
os.makedirs(OUTPUT_DIRECTORY, exist_ok=True)

fact_output_path = os.path.join(OUTPUT_DIRECTORY, "fact_analysis_table.parquet")
dim_date_output_path = os.path.join(OUTPUT_DIRECTORY, "dim_date_table.parquet")

if not fact_analysis_df.empty:
    fact_analysis_df.to_parquet(fact_output_path, index=False)
    print(f"  Fact Analysis Table saved to: {fact_output_path}")
else:
    print("  Fact Analysis DataFrame is empty, skipping save.")

if not dim_date_df.empty:
    dim_date_df.to_parquet(dim_date_output_path, index=False)
    print(f"  Date Dimension Table saved to: {dim_date_output_path}")
else:
    print("  Date Dimension DataFrame is empty, skipping save.")

print("\n--- Gold Layer: Completed ---")


--- Gold Layer: Starting Data Modeling for Power BI ---
  Generating Date Dimension for month-ends from 2023-01 to 2025-12
  Date Dimension created with 36 rows (expected 36 for 3 years * 12 months).

--- Date Dimension DataFrame Info ---
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 36 entries, 0 to 35
Data columns (total 12 columns):
 #   Column            Non-Null Count  Dtype         
---  ------            --------------  -----         
 0   date_key          36 non-null     int64         
 1   date              36 non-null     datetime64[ns]
 2   year              36 non-null     int32         
 3   month_num         36 non-null     int32         
 4   month_name        36 non-null     object        
 5   month_name_full   36 non-null     object        
 6   quarter           36 non-null     int32         
 7   day_of_week_num   36 non-null     int32         
 8   day_of_week_name  36 non-null     object        
 9   day_of_month      36 non-null     int32         
 10  week

  date_range = pd.date_range(start=min_date.replace(day=1), end=max_date, freq='M')


In [219]:
date_df = pd.read_parquet("/Users/hajirufai/test/MSF-test/processed_data/dim_date_table.parquet")
date_df.head()

Unnamed: 0,date_key,date,year,month_num,month_name,month_name_full,quarter,day_of_week_num,day_of_week_name,day_of_month,week_of_year,year_month_key
0,20230131,2023-01-31,2023,1,Jan,January,1,1,Tuesday,31,5,202301
1,20230228,2023-02-28,2023,2,Feb,February,1,1,Tuesday,28,9,202302
2,20230331,2023-03-31,2023,3,Mar,March,1,4,Friday,31,13,202303
3,20230430,2023-04-30,2023,4,Apr,April,2,6,Sunday,30,17,202304
4,20230531,2023-05-31,2023,5,May,May,2,2,Wednesday,31,22,202305


In [220]:
fact_df = pd.read_parquet("/Users/hajirufai/test/MSF-test/processed_data/fact_analysis_table.parquet")
fact_df.head()

Unnamed: 0,date_key,year_month_key,date,record_type,amount_eur_final,project_id,country,department,category
0,20230131,202301,2023-01-31,Expense,2278.71,BE55,Belgium,HR,Salaries
1,20230131,202301,2023-01-31,Expense,4053.43,BE55,Belgium,HR,Training
2,20230131,202301,2023-01-31,Expense,1450.1,BE55,Belgium,HR,Recruitment
3,20230131,202301,2023-01-31,Expense,1275.95,BE55,Belgium,Medical,Medications
4,20230131,202301,2023-01-31,Expense,2182.81,BE55,Belgium,Medical,Medical Equipment


In [221]:
len(fact_df)

6912

In [223]:
## Get my silver dfs
silver_expenses_df = pd.read_parquet("/Users/hajirufai/test/MSF-test/processed_data/silver_expenses_data.parquet")
silver_budget_df = pd.read_parquet("/Users/hajirufai/test/MSF-test/processed_data/silver_budget_data.parquet")

In [227]:
silver_budget_df.isnull().sum()

date          0
department    0
category      0
budget_eur    0
project_id    0
country       0
dtype: int64

In [228]:
expenses_df.isnull().sum()

date                   0
department             0
category               0
amount_local         576
project_id             0
country                0
original_currency      0
rate                   0
amount_eur           576
dtype: int64

In [229]:
expenses_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3456 entries, 0 to 3455
Data columns (total 9 columns):
 #   Column             Non-Null Count  Dtype         
---  ------             --------------  -----         
 0   date               3456 non-null   datetime64[ns]
 1   department         3456 non-null   object        
 2   category           3456 non-null   object        
 3   amount_local       2880 non-null   float64       
 4   project_id         3456 non-null   object        
 5   country            3456 non-null   object        
 6   original_currency  3456 non-null   object        
 7   rate               3456 non-null   float64       
 8   amount_eur         2880 non-null   float64       
dtypes: datetime64[ns](1), float64(3), object(5)
memory usage: 243.1+ KB


In [230]:
# RESTART


In [231]:
bronze_expenses_df = pd.read_parquet("/Users/hajirufai/test/MSF-test/processed_data/bronze_expenses_data.parquet")
bronze_expenses_df.head()

Unnamed: 0,id,year,month,department,category,amount_local,currency,project_id,country,original_currency,amount_eur
0,1,2023,1,HR,Salaries,2278.71,EUR,BE55,Belgium,EUR,
1,2,2023,1,HR,Training,4053.43,EUR,BE55,Belgium,EUR,
2,3,2023,1,HR,Recruitment,1450.1,EUR,BE55,Belgium,EUR,
3,4,2023,1,Medical,Medications,1275.95,EUR,BE55,Belgium,EUR,
4,5,2023,1,Medical,Medical Equipment,2182.81,EUR,BE55,Belgium,EUR,


In [234]:
PROJECT_CURRENCY_MAP

{'BE01': {'country': 'Belgium', 'currency': 'EUR'},
 'BE55': {'country': 'Belgium', 'currency': 'EUR'},
 'KE01': {'country': 'Kenya', 'currency': 'KES'},
 'KE02': {'country': 'Kenya', 'currency': 'KES'},
 'SN01': {'country': 'Senegal', 'currency': 'XOF'},
 'SN02': {'country': 'Senegal', 'currency': 'XOF'},
 'BF01': {'country': 'Burkina Faso', 'currency': 'XOF'},
 'BF02': {'country': 'Burkina Faso', 'currency': 'XOF'}}

In [235]:
bronze_budget_df = pd.read_parquet("/Users/hajirufai/test/MSF-test/processed_data/bronze_budget_data.parquet")
bronze_budget_df.head()

Unnamed: 0,id,year,month,department,category,budget_eur,version,project_id,country
0,1,2023,1,HR,Salaries,2293.0,v1,BE55,Belgium
1,2,2023,1,HR,Training,4768.76,v1,BE55,Belgium
2,3,2023,1,HR,Recruitment,1642.21,v1,BE55,Belgium
3,4,2023,1,Medical,Medications,1377.75,v1,BE55,Belgium
4,5,2023,1,Medical,Medical Equipment,2062.46,v1,BE55,Belgium
