In [1]:
# Utils
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd 
import time 
import random

# Neural Nets
import tensorflow as tf
import tensorflow_probability as tfp
from keras.saving import register_keras_serializable
from sklearn.model_selection import train_test_split

# Config 
import sys
import os
import pickle
from dotenv import load_dotenv
import warnings

parent_dir = os.path.abspath(os.path.join(os.getcwd(), '..'))
if parent_dir not in sys.path:
    sys.path.append(parent_dir)

# Data
from mkt_data_ETL.data_load_and_transform import get_data, get_top_mkt_cap_stocks

load_dotenv()
# Suppress all warnings
warnings.filterwarnings("ignore")






In [2]:

# =============================================================================
# REPRODUCIBILITY SETUP - Set seeds for deterministic results
# =============================================================================
def set_seeds(seed=42):
    """
    Set seeds for reproducible results across all random number generators.
    
    Args:
        seed (int): The seed value to use for all random number generators
    """
    # Set Python's built-in random seed
    random.seed(seed)
    
    # Set NumPy seed
    np.random.seed(seed)
    
    # Set TensorFlow seeds
    tf.random.set_seed(seed)
    
    # Set environment variables for deterministic behavior
    os.environ['PYTHONHASHSEED'] = str(seed)
    os.environ['TF_DETERMINISTIC_OPS'] = '1'
    os.environ['TF_CUDNN_DETERMINISTIC'] = '1'
    
    # Configure TensorFlow for deterministic operations
    tf.config.experimental.enable_op_determinism()
    
    print(f"All seeds set to: {seed}")
    print("Deterministic operations enabled for reproducible results.")

# Set the seed for reproducible results
SEED = 42
set_seeds(SEED)

def verify_seed_settings():
    """
    Verify that all seed settings are properly configured.
    This function can be called to check reproducibility setup.
    """
    print("=== Reproducibility Verification ===")
    print(f"SEED value: {SEED}")
    print(f"PYTHONHASHSEED: {os.environ.get('PYTHONHASHSEED', 'Not set')}")
    print(f"TF_DETERMINISTIC_OPS: {os.environ.get('TF_DETERMINISTIC_OPS', 'Not set')}")
    print(f"TF_CUDNN_DETERMINISTIC: {os.environ.get('TF_CUDNN_DETERMINISTIC', 'Not set')}")
    print("=====================================")

# Uncomment the line below to verify seed settings
# verify_seed_settings()

All seeds set to: 42
Deterministic operations enabled for reproducible results.


In [3]:
# Get data 
stock_prices_df, stock_shares_amount_df, mkt_cap_df, spx_index, removed_companies = get_data()
top_100_mkt_cap_df, top_100_mkt_cap_prices_df=get_top_mkt_cap_stocks(stock_prices_df=stock_prices_df, 
stock_mkt_cap_df=mkt_cap_df)


# Calculate daily returns
stocks_returns    = np.log(top_100_mkt_cap_prices_df / top_100_mkt_cap_prices_df.shift(1))
sp500_idx_returns =  np.log(spx_index / spx_index.shift(1))

# Rolling volatility
window_size = 252*2 # 2 years
Sigma_df= stocks_returns.rolling(window=window_size).cov(pairwise=True)
Sigma_df = Sigma_df.dropna()

# Get the first date in the cleaned DataFrame
START_DATE = Sigma_df.index.get_level_values(0)[0]

# Filter dataframes to start from START_DATE
top_100_mkt_cap_df = top_100_mkt_cap_df.loc[START_DATE:]
stocks_returns     = stocks_returns.loc[START_DATE:]
stock_prices_df    = stock_prices_df.loc[START_DATE:]

assets = stocks_returns.columns
dates  = stocks_returns.index

n = len(assets)
T = len(dates)
Sigma_t = np.empty((T, n, n))

# Fill array with each rolling covariance matrix
for i, t in enumerate(dates):
    Sigma = Sigma_df.loc[t].reindex(index=assets, columns=assets).values
    Sigma_t[i] = Sigma


# Market Weights
mkt_portfolio_weights = top_100_mkt_cap_df.div(top_100_mkt_cap_df.sum(axis=1),axis=0)

