# **Public Service Productivity - Residual Analysis for Autocorrelation**

## Introduction

The Office for National Statistics partners with government departments, academics and subject matter experts to review public service productivity. This is a measure of public sector input against outputs. Inputs include labour, goods and services used for work and capital, and outputs vary from department to department but include things like operations for the NHS or court hearings for the Ministry of Justice. For some (but currently not all) departments, these outputs are further modified by quality metrics: for the Department of Education for instance, more students means higher output and therefore higher productivity as long as inputs (teachers, educational funding) remain constant. However overcrowded classrooms can lead to lower exam grades which would mean it is counterproductive. Adjusting the productivity index accounts for this and avoids misleading productivity figures. 

This data is produced annually but the work involved in assessing the quality adjustments means there is a significant lag. Alongside this, closely related data is produced quarterly with a lag of only four months. This is not adjusted for quality so it is available quite promptly, but it does not perfectly align to the annual data due to difference in reporting areas.

The ONS's Time Series Analysis team uses the annual figures for inputs and outputs as well as the quarterly data to nowcast public sector productivity figures. The dynamic regression model they use for this is more complex (and therefore more expensive and time consuming to run) than simpler models available. The use of this model is based on an assumption that residuals from this model are not correlated over time which means that there is no information left in the residuals – it is all accounted for by the model.

The main goal of this project is to examine this assumption by analysising the data to see whether using a simpler model, the residuals are correlated over time. The hypothesis is that they will be, and showing this will prove that the current method of forecasting is justified. However if the hypothesis is shown to be false this opens up the opportunity for time and cost savings in preparing the forecasts in the future.

## About the data

We've been provided with the data used to train the Nowcast models and an R Script which uses the annual and annualised quartly PSP data to generate Nowcasts based on each model for each individual PSP variable tracked by the team (Totals, Education, Health and Public Order & Safety). 

These figures are represented as a proportion of the 1997 baseline, which is set to 100 (so a score of 200 for inputs would represent a doubling of 1997 inputs). They are saved in the "data/nowcast" directory.


In [None]:
# Import libraries
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import plotly.express as px
from sklearn.linear_model import LinearRegression

import statsmodels.api as sm
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.stats.diagnostic import acorr_ljungbox
from statsmodels.tsa.stattools import adfuller

In [None]:
#creating lists
categories = ["Total", "Health", "Education", "POS"]
measures = ["Input", "Output", "Productivity"]
inputs = [x + "_" + measures[0] for x in categories]
outputs = [x + "_" + measures[1] for x in categories]
prods = [x + "_" + measures[2] for x in categories]
models = ['observed', 'cagr', 'ets','dynamic','growth_rate']
cutoff_year = 2007 #data before this cutoff will be ignored

## *Select target below:*

In [None]:
target = inputs # IMPORTANT! Select from inputs, outputs or prods here e.g. target = inputs

In [None]:
# Import data
def data_importer(metric, cutoff = cutoff_year):
    '''
    Takes the metric you are interested in (from one of the lists above) and imports it to a df
    with a datetime index. The metric's nowcast data must be in the data/nowcast directory
    '''
    df = pd.read_csv(f"data/nowcast/PSP_TSC-V_{metric}.csv")
    df.rename(columns = {"Unnamed: 0": "year"}, inplace = True)
    df.year += 1996 
    df.year = pd.to_datetime(df.year, format='%Y')
    df = df.set_index('year')
    df = df[df.index >= f'{cutoff}/01/01']
    df = df.dropna(axis = 1)
    return df


df_all = [] # creating an empty list to catch all the dataframes for each target variable
for i, variable in enumerate(target):
    df_all.append(data_importer(target[i]))

## Data exploration

In [None]:
for i, df in enumerate(df_all):
    print('#' * 30)
    print(target[i])
    print('#' * 30)
    display(df_all[i].describe())
    display(df_all[i].info())

In [None]:
def initial_plot(df, metric, models_list = models):
    ''' Creates a plot of the Nowcasts and obs of the metric of interest.'''
    plt.figure(figsize =(8, 6))
    for model in models_list:
        try:
            plt.plot(df[model], label = model)
        except KeyError: 
            pass
    
    plt.title(f'PSP {metric} Nowcasts by model, baseline 1997 = 100')
    plt.ylabel(metric)
    plt.xlabel('Year')
    plt.legend()
    

for i, df in enumerate(df_all):
        initial_plot(df_all[i], target[i], models) 

**Augmented Dickey-Fuller Test** (statistical method to detect if a time series is stationary).

If p > 0.05 the series is not stationary (ie. it displays a trend or seasonality).

In [None]:
for i,df in enumerate(df_all):
    print("Models,", target[i],':') 
    print('-' * 30)
    for model in models:
        try:
            adf_result = adfuller(df_all[i][model])
            print(model,'p-value:', adf_result[1])
        except KeyError:
            print(f'p-value not available for {target[i]} {model} model.')
    print("\n")    

