In [18]:
from functools import partial
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
# from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
# from sklearn.metrics import accuracy_score
from dynamax.hidden_markov_model import GaussianHMM
import jax.numpy as jnp
import jax.random as jr
import optax
from jax import vmap

In [25]:
num_train_batches = 3
num_test_batches = 1
num_timesteps = 100

true_num_states = 3
emission_dim = 5
hmm = GaussianHMM(true_num_states, emission_dim)

initial_probs = jnp.ones(true_num_states) / true_num_states
transition_matrix = 0.80 * jnp.eye(true_num_states) \
    + 0.15 * jnp.roll(jnp.eye(true_num_states), 1, axis=1) \
    + 0.05 / true_num_states
emission_means = jnp.column_stack([
    jnp.cos(jnp.linspace(0, 2 * jnp.pi, true_num_states + 1))[:-1],
    jnp.sin(jnp.linspace(0, 2 * jnp.pi, true_num_states + 1))[:-1],
    jnp.zeros((true_num_states, emission_dim - 2)),
    ])
emission_covs = jnp.tile(0.1**2 * jnp.eye(emission_dim), (true_num_states, 1, 1))

true_params, props = hmm.initialize(initial_probs=initial_probs,
                                transition_matrix=transition_matrix,
                                emission_means=emission_means,
                                emission_covariances=emission_covs)

# Sample train, validation, and test data
train_key, val_key, test_key = jr.split(jr.PRNGKey(0), 3)
f = vmap(partial(hmm.sample, true_params, num_timesteps=num_timesteps))
train_true_states, train_emissions = f(jr.split(train_key, num_train_batches))
test_true_states,  test_emissions  = f(jr.split(test_key, num_test_batches))

hmm.fit_em(true_params, props, train_emissions, num_iters=10)

