# Index Creation, Charting, Simple Statistical Analysis and Connectedness

## Part 1: Loading the required libraries

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.dates as mdates # For better date formatting
import openpyxl
import numpy as np

from statsmodels.tsa.stattools import adfuller, kpss


## Part 2: Functions for loading in the prices from Refinitiv for each stock and to calculate our index.

### Function 2.A: Load and Prepare Refinitiv obtained company price data from each group.

In [None]:
def load_and_prepare_data(filepath, sheet_name=0):
    """
    Loads data from a single Excel file, and prepares it.
    Each pair of (Exchange Date, Price) columns is treated as a separate company.
    """
    try:
        xls = pd.ExcelFile(filepath)
    except FileNotFoundError:
        print(f"Error: File not found at {filepath}")
        return pd.DataFrame(), [] # Return empty DataFrame and list
    df_raw = pd.read_excel(xls, sheet_name=sheet_name, header=0)

    all_company_data = []
    company_names = []

    if df_raw.shape[1] % 2 != 0:
        print(f"Warning: The Excel file {filepath} has an odd number of columns. Expecting Date/Price pairs.")
        # Process pairs and ignore the last column if odd.

    # Iterate through columns ensuring we don't go out of bounds for price_col_name
    for i in range(0, df_raw.shape[1] - (df_raw.shape[1] % 2) , 2):
        date_col_name = df_raw.columns[i]
        price_col_name = df_raw.columns[i+1]
        company_name = price_col_name # Assuming price column header is the company name

        company_df = df_raw[[date_col_name, price_col_name]].copy()
        company_df.columns = ['Date', 'Price'] # Standardize column names

        # Convert 'Date' column to datetime objects, handling potential errors
        company_df['Date'] = pd.to_datetime(company_df['Date'], errors='coerce')
        company_df.dropna(subset=['Date'], inplace=True) # Remove rows where date conversion failed

        # Remove rows where Price might be missing (e.g., before IPO) or non-numeric
        company_df['Price'] = pd.to_numeric(company_df['Price'], errors='coerce')
        company_df.dropna(subset=['Price'], inplace=True)

        if company_df.empty:
            continue

        company_df.set_index('Date', inplace=True)
        company_df.rename(columns={'Price': company_name}, inplace=True)

        if not company_df.empty:
            all_company_data.append(company_df)
            company_names.append(company_name)

    if not all_company_data:
        return pd.DataFrame(), []

    merged_df = pd.concat(all_company_data, axis=1, join='outer')
    merged_df.sort_index(inplace=True)
    return merged_df, company_names

### Function 2.B: Calculating the Equally Weighted Indexes

In [None]:
def calculate_equal_weighted_index(price_data_df, base_value=100):
    """
    Calculates an equal-weighted index.
    """
    if price_data_df.empty:
        return pd.Series(dtype=float), pd.DataFrame(dtype=float), pd.DataFrame(dtype=float), pd.Series(dtype=int)

    filled_price_data = price_data_df.ffill()
    daily_returns = filled_price_data.pct_change()
    num_companies_active = filled_price_data.notna().sum(axis=1)
    index_daily_returns = daily_returns.mean(axis=1)

    first_valid_date = None
    if not num_companies_active[num_companies_active > 0].empty:
        first_valid_date = num_companies_active[num_companies_active > 0].index[0]
        index_daily_returns.loc[first_valid_date] = 0
    else:
        return pd.Series(dtype=float), daily_returns, filled_price_data, num_companies_active

    index_values = base_value * (1 + index_daily_returns).cumprod()
    index_values = index_values.reindex(filled_price_data.index)

    if first_valid_date:
        if not index_values.empty and index_values.index[0] < first_valid_date:
             index_values.loc[:pd.Timestamp(first_valid_date) - pd.Timedelta(days=1)] = float('nan')
        if first_valid_date in index_values.index:
            index_values.loc[first_valid_date] = base_value
    else:
        index_values[:] = float('nan')

    return index_values, daily_returns, filled_price_data, num_companies_active

### Function 2.C: Processing

In [None]:
def process_and_return_index(file_path_to_process, index_name_prefix):
    """
    Loads data, calculates index, and returns it for later use.
    This version is simplified to only return the index series.
    """
    print(f"\n--- Processing {index_name_prefix}: {file_path_to_process} ---")
    combined_prices_df, _ = load_and_prepare_data(file_path_to_process)

    if not combined_prices_df.empty:
        index_series, _, _, _ = calculate_equal_weighted_index(combined_prices_df, base_value=100)
        if not index_series.dropna().empty:
            print(f"Successfully calculated {index_name_prefix} Index.")
            return index_series.rename(f"{index_name_prefix} Index") # Rename for clarity
        else:
            print(f"Index calculation for {index_name_prefix} resulted in no valid data.")
            return None
    else:
        print(f"No data loaded from {file_path_to_process}.")
        return None