# Market portfolio return
mkt_return = (mkt_portfolio_weights.shift(1) * stocks_returns).sum(axis=1)

In [None]:
def compute_relative_covariance(sigma: np.ndarray, pi: np.ndarray) -> np.ndarray:
    """
    Calcula a matriz de covariância relativa τ_ij^π(t) com base na covariância σ_ij(t)
    e no vetor de pesos do portfólio π(t).
    
    Args:
        sigma (np.ndarray): Matriz de covariância dos ativos (n x n).
        pi (np.ndarray): Vetor de pesos do portfólio (n,).

    Returns:
        np.ndarray: Matriz de covariância relativa τ^π (n x n).
    """
    # Covariância ativo i com portfólio: sigma_iπ = sigma @ pi
    sigma_i_pi = sigma @ pi        # (n,)
    sigma_pi_pi = pi.T @ sigma @ pi  # escalar

    # Matriz τ_ij^π = σ_ij - σ_iπ - σ_jπ + σ_ππ
    n = len(pi)
    tau = np.zeros((n, n))
    for i in range(n):
        for j in range(n):
            tau[i, j] = sigma[i, j] - sigma_i_pi[i] - sigma_i_pi[j] + sigma_pi_pi
    return tau

print("initializing tau_t calculation")
start_time = time.time()

tau_t = np.empty((T, n, n))
for t in range(T):
    tau_t[t] = compute_relative_covariance(Sigma_t[t], mkt_portfolio_weights.iloc[t].values)

end_time = time.time()
elapsed_seconds = end_time - start_time
print(f"Elapsed time: {elapsed_seconds:.2f} seconds")


# Copying data generated previously 
mu_t_df = mkt_portfolio_weights.copy()
R_t_df =  stocks_returns.copy()


initializing tau_t calculation


In [None]:
def G_func(mu, p=0.5):
    S = np.sum(mu**p, axis=1)                # (T,)
    return S**(1.0/p)                          # (T,)

def grad_G(mu: np.ndarray, p: float = 0.5) -> np.ndarray: 
    S = np.sum(mu**p, axis=1)                     # (T,)
    pref = S**(1.0/p - 1.0)                       # (T,)
    return (mu**(p - 1.0)) * pref[:, None]        # (T, N)         

def grad_log_G(mu, p=0.5):
    S = np.sum(mu**p, axis=1)   
    return (mu**(p - 1.0)) / S[:, None] 

def dwp_weights(mu: np.ndarray, p: float=0.5) -> np.ndarray:
    num = mu**p
    den = num.sum(axis=1, keepdims=True)
    return num / den

def dwp_simple_returns(mu: np.ndarray, r: np.ndarray, p: float = 0.5) -> np.ndarray:
    pi = dwp_weights(mu, p)                  # (T,N)
    w = np.empty_like(pi)
    w[1:] = pi[:-1]                          # shift(1)
    w[0]  = np.nan
    return (w * r).sum(axis=1)    


def hess_G_dwp(mu: np.ndarray, p: float = 0.5) -> np.ndarray:
    """
    Hessiana de G(μ) para o Diversity-Weighted Portfolio:
        G(μ) = (∑ μ_i^p)^(1/p).
    Retorna tensor (T, n, n) com D_{ij}G(μ_t).
    """
    T, n = mu.shape
    S   = np.sum(mu**p, axis=1)                         # (T,)
    Sa  = S**(1.0/p - 1.0)                              # S^{a}, a = 1/p - 1
    # termo diagonal: (p-1) S^{a} * diag( μ^{p-2} )
    diag_power = mu**(p - 2.0)                          # (T,n)
    term1 = (p - 1.0) * Sa[:, None, None] * np.einsum('ti,ij->tij', diag_power, np.eye(n))

    # termo de rank-1: (1-p) S^{a-1} * (μ^{p-1} ⊗ μ^{p-1})
    v     = mu**(p - 1.0)                               # (T,n)
    term2 = (1.0 - p) * (Sa / S)[:, None, None] * (v[:, :, None] * v[:, None, :])

    return term1 + term2

