<p style='text-align: right;'>初項：2024年x月x日</p>
<h2 align="center">Skewを考慮した最適ポートフォリオとBLモデル<br> 平均分散法の拡張</h2>
<p style='text-align: right;'>藤原　哉</p>

## 資産価格の生成
$$
\begin{align*}
f_X(x) = 2\phi(x;\mu, \Sigma) \Phi(\lambda_1 \Sigma^{\frac{1}{2}}(x-\mu))
\end{align*}
$$

In [None]:
import numpy as np
from scipy.stats import multivariate_normal, norm

def generate_truncated_skew_normal(mu: np.ndarray, lambda1: np.ndarray, Sigma: np.ndarray, size: int) -> np.ndarray:
    """
    Truncated Skew Normal分布に従う資産リターンを生成します。

    Args:
        mu (np.ndarray): 平均ベクトル。
        lambda1 (np.ndarray): 歪度パラメータベクトル。
        Sigma (np.ndarray): 共分散行列。
        size (int): 生成するサンプル数。

    Returns:
        np.ndarray: 生成された資産リターンの配列。
    """
    N_x = len(mu)  # 資産数を取得
    returns = np.zeros((size, N_x))  # リターン配列を初期化
    chol_Sigma = np.linalg.cholesky(Sigma)  # 共分散行列のCholesky分解
    inv_sqrt_Sigma = np.linalg.inv(chol_Sigma)  # Cholesky因子の逆行列を計算
    for i in range(size):
        z = multivariate_normal.rvs(mean=mu, cov=Sigma)  # 多変量正規分布からサンプルを生成
        delta = lambda1 @ inv_sqrt_Sigma  # 歪度パラメータの変換
        u = norm.cdf(delta @ (z - mu))  # 累積分布関数を計算
        returns[i] = z if np.random.rand() < 2 * u else -z  # 条件に応じてzまたは-zを選択
    return returns

def generate_random_correlation_matrix(N_x: int) -> np.ndarray:
    """
    ランダムな正定値相関行列を生成します。

    Args:
        N_x (int): 資産数。

    Returns:
        np.ndarray: 正定値の相関行列。
    """
    A = np.random.randn(N_x, N_x)
    Sigma = np.dot(A, A.T)  # 対称正定値行列を生成
    D_inv = np.diag(1 / np.sqrt(np.diag(Sigma)))  # 対角成分の逆数の平方根の対角行列
    R = D_inv @ Sigma @ D_inv  # 相関行列を計算
    np.fill_diagonal(R, 1)  # 対角成分を1に設定
    return R

def generate_asset_returns(N_x: int, N_Y: int) -> np.ndarray:
    """
    指定された資産数と年数に基づいて月次リターンを生成します。

    Args:
        N_x (int): 資産数。
        N_Y (int): 年数。

    Returns:
        np.ndarray: 生成された月次リターンの配列。
    """
    mu = np.random.uniform(-0.005, 0.02, N_x)  # 平均リターンを-0.5%から2%の範囲で設定（月次）
    lambda1 = np.random.uniform(-2, 2, N_x)  # 歪度パラメータを-2から2の範囲で設定
    std_devs = np.random.uniform(0.0144, 0.0866, N_x)  # 月次標準偏差を1.44%から8.66%の範囲で設定
    R = generate_random_correlation_matrix(N_x)  # ランダムな相関行列を生成
    Sigma = np.outer(std_devs, std_devs) * R  # 共分散行列を計算
    size = N_Y * 12  # 総月数を計算
    returns = generate_truncated_skew_normal(mu, lambda1, Sigma, size)  # リターンを生成
    return returns

# 乱数シードを固定
np.random.seed(42)

# パラメータ例
N_x = 6  # 資産数
N_Y = 20  # 年数

# 月次リターンを生成
monthly_returns = generate_asset_returns(N_x, N_Y)

# 各資産の年率標準偏差を計算して表示
annual_std_devs = np.std(monthly_returns, axis=0) * np.sqrt(12)
print("各資産の年率標準偏差:")
for i, std in enumerate(annual_std_devs):
    print(f"資産 {i+1}: {std:.2%}")

