In [1]:
# Carrega bibliotecas necessárias
import numpy as np
import pandas as pd
import scipy.stats as stats


# M é o numero de Regimes
# K é o numero de variaveis exogenas (contando a constante)
# Beta é um vetor de coeficientes de dimensao (k x M)


In [2]:
def Eta_t(Y : np.ndarray, X : np.ndarray, Regimes: dict) -> np.ndarray:
    """
    Calcula as "quase densidades" para cada regime em um modelo de Markov Switching.
    
    Y: matriz/array (n x V) com valores da variável dependente.
    X: matriz/array (n x K) com as variáveis explicativas (inclua coluna de constante, se houver).
    Regimes = {
        "Regime1": {"Beta": np.array(1 x K), "Omega": np.array([V x V])},
        "Regime2": {"Beta": np.array(1 x K), "Omega": np.array([V x V])},
    }
    Retorna matriz (M x 1) com as "quase densidades" para cada regime.
    Exemplo de uso:
    Regimes = {
        "Regime1": {"Beta": np.array([1, 1]), "Omega": np.array([0.1])},
        "Regime2": {"Beta": np.array([1, 2]), "Omega": np.array([0.5])},
    }
    mEta1 = Etat(Y, X, Regimes)
    """

    # Validation of inputs
    for regime_name, regime_params in Regimes.items():
        # check dimensions of Betas vs X
        if X.shape[1] != regime_params["Beta"].shape[0]:
            raise ValueError(f"Número de variáveis exógenas em X não corresponde ao número de betas para {regime_name}.")
        
        # Check dimensions of Omega
        if regime_params["Omega"].ndim == 2 and regime_params["Omega"].shape[0] != regime_params["Omega"].shape[1]:
            raise ValueError(f"Regime {regime_name}: Omega deve ser uma matriz quadrada.")

        if not np.allclose(regime_params["Omega"], regime_params["Omega"].T):
            raise ValueError(f"Regime {regime_name}: Omega deve ser simétrica.")

        # Check dimensions of Omega vs Y
        if Y.shape[1] != regime_params["Omega"].shape[0]:
            raise ValueError(f"Regime {regime_name}: Número de variáveis dependentes em Y não corresponde a dimensão de omega.")

    # ToDo: function should be Multivariate Normal, not univariate
    Betas = np.column_stack([regime["Beta"] for regime in Regimes.values()])
    Omegas = np.column_stack([regime["Omega"] for regime in Regimes.values()])
    
    p = stats.norm.pdf(x = (Y - (X @ Betas)) / Omegas, loc=0, scale=1)
    # scipy.stats.multivariate_normal.pdf(x = Y, mean = X @ Betas, cov = Omegas)

    return p

In [3]:
# Read a CSV file
df = pd.read_csv('matrizYX.csv')
print(df.head())

# df.describe(include='all')
Y = df['Var1'].values.reshape((-1, 1))
X = df[['Var2', 'Var3']].values.reshape((-1, 2))
print(Y[0:3, :])
print(X[0:3, :])

print(X.shape)

   Unnamed: 0      Var1  Var2      Var3
0           1  0.502841     1  0.224886
1           2  4.283640     1  1.740050
2           3 -0.169335     1 -0.204261
3           4  0.671845     1 -0.917602
4           5 -2.014495     1 -0.674167
[[ 0.50284076]
 [ 4.28364042]
 [-0.16933504]]
[[ 1.          0.22488637]
 [ 1.          1.74004993]
 [ 1.         -0.20426111]]
(100, 2)


