In [3]:
#Necessary packages
import pandas as pd
import numpy as np
from datetime import datetime
import statsmodels.api as sm
import statsmodels.tsa.stattools as ts
from math import sqrt
import datetime 
import calendar
import sys

In [4]:
# Data
file_path = r'C:\Users\PHIRI003\OneDrive - Wageningen University & Research\Documents\WEcR Internship\Work\Dairy\Merged_data.xlsx'
df = pd.read_excel(file_path, header=0 )
df.index = df['Date']

Including seasonal dummy variables to control for the effect of seasonality in agricultural markets

In [5]:
# Function for seasonal dummy variables
def SeasonalDummies(df, frequency='M'):
    nT = len(df)  # Number of observations
    startdate = df.index[0]  # Start date of the time series
    datetime = pd.DataFrame(data=pd.date_range(startdate, periods=nT, freq=frequency), columns=["datetime"])
    monthnumber = datetime["datetime"].dt.month  # Extract month numbers
    monthname = pd.DataFrame()  # DataFrame to store month names
    
    for i in range(nT):
        monthname.at[i, 'D'] = calendar.month_name[monthnumber[i]]  # Assign month names based on month numbers
    
    seasdum = pd.get_dummies(monthname)  # Create dummy variables for each month
    seasdum = seasdum.drop('D_January', axis=1)  # Drop January to avoid multicollinearity
    seasdum = seasdum.set_index(df.index)  # Set the index to match the original DataFrame
    seasdum = seasdum.astype(int)
    return seasdum

# Create seasonal dummies
seasdum = SeasonalDummies(df)

# Combine the data with the seasonal dummy variables
df_with_dummies = pd.concat([df, seasdum], axis=1)


Testing for unit root
Time series in agrifood/macro often undergo structural changes that may affect unit root test outcomes. Unit root tests with with structural breaks 
such as Zivot-Andrews test (identifies 1 break)  and Lumsdaine and Papell (identifies multiple breaks) address this issue. 

These two tests are carried out manually inorder to include seasonal dummies in the regression equations

In [None]:
import itertools
import statsmodels.api as sm

# Define the function to perform the Zivot-Andrews test with seasonal dummies
def za_test(y, seasdum, max_lag=12, num_breaks=1):
    y = pd.Series(y).astype(float)
    nT = len(y)
    m = 4  # Minimum distance between breaks
    h = int(nT * 0.15)  # Minimum distance from the start and end

    #set the test outputs
    best_ur_stat = np.inf # best unit-root t-stat
    best_breaks = None # structural breaks
    best_break_strength = None # # diagnostic: break dummy t-stat
    
    # Generate all possible combinations of break points in the data set
    potential_breaks = range(h, nT - h) # within the minimum and maximum distance
    all_combinations = list(itertools.combinations(potential_breaks, num_breaks))
    
    for breaks in all_combinations:
        if all(abs(breaks[i] - breaks[i-1]) >= m for i in range(1, num_breaks)):
            X = seasdum.copy()
            for i, bp in enumerate(breaks):
                X[f'break_{i}'] = (np.arange(nT) >= bp).astype(int) #break in levels
                X[f'break_trend_{i}'] = X[f'break_{i}'] * np.arange(nT)  #break in trend
            X['const'] = 1 #constat term
            X['trend'] = np.arange(nT)
            
            # unit-root term and ADF augmentation
            X['y_lag1'] = y.shift(1)       # lags, RHS values
            dy = y.diff()                   # lagged y, LHS

            for lag in range(1, max_lag + 1):
            X[f'dy_lag{lag}'] = dy.shift(lag) # add unit root variable to X, differenced lag

            # align and drop missing from lags and diff
            data = pd.concat([dy.rename('dy'), X], axis=1).dropna()
            dy_valid = data['dy']
            X_valid = data.drop(columns=['dy']).apply(pd.to_numeric, errors='coerce')

            # fit Δy_t on RHS
            model = sm.OLS(dy_valid, X_valid).fit()

            # unit-root test statistic: t-stat on y_{t-1}
            ur_stat = model.tvalues['y_lag1']
            
            if ur_stat < best_ur_stat:
            best_ur_stat = ur_stat
            best_breaks = breaks
            best_break_strength = break_strength
                
    return best_breaks, best_ur_stat, best_break_strength

for column in ['API', 'PPI', 'CPI', 'Oil', 'Silage_PI', 'Dairy_Conce', 'CWage_20']:
    print(f"\nZivot-Andrews test for {column}:")
    break_points, ur_stat, break_strength = za_test(df_with_dummies[column], seasdum)

    print("Best break points:", [df_with_dummies.index[bp] for bp in break_points])
    print("Unit root test statistic :", ur_stat)
    print("Break strength:", break_strength)


In [None]:
# Test with more than one strucutural break in the timeframe
import ruptures as rpt
import matplotlib.pyplot as plt # Plot the graph showing breaks points