# 生成されたリターンを表示
print(monthly_returns)

In [None]:
import pandas as pd
import plotly.express as px

# monthly_returns、N_x、N_Yが既に定義されているとします。

size: int = N_Y * 12  # 総月数を計算
months_elapsed: np.ndarray = np.arange(size)  # 0から始まる経過月数の配列を作成

# monthly_returnsをデータフレームに変換
df_returns: pd.DataFrame = pd.DataFrame(monthly_returns, columns=[f'Asset {i+1}' for i in range(N_x)])
df_returns['Month'] = months_elapsed  # 経過月数の列を追加

# データを長い形式に変換
df_long: pd.DataFrame = df_returns.melt(id_vars='Month', var_name='Asset', value_name='Return')

# 資産ごとのリターンをラインプロット
fig = px.line(df_long, x='Month', y='Return', color='Asset', title='資産の月次リターン（経過月数）')

# グラフを表示
fig.show()

In [None]:
import numpy as np
import pandas as pd
import plotly.graph_objs as go
from plotly.subplots import make_subplots

# monthly_returns、N_xが既に定義されているとします。

# サブプロットの行数と列数を設定
cols = 2  # 1行あたりのグラフ数
rows = (N_x + cols - 1) // cols  # 必要な行数を計算

# サブプロットを作成
fig = make_subplots(rows=rows, cols=cols, subplot_titles=[f'Asset {i+1}' for i in range(N_x)])

for i in range(N_x):
    row = i // cols + 1
    col = i % cols + 1
    fig.add_trace(
        go.Histogram(
            x=monthly_returns[:, i],
            name=f'Asset {i+1}',
            nbinsx=30  # ヒストグラムのビン数を設定
        ),
        row=row,
        col=col
    )

# レイアウトを更新
fig.update_layout(
    title_text='各資産の月次リターンのヒストグラム',
    height=300 * rows,  # グラフの高さを調整
    showlegend=False
)

# グラフを表示
fig.show()

### pyroでの実装

In [None]:
import torch
import torch.special  # 追加
import pyro
import pyro.distributions as dist
from pyro.distributions import constraints
from pyro.infer import MCMC, NUTS
from torch.distributions import Distribution
from pyro.distributions.torch_distribution import TorchDistributionMixin
from pyro.infer.autoguide.initialization import init_to_value

# デバイスの指定
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

class TruncatedSkewNormal(Distribution, TorchDistributionMixin):
    arg_constraints = {
        "mu": constraints.real_vector,
        "lambda1": constraints.real_vector,
        "Sigma": constraints.dependent,  # Sigma は正定値対称行列で、dependent に設定
    }
    support = constraints.real_vector
    has_rsample = False  # リサンプリング不可

    def __init__(self, mu, lambda1, Sigma, validate_args=None):
        self.mu = mu
        self.lambda1 = lambda1
        self.Sigma = Sigma
        self.Nx = mu.size(-1)
        batch_shape = mu.shape[:-1]
        event_shape = (self.Nx,)
        super().__init__(batch_shape=batch_shape, event_shape=event_shape, validate_args=validate_args)
        # Cholesky 分解
        self.Sigma_sqrt = torch.linalg.cholesky(Sigma)
        # delta の計算
        self.delta = self.Sigma_sqrt @ self.lambda1.unsqueeze(-1)  # [Nx, 1]
        self.delta = self.delta.squeeze(-1)  # [Nx]
        # 標準正規分布の定数
        self._2_pi_log = torch.log(torch.tensor(2 * torch.pi, device=mu.device))

    def expand(self, batch_shape, _instance=None):
        new = self._get_checked_instance(TruncatedSkewNormal, _instance)
        batch_shape = torch.Size(batch_shape)
        new.mu = self.mu.expand(batch_shape + self.event_shape)
        new.lambda1 = self.lambda1.expand(batch_shape + self.event_shape)
        new.Sigma = self.Sigma
        new.Nx = self.Nx
        new.delta = self.delta
        new.Sigma_sqrt = self.Sigma_sqrt
        new._2_pi_log = self._2_pi_log
        super(TruncatedSkewNormal, new).__init__(batch_shape=batch_shape, event_shape=self.event_shape, validate_args=False)
        return new

    def log_prob(self, value):
        device = value.device
        delta = value - self.mu  # [batch_size, Nx]
        # 多変量正規分布の対数密度関数
        solve_result = torch.linalg.solve(self.Sigma, delta.unsqueeze(-1))
        log_phi = -0.5 * (delta.unsqueeze(-1) * solve_result).sum(dim=-2).squeeze(-1)
        logdet_Sigma = torch.logdet(self.Sigma)
        log_phi += -0.5 * self.Nx * self._2_pi_log - 0.5 * logdet_Sigma
        # temp の計算
        temp = solve_result.squeeze(-1)
        w = torch.sqrt(1 + (self.lambda1 * temp).sum(dim=-1))
        arg = (self.lambda1 * delta).sum(dim=-1) / w
        # 数値安定性を考慮した log_cdf の計算
        log_Phi = torch.special.log_ndtr(arg)
        # 対数確率密度関数
        log_2 = torch.log(torch.tensor(2.0, device=device))
        logp = log_2 + log_phi + log_Phi
        return logp

    def sample(self, sample_shape=torch.Size()):
        shape = sample_shape + self.mu.shape
        device = self.mu.device
        # 標準正規乱数の生成
        Z = torch.randn(shape, device=device)
        U_shape = sample_shape + self.mu.shape[:-1]
        U = torch.abs(torch.randn(U_shape, device=device))  # |U| の形状を調整
        # サンプルの生成
        X = self.mu + (self.Sigma_sqrt @ Z.unsqueeze(-1)).squeeze(-1) + self.delta * U.unsqueeze(-1)
        return X