def drift_g(mu: np.ndarray, tau: np.ndarray, p: float = 0.5) -> np.ndarray:
    """
    Calcula g_t = -(1/(2 G)) * sum_{i,j} D_{ij}G * μ_i μ_j τ_{ij}, para t=1..T.
    Entradas:
      - mu  : (T, n) pesos de mercado μ(t)
      - tau : (T, n, n) covariância relativa τ^{μ}(t)
    Saída:
      - g   : (T,) série temporal do integrando do drift
    """
    # G(μ)
    G = (np.sum(mu**p, axis=1))**(1.0/p)                # (T,)

    # Hessiana D^2 G(μ)
    H = hess_G_dwp(mu, p=p)                             # (T, n, n)

    # sum_{i,j} D_{ij}G * μ_i * μ_j * τ_{ij}  (evita construir μ⊗μ explicitamente)
    summed = np.einsum('tij,ti,tj,tij->t', H, mu, mu, tau)   # (T,)

    g = -0.5 * summed / G
    return g 
import numpy as np

def integrate_g_simpson_or_trap(dg_t: np.ndarray, dt: float = 1.0) -> float:
    """
    Aproxima ∫ g(t) dt a partir de amostras dg_t[0..T-1].
    - Se T ímpar: Simpson composto com passo dt.
    - Se T par : Trapézio composto com passo dt.
    Requer passo uniforme (dt escalar). 
    """
    dg_t = np.asarray(dg_t, dtype=float)
    T = dg_t.shape[0]
    if T == 1:
        return 0.0

    if T % 2 == 1:
        # Simpson: h/3*(f0 + fT + 4*sum(f_odd) + 2*sum(f_even))
        s_odd  = dg_t[1:-1:2].sum()   # índices 1,3,5,...
        s_even = dg_t[2:-1:2].sum()   # índices 2,4,6,...
        return (dt/3.0) * (dg_t[0] + dg_t[-1] + 4.0*s_odd + 2.0*s_even)
    else:
        # Trapézio composto: h*(0.5*f0 + sum(f_mid) + 0.5*fT)
        return dt * (0.5*dg_t[0] + dg_t[1:-1].sum() + 0.5*dg_t[-1])

def integrate_g_hybrid(dg_t: np.ndarray, dt: float = 1.0) -> float:
    """
    Híbrido: se T par, aplica Simpson nos primeiros T-1 pontos e
    trapézio no último intervalo (T-2,T-1). dt uniforme.
    """
    dg_t = np.asarray(dg_t, dtype=float)
    T = dg_t.shape[0]
    if T <= 1:
        return 0.0
    if T % 2 == 1:
        # Simpson em toda a malha
        return integrate_g_simpson_or_trap(dg_t, dt=dt)
    else:
        # Simpson nos 0..T-2 (tem T-1 pontos => ímpar)
        part_simpson = integrate_g_simpson_or_trap(dg_t[:-1], dt=dt)
        # Trapézio no último intervalo
        part_trap = dt * 0.5 * (dg_t[-2] + dg_t[-1])
        return part_simpson + part_trap


In [None]:
p= 0.5 
# Mercado 
mu = mu_t_df.values # Pesos mercado
ret_mkt = np.exp(np.sum(mkt_return.values))

# G_p 
G_p = G_func(mu, p=0.5)
grad_G_p = grad_G(mu)
grad_log_G_p = grad_log_G(mu)


# DWP e retorno do DWP
pi_dwp = dwp_weights(mu, p=p)
dwp_ret = dwp_simple_returns(pi_dwp, stocks_returns.values)
dwp_ret = np.nan_to_num(dwp_ret)
dwp_cumulative = np.exp(np.sum(dwp_ret))


# Master equation 
g_t   = drift_g(mu, tau_t, p=p)
integral_g = integrate_g_hybrid(g_t, dt=1.0)


left_hand_side = np.log(dwp_cumulative)- np.log(ret_mkt)
right_hand_side = (np.log(G_p[-1])- np.log(G_p[0])) + integral_g


diff_master_formula = left_hand_side - right_hand_side
print(diff_master_formula)

-0.008008495330664167