# Define the function to perform the Lumsdaine and Papell test with seasonal dummies
def lumsdaine_papell_with_seasonal_dummies(y, seasdum, max_lag=12, num_breaks=3):
    y = pd.Series(y).astype(float)
    nT = len(y)
    m = 4  # Minimum distance between breaks
    h = int(nT * 0.15)  # Minimum distance from the start and end
    
    best_ur_stat = np.inf    
    best_breaks = None
    best_break_strength = None
    
# break iterations
    for breaks in itertools.combinations(potential_breaks, num_breaks):
        if any(abs(breaks[i] - breaks[i-1]) < m for i in range(1, num_breaks)):
            continue

        # design matrix
        X = seasdum.copy()
        # level & trend-break dummies
        for i, bp in enumerate(breaks):
            step = (time >= bp).astype(int)
            X[f'break_{i}']       = step
            X[f'break_trend_{i}'] = step * time

        # deterministic terms
        X['const'] = 1.0
        X['trend'] = time

        # ADF-style terms
        X['y_lag1'] = y.shift(1)               # <- key for unit-root test
        dy = y.diff()                           # LHS = Δy_t
        for lag in range(1, max_lag + 1):
            X[f'dy_lag{lag}'] = dy.shift(lag)

        # align
        data = pd.concat([dy.rename('dy'), X], axis=1).dropna()
        if len(data) < 10:    # guard against too few obs after lags
            continue

        dy_valid = data['dy']
        X_valid  = data.drop(columns=['dy']).apply(pd.to_numeric, errors='coerce')

        # OLS of Δy_t on RHS
        model = sm.OLS(dy_valid, X_valid).fit()

        # unit-root t-statistic on y_{t-1}
        if 'y_lag1' not in model.tvalues.index:
            continue
        ur_stat = model.tvalues['y_lag1']

        # diagnostic: strength of level-shift break(s)
        brk_t = []
        for i in range(num_breaks):
            nm = f'break_{i}'
            if nm in model.tvalues.index:
                brk_t.append(abs(model.tvalues[nm]))
        break_strength = max(brk_t) if brk_t else np.nan

        # LP/ZA convention: choose the configuration with the MOST NEGATIVE ur-stat
        if ur_stat < best_ur_stat:
            best_ur_stat = ur_stat
            best_breaks = breaks
            best_break_strength = break_strength

    return best_breaks, best_ur_stat, best_break_strength
    
# Function to apply Bai-Perron test and find breakpoints
def bai_perron_breakpoints(y, n_bkps=3):
    ts = y.dropna()
    adjusted_index = ts.index
    ts_values = ts.values.reshape(-1, 1)
    
    # Apply the Bai-Perron test
    model = "l2"  # least squares
    algo = rpt.Binseg(model=model).fit(ts_values)
    
    result = algo.predict(n_bkps=n_bkps)
    
    #breakpoint indices to dates
    break_dates = [adjusted_index[i] for i in result if i < len(adjusted_index)]
    
    return break_dates

# Function to identify similar breaks between the two methods
def find_similar_breaks(lumsdaine_breaks, bai_perron_breaks, tolerance=5):
    similar_breaks = []
    for lb in lumsdaine_breaks:
        for bp in bai_perron_breaks:
            if abs((lb - bp).days) <= tolerance:  # Compare the difference in days between dates
                similar_breaks.append((lb, bp))
    return similar_breaks


# Example usage
for column in ['API', 'CPI', 'PPI', 'Dairy_Conce', 'CWage_20']:
    print(f"\nAnalysis for {column}:")
    
    # Lumsdaine and Papell test
    lumsdaine_breaks, t_stat, unit_root_stat = lumsdaine_papell_with_seasonal_dummies(df[column], seasdum)
    lumsdaine_breaks_dates = [df.index[bp] for bp in lumsdaine_breaks]
    
    print("Lumsdaine-Papell best break points:", lumsdaine_breaks_dates)
    print("T-statistic at best break points:", t_stat)
    print("Unit root test statistic:", unit_root_stat)
    
    # Bai-Perron test
    bai_perron_breaks = bai_perron_breakpoints(df[column])
    
    print("Bai-Perron breakpoints:", bai_perron_breaks)
    
    # Find similar breaks
    similar_breaks = find_similar_breaks(lumsdaine_breaks_dates, bai_perron_breaks)
    print("Similar breaks between methods:", similar_breaks)
    
    # Plot Bai-Perron breakpoints
    ts = df[column].dropna()
    adjusted_index = ts.index
    ts_values = ts.values.reshape(-1, 1)
    rpt.display(ts_values, [adjusted_index.get_loc(d) for d in bai_perron_breaks if d in adjusted_index])
    plt.title(f'Breakpoints in {column}')
    plt.xlabel('Date')
    plt.ylabel(column)
    plt.xticks(ticks=[adjusted_index.get_loc(d) for d in bai_perron_breaks if d in adjusted_index], 
               labels=[d.strftime('%Y-%m-%d') for d in bai_perron_breaks], 
               rotation=45)
    plt.savefig(f'{column}_breakpoints.png', bbox_inches='tight')
    plt.show()