def model(data):
    Nx = data.size(1)
    device = data.device
    mu = pyro.sample('mu', dist.Normal(torch.tensor(0., device=device), torch.tensor(10., device=device)).expand([Nx]).to_event(1))
    lambda1 = pyro.sample('lambda1', dist.Normal(torch.tensor(0., device=device), torch.tensor(10., device=device)).expand([Nx]).to_event(1))
    sd = pyro.sample('sd', dist.HalfCauchy(torch.tensor(2.5, device=device)).expand([Nx]).to_event(1))
    L_omega = pyro.sample('L_omega', dist.LKJCholesky(Nx, concentration=torch.tensor(1.0, device=device)))
    L = sd.unsqueeze(-1) * L_omega
    Sigma = L @ L.transpose(-1, -2)
    with pyro.plate('data', data.size(0)):
        pyro.sample('obs', TruncatedSkewNormal(mu, lambda1, Sigma), obs=data)

# データの読み込みとデバイスへの移動
monthly_returns_pyro = torch.tensor(monthly_returns, dtype=torch.float32).to(device)

# 初期パラメータの設定
Nx = monthly_returns.size(1)
initial_params = {
    'mu': torch.mean(monthly_returns, dim=0),
    'lambda1': torch.zeros(Nx, device=device),
    'sd': torch.std(monthly_returns, dim=0),
}

# NUTS カーネルの作成
nuts_kernel = NUTS(model, init_strategy=init_to_value(values=initial_params))

# MCMC の実行
mcmc_pyro = MCMC(nuts_kernel, num_samples=1000, warmup_steps=200, num_chains=4)
mcmc_pyro.run(monthly_returns_pyro)


### pyroでの結果確認

In [None]:
import torch
import pyro
import pyro.distributions as dist
from pyro.infer import MCMC, NUTS
import matplotlib.pyplot as plt
import arviz as az
import numpy as np

# サンプルの取得
samples_pyro = mcmc_pyro.get_samples(group_by_chain=True)

# muの平均値
mu_samples = samples_pyro['mu']  # shape: (num_chains, num_samples, Nx)
mu_mean = mu_samples.mean(dim=(0, 1))
print("muの平均値:", mu_mean)

# lambda1の平均値
lambda1_samples = samples_pyro['lambda1']
lambda1_mean = lambda1_samples.mean(dim=(0, 1))
print("lambda1の平均値:", lambda1_mean)