## Residuals plots

As the data shows very little sign of seasonality it was not possible to extract residuals from the trend using the seasonal decompose method. Instead we decided to do the following for each variable:

* Fit a linear line of best fit to that data
* Plot the residuals based on their distance from this line

In [None]:
# Function to perform linear regression and plot results
def plot_regression(df, ax_data, ax_residuals, X, y, label):
    # Fitting the linear regression model
    model = LinearRegression()
    model.fit(X, y)
    y_pred = model.predict(X)
    
    # Calculating residuals
    residuals = y - y_pred
    
    # Plotting the data and the line of best fit
    ax_data.plot(df.index, y, label='Nowcast')
    ax_data.plot(df.index, y_pred, label='Line of Best Fit', color='red')
    ax_data.set_xlabel('Year')
    ax_data.set_ylabel(label)
    ax_data.set_title(f'{label} vs Year with Line of Best Fit')
    ax_data.legend()
    
    # Plotting residuals
    ax_residuals.scatter(df.index, residuals, label='Residuals')
    ax_residuals.axhline(0, color='red', linestyle='--', label='Zero Line')
    ax_residuals.set_xlabel('Date')
    ax_residuals.set_ylabel('Residuals')
    ax_residuals.set_title(f'Residuals of {label} vs Year')
    ax_residuals.legend()

for i, entry in enumerate(df_all):
    print('#' * 30)
    print("Variable :", target[i])
    print('#' * 30)
    fig, axs = plt.subplots(5, 2, figsize=(12, 16))
    fig.tight_layout(pad=6.0)

    # Loop through model columns
    for j, model in enumerate(models):
        X = np.arange(len(df_all[i])).reshape(-1, 1)  # Reshape to fit the model

        # Plot for model data
        try:
            y_input = df_all[i][model].values
            plot_regression(df_all[i],axs[j, 0], axs[j, 1], X, y_input, model)
        except KeyError:
            print(f'Data not available for {target[i]} {model}.')
    
    plt.show()


## ADF Test on the residuals

The Augmented Dickey-Fuller (ADF) test is performed on the residuals of regression models fitted to the various time series data. The ADF test helps determine whether these residuals are stationary, which is essential for validating the adequacy of the time series models.

In [None]:
# Function to perform ADF test
def perform_adf_test(series):
    """Perform the ADF test on a time series and return results."""
    result = adfuller(series)
    adf_statistic = result[0]
    p_value = result[1]
    critical_values = result[4]
    return adf_statistic, p_value, critical_values

# Initialize an empty list to collect dataframes
adf_results = []
for i, entry in enumerate(df_all):
    # Initialize an empty list to collect results
    results = []

    # Loop through each column to fit a regression model and get residuals
    for column in models:
        # Prepare the data for regression
        try:
            y = df_all[i][column]
            X = sm.add_constant(range(len(y)))  # Use time index as the independent variable
        except KeyError:
            print(f'ADF Test not available for {target[i]} residuals.')
        else:
            # Fit the model
            model = sm.OLS(y, X).fit()

            # Get the residuals
            residuals = model.resid

            # Perform ADF test on the residuals
            adf_statistic, p_value, critical_values = perform_adf_test(residuals)
    
            # Store results in the list
            results.append({
                'Model': column,
                'ADF Statistic': adf_statistic,
                'ADF p-value': p_value,
                'ADF Critical Value (1%)': critical_values['1%'],
                'ADF Critical Value (5%)': critical_values['5%'],
                'ADF Critical Value (10%)': critical_values['10%']
        })

    # Create a DataFrame from the results
    adf_results.append(pd.DataFrame(results))

    # Print the results DataFrame
    print(target[i])
    display(adf_results[i])
    print("\n")

### Key Statistics:
- ADF Statistic: This is the test statistic from the ADF test. The more negative this value, the stronger the evidence against the null hypothesis of a unit root (i.e., non-stationarity).
- p-value: The probability that the test statistic is as extreme as observed under the null hypothesis. A low p-value (typically below 0.05) indicates that the null hypothesis can be rejected, suggesting that the residuals are stationary.
- Critical Values (1%, 5%, 10%): These are the threshold values for the ADF Statistic at different significance levels. If the ADF Statistic is more negative than the critical value at a particular significance level, the null hypothesis is rejected at that level.


## Ljung-Box Test
This tests the overall randomness of autocorrelations in a time series. It checks for the absence of serial autocorrelations.
The null hypothesis is that the model does not show a lack of fit - that any residuals are white noise. On the other hand if the null hyptothesis is rejected the model shows a lack of fit : there is data left on the table that should be incorporated into the model. a low P value (< 0.05) would be grounds to reject the null hypothesis.

In [None]:
# Function to perform Ljung-Box test and return results
def perform_ljung_box_test(residuals, lags=[10]):
    """Perform the Ljung-Box test on residuals and return results as DataFrame."""
    ljung_box_result = acorr_ljungbox(residuals, lags=lags, return_df=True)
    return ljung_box_result