In [4]:
# def Xi_filter(Eta : ArrayLike, transitionMatrix : ArrayLike, unconditional_state_probs : Optional[ArrayLike] = None) -> np.ndarray:
def Xi_filter(Eta : np.ndarray, transitionMatrix : np.ndarray, unconditional_state_probs  = None) -> np.ndarray:
    """
    Calcula o Xi filtrado (regime probabilities p(S_t | info até t) em um modelo de Markov Switching.
    
    Parâmetros:
    - Eta: array (T x M) com as "quase densidades" por regime no tempo t.
    - transitionMatrix: array (M x M) com as probabilidades de transição entre estados.
    - unconditional_state_probs: array (M,) com as probabilidades incondicionais dos estados (opcional).

    Retorna:
    - Xi: array (T x M) com as probabilidades filtradas por regime.
    """

    Eta = np.asarray(Eta, dtype=float)
    P = np.asarray(transitionMatrix, dtype=float)
    T, M = Eta.shape

    if unconditional_state_probs is None:
        unconditional_state_probs = np.ones(Eta.shape[1])/Eta.shape[1]

    # Validate dimensions
    if transitionMatrix.shape[0] != transitionMatrix.shape[1]:
        raise ValueError("Transition matrix must be square")
        
    if transitionMatrix.shape[0] != Eta.shape[1]:
        raise ValueError(f"Transition matrix dimension ({transitionMatrix.shape[0]}) must match number of regimes in Eta ({Eta.shape[1]})")
        
    if len(unconditional_state_probs) != Eta.shape[1]:
        raise ValueError(f"Unconditional state probabilities length ({len(unconditional_state_probs)}) must match number of regimes in Eta ({Eta.shape[1]})")

    Xi = np.full_like(Eta, 0.0)

    for i in range(Eta.shape[0]):
        if i == 0:
            Xi_t1 = unconditional_state_probs
        else:
            Xi_t1 = transitionMatrix.T @ Xi[i-1]
        
        # print("N=", Eta[i] * Xi_t1)
        # print("D=", Eta[i] @ Xi_t1)
        Xi[i] = (Eta[i] * Xi_t1)/(Eta[i] @ Xi_t1)

    return Xi

In [5]:
def Xi_smooth(Xi_filtered, transitionMatrix) :
    # unconditional_state_probs = np.array([0.5, 0.5])

    Xi_s = Xi_filtered.copy()

    for i in range(Xi_filtered.shape[0]-2, 0, -1):
        Xi_s[i] = (transitionMatrix @ (Xi_s[i+1] / (transitionMatrix.T @ Xi_filtered[i]))) * Xi_filtered[i]

    return Xi_s

In [6]:
def EstimateUnconditionalStateProbs(Xi_smoothed) :
    return Xi_smoothed.mean(axis=0)

In [7]:
Eta = Eta_t(Y, X, Regimes={
    "Regime1": {"Beta": np.array([1, 1]), "Omega": np.array([0.1])},
    "Regime2": {"Beta": np.array([1, 2]), "Omega": np.array([0.5])},
})

transitionMatrix = np.array([[0.9, 0.1],
                  [0.2, 0.8]])

Xi_filtered = Xi_filter(Eta, transitionMatrix)

Xi_smoothed = Xi_smooth(Xi_filtered, transitionMatrix)


In [8]:
def EstimateTransitionMatrix(Xi_smoothed) :

    # get the number of regimes from Xi_smoothed
    M = Xi_smoothed.shape[1]

    # Create a zero matrix for transition counts
    transitionMatrix = np.zeros((M, M))

    #  Estimate the most likely state at each time point
    estimated_state = np.argmax(Xi_smoothed, axis=1)

    # Convert the estimation to a column vector
    estimated_state = estimated_state.reshape(-1, 1)

    # Calculate the unconditional state probabilities. This will be used to estimate the initial state probabilities.
    UnconditionalStateProbs = EstimateUnconditionalStateProbs(Xi_smoothed)

    # Estimate the initial state as the state with the highest unconditional probability
    estimated_initial_state = np.argmax(UnconditionalStateProbs, axis=0)

    # create lagged version of estimated_state, the first value is the estimated initial state
    lagged_state = np.roll(estimated_state, 1, axis=0)
    lagged_state[0] = estimated_initial_state

    #  create a new Xi_smoothed by adding estimated_state and lagged_state as new columns to Xi_smoothed
    Xi_smoothed_helper = np.hstack([
        Xi_smoothed,
        estimated_state,
        lagged_state
    ])

    # Goes through the transition probabilities positions and estimate them based on the smoothed probabilities
    for state_i in range(M):
        for state_j in range(M):

            # Count_ij : P(S_t = j, S_t-1 = i)
            Count_ij = Xi_smoothed_helper[(Xi_smoothed_helper[:, M] == state_j) & (Xi_smoothed_helper[:, M+1] == state_i),].shape[0]

            # Count_i : P(S_t-1 = i)
            Count_i = Xi_smoothed_helper[Xi_smoothed_helper[:, M+1] == state_i, ].shape[0]

            # Update the transition counts matrix
            transitionMatrix[state_i, state_j] = Count_ij / Count_i

    return transitionMatrix