# sdの平均値
sd_samples = samples_pyro['sd']
sd_mean = sd_samples.mean(dim=(0, 1))
print("sdの平均値:", sd_mean)

# Sigmaの計算
L_omega_samples = samples_pyro['L_omega']
def compute_sigma(sd, L_omega):
    L = sd.unsqueeze(-1) * L_omega
    Sigma = torch.matmul(L, L.transpose(-1, -2))
    return Sigma

Sigma_samples = compute_sigma(sd_samples, L_omega_samples)
Sigma_mean = Sigma_samples.mean(dim=(0, 1))
print("Sigmaの平均値:\n", Sigma_mean)

# arvizを使用した解析
samples_numpy = {k: v.detach().cpu().numpy() for k, v in samples_pyro.items()}
idata = az.from_dict(posterior=samples_numpy)

# SigmaをInferenceDataに追加
Sigma_samples_numpy = Sigma_samples.detach().cpu().numpy()
idata.add_posterior({'Sigma': Sigma_samples_numpy})

# 推定結果のサマリーを表示
summary = az.summary(idata, var_names=["mu", "lambda1", "sd"])
print(summary)

# トレースプロット
az.plot_trace(idata, var_names=["mu", "lambda1", "sd"])
plt.show()

# Rhatの確認
rhat = az.rhat(idata, var_names=["mu", "lambda1", "sd"])
print("Rhat:\n", rhat)

# 有効サンプルサイズの確認
ess = az.ess(idata, var_names=["mu", "lambda1", "sd"])
print("有効サンプルサイズ:\n", ess)


### numpyro 実装

In [None]:
import numpy as np
import jax.numpy as jnp
from jax import random, lax
import jax.scipy.special
import numpyro
from numpyro import distributions as dist
from numpyro.distributions import constraints
from numpyro.distributions.distribution import Distribution
from numpyro.infer import MCMC, NUTS


In [6]:
class TruncatedSkewNormal(Distribution):
    arg_constraints = {
        "mu": constraints.real_vector,
        "lambda1": constraints.real_vector,
        "Sigma": constraints.dependent,  # Sigmaは他のパラメータに依存
    }
    support = constraints.real_vector
    reparametrized_params = ["mu", "lambda1", "Sigma"]

    def __init__(self, mu, lambda1, Sigma, validate_args=None):
        self.mu = mu
        self.lambda1 = lambda1
        self.Sigma = Sigma
        self.Nx = mu.shape[-1]
        batch_shape = jnp.shape(mu)[:-1]
        event_shape = (self.Nx,)
        super().__init__(batch_shape=batch_shape, event_shape=event_shape, validate_args=validate_args)
        # Cholesky分解
        self.Sigma_sqrt = jnp.linalg.cholesky(Sigma)
        # deltaの計算
        self.delta = jnp.matmul(self.Sigma_sqrt, self.lambda1[..., jnp.newaxis])[..., 0]

    def log_prob(self, value):
        delta = value - self.mu  # shape: [batch_size, Nx]
        # 多変量正規分布の対数密度関数
        solve_result = jnp.linalg.solve(self.Sigma, delta[..., jnp.newaxis])
        quad_form = jnp.sum(delta[..., jnp.newaxis] * solve_result, axis=(-2, -1))
        logdet_Sigma = 2.0 * jnp.sum(jnp.log(jnp.diagonal(self.Sigma_sqrt, axis1=-2, axis2=-1)), axis=-1)
        log_phi = -0.5 * (self.Nx * jnp.log(2 * jnp.pi) + logdet_Sigma + quad_form)
        # tempの計算
        #temp = solve_result[..., 0]
        #w = jnp.sqrt(1 + jnp.sum(self.lambda1 * temp, axis=-1))
        arg = jnp.sum(self.lambda1 * delta, axis=-1) #/ w
        # 数値安定性を考慮したlog_cdfの計算
        log_Phi = jax.scipy.special.log_ndtr(arg)
        # 対数確率密度関数
        logp = jnp.log(2.0) + log_phi + log_Phi
        return logp

    def _validate_sample(self, value):
        pass  # サンプルの検証をスキップ（必要に応じて実装）

    @staticmethod
    def infer_shapes(mu, lambda1, Sigma):
        event_shape = mu.shape[-1:]
        batch_shape = lax.broadcast_shapes(mu.shape[:-1], lambda1.shape[:-1], Sigma.shape[:-2])
        return batch_shape, event_shape


