Import & Settings

In [84]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from scipy.stats import spearmanr
import tensorflow as tf
from itertools import product
from keras.models import Model
from keras.layers import Input, Dense
import matplotlib.pyplot as plt
from scipy.stats import spearmanr
from tensorflow.keras.layers import Layer
from tensorflow.keras import layers, Model
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import TimeSeriesSplit

1. Load and process data

In [85]:
from google.colab import files
uploaded = files.upload()

Saving market_data.xlsx to market_data (3).xlsx


In [86]:
df = pd.read_excel("market_data.xlsx")
# Show first rows
print(df.columns.tolist())
# Delete columns Date and target _MKT
X_full = df.drop(columns=["Date", "_MKT"])
# Remove extra spaces
df.columns = df.columns.str.strip()

['Date', 'EMP', 'PE', 'CAPE', 'DY', 'Rho', 'MOV ', 'IR', 'RR', 'Y02', 'Y10', 'STP', 'CF', 'MG', 'RV', 'ED', 'UN', 'GDP', 'M2', 'CPI', 'DIL', 'YSS', 'NYF', '_AU', '_DXY', '_LCP', '_TY', '_OIL', '_MKT', '_VA', '_GR']


Define signal sets

In [87]:
signal_sets = {
    "Set 1: Valuation"        : ['PE', 'CAPE', 'DY'],
    "Set 2: Interest Rates"   : ['IR', 'RR', 'Y02', 'Y10', 'STP'],
    "Set 3: Macro Conditions" : ['GDP', 'M2', 'CPI', 'UN', 'CF'],
    "Set 4: Corporate Health" : ['MG', 'RV', 'ED'],
    "Set 5: Risk Sentiment"   : ['Rho', 'MOV', 'YSS', 'NYF', '_DXY', '_OIL'],
    "Set 6: Style & Asset"    : ['_VA', '_GR', '_AU', '_TY'],
    "Set 7: Full Set"         : [col for col in df.columns if col not in ['Date', '_MKT']]
}

Apply standard AE

In [88]:
autoencoder_results = {}

for set_name, columns in signal_sets.items():

    # Prepare data
    data = df[columns].dropna()
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(data)

    # simple AE architecture
    input_dim = X_scaled.shape[1]
    input_layer = Input(shape=(input_dim,))
    encoded = Dense(4, activation='relu')(input_layer)
    decoded = Dense(input_dim, activation='linear')(encoded)
    autoencoder = Model(inputs=input_layer, outputs=decoded)
    autoencoder.compile(optimizer='adam', loss='mse')

    # Training
    history = autoencoder.fit(X_scaled, X_scaled, epochs=100, batch_size=16, verbose=0)

    # Save results
    loss = history.history['loss']
    autoencoder_results[set_name] = {
        "loss": loss,
        "final_loss": loss[-1]
    }


In [89]:
latent_codes = {}
target_returns = {}

for set_name, columns in signal_sets.items():

    # Prepare data
    data = df[columns + ['_MKT']].dropna()
    X = data[columns].values
    y = data['_MKT'].shift(-1).dropna()  # target

    # Align X e y
    X = X[:-1]
    y = y.values

    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)

    # Architecture
    input_dim = X_scaled.shape[1]
    input_layer = Input(shape=(input_dim,))
    encoded_layer = Dense(4, activation='relu')(input_layer)
    decoded_layer = Dense(input_dim, activation='linear')(encoded_layer)
    autoencoder = Model(inputs=input_layer, outputs=decoded_layer)
    autoencoder.compile(optimizer='adam', loss='mse')
    autoencoder.fit(X_scaled, X_scaled, epochs=100, batch_size=16, verbose=0)

    # Extract latent code
    encoder = Model(inputs=input_layer, outputs=encoded_layer)
    latent = encoder.predict(X_scaled)

    latent_codes[set_name] = latent
    target_returns[set_name] = y