In [9]:
for i in Xi_smoothed :
    print(i)

[2.86997584e-11 1.00000000e+00]
[6.15627809e-54 1.00000000e+00]
[5.93238963e-22 1.00000000e+00]
[8.37266497e-08 9.99999916e-01]
[9.36887454e-119 1.00000000e+000]
[1.50096177e-142 1.00000000e+000]
[4.53577976e-16 1.00000000e+00]
[6.13715088e-04 9.99386285e-01]
[8.11969835e-110 1.00000000e+000]
[1.90309395e-11 1.00000000e+00]
[1.02467351e-18 1.00000000e+00]
[8.99978482e-268 1.00000000e+000]
[0.0103891 0.9896109]
[1.81926799e-22 1.00000000e+00]
[3.75207571e-08 9.99999962e-01]
[1.19563647e-22 1.00000000e+00]
[1.3357281e-220 1.0000000e+000]
[0.00695867 0.99304133]
[1.41031324e-108 1.00000000e+000]
[3.7023138e-14 1.0000000e+00]
[3.20195781e-100 1.00000000e+000]
[0.00189201 0.99810799]
[1.21369286e-78 1.00000000e+00]
[0.01339134 0.98660866]
[1.23755226e-31 1.00000000e+00]
[3.16346366e-21 1.00000000e+00]
[8.70868511e-05 9.99912913e-01]
[6.71357982e-100 1.00000000e+000]
[0.71910152 0.28089848]
[4.2095464e-66 1.0000000e+00]
[0.17663079 0.82336921]
[4.76243733e-49 1.00000000e+00]
[1.07902839e-27 

In [None]:
def EstimateBeta(Xi_smoothed,
                Y : np.ndarray,
                X : np.ndarray) -> np.ndarray :
    
    # Initialize Beta matrix to store estimated coefficients for each regime
    # Number of regimes from Xi_smoothed columns
    M = Xi_smoothed.shape[1]

    # Number of variables (columns in X)
    K = X.shape[1]
        
    # Initialize an empty array with K rows and 0 columns
    Betas_estimated = np.zeros((K, 0))
    
    #  For each regime run a OLS to estimate the regression
    for regime_number in range(Xi_smoothed.shape[1]):

        # Check dimensions of Omega
        Weights_Matrix = np.diag(Xi_smoothed[:, regime_number])

        A = (X.T @ Weights_Matrix @ X)

        try:
            A_inv = np.linalg.inv(A)
        except np.linalg.LinAlgError:
            A_inv = np.linalg.pinv(A)

        B = A_inv @ X.T @ Weights_Matrix @ Y

        print(B)

        # para cada regime, empilhar os Betas baseado na estimacao.
        Betas_estimated = np.column_stack([Betas_estimated, B])
        # Betas_estimated = np.column_stack([Betas_estimated, B])
                # np.kron(B, I)

    return Betas_estimated

In [26]:
Xi_smoothed
EstimateBeta(Xi_smoothed,
                Y,
                X)

[[0.95335225]
 [1.01609255]]
[[0.31774779]
 [1.69680899]]


array([[0.95335225, 0.31774779],
       [1.01609255, 1.69680899]])

In [None]:

Eta = Eta_t(Y, X, Regimes={
    "Regime1": {"Beta": np.array([1, 1]), "Omega": np.array([0.1])},
    "Regime2": {"Beta": np.array([1, 2]), "Omega": np.array([0.5])},
})

transitionMatrix = np.array([[0.9, 0.1],
                  [0.2, 0.8]])
print (Eta[0:3, :])

Xi_filtered = Xi_filter(Eta, transitionMatrix)
print(Xi_filtered[0:3, :])
print(Xi_filtered.shape)

Xi_smoothed = Xi_smooth(Xi_filtered, transitionMatrix)


[[1.90520546e-12 6.63840243e-02]
 [7.27535083e-53 3.69305464e-01]
 [2.37967559e-21 1.25354342e-01]]
[[2.86997584e-11 1.00000000e+00]
 [4.92502247e-53 1.00000000e+00]
 [4.74589780e-21 1.00000000e+00]]
(100, 2)


In [33]:
np.random.randn(3 + 1, 2)

array([[-0.28785926, -0.47902126],
       [ 0.19083475, -0.95100733],
       [-0.09708877,  0.00697424],
       [-0.55698994, -0.56878768]])

In [None]:


# # Dados
# # X: [constante, x_t]
# X = np.concatenate([
#     np.ones((iT, 1)),
#     np.random.randn(iT, iV)
# ], axis=1)

# # Y = X[:,0] + X[:,1] + ruído
# Y = (X[:, 0] + X[:, 1])[:, None] + np.random.randn(iT, 1)


# Chutes iniciais
# Matriz (K+1) x M de betas
mBetas = np.random.randn(qtdVariavelExo + 1, qtdRegimes)

# Matriz V x M para omegas (aqui não usada na Etat)
mOmegas = np.ones((qtdVariavelDep, qtdRegimes))

# Vetor de probabilidades a priori dos regimes (uniforme)
mPI = (1.0 / qtdRegimes) * np.ones((qtdRegimes, 1))

# Empilha parâmetros em um único vetor coluna (como vec() em Ox, por colunas)
mTheta = np.concatenate([
    mBetas.flatten(order='F'),  # vec por colunas
    mOmegas.flatten(order='F')
])[:, None]  # vira coluna

# Vetor mCar = [T, M, K, V]
mCar = [qtd_T, qtdRegimes, qtdVariavelExo, qtdVariavelDep]

# Matriz de transição P (2x2) com 0.5 em todas entradas
P = 0.5 * np.ones((qtdRegimes, qtdRegimes))

# Filtragem
mPsiFtt = None   # armazenará p(S_t | info até t)
mPsiFt1t = None  # armazenará p(S_t | info até t-1)
mPsit1t = None   # p(S_t | info até t-1) em cada passo



In [45]:
print("mBetas:")
print(mBetas)

print("Extracao de Betas:")
print(mTheta[0:((qtdVariavelExo + 1) * qtdRegimes), 0])

mBetas:
[[ 0.60982297  2.37000751]
 [-1.08740034 -0.37670498]]
Extracao de Betas:
[ 0.60982297 -1.08740034  2.37000751 -0.37670498]


In [46]:
t=3
print(Y[0:3, :])
# print(X)


mDados = np.concatenate([Y[t:t+1, :], X[t:t+1, :]], axis=1)

print(mDados)

mDados[0, 1:(1 + 2)]

[[ 0.50284076]
 [ 4.28364042]
 [-0.16933504]]
[[ 0.67184462  1.         -0.91760196]]


array([ 1.        , -0.91760196])

In [None]:
for t in range(iT):
    # Em Ox: Y[t][0] ~ X[t][:]
    # Aqui: mDados = [Y_t, X_t...]
    mDados = np.concatenate([, ], axis=1)
    print(mDados)
    
    mEtatt = Etat(mDados, mTheta, mCar,
                  y_t = Y[t:t+1, :],
                   x_t = X[t:t+1, :],
                     mBeta = mBetas, mOmega = mOmegas)  # “densidades” por regime

    if t == 0: 
        # mPsitt = (mEtatt .* mPI) / (mEtatt' * mPI)
        num = mEtatt * mPI
        den = float(mEtatt.T @ mPI)
        mPsitt = num / den

        # mPsit1t = P' * mPsitt
        mPsit1t = P.T @ mPsitt

        mPsiFtt = mPsitt.T        # 1 x M
        mPsiFt1t = mPsit1t.T      # 1 x M

    else:
        # mPsitt = (mEtatt .* mPsit1t) / (mEtatt' * mPsit1t)
        num = mEtatt * mPsit1t
        den = float(mEtatt.T @ mPsit1t)
        mPsitt = num / den

        # mPsit1t = P' * mPsitt
        mPsit1t = P.T @ mPsitt

        # Empilha por linhas (equivalente a | em Ox)
        mPsiFtt = np.concatenate([mPsiFtt, mPsitt.T], axis=0)
        mPsiFt1t = np.concatenate([mPsiFt1t, mPsit1t.T], axis=0)

# Em Ox: println(mPsiFtt~mPsiFt1t);
# Aqui: concatenação horizontal
resultado = np.concatenate([mPsiFtt, mPsiFt1t], axis=1)
print(resultado)


# if __name__ == "__main__":
#     main()


[[0.50284076 1.         0.22488637]]
[[4.28364042 1.         1.74004993]]
[[-0.16933504  1.         -0.20426111]]
[[ 0.67184462  1.         -0.91760196]]
[[-2.01449451  1.         -0.67416682]]
[[-1.92464743  1.         -0.34352802]]
[[0.39768939 1.         0.2233494 ]]
[[ 0.57686187  1.         -0.14139257]]
[[-1.4479281   1.         -0.18338381]]
[[0.97218372 1.         0.68035335]]
[[1.9762311  1.         0.09055805]]
[[-3.3727296   1.         -0.83327536]]
[[2.0064268  1.         0.81349984]]
[[3.0863699  1.         1.11738191]]
[[1.83881555 1.         0.31498606]]
[[-0.47459947  1.         -0.50031242]]
[[-3.81393617  1.         -1.62677083]]
[[1.37498886 1.         0.61943121]]
[[-2.67665719  1.         -1.45743084]]
[[-1.57290505  1.         -1.80348832]]
[[5.12628755 1.         2.00155947]]
[[1.82506307 1.         0.57911638]]
[[-1.6011158   1.         -0.70797022]]
[[1.74934207 1.         0.59335959]]
[[-0.76213588  1.         -0.58939351]]
[[1.40393878 1.         1.46743432]]

  den = float(mEtatt.T @ mPI)
  den = float(mEtatt.T @ mPsit1t)


In [55]:
print(resultado)

[[9.82088880e-01 1.79111202e-02 5.00000000e-01 5.00000000e-01]
 [9.99999996e-01 4.40091376e-09 5.00000000e-01 5.00000000e-01]
 [8.18068441e-01 1.81931559e-01 5.00000000e-01 5.00000000e-01]
 [9.92892976e-01 7.10702423e-03 5.00000000e-01 5.00000000e-01]
 [4.49473801e-03 9.95505262e-01 5.00000000e-01 5.00000000e-01]
 [5.24199063e-03 9.94758009e-01 5.00000000e-01 5.00000000e-01]
 [9.73132571e-01 2.68674291e-02 5.00000000e-01 5.00000000e-01]
 [9.87761824e-01 1.22381764e-02 5.00000000e-01 5.00000000e-01]
 [2.98160862e-02 9.70183914e-01 5.00000000e-01 5.00000000e-01]
 [9.96853494e-01 3.14650642e-03 5.00000000e-01 5.00000000e-01]
 [9.99946235e-01 5.37648744e-05 5.00000000e-01 5.00000000e-01]
 [2.80819050e-05 9.99971918e-01 5.00000000e-01 5.00000000e-01]
 [9.99949401e-01 5.05992802e-05 5.00000000e-01 5.00000000e-01]
 [9.99999355e-01 6.45443222e-07 5.00000000e-01 5.00000000e-01]
 [9.99905741e-01 9.42591083e-05 5.00000000e-01 5.00000000e-01]
 [6.05823103e-01 3.94176897e-01 5.00000000e-01 5.000000