In [7]:
def model(data):
    Nx = data.shape[1]
    mu = numpyro.sample('mu', dist.Normal(0., 0.1).expand([Nx]))
    lambda1 = numpyro.sample('lambda1', dist.Normal(0., 1.).expand([Nx]))
    sd = numpyro.sample('sd', dist.HalfCauchy(0.2).expand([Nx]))
    # コレスキー分解を用いた共分散行列の生成
    L_omega = numpyro.sample('L_omega', dist.LKJCholesky(Nx, concentration=1.0))
    L = sd[..., jnp.newaxis] * L_omega
    Sigma = jnp.matmul(L, jnp.transpose(L, axes=(-1, -2)))
    with numpyro.plate('data', data.shape[0]):
        numpyro.sample('obs', TruncatedSkewNormal(mu, lambda1, Sigma), obs=data)


In [None]:
# JAXのデバイス配列に変換
monthly_returns_numpyro = jnp.array(monthly_returns)

# NUTSカーネルの作成
nuts_kernel = NUTS(model,adapt_step_size=True)
#mcmc_numpyro = MCMC(nuts_kernel, num_warmup=200, num_samples=1000, num_chains=1)

# CPU マルチコアの場合
mcmc_numpyro = MCMC(nuts_kernel, num_warmup=2000, num_samples=20000, num_chains=4, chain_method='vectorized')
# GPUの場合
#mcmc_numpyro = MCMC(nuts_kernel, num_warmup=200, num_samples=1000, num_chains=4, chain_method='parallel')

# MCMCの実行
mcmc_numpyro.run(random.PRNGKey(0), data=monthly_returns_numpyro)

# MCMCの結果を取得
samples_numpyro = mcmc_numpyro.get_samples()

In [None]:
print(samples_numpyro.keys())

# muの平均値
mu_samples = samples_numpyro['mu']  # shape: (num_samples, Nx)
mu_mean = jnp.mean(mu_samples, axis=0)  # 平均値を計算
print("muの平均値:", mu_mean)

# lambda_1 の平均値
lambda1_samples = samples_numpyro['lambda1']
lambda1_mean = jnp.mean(lambda1_samples, axis=0)
print("lambda1の平均値:", lambda1_mean)

# sd L_omega のサンプル 
sd_samples = samples_numpyro['sd']  # shape: (num_samples, Nx)
L_omega_samples = samples_numpyro['L_omega']  # shape: (num_samples, Nx, Nx)

from jax import vmap

def compute_sigma(sd, L_omega):
    L = sd[:, jnp.newaxis] * L_omega  # (Nx, Nx)
    Sigma = jnp.matmul(L, L.T)  # (Nx, Nx)
    return Sigma

# `vmap` を使用して全サンプルに対して計算
Sigma_samples = vmap(compute_sigma)(sd_samples, L_omega_samples)  # shape: (num_samples, Nx, Nx)

Sigma_mean = jnp.mean(Sigma_samples, axis=0)  # shape: (Nx, Nx)
print("Sigmaの平均値:\n", Sigma_mean)

sd_mean = jnp.mean(sd_samples, axis=0)
L_omega_mean = jnp.mean(L_omega_samples, axis=0)

Sigma_mean2 = compute_sigma(sd_mean, L_omega_mean)
print("sdの平均値：　",sd_mean)
print("Sigmaの平均値2:\n", Sigma_mean2)

In [None]:
import arviz as az
import matplotlib.pyplot as plt

# NumPyroのMCMC結果をInferenceDataに変換
idata = az.from_numpyro(mcmc_numpyro)

# サマリーの表示
summary = az.summary(idata, var_names=["mu", "lambda1", "sd"])
print(summary)

# トレースプロットの表示
az.plot_trace(idata, var_names=["mu", "lambda1", "sd"])
plt.show()