#collecting overall results
ljung_box_results = [] 
for i, entry in enumerate(df_all):
    # Collect results
    results = []

    for model in models:
        try:
            y = df_all[i][model]
            
        except KeyError:
            print(f'Ljung-Box Test results not available for {target[i]} {model} model.')
        
        else:
            X = sm.add_constant(range(len(y)))  # Time index as independent variable
            lbmodel = sm.OLS(y, X).fit()
            residuals = lbmodel.resid
            ljung_box_result = perform_ljung_box_test(residuals, lags=[10])
            results.append({
                'Model': model,
                'Ljung-Box Statistic': ljung_box_result['lb_stat'].values[0],
                'Ljung-Box p-value': ljung_box_result['lb_pvalue'].values[0]
        })

    # Create DataFrame from results
    ljung_box_results.append(pd.DataFrame(results))

    # Print the results DataFrame
    print(target[i])
    display(ljung_box_results[i])
    print('\n')

## Ljung-Box Results Explained
The results presented are from the Ljung-Box test, which is a statistical test used to check if there are significant autocorrelations (dependencies) in the residuals of a time series model. The test essentially checks whether the residuals are randomly distributed (i.e., if they are "white noise"). If the residuals exhibit significant autocorrelation, it suggests that the model may not have fully captured the underlying structure of the data.

### Key Statistics:
- Ljung-Box Statistic: This is the test statistic calculated by the Ljung-Box test for each variable. A higher value of this statistic indicates more significant autocorrelation in the residuals.
- p-value: The p-value associated with the Ljung-Box statistic. It represents the probability of observing the data if the null hypothesis (that there is no autocorrelation) is true. A low p-value (typically below 0.05) suggests that we should reject the null hypothesis, indicating the presence of significant autocorrelation.


### Output (save ADF and Ljung-Box data by variable to multi-tab .xlsx)

In [None]:
#assigning a name variable here
if target == inputs:
    name = "inputs"
elif target == outputs:
    name = "outputs"
elif target == prods:
    name = "productivity"


#creating an excel file with tabs for each variable analysed
outcome = []
for i, entry in enumerate(df_all):
    outcome.append(adf_results[i].merge(ljung_box_results[i], how='left', on='Model'))

try:
    with pd.ExcelWriter(f"output/{name}_results.xlsx") as writer:
        for i, entry in enumerate(outcome):
            outcome[i].to_excel(writer, sheet_name=target[i])   
    print(f"Results output to 'output/{name}_results.xlsx'")
except:        
    print(f"Error: The file 'output/{name}_results.xlsx' is currently open. Please close it and try again.")

### Summaries of ADF and Ljung-Box Test Results

In [None]:
file_path = f'output/{name}_results.xlsx' 
excel_file = pd.ExcelFile(file_path)

# Output .txt file path
output_file_path = f'output/{name}_summaries.txt'

# Function to interpret ADF results
def interpret_adf(row):
    if row['ADF p-value'] < 0.01:
        return 'strong evidence of stationarity'
    elif row['ADF p-value'] < 0.05:
        return 'evidence of stationarity'
    elif row['ADF p-value'] < 0.10:
        return 'borderline stationarity'
    else:
        return 'non-stationary residuals'

# Function to interpret Ljung-Box results
def interpret_ljung_box(row):
    if row['Ljung-Box p-value'] < 0.01:
        return 'significant autocorrelation'
    elif row['Ljung-Box p-value'] < 0.05:
        return 'moderate autocorrelation'
    else:
        return 'weak autocorrelation'

# Open the text file
with open(output_file_path, 'w') as output_file:
    # Go through each sheet in the Excel file
    for sheet_name in excel_file.sheet_names:
        df = excel_file.parse(sheet_name)
        
        # Summaries
        adf_summary = []
        ljung_box_summary = []
        
        for index, row in df.iterrows():
            adf_result = interpret_adf(row)
            ljung_box_result = interpret_ljung_box(row)
            
            adf_summary.append(f"{row['Model']}: {adf_result}")
            ljung_box_summary.append(f"{row['Model']}: {ljung_box_result}")
        
        # Summary strings
        adf_summary_str = f"### ADF Test Summary for {sheet_name}:\n" + "\n".join([f"- {summary}" for summary in adf_summary])
        ljung_box_summary_str = f"\n### Ljung-Box Test Summary for {sheet_name}:\n" + "\n".join([f"- {summary}" for summary in ljung_box_summary])
        
        # Print the summary for the current sheet
        print(adf_summary_str)
        print(ljung_box_summary_str)
        print("\n" + "="*50 + "\n")
        
        # Write the summary .txt file
        output_file.write(adf_summary_str + "\n")
        output_file.write(ljung_box_summary_str + "\n")
        output_file.write("\n" + "="*50 + "\n\n")
