In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import sys
import numpy as np

In [None]:
# Load the datasets
euster_df = pd.read_csv('./data/euster.csv')
fond_df = pd.read_csv('./data/fond.csv')

# Clean fond data
fond_df['Date'] = pd.to_datetime(fond_df['Date'])
fond_df = fond_df.sort_values(by='Date')

# Clean euster data


# rename columns
euster_df = euster_df.rename(columns={"ECBESTRVOLWGTTRMDMNRT": "Value","DATE": "Date"})
euster_df['Date'] = pd.to_datetime(euster_df['Date'])
euster_df = euster_df.sort_values('Date')

# filter NaN values
# Convert a column to numeric, coercing non-numeric values to NaN
euster_df['Value'] = pd.to_numeric(euster_df['Value'], errors='coerce')

# Drop the rows where the column has NaN values
euster_df = euster_df.dropna(subset=['Value'])

euster_df["Date"] = euster_df["Date"].astype('datetime64[ns]')

In [None]:
def caclculate_annualized_return(window_df, window_size_actual):
    return (window_df['Price'].iloc[-1] / window_df['Price'].iloc[0])**(365/window_size_actual) - 1
    
def calculate_non_overlapping_window_return(df, window_size_or_windows=20):
    if isinstance(window_size_or_windows, int):
        # Original logic
        num_windows = len(df) // window_size_or_windows
        results_df = pd.DataFrame(index=range(num_windows), columns=['Date', 'Annualized Return'])

        for i in range(num_windows):
            start = i * window_size_or_windows
            end = (i + 1) * window_size_or_windows
            window_df = df.iloc[start:end]
            if not window_df.empty:
                window_size_actual = len(window_df)
                annualized_return = caclculate_annualized_return(window_df, window_size_actual)
                results_df.loc[i, 'Annualized Return'] = annualized_return * 100
                results_df.loc[i, 'Date'] = window_df['Date'].iloc[window_size_actual // 2] 
        

    elif isinstance(window_size_or_windows, list):
        # New logic for list of date pairs
        results_df = pd.DataFrame(columns=['Date', 'Annualized Return'])
        for i, (start_date, end_date) in enumerate(window_size_or_windows):
            window_df = df[(df['Date'] >= start_date) & (df['Date'] <= end_date)]
            if not window_df.empty:
                window_size_actual = len(window_df)
                annualized_return = caclculate_annualized_return(window_df, window_size_actual)
                results_df.loc[i, 'Date'] = window_df['Date'].iloc[window_size_actual // 2]
                results_df.loc[i, 'Annualized Return'] = annualized_return * 100
    else:
        raise ValueError("window_size must be an integer or a list of date pairs")
    
    results_df = results_df.dropna()
    results_df["Date"] = results_df["Date"].astype('datetime64[ns]')
    return results_df



In [None]:
start_date_filter = pd.to_datetime("2022-08-01")
euster_filtered_df = euster_df[euster_df['Date'] >= start_date_filter]

fond_filtered_df = fond_df[fond_df['Date'] >= start_date_filter]

In [None]:
def calculate_median_tracking_difference(index_df,fond_df):
    merged_df = pd.merge(index_df, fond_df, left_on='Date', right_on='Date')
    merged_df['Tracking Difference'] = merged_df['Value'] - merged_df['Annualized Return']
    median_tracking_difference = merged_df['Tracking Difference'].median()

    return median_tracking_difference

In [None]:
# Data for best tracking difference window size dataframe
best_td_df = None
best_td_window_size = None
best_td_median_tracking_difference =sys.maxsize

# Check window sizes from 1 to 100
for i in range(1,100):
    # Calculate the annualized return for each window
    annual_returns_df = calculate_non_overlapping_window_return(fond_filtered_df,i)
    median_tracking_difference = calculate_median_tracking_difference(euster_filtered_df, annual_returns_df)
    
    # Update best tracking difference if needed
    if abs(median_tracking_difference) < best_td_median_tracking_difference:
        best_td_median_tracking_difference = median_tracking_difference
        best_td_df = annual_returns_df
        best_td_window_size = i

In [None]:
def show_chart(df,window_size,median_tracking_difference,label):
    plt.figure(figsize=(12, 6))
    plt.plot(df['Date'], df['Annualized Return'],label=f'{label} (window size = {window_size}, tracking difference={median_tracking_difference:.2f}%)')

    # Plot EUSTER value
    plt.plot(euster_filtered_df['Date'], euster_filtered_df['Value'], label='EUSTER Value')

    plt.title('Annualized Returns and EUSTER Value over Time')
    plt.xlabel('Date')

    plt.ylabel('Percentage (%)')
    plt.legend()
    plt.grid(True)
    plt.show()

In [None]:
show_chart(best_td_df,best_td_window_size,best_td_median_tracking_difference,"Best tracking difference")

In [None]:
def create_windows(df,change_threshold):
    significant_change_indices = df.index[df['Value'].diff().abs() > 0.1]
    windows = []
    start_date = df['Date'].iloc[0]
    for end_date in significant_change_indices:
        windows.append((start_date, df['Date'].iloc[end_date] - pd.Timedelta(days=1)))
        start_date = df['Date'].iloc[end_date]
    windows.append((start_date, df['Date'].iloc[-1]))
    return windows


# Calculate the annualized return for each window
windows = create_windows(euster_df,0.05)

average_window_size = np.mean([window[1] - window[0] for window in windows if window[0] > start_date_filter])

ester_window_annualized_df = calculate_non_overlapping_window_return(fond_df, windows)
# filter from start_date_filter
ester_window_annualized_df = ester_window_annualized_df[ester_window_annualized_df['Date'] >= start_date_filter] 

median_tracking_difference = calculate_median_tracking_difference(euster_filtered_df, ester_window_annualized_df)

show_chart(ester_window_annualized_df,average_window_size,median_tracking_difference,"Index change window")