### Function 2.D: Plotting all the indexes in a single chart

In [None]:
def plot_multiple_indices(indices_df, title='Comparative Index Performance'):
    """
    Plots multiple index series from a DataFrame on the same monthly chart.
    """
    if indices_df.empty:
        print("DataFrame is empty. Cannot plot.")
        return

    # Resample to monthly, using the last value of the month
    monthly_df = indices_df.resample('M').last().dropna(how='all')

    if monthly_df.empty:
        print("No data available for monthly plotting after resampling.")
        return

    plt.figure(figsize=(14, 7))
    for column in monthly_df.columns:
        plt.plot(monthly_df.index, monthly_df[column], marker='o', linestyle='-', label=column)

    plt.title(title)
    plt.xlabel('Date')
    plt.ylabel('Index Value')
    plt.grid(True)
    plt.legend() # Add a legend to distinguish the lines

    # Formatting the x-axis for dates
    plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m'))
    locator_interval = max(1, len(monthly_df) // 12 if len(monthly_df) > 0 else 1)
    plt.gca().xaxis.set_major_locator(mdates.MonthLocator(interval=locator_interval))
    plt.gcf().autofmt_xdate()

    plt.show()

### Function 2.E: Plotting Individual Indexes

In [None]:
def plot_single_index(index_series, title='Index Performance', y_limits=None):
    """
    Plots a single index on its own monthly chart.
    If y_limits (a tuple) is provided, it sets a fixed y-axis range.
    """
    if index_series.empty or index_series.dropna().empty:
        print(f"Index series '{title}' is empty or all NaN after date filtering. Cannot plot.")
        return

    try:
        monthly_series = index_series.resample('ME').last().dropna()
    except AttributeError:
        monthly_series = index_series.resample('M').last().dropna()

    if monthly_series.empty:
        print(f"No data available for monthly plotting for '{title}'.")
        return

    plt.figure(figsize=(12, 6))
    plt.plot(monthly_series.index, monthly_series.values, marker='o', linestyle='-', label=title)
    plt.title(title)
    plt.xlabel('Date')
    plt.ylabel('Index Value')
    plt.grid(True)
    plt.legend()
    
    # This is the new part: Apply the standardized y-axis limits if they are provided.
    if y_limits:
        plt.ylim(y_limits)
        
    plt.gca().xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m'))
    locator_interval = max(1, len(monthly_series) // 12 if len(monthly_series) > 0 else 1)
    plt.gca().xaxis.set_major_locator(mdates.MonthLocator(interval=locator_interval))
    plt.gcf().autofmt_xdate()
    plt.show()

## Part 3: Main Script

In [None]:
# --- Main Script ---

# 1. DEFINE YOUR PARAMETERS HERE
file_path = "calculated_indices.xlsx"
sheet_names = ["Neo Bank Index", "Challenger Bank Index", "Traditional Bank Index"]
hard_stop_date = pd.to_datetime('2025-05-01')

print(f"--- Loading pre-calculated indices from: {file_path} ---")

# 2. Load each index
loaded_indices = []
try:
    for sheet in sheet_names:
        index_series = pd.read_excel(file_path, sheet_name=sheet, index_col=0, parse_dates=True).iloc[:, 0]
        index_series.name = sheet
        loaded_indices.append(index_series)
        print(f"Successfully loaded sheet: '{sheet}'")
except FileNotFoundError:
    print(f"\nERROR: The file was not found at the specified path.")
    exit()
except Exception as e:
    print(f"\nAn error occurred while reading the Excel file: {e}")
    exit()

# 3. Proceed only if data was loaded
if not loaded_indices or len(loaded_indices) < 2:
    print("\n--- At least two indices are required for comparison. Script finished. ---")
else:
    # --- Part 1: Generate the Comparative Chart ---
    temp_df = pd.concat(loaded_indices, axis=1)
    first_common_date = temp_df.dropna().index[0]
    print(f"\n--- Aligning all indices to first common valid date: {first_common_date.date()} ---")
    print(f"--- Truncating all data to end on or before: {hard_stop_date.date()} ---")

    final_slice_df = temp_df.loc[first_common_date:hard_stop_date]
    comparison_df = (final_slice_df / final_slice_df.iloc[0]) * 100

    print("\n--- Aligned and Re-based Index Comparison (First 5 rows) ---")
    print(comparison_df.head())
    plot_multiple_indices(comparison_df, title=f'Comparative Index Performance (from pre-calculated file)')

    # --- Part 2: Generate Individual Charts with Standardized Y-Axis ---
    print(f"\n--- Generating individual plots from {first_common_date.date()} to {hard_stop_date.date()} ---")
    
    # NEW: Calculate the common y-axis range BEFORE plotting.
    # 1. Get the slice of data that will be plotted.
    individual_plots_df = temp_df.loc[first_common_date:hard_stop_date]
    
    # 2. Find the absolute min and max across all data in that slice.
    global_min = individual_plots_df.min().min()
    global_max = individual_plots_df.max().max()
    
    # 3. Add 5% padding for better visualization.
    padding = (global_max - global_min) * 0.05
    y_axis_limits = (global_min - padding, global_max + padding)
    
    print(f"Standardizing individual plot Y-axis from {y_axis_limits[0]:.2f} to {y_axis_limits[1]:.2f}")

    # Loop through the columns of our sliced dataframe.
    for index_name in individual_plots_df.columns:
        # Pass the calculated y_axis_limits to the plotting function.
        plot_single_index(
            individual_plots_df[index_name], 
            title=f"{index_name} (from common start date)",
            y_limits=y_axis_limits
        )
        
print("\n--- Script Finished ---")

## Saving the equal/aligned indexes to excel.

In [None]:
# 1. DEFINE THE OUTPUT FILENAME
output_file_name = 'calculated_indices_EQUAL.xlsx'

# 2. CHECK IF THE RE-BASED DATA EXISTS
# We use 'comparison_df' which contains the data re-based to 100.
if 'comparison_df' in locals() and not comparison_df.empty:
    print(f"\n--- Saving re-based index data (starts at 100) to: {output_file_name} ---")
    
    try:
        # Use pd.ExcelWriter to save multiple sheets to one file.
        with pd.ExcelWriter(output_file_name, engine='openpyxl') as writer:
            
            # Loop through each column (each index) in the comparison DataFrame.
            for index_name in comparison_df.columns:
                
                # Select the data for the current index.
                single_index_df = comparison_df[[index_name]]
                
                # Write this single-column DataFrame to a new sheet.
                single_index_df.to_excel(writer, sheet_name=index_name, index=True)
                
                print(f"  - Saved '{index_name}' to its own sheet.")

        print(f"\nSuccessfully created '{output_file_name}'. Each index starts at 100.")

    except Exception as e:
        print(f"\nERROR: Could not save the Excel file. Reason: {e}")
        
else:
    print("\n--- No final re-based data available to save. Skipping file export. ---")

# Part 4: Stationary Testing

## Function 4.A: Running ADF and KPSS on the 3 indexes

In [None]:
def run_stationarity_tests(series, series_name):
    """
    Runs and interprets the Augmented Dickey-Fuller and KPSS tests on a time series.
    """
    print(f"\n--- Stationarity Tests for {series_name} ---")

    # Drop missing values for the tests
    series_cleaned = series.dropna()

    # --- Augmented Dickey-Fuller Test ---
    # The ADF test checks for non-stationarity.
    # Null Hypothesis (H0): The series has a unit root (it is non-stationary).
    # Alternative Hypothesis (HA): The series does not have a unit root (it is stationary).
    print("\n1. Augmented Dickey-Fuller Test (ADF):")
    # The adfuller function returns several values; we're primarily interested in the p-value.
    adf_result = adfuller(series_cleaned)
    p_value_adf = adf_result[1]

    print(f"   ADF Statistic: {adf_result[0]}")
    print(f"   p-value: {p_value_adf}")
    print("   Critical Values:")
    for key, value in adf_result[4].items():
        print(f'      {key}: {value}')

    if p_value_adf <= 0.05:
        print("\n   Conclusion: Strong evidence against the null hypothesis (p <= 0.05).")
        print("   Reject the null hypothesis. The series is likely stationary.")
    else:
        print("\n   Conclusion: Weak evidence against the null hypothesis (p > 0.05).")
        print("   Fail to reject the null hypothesis. The series is likely non-stationary.")

    # --- KPSS Test ---
    # The KPSS test checks for stationarity around a mean or trend.
    # Null Hypothesis (H0): The series is stationary around a constant (trend-stationary).
    # Alternative Hypothesis (HA): The series has a unit root (it is not stationary).
    print("\n2. Kwiatkowski-Phillips-Schmidt-Shin Test (KPSS):")
    # We use 'ct' to test for stationarity around a trend, which is common for financial indices.
    kpss_result = kpss(series_cleaned, regression='ct')
    p_value_kpss = kpss_result[1]

    print(f"   KPSS Statistic: {kpss_result[0]}")
    print(f"   p-value: {p_value_kpss}")
    print("   Critical Values:")
    for key, value in kpss_result[3].items():
        print(f'      {key}: {value}')

    if p_value_kpss <= 0.05:
        print("\n   Conclusion: Strong evidence against the null hypothesis (p <= 0.05).")
        print("   Reject the null hypothesis. The series is likely non-stationary.")
    else:
        print("\n   Conclusion: Weak evidence against the null hypothesis (p > 0.05).")
        print("   Fail to reject the null hypothesis. The series is likely trend-stationary.")

# Part 5: Main Script for ADF and KPSS tests

In [None]:
# --- Main Script to Load and Test Indices ---
excel_file_path = 'calculated_indices_EQUAL.xlsx'
try:
    # Load each sheet into a separate DataFrame
    neo_index_df = pd.read_excel(excel_file_path, sheet_name='Neo Bank Index', index_col=0)
    challenger_index_df = pd.read_excel(excel_file_path, sheet_name='Challenger Bank Index', index_col=0)
    traditional_index_df = pd.read_excel(excel_file_path, sheet_name='Traditional Bank Index', index_col=0)

    # Extract the series from the DataFrames
    neo_series = neo_index_df.iloc[:, 0]
    challenger_series = challenger_index_df.iloc[:, 0]
    traditional_series = traditional_index_df.iloc[:, 0]

    # Run tests for each index
    run_stationarity_tests(neo_series, "Neo Bank Index")
    run_stationarity_tests(challenger_series, "Challenger Bank Index")
    run_stationarity_tests(traditional_series, "Traditional Bank Index")

except FileNotFoundError:
    print(f"Error: The file '{excel_file_path}' was not found.")
    print("Please make sure you have run the first code block to generate it.")

In [None]:
print("--- Step 1: Applying First-Difference Transformation ---")
# The .diff() method calculates the difference from the previous day.
# The .dropna() method removes the first row, which is now NaN.
try:
    neo_diff = neo_series.diff().dropna()
    challenger_diff = challenger_series.diff().dropna()
    traditional_diff = traditional_series.diff().dropna()
    print("Transformation complete. The new series represent daily returns.")
except NameError:
    print("Error: The original series (e.g., neo_series) were not found.")
    print("Please make sure you have run the previous code cell first.")


def run_stationarity_tests_on_transformed_data(series, series_name):
    """
    Runs ADF and KPSS tests specifically for transformed (differenced) data.
    The key difference is using regression='c' in the KPSS test, as the trend has been removed.
    """
    print(f"\n--- Stationarity Tests for: {series_name} ---")
    
    # --- Augmented Dickey-Fuller Test ---
    # Null Hypothesis: The series is NON-STATIONARY.
    # We expect to REJECT this for differenced data.
    print("\n1. Augmented Dickey-Fuller Test (ADF):")
    adf_result = adfuller(series)
    p_value_adf = adf_result[1]
    print(f"   p-value: {p_value_adf}")
    if p_value_adf <= 0.05:
        print("   Conclusion: REJECT the null hypothesis. The series is likely STATIONARY.")
    else:
        print("   Conclusion: FAIL to reject the null hypothesis. The series is likely NON-STATIONARY.")

    # --- KPSS Test ---
    # Null Hypothesis: The series is STATIONARY.
    # We expect to FAIL TO REJECT this for differenced data.
    print("\n2. Kwiatkowski-Phillips-Schmidt-Shin Test (KPSS):")
    # Using regression='c' because we are testing for stationarity around a constant (level), not a trend.
    kpss_result = kpss(series, regression='c')
    p_value_kpss = kpss_result[1]
    print(f"   p-value: {p_value_kpss}")
    if p_value_kpss <= 0.05:
        print("   Conclusion: REJECT the null hypothesis. The series is likely NON-STATIONARY.")
    else:
        print("   Conclusion: FAIL to reject the null hypothesis. The series is likely STATIONARY.")


# --- Step 2: Re-run tests on the NEW differenced data ---
# We now call the new testing function on our transformed series.
print("\n--- Step 2: Running Stationarity Tests on Transformed (Differenced) Data ---")

run_stationarity_tests_on_transformed_data(neo_diff, "Neo Bank Index (Transformed)")
run_stationarity_tests_on_transformed_data(challenger_diff, "Challenger Bank Index (Transformed)")
run_stationarity_tests_on_transformed_data(traditional_diff, "Traditional Bank Index (Transformed)")