(ParamsGaussianHMM(initial=ParamsStandardHMMInitialState(probs=Array([0.6363637 , 0.33333337, 0.03030304], dtype=float32)), transitions=ParamsStandardHMMTransitions(transition_matrix=Array([[0.74240583, 0.21992712, 0.03766707],
        [0.03153611, 0.7945066 , 0.17395729],
        [0.1543052 , 0.00937766, 0.8363171 ]], dtype=float32)), emissions=ParamsGaussianHMMEmissions(means=Array([[ 1.0027838 , -0.00900382, -0.01286408, -0.00475318,  0.00263187],
        [-0.5063755 ,  0.859224  ,  0.01620091, -0.01208951, -0.0147169 ],
        [-0.5088892 , -0.87816954,  0.02022544,  0.00228761,  0.00467192]],      dtype=float32), covs=Array([[[ 8.52393266e-03,  1.50035555e-03, -1.02203339e-04,
           1.50570611e-03,  3.39067592e-05],
         [ 1.50035555e-03,  1.13528995e-02, -1.63830002e-03,
           1.12502417e-03, -1.59981055e-03],
         [-1.02203339e-04, -1.63830002e-03,  6.49747020e-03,
          -5.40972804e-04,  1.03812595e-03],
         [ 1.50570634e-03,  1.12502428e-03, -5.4097

In [61]:
from sys import last_exc
from httpx import post


print(test_emissions.shape)
print(test_emissions[0].shape)
# print(test_emissions[0])

state = hmm.most_likely_states(true_params, test_emissions[0])
print(state)
print(state[-1])

last_state = state[-1]

posteriors = hmm.smoother(true_params, test_emissions[0])
print(posteriors.smoothed_probs.shape)
print(posteriors.smoothed_probs[-1][last_state])

(1, 100, 5)
(100, 5)
[1 1 1 1 1 1 2 2 0 0 0 2 1 2 1 1 1 1 1 1 1 2 2 2 2 2 2 2 0 0 0 0 0 1 1 1 1
 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 0 0 0 0 0 1 1 1 1 1 1 1 1 1 1 1 1 1 1 2
 2 2 2 2 0 0 0 0 1 1 1 1 1 2 2 2 2 2 2 2 2 2 0 0 0 0]
0
(100, 3)
1.0


In [2]:
def calculate_basic_features(df):
    features = {
        'returns': df['close'].pct_change(),
        'log_returns': np.log(df['close']).diff(),
        'price_range': df['high'] - df['low'],
        'price_range_pct': (df['high'] - df['low']) / df['open'],
        'volume_price_ratio': df['volume'] / df['close'],
        'volume_change': df['volume'].pct_change()
    }
    return pd.DataFrame(features).fillna(0)

In [4]:
def apply_kalman_filter(features_df):
    # 初始化Kalman Filter (狀態維度等於特徵數)
    n_features = features_df.shape[1]
    kf = KalmanFilterCore(
        dim_state=n_features,
        process_noise_cov=np.eye(n_features) * 0.001,
        measure_noise_cov=np.eye(n_features) * 0.001
    )
    
    # 存儲濾波後的特徵
    filtered_features = []
    
    # 對每個時間點的特徵向量進行濾波
    for _, row in features_df.iterrows():
        measurement = row.values
        _, _ = kf.update(measurement)
        filtered_features.append(kf.x.copy())  # 儲存濾波後的狀態估計
        
    return np.array(filtered_features)

In [5]:
def process_features(filtered_features, n_components=None):
    # 標準化
    scaler = StandardScaler()
    scaled_features = scaler.fit_transform(filtered_features)
    
    # PCA降維
    if n_components is not None:
        pca = PCA(n_components=n_components)
        processed_features = pca.fit_transform(scaled_features)
    else:
        processed_features = scaled_features
        
    return processed_features

In [6]:
def train_hmm(processed_features, n_states=3):
    # 初始化HMM
    hmm = HMMCore(n_states=n_states)
    
    # 使用處理後的特徵矩陣訓練HMM
    hmm.fit(processed_features)
    
    return hmm

In [7]:
def generate_ohlcv_data(n_samples=10000, volatility=0.001, volume_mean=1000, seed=42):
    """
    生成模擬的1分鐘OHLCV數據
    
    Parameters
    ----------
    n_samples : int
        數據點數量
    volatility : float
        價格波動率
    volume_mean : float
        平均成交量
    seed : int
        隨機種子
    
    Returns
    -------
    pd.DataFrame
        包含OHLCV數據的DataFrame
    """
    np.random.seed(seed)
    
    # 生成價格序列（使用幾何布朗運動）
    dt = 1/1440  # 1分鐘
    drift = 0.1  # 年化漂移率
    prices = np.exp(
        (drift - volatility**2/2)*dt + 
        volatility*np.random.normal(0, np.sqrt(dt), n_samples)
    ).cumprod() * 100  # 起始價格100
    
    # 生成OHLCV數據
    data = {
        'timestamp': pd.date_range(
            start='2024-01-01', 
            periods=n_samples, 
            freq='1min'
        ),
        'open': prices,
        'high': prices * (1 + np.random.uniform(0, volatility, n_samples)),
        'low': prices * (1 - np.random.uniform(0, volatility, n_samples)),
        'close': prices * (1 + np.random.normal(0, volatility, n_samples)),
        'volume': np.random.lognormal(
            np.log(volume_mean), 
            0.5, 
            n_samples
        )
    }
    
    df = pd.DataFrame(data)
    
    # 確保high/low的邏輯正確性
    df['high'] = df[['open', 'close', 'high']].max(axis=1)
    df['low'] = df[['open', 'close', 'low']].min(axis=1)
    
    # df = df.set_index('timestamp', inplace=True)
    
    return df

# 生成數據
df = generate_ohlcv_data(n_samples=10000)

# 分割訓練集
train_df = df.iloc[:2000]
test_df = df.iloc[2000:]

In [None]:
print('訓練集數據：', train_df.head())
print('測試集數據：', test_df.head())

In [None]:
train_df.set_index('timestamp', inplace=True)
test_df.set_index('timestamp', inplace=True)

print(train_df.head())
print(test_df.head())

In [None]:
# 1. 計算基礎特徵
basic_features = calculate_basic_features(train_df)

# 2. Kalman Filter處理
filtered_features = apply_kalman_filter(basic_features)

# 3. 標準化和PCA
processed_features = process_features(filtered_features, n_components=5)

print(processed_features)
print(processed_features.shape)
# 4. HMM訓練
hmm = train_hmm(processed_features)

In [None]:
def plot_time_series_with_states(df, features, states, n_states):
    """
    繪製原始時間序列數據和對應的HMM狀態。
    
    Parameters
    ----------
    df : pd.DataFrame
        包含原始OHLCV數據的DataFrame，帶有時間索引
    features : list
        要繪製的特徵名稱列表
    states : np.ndarray
        HMM狀態序列，長度應該與df相同
    n_states : int
        HMM的狀態數量
    """
    # 創建子圖
    fig, axes = plt.subplots(len(features) + 1, 1, 
                            figsize=(15, 3*len(features)), 
                            sharex=True)
    
    # 繪製每個原始特徵的時間序列
    for i, feature in enumerate(features):
        axes[i].plot(df.index, df[feature], label=feature)
        axes[i].legend()
        axes[i].grid(True)
        axes[i].set_ylabel(feature)
    
    # 繪製HMM狀態
    scatter = axes[-1].scatter(df.index, 
                             np.zeros_like(states),  # y座標都是0，只用顏色區分狀態
                             c=states, 
                             cmap='tab10',  # 使用離散的顏色圖
                             vmin=0, 
                             vmax=n_states-1)
    axes[-1].set_ylabel('HMM States')
    plt.colorbar(scatter, ax=axes[-1], 
                ticks=range(n_states),
                label='State')
    
    # 設置x軸標籤
    axes[-1].set_xlabel('Time')
    
    plt.tight_layout()
    plt.show()

# 使用訓練好的HMM模型獲取states
# 注意：這裡不需要predict_stream，因為我們已經用這些數據訓練了模型
emission_matrix = hmm._gaussian_probability(processed_features, hmm.means, hmm.variances)
log_probs = np.log(emission_matrix)
states = np.argmax(log_probs, axis=1)  # 選擇最可能的狀態

# 繪製時間序列和對應的狀態
plot_time_series_with_states(train_df, 
                           ['open', 'high', 'low', 'close', 'volume'], 
                           states, 
                           n_states=3)

In [12]:
def plot_state_transitions(states, n_states):
    """繪製狀態轉換熱圖"""
    # 計算轉換矩陣
    transitions = np.zeros((n_states, n_states))
    for i in range(len(states)-1):
        transitions[states[i], states[i+1]] += 1
    
    # 正規化
    transitions = transitions / transitions.sum(axis=1, keepdims=True)
    
    # 繪製熱圖
    plt.figure(figsize=(8, 6))
    sns.heatmap(transitions, annot=True, cmap='YlOrRd', 
                xticklabels=range(n_states), 
                yticklabels=range(n_states))
    plt.title('State Transition Probabilities')
    plt.xlabel('To State')
    plt.ylabel('From State')
    plt.show()

In [13]:
def plot_feature_space_states(features, states, n_states):
    """在特徵空間中可視化狀態分布"""
    fig = plt.figure(figsize=(15, 5))
    
    # 2D投影
    ax1 = fig.add_subplot(131)
    scatter = ax1.scatter(features[:, 0], features[:, 1], 
                         c=states, cmap='tab10', 
                         alpha=0.6, vmin=0, vmax=n_states-1)
    ax1.set_xlabel('Feature 1')
    ax1.set_ylabel('Feature 2')
    plt.colorbar(scatter)
    
    # 3D視圖
    ax2 = fig.add_subplot(132, projection='3d')
    scatter = ax2.scatter(features[:, 0], features[:, 1], features[:, 2],
                         c=states, cmap='tab10',
                         alpha=0.6, vmin=0, vmax=n_states-1)
    ax2.set_xlabel('Feature 1')
    ax2.set_ylabel('Feature 2')
    ax2.set_zlabel('Feature 3')
    
    plt.tight_layout()
    plt.show()

In [14]:
def plot_state_durations(states):
    """分析並繪製每個狀態的持續時間分布"""
    n_states = len(np.unique(states))
    durations = []
    current_state = states[0]
    current_duration = 1
    
    # 計算持續時間
    for state in states[1:]:
        if state == current_state:
            current_duration += 1
        else:
            durations.append((current_state, current_duration))
            current_state = state
            current_duration = 1
    durations.append((current_state, current_duration))
    
    # 按狀態分組
    state_durations = [[] for _ in range(n_states)]
    for state, duration in durations:
        state_durations[state].append(duration)
    
    # 繪製箱型圖和小提琴圖
    plt.figure(figsize=(12, 6))
    
    # 左側箱型圖
    plt.subplot(121)
    plt.boxplot(state_durations)
    plt.title('State Duration Distribution (Box Plot)')
    plt.xlabel('State')
    plt.ylabel('Duration (time steps)')
    
    # 右側小提琴圖
    plt.subplot(122)
    plt.violinplot(state_durations)
    plt.title('State Duration Distribution (Violin Plot)')
    plt.xlabel('State')
    plt.ylabel('Duration (time steps)')
    
    plt.tight_layout()
    plt.show()

In [15]:
def plot_state_distributions(features, states, n_states):
    """可視化每個狀態的特徵分布"""
    fig, axes = plt.subplots(1, 3, figsize=(15, 5))
    
    for i in range(3):  # 對每個特徵
        for state in range(n_states):  # 對每個狀態
            state_data = features[states == state, i]
            sns.kdeplot(data=state_data, ax=axes[i], label=f'State {state}')
        
        axes[i].set_title(f'Feature {i+1} Distribution by State')
        axes[i].legend()
    
    plt.tight_layout()
    plt.show()

In [16]:
def plot_transition_matrix(hmm):
    """繪製HMM轉換矩陣熱圖"""
    plt.figure(figsize=(10, 8))
    sns.heatmap(hmm.transition_matrix, 
                annot=True, 
                fmt='.2f',
                cmap='YlOrRd',
                xticklabels=[f'State {i}' for i in range(hmm.n_states)],
                yticklabels=[f'State {i}' for i in range(hmm.n_states)])
    plt.title('State Transition Matrix')
    plt.xlabel('To State')
    plt.ylabel('From State')
    plt.show()

In [17]:
def plot_state_feature_distributions(processed_features, states, feature_names):
    """繪製每個狀態下各特徵的分布"""
    n_states = len(np.unique(states))
    n_features = processed_features.shape[1]
    
    fig, axes = plt.subplots(n_states, n_features, 
                            figsize=(4*n_features, 3*n_states))
    
    for state in range(n_states):
        state_data = processed_features[states == state]
        for j in range(n_features):
            ax = axes[state, j]
            sns.histplot(state_data[:, j], ax=ax, bins=30)
            ax.set_title(f'State {state} - Feature {feature_names[j]}')
            
    plt.tight_layout()
    plt.show()

In [18]:
def plot_3d_state_space(processed_features, states):
    """在3D空間中可視化狀態分布"""
    fig = plt.figure(figsize=(12, 8))
    ax = fig.add_subplot(111, projection='3d')
    
    scatter = ax.scatter(processed_features[:, 0],
                        processed_features[:, 1],
                        processed_features[:, 2],
                        c=states,
                        cmap='tab10')
    
    ax.set_xlabel('Feature 1')
    ax.set_ylabel('Feature 2')
    ax.set_zlabel('Feature 3')
    plt.colorbar(scatter, label='State')
    plt.title('State Distribution in Feature Space')
    plt.show()

In [None]:
# 獲取states
emission_matrix = hmm._gaussian_probability(processed_features, hmm.means, hmm.variances)
states = np.argmax(emission_matrix, axis=1)

# 1. 轉換矩陣
plot_transition_matrix(hmm)

# 2. 特徵分布
feature_names = ['Feature 1', 'Feature 2', 'Feature 3', 'Feature 4', 'Feature 5']
plot_state_feature_distributions(processed_features, states, feature_names)

# 3. 持續時間分析
plot_state_durations(states)

# 4. 3D特徵空間
plot_3d_state_space(processed_features, states)