# Famine Alert Price Model Walkthrough


## Method
We illustrate the process of identifying price anomolies as a means of 
This is based on Baquedano:
"[Developing an indicator of price anomalies as an early warning tool: A compound growth approach](https://www.fao.org/3/i7550e/i7550e.pdf)"
Felix G. Baquedano. FAO, Rome, 2015.

## Data
As an illustration, we use food price data on Sudan from the FAO Food price monitoring tool:
1. [FAO data](https://fpma.fao.org/giews/fpmat4/#/dashboard/tool/domestic) (FAO. 2025. Food Price Monitoring and Analysis Tool. Accessed on February 3, 2025. Licence: CC-BY-4.0.)
2. We use [IMF CPI inflation data](https://www.imf.org/external/datamapper/PCPIPCH@WEO/WEOWORLD/SDN?year=2025) (Source: International Monetary Fund, "Inflation rate, average consumer prices")


In [None]:

import numpy as np
import pandas as pd
from pathlib import Path
import os

import json


try: # identify if in colab to fix paths
    import google.colab
    IN_COLAB = True
    print("colab identified.")
except ImportError:
    IN_COLAB = False

In [None]:
# clone the repository to access the data

!git clone https://github.com/aristotle-tek/famine-prediction.git


In [None]:
if IN_COLAB: # fix for paths in colab
    base_path = Path('/content/famine-prediction')
else:
    try:
        base_path = Path(__file__).resolve().parent.parent
    except NameError:
        base_path = Path.cwd().parent.parent
print("Base path: ", base_path)

In [None]:
os.chdir(base_path)
from src.price_models import (
    compute_cgr, compute_volatility, adjust_cgr_for_volatility,
    weighted_mean, weighted_std, compute_anomaly_score, classify_anomaly,
    compute_gamma, combine_signals, handle_missing_data
)

In [None]:

# example - wheat - Sudan
sudan_wheat_file = base_path / 'data'/ 'raw'/ 'price_data' / 'Sudan_Wheat_Mon_Feb_03_2025.xlsx'

df = pd.read_excel(sudan_wheat_file, parse_dates=['Date'])
df.sort_values(by='Date', inplace=True)
df.set_index('Date', inplace=True)

# just rename to the region, since they are all for Wheat, Retail, SDG/Kg
df.columns =  df.columns.str.extract(r'RETAIL, ([^,]+), Wheat Grain')[0].str.replace(" ", "_")


In [None]:
# For now we will just use a single region & food as an example: Wheat prices in Al-Fashir

curr_var = 'Al-Fashir'
price_series = df[curr_var]

missing = df[curr_var].isna().sum()
total = len(df)
print(f"Missing values for {curr_var}: {missing}/{total} ({missing/total:.2%})")


In [None]:
# Some price data is missing, so we will interpolate it

# print dates for which the price is missing
print(df[df[curr_var].isna()].index)

# Impute missing data using time-based interpolation (options include 'ffill', 'bfill', 'drop')
price_series = handle_missing_data(price_series, method='interpolate')

In [None]:
# Now let's adjust for inflation

inflation_index = pd.read_excel(base_path / 'price_data' / 'Sudan_IMF-inflation-20250203.xlsx', index_col=0)
inflation_index.drop(columns=['Inflation rate, average consumer prices (Annual percent change)'], inplace=True)

# The data is in rows, so transpose to get a column and set date as the index
inflation_index = inflation_index.T

inflation_index['Date'] = pd.to_datetime(inflation_index.index.astype(str) + '-01-01')

# Set Date as index for easier merging
inflation_index.set_index('Date', inplace=True)

def adjust_for_inflation(price_series, inflation_series):
    """Adjusts price data for inflation using CPI index."""
    return price_series / inflation_series.reindex(price_series.index, method='ffill')


price_series = adjust_for_inflation(price_series, inflation_index['Sudan'])

print(price_series.tail())

In [None]:
# Compute compound growth rates

# Compute CGR over a 3-month window (quarterly) and 12-month window (annual)
df['CGR_3m'] = compute_cgr(price_series, window=3)
df['CGR_12m'] = compute_cgr(price_series, window=12)

# Compute volatility (rolling std of log differences) over the same windows
df['vol_3m'] = compute_volatility(price_series, window=3)
df['vol_12m'] = compute_volatility(price_series, window=12)

# Adjust the CGRs for volatility
df['vCGR_3m'] = adjust_cgr_for_volatility(df['CGR_3m'], df['vol_3m'])
df['vCGR_12m'] = adjust_cgr_for_volatility(df['CGR_12m'], df['vol_12m'])

# To put slightly more emphasis on more recent data, we create a simple 
# linear increasing weight varying from 1 to 2. 
weights = np.linspace(1, 2, len(df))

# Compute weighted mean and standard deviation for the quarterly vCGR
valid_mask_3m = df['vCGR_3m'].notna()
valid_vCGR_3m = df.loc[valid_mask_3m, 'vCGR_3m'].values
weights_3m = weights[valid_mask_3m]
w_mean_3m = weighted_mean(valid_vCGR_3m, weights_3m)
w_std_3m = weighted_std(valid_vCGR_3m, weights_3m)

# Similarly for the annual vCGR
valid_mask_12m = df['vCGR_12m'].notna()
valid_vCGR_12m = df.loc[valid_mask_12m, 'vCGR_12m'].values
weights_12m = weights[valid_mask_12m]
w_mean_12m = weighted_mean(valid_vCGR_12m, weights_12m)
w_std_12m = weighted_std(valid_vCGR_12m, weights_12m)

# Compute anomaly scores for each period where vCGR is available
df['IPA_score_3m'] = df['vCGR_3m'].apply(
    lambda x: compute_anomaly_score(x, w_mean_3m, w_std_3m) if pd.notna(x) else np.nan
)
df['IPA_score_12m'] = df['vCGR_12m'].apply(
    lambda x: compute_anomaly_score(x, w_mean_12m, w_std_12m) if pd.notna(x) else np.nan
)

# Classify each anomaly based on its score
df['Alert_3m'] = df['IPA_score_3m'].apply(
    lambda x: classify_anomaly(x) if pd.notna(x) else "Insufficient data"
)
df['Alert_12m'] = df['IPA_score_12m'].apply(
    lambda x: classify_anomaly(x) if pd.notna(x) else "Insufficient data"
)

# Compute gamma from the available quarterly and annual vCGR values
gamma = compute_gamma(df['vCGR_3m'].dropna(), df['vCGR_12m'].dropna())

# Combine the quarterly and annual anomaly scores where both are available
def combined_IPA(row):
    if pd.notna(row['IPA_score_3m']) and pd.notna(row['IPA_score_12m']):
        return combine_signals(row['IPA_score_3m'], row['IPA_score_12m'], gamma)
    return np.nan

df['IPA_combined'] = df.apply(combined_IPA, axis=1)
df['Alert_combined'] = df['IPA_combined'].apply(
    lambda x: classify_anomaly(x) if pd.notna(x) else "Insufficient data"
)

print(df[[
    curr_var, 'CGR_3m', 'CGR_12m', 'vCGR_3m', 'vCGR_12m',
    'IPA_score_3m', 'IPA_score_12m', 'IPA_combined',
    'Alert_3m', 'Alert_12m', 'Alert_combined'
]])


In [None]:
# test stationarity

from src.stationarity_tests import test_adf, test_pp, test_dfgls, test_kpss


adf_result = test_adf(price_series)
pp_result = test_pp(price_series)
dfgls_result = test_dfgls(price_series)
kpss_result = test_kpss(price_series)

print("ADF Test:", adf_result)
print("\n\nPhillips-Perron Test:", pp_result)
print("\n\nDF-GLS Test:", dfgls_result)
print("\n\nKPSS Test:", kpss_result)

In [None]:
# To simplify the interpretation of the tests, we can re-organize the output

def summarize_stationarity_tests(adf_result, pp_result, dfgls_result, kpss_result):
    summary = {}

    adf_stat = adf_result['Test Statistic']
    adf_pval = adf_result['p-value']
    adf_critical = adf_result['Critical Values']
    if adf_stat < adf_critical['10%'] and adf_pval < 0.05:
        summary['ADF Test'] = "Suggests Stationarity"
    else:
        summary['ADF Test'] = "Suggests Non-Stationarity"

    pp_stat = pp_result['Test Statistic']
    pp_pval = pp_result['p-value']
    pp_critical = pp_result['Critical Values']
    if pp_stat < pp_critical['10%'] and pp_pval < 0.05:
        summary['Phillips-Perron Test'] = "Suggests Stationarity"
    else:
        summary['Phillips-Perron Test'] = "Suggests Non-Stationarity"

    dfgls_stat = dfgls_result['Test Statistic']
    dfgls_pval = dfgls_result['p-value']
    dfgls_critical = dfgls_result['Critical Values']
    if dfgls_stat < dfgls_critical['10%'] and dfgls_pval < 0.05:
        summary['DF-GLS Test'] = "Suggests Stationarity"
    else:
        summary['DF-GLS Test'] = "Suggests Non-Stationarity"

    # KPSS (opposite null hypothesis)
    kpss_stat = kpss_result['Test Statistic']
    kpss_critical = kpss_result['Critical Values']
    if kpss_stat < kpss_critical['1%']:
        summary['KPSS Test'] = "Suggests Stationarity"
    else:
        summary['KPSS Test'] = "Suggests Non-Stationarity"

    return summary

summary_results = summarize_stationarity_tests(adf_result, pp_result, dfgls_result, kpss_result)
for test, result in summary_results.items():
    print(f"{test}: {result}")