[1m61/61[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 1ms/step  
[1m61/61[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 1ms/step  
[1m61/61[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step  
[1m61/61[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 1ms/step  
[1m61/61[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 1ms/step  
[1m61/61[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step  
[1m61/61[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step  


Information Coefficient

In [107]:
latent_codes = latent_ae
target_returns = target_returns_ae
n_splits = 5
ic_results_ae = {}

for set_name in latent_codes:
    X = latent_codes[set_name]
    y = target_returns[set_name]

    tscv = TimeSeriesSplit(n_splits=n_splits)
    ic_folds = []

    for train_idx, test_idx in tscv.split(X):
        X_train, X_test = X[train_idx], X[test_idx]
        y_train, y_test = y[train_idx], y[test_idx]

        model = LinearRegression()
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)

        if np.std(y_pred) > 0 and np.std(y_test) > 0:
            ic = spearmanr(y_pred, y_test).correlation
            ic_folds.append(ic)
        else:
            ic_folds.append(np.nan)


        ic_results_ae[set_name] = np.nanmean(ic_folds)


Sharpe Ratio

In [91]:
latent_codes = latent_ae
target_returns = target_returns_ae
n_splits = 5
sharpe_results_ae = {}

for set_name in latent_codes:
    X = latent_codes[set_name]
    y = target_returns[set_name]

    tscv = TimeSeriesSplit(n_splits=n_splits)
    pnl_all = []

    for train_idx, test_idx in tscv.split(X):
        X_train, X_test = X[train_idx], X[test_idx]
        y_train, y_test = y[train_idx], y[test_idx]

        model = LinearRegression()
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)

        pnl = y_pred * y_test  # uso continuo, come FFNN
        pnl_all.extend(pnl)

    pnl_all = np.array(pnl_all)
    sharpe = np.mean(pnl_all) / np.std(pnl_all) * np.sqrt(252)
    sharpe_results_ae[set_name] = sharpe


Results

In [104]:
# Reconstruct results
results = []

for set_name in ic_results_ae:
    avg_ic = ic_results_ae[set_name]
    sharpe = sharpe_results_ae[set_name]

    results.append({
        "Signal Set": set_name,
        "Avg IC": round(avg_ic, 4),
        "Avg Sharpe": round(sharpe, 4)
    })

results_df_ae = pd.DataFrame(results).sort_values(
    ["Avg IC", "Avg Sharpe"], ascending=False
)

print(results_df_ae)


                Signal Set  Avg IC  Avg Sharpe
5     Set 6: Style & Asset  0.9664      9.6023
1    Set 2: Interest Rates  0.3280     12.9849
3  Set 4: Corporate Health  0.2442     12.3166
6          Set 7: Full Set  0.2436     10.7590
0         Set 1: Valuation  0.1896     11.2905
4    Set 5: Risk Sentiment -0.2656     11.3266
2  Set 3: Macro Conditions -0.3357     10.6931


Apply Contractive Autoencoder

In [93]:
class ContractiveBottleneck(Layer):
    def __init__(self, units, lam=1e-4, **kwargs):
        super(ContractiveBottleneck, self).__init__(**kwargs)
        self.units = units
        self.lam = lam
        self.dense = Dense(units, activation='sigmoid', name='bottleneck')

    def call(self, inputs):
        h = self.dense(inputs)
        dh = h * (1 - h)  # derivative of sigmoid
        contractive_loss = self.lam * tf.reduce_sum(tf.square(self.dense.kernel)) * tf.reduce_sum(tf.square(dh))
        self.add_loss(contractive_loss)
        return h


In [94]:
latent_cae = {}
target_returns_cae = {}

for set_name, columns in signal_sets.items():

    # Prepare data
    data = df[columns + ['_MKT']].dropna()
    X = data[columns].values
    y = data['_MKT'].shift(-1).dropna()
    X = X[:-1]
    y = y.values

    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)

    # CAE architecture definition
    input_dim = X_scaled.shape[1]
    input_layer = Input(shape=(input_dim,))
    encoded = Dense(12, activation='relu')(input_layer)
    encoded = Dense(6, activation='relu')(encoded)
    bottleneck = ContractiveBottleneck(3)(encoded)
    decoded = Dense(6, activation='relu')(bottleneck)
    decoded = Dense(12, activation='relu')(decoded)
    output_layer = Dense(input_dim, activation='linear')(decoded)

    autoencoder_cae = Model(inputs=input_layer, outputs=output_layer)
    autoencoder_cae.compile(optimizer='adam', loss='mse')

    autoencoder_cae.fit(X_scaled, X_scaled, epochs=100, batch_size=16, verbose=0)

    # Extract latent signals
    encoder_cae = Model(inputs=input_layer, outputs=bottleneck)
    latent_cae[set_name] = encoder_cae.predict(X_scaled)
    target_returns_cae[set_name] = y



[1m61/61[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step
[1m61/61[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step
[1m61/61[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step  
[1m61/61[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step
[1m61/61[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step
[1m61/61[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step
[1m61/61[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 4ms/step


Information Coefficient

In [108]:
latent_codes = latent_cae
target_returns = target_returns_cae
n_splits = 5
ic_results_cae = {}

for set_name in latent_codes:
    X = latent_codes[set_name]
    y = target_returns[set_name]

    tscv = TimeSeriesSplit(n_splits=n_splits)
    ic_folds = []

    for train_idx, test_idx in tscv.split(X):
        X_train, X_test = X[train_idx], X[test_idx]
        y_train, y_test = y[train_idx], y[test_idx]

        model = LinearRegression()
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)

        if np.std(y_pred) > 0 and np.std(y_test) > 0:
            ic = spearmanr(y_pred, y_test).correlation
            ic_folds.append(ic)
        else:
            ic_folds.append(np.nan)


        ic_results_cae[set_name] = np.nanmean(ic_folds)


Sharpe Ratio

In [96]:
latent_codes = latent_cae
target_returns = target_returns_cae
n_splits = 5
sharpe_results_cae = {}

for set_name in latent_codes:
    X = latent_codes[set_name]
    y = target_returns[set_name]

    tscv = TimeSeriesSplit(n_splits=n_splits)
    pnl_all = []

    for train_idx, test_idx in tscv.split(X):
        X_train, X_test = X[train_idx], X[test_idx]
        y_train, y_test = y[train_idx], y[test_idx]

        model = LinearRegression()
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)

        pnl = y_pred * y_test
        pnl_all.extend(pnl)

    pnl_all = np.array(pnl_all)
    sharpe = np.mean(pnl_all) / np.std(pnl_all) * np.sqrt(252)
    sharpe_results_cae[set_name] = sharpe


Result Contractive Autoencoder

In [105]:
# Reconstruct results CAE
results_cae = []

for set_name in ic_results_cae:
    avg_ic = np.mean(ic_results_cae[set_name])
    sharpe = sharpe_results_cae[set_name]
    results_cae.append({
        "Signal Set": set_name,
        "Avg IC": round(avg_ic, 4),
        "Avg Sharpe": round(sharpe, 4)
    })

results_df_cae = pd.DataFrame(results_cae).sort_values(
    ["Avg IC", "Avg Sharpe"], ascending=False
)

print(results_df_cae)

                Signal Set  Avg IC  Avg Sharpe
5     Set 6: Style & Asset  0.8815     12.3638
1    Set 2: Interest Rates  0.3391     13.2221
0         Set 1: Valuation  0.2496     11.5058
3  Set 4: Corporate Health  0.1202     11.1546
6          Set 7: Full Set  0.1034     10.7920
4    Set 5: Risk Sentiment -0.2932     12.1900
2  Set 3: Macro Conditions -0.4675     10.4853


Apply Variational Autoencoder

In [98]:
class Sampling(layers.Layer):
    def call(self, inputs):
        z_mean, z_log_var = inputs
        epsilon = tf.random.normal(shape=tf.shape(z_mean))
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon

class VAE(Model):
    def __init__(self, original_dim, latent_dim=3):
        super(VAE, self).__init__()
        self.original_dim = original_dim
        self.latent_dim = latent_dim

        self.encoder = tf.keras.Sequential([
            layers.InputLayer(shape=(original_dim,)),
            layers.Dense(64, activation='relu'),
            layers.Dense(32, activation='relu'),
            layers.Dense(latent_dim + latent_dim),  # z_mean + z_log_var
        ])

        self.decoder = tf.keras.Sequential([
            layers.InputLayer(shape=(latent_dim,)),
            layers.Dense(32, activation='relu'),
            layers.Dense(64, activation='relu'),
            layers.Dense(original_dim),
        ])
        self.sampler = Sampling()

    def call(self, inputs):
        z_params = self.encoder(inputs)
        z_mean, z_log_var = tf.split(z_params, num_or_size_splits=2, axis=1)
        z = self.sampler((z_mean, z_log_var))
        reconstruction = self.decoder(z)

        # KL divergence
        kl_loss = -0.5 * tf.reduce_sum(1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var), axis=1)
        self.add_loss(tf.reduce_mean(kl_loss))
        return reconstruction

In [99]:
latent_vae = {}
target_returns_vae = {}

for set_name, columns in signal_sets.items():

    # Prepare data and future return target
    data = df[columns + ['_MKT']].copy()
    data['_target'] = data['_MKT'].shift(-1)
    data = data.dropna()

    X = data[columns].values
    y = data['_target'].values

    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)

    # Create VAE model
    original_dim = X_scaled.shape[1]
    vae = VAE(original_dim=original_dim, latent_dim=3)  # make sure 'shape=' is used inside VAE class
    vae.compile(optimizer='adam', loss='mse')

    # Train the VAE
    vae.fit(X_scaled, X_scaled, epochs=100, batch_size=16, verbose=0)

    # Extract latent representation (z_mean)
    z_params = vae.encoder.predict(X_scaled)
    z_mean, z_log_var = np.split(z_params, 2, axis=1)

    latent_vae[set_name] = z_mean
    target_returns_vae[set_name] = y

[1m61/61[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step  
[1m61/61[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step
[1m61/61[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step  
[1m61/61[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step  
[1m61/61[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step
[1m61/61[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step  
[1m61/61[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 2ms/step  


Information Coefficient

In [110]:
latent_codes = latent_vae
target_returns = target_returns_vae
n_splits = 5
ic_results_vae = {}

for set_name in latent_codes:
    X = latent_codes[set_name]
    y = target_returns[set_name]

    tscv = TimeSeriesSplit(n_splits=n_splits)
    ic_folds = []

    for train_idx, test_idx in tscv.split(X):
        X_train, X_test = X[train_idx], X[test_idx]
        y_train, y_test = y[train_idx], y[test_idx]

        model = LinearRegression()
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)

        if np.std(y_pred) > 0 and np.std(y_test) > 0:
            ic = spearmanr(y_pred, y_test).correlation
            ic_folds.append(ic)
        else:
            ic_folds.append(np.nan)


    ic_results_vae[set_name] = np.mean(ic_folds)

print("Information Coefficient VAE:")
print(pd.DataFrame([
    {"Signal Set": k, "Improved IC": round(v, 4)}
    for k, v in ic_results_vae.items()
]).sort_values(by="Improved IC", ascending=False))


Information Coefficient VAE:
                Signal Set  Improved IC
5     Set 6: Style & Asset       0.4587
4    Set 5: Risk Sentiment       0.0801
0         Set 1: Valuation      -0.0071
2  Set 3: Macro Conditions      -0.0537
3  Set 4: Corporate Health      -0.0687
1    Set 2: Interest Rates      -0.0918
6          Set 7: Full Set          NaN


  ic = spearmanr(y_pred, y_test).correlation


Sharpe Ratio

In [111]:
latent_codes = latent_vae
target_returns = target_returns_vae
n_splits = 5
sharpe_results_vae = {}

for set_name in latent_codes:
    X = latent_codes[set_name]
    y = target_returns[set_name]

    tscv = TimeSeriesSplit(n_splits=n_splits)
    pnl_all = []

    for train_idx, test_idx in tscv.split(X):
        X_train, X_test = X[train_idx], X[test_idx]
        y_train, y_test = y[train_idx], y[test_idx]

        model = LinearRegression()
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)

        pnl = y_pred * y_test
        pnl_all.extend(pnl)

    pnl_all = np.array(pnl_all)
    sharpe = np.mean(pnl_all) / np.std(pnl_all) * np.sqrt(252)
    sharpe_results_vae[set_name] = sharpe


Results Variational Autoencoder

In [102]:

results_vae = []

for set_name in ic_results_vae:
    avg_ic = np.mean(ic_results_vae[set_name])
    sharpe = sharpe_results_vae[set_name]
    results_vae.append({
        "Signal Set": set_name,
        "Avg IC": round(avg_ic, 4),
        "Avg Sharpe": round(sharpe, 4)
    })

results_df_vae = pd.DataFrame(results_vae).sort_values(
    ["Avg IC", "Avg Sharpe"], ascending=False
)

print(results_df_vae)

                Signal Set  Avg IC  Avg Sharpe
5     Set 6: Style & Asset  0.4587      9.6224
4    Set 5: Risk Sentiment  0.0801     10.3169
0         Set 1: Valuation -0.0071     13.8121
2  Set 3: Macro Conditions -0.0537     13.0336
3  Set 4: Corporate Health -0.0687     13.4984
1    Set 2: Interest Rates -0.0918     13.1109
6          Set 7: Full Set     NaN     13.2970


Final Results

In [106]:
combined_results = []

for set_name in signal_sets.keys():
    result = {"Signal Set": set_name}

    # AE
    result["AE Sharpe"] = round(sharpe_results_ae.get(set_name, float('nan')), 4)
    result["AE IC"] = round(ic_results_ae.get(set_name, float('nan')), 4)

    # CAE
    result["CAE Sharpe"] = round(sharpe_results_cae.get(set_name, float('nan')), 4)
    result["CAE IC"] = round(ic_results_cae.get(set_name, float('nan')), 4)

    # VAE
    result["VAE Sharpe"] = round(sharpe_results_vae.get(set_name, float('nan')), 4)
    result["VAE IC"] = round(ic_results_vae.get(set_name, float('nan')), 4)

    combined_results.append(result)

# Create DataFrame
summary_df = pd.DataFrame(combined_results).sort_values(by="VAE Sharpe", ascending=False)

# Show
print("📊 Final Comparison AE vs CAE vs VAE:")
display(summary_df)


📊 Final Comparison AE vs CAE vs VAE:


Unnamed: 0,Signal Set,AE Sharpe,AE IC,CAE Sharpe,CAE IC,VAE Sharpe,VAE IC
0,Set 1: Valuation,11.2905,0.1896,11.5058,0.2496,13.8121,-0.0071
3,Set 4: Corporate Health,12.3166,0.2442,11.1546,0.1202,13.4984,-0.0687
6,Set 7: Full Set,10.759,0.2436,10.792,0.1034,13.297,
1,Set 2: Interest Rates,12.9849,0.328,13.2221,0.3391,13.1109,-0.0918
2,Set 3: Macro Conditions,10.6931,-0.3357,10.4853,-0.4675,13.0336,-0.0537
4,Set 5: Risk Sentiment,11.3266,-0.2656,12.19,-0.2932,10.3169,0.0801
5,Set 6: Style & Asset,9.6023,0.9664,12.3638,0.8815,9.6224,0.4587
