# ポートフォリオ最適化

今回取り扱う問題は、ポートフォリオ最適化と呼ばれる最適化問題です。

ポートフォリオ最適化とは、どの金融商品を全資産に対してどのくらい投資すれば利益を最大化できるのかを決定する問題です。

次のデモでは、 NASDAQ銘柄を構成する100個の銘柄のうち、99個の過去(2021/1/14~2021/5/14)の価格データに対して、最適なポートフォリオを決定します。

「Run」ボタンを押すと実行されます。
「Options」から実行時のパラメーターの変更が可能です。

---
参考文献

https://en.wikipedia.org/wiki/Nasdaq


In [None]:
%%html
<style>.portfolio {
    width: 100%;
    margin: 0 auto;
    font-size: 100%;
    font-size: 1.vw;
}

In [None]:
from IPython.core.display import display
from ipywidgets import (
    Button,
    FloatSlider,
    IntSlider,
    interactive_output,
    VBox,
    HBox,
    Output,
    Label,
    Accordion,
    IntProgress,
    GridBox,
    HTML,
    GridspecLayout,
)
import matplotlib.pyplot as plt

In [None]:
class AmplifyProblem(object):
    def __init__(self, timeout, *args, **kwargs):
        self.timeout = timeout

    def construct(self):
        raise NotImplementedError

    def solve(self):
        raise NotImplementedError

    def _solve(self, model):
        # クライアントの設定
        client = FixstarsClient()
        # ローカル環境では Amplify AEのアクセストークンを入力してください
        # client.token = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
        assert isinstance(self.timeout, int)
        client.parameters.timeout = self.timeout
        # ソルバーを定義して実行
        #         print("solving...")
        solver = Solver(client)
        result = solver.solve(model)
        #         print("finish!")
        return result

In [None]:
from amplify import einsum, BinarySymbolGenerator
from amplify import Solver
from amplify.constraint import equal_to
from amplify import gen_symbols
from amplify.client import FixstarsClient
import numpy as np
from ipywidgets import HTML
import glob
import os
import pandas as pd


def read_csv_data_yahoo(root):
    """
    Parameters
    ----------
    root: str
        データのあるディレクトリ名

    Returns
    -------
    asset: np.array
        銘柄と期間中のkeyの変動を格納した行列, shape is (num of brands, num of dates)
    """
    asset = list()
    paths = glob.glob(f"{root}/*.csv")
    for path in paths:
        df = pd.read_csv(path)  # showするならここに挿入
        high_value = df["High"].to_numpy().reshape(1, -1)
        low_value = df["Low"].to_numpy().reshape(1, -1)

        # highとlowの平均値
        value = (high_value + low_value) * 0.5
        asset.append(value)

    asset = np.concatenate(asset, axis=0)

    names = [os.path.basename(path).replace(".csv", "") for path in paths]
    return asset, names


def historical_data_method(asset, D):
    """ヒストリカルデータ方式による期待収益率・分散の計算
    Parameters
    ----------
    asset: np.array
        各銘柄の過去の価格値

    D: int
        投資して回収するまでの期間

    Returns
    -------
    expected_rate_of_return: array
        shape is (num of brands, N-D)
    covariance_rate_of_return; array
        shape is (num of brands, num of brands)
    """
    assert isinstance(D, int)
    _, N = asset.shape

    # N - Dまで欲しいので + 1
    list_rate_of_return_per_term = list()
    for j in range(N - D + 1):
        start = j
        end = j + D - 1
        rate_of_return = np.divide(
            asset[:, end] - asset[:, start], asset[:, start]
        ).reshape(-1, 1)
        list_rate_of_return_per_term.append(rate_of_return)

    rate_of_return_per_term = np.concatenate(list_rate_of_return_per_term, axis=1)

    # 各銘柄に対する期待収益率
    expected_rate_of_return = np.mean(rate_of_return_per_term, axis=1)

    # 銘柄間の共分散
    covariance_rate_of_return = np.cov(rate_of_return_per_term)

    return expected_rate_of_return, covariance_rate_of_return, rate_of_return_per_term


class MarkowitzProblem(AmplifyProblem):
    """
    Parameters
    ----------
    D: int
        回収期間
    K: int
        最大口数
    gamma: float
        リスク分散を考慮するパラメータ

    """

    def __init__(self, D, K, gamma, *args, **kwargs):
        super().__init__(*args, **kwargs)

        # 定式化中のK(口)
        maximum_asset = K
        asset, names = read_csv_data_yahoo(root="../../storage/portfolio")
        num_brand, num_date = asset.shape

        # 各銘柄における
        (
            expected_rate_of_return,
            variance_rate_of_return,
            rate_of_return_per_term,
        ) = historical_data_method(asset, D)

        self.config = dict(
            num_brand=num_brand,
            num_date=num_date,
            variance_rate_of_return=variance_rate_of_return,
            expected_rate_of_return=expected_rate_of_return,
            rate_of_return_per_term=rate_of_return_per_term,
            asset=asset,
            maximum_asset=maximum_asset,
            names=names,
        )

        self.gamma = gamma

    def construct(self):
        """ """
        # =========
        # 変数の宣言
        # =========
        x = BinarySymbolGenerator().array(
            self.config["num_brand"], self.config["maximum_asset"]
        )
        w = x.sum(axis=1)
        self.w = w
        self.x = x

        # ==========
        # 変数の固定化
        # ==========
        pass

        # ===================
        # 目的関数と制約式の追加
        # ===================
        objective = self.setObjective()
        constraint = self.setConstraint()

        # =============
        # コスト関数の定義
        # =============
        self.priority = 0.05

        model = objective + self.priority * constraint
        return model

    def setObjective(self):
        """目的関数を定義

        Parameters
        ----------
        gamma: float
            期待リスクの重要度を表すパラメータ

        Notes
        -----
        期待収益率の最大化と期待リスクの最小化
        期待収益率: E_w(R)
        期待リスク: V_w(R)
        目的関数 =  E_w(R) - gamma / 2 * V_w(R)
        """
        n = self.config["num_brand"]
        expected_rate_of_return = self.config["expected_rate_of_return"]
        variance_rate_of_return = self.config["variance_rate_of_return"]

        # 期待収益率の定義
        profit = einsum("i,i->", self.w, expected_rate_of_return)

        # 分散の定義
        risk = einsum("i,j,ij->", self.w, self.w, variance_rate_of_return)

        # 目的関数の定義
        objective = -profit + self.gamma * 0.5 * risk

        return objective

    def setConstraint(self):
        """制約式を定義

        Parameters
        ----------
        K: int
            全投資資産

        Notes
        -----
        全投資資産はK口だけ
            \sum_{i=1}^n w_i = K
        """

        K = self.config["maximum_asset"]
        constraint = equal_to(self.w, K)

        return constraint

    def solve(self):
        model = self.construct()
        result = self._solve(model)

        if len(result) == 0:
            raise RuntimeError("The given constraints are not satisfied")

        return result

    def eval(self, result):
        """Amplifyをもちいて解いた結果から評価を行う関数.
        Parameters
        ----------
        result: amplify.SolverResult
            ソルバーから返された結果を格納している
        """
        for solution in result:
            energy = solution.energy
            values = solution.values

        # 各銘柄の投資口数が解である.
        w_values = self.w.decode(result[0].values).astype(int)

        # 期待収益率の計算
        expected_rate_of_return = self.config["expected_rate_of_return"]
        covariance_rate_of_return = self.config["variance_rate_of_return"]

        result_profit = np.dot(expected_rate_of_return, w_values)
        result_variance = None

        msg = ("<h4>", "[期待収益率]: ", f"{result_profit}", "</h4>")
        # display(HTML("".join(msg)))

        # 分散投資の結果
        display_df = self.showSortedDataFrame(
            names=self.config["names"],
            return_rates=expected_rate_of_return,
            w_values=w_values,
        )

        # 投資結果から収益率を可視化する
        display_fig_expected_return = self.showHistorical(
            names=self.config["names"],
            rate_of_return_per_term=self.config["rate_of_return_per_term"],
            w_values=w_values,
        )

        return display_df, msg, display_fig_expected_return

    @staticmethod
    def showSortedDataFrame(names, return_rates, w_values):
        """ """
        import pandas as pd

        df = pd.DataFrame(
            dict(brand_name=names, return_rate=return_rates, count=w_values.tolist())
        )

        # 投資口数が多い順にソート
        sorted_df = df.sort_values("count", ascending=False)

        # 表示する際には投資口数0の銘柄を表示しない
        display_df = sorted_df[sorted_df["count"] > 0]
        return display_df.reset_index(drop=True)

    @staticmethod
    def showHistorical(names, rate_of_return_per_term, w_values):
        # 選ばれた金融銘柄の抽出
        selected_indices = np.where(w_values > 0)[0]

        # 必要な銘柄の過去の収益率を抽出
        fig = plt.figure(figsize=(12, 12))
        ax = fig.add_subplot()
        for index in selected_indices:
            name = names[index]
            return_rate = rate_of_return_per_term[index]
            ax.plot(return_rate, label=name, marker="o")

        ax.set_title("return rate (historical)", fontsize=16)
        plt.legend(fontsize=16)
        return ax

In [None]:
from ipywidgets import Button, IntSlider, interactive_output, HBox, Output, Box


class BaseDemo(object):
    def __init__(self):
        super().__init__()

        self.problem_out = None
        self.problem_result_out = None

        run_btn = Button(
            description="Run", button_style="", tooltip="Run", icon="check"
        )
        run_btn.on_click(self.show_result)
        self.run_btn = run_btn

    def show_problem(self, *args, **kwargs):
        assert self.problem_out is not None
        with self.problem_out:
            self._show_problem(*args, **kwargs)

    def show_result(self, *args, **kwargs):
        assert self.problem_result_out is not None

        with self.problem_result_out:
            self.problem_result_out.clear_output()
            self._show_result(*args, **kwargs)

    def _show_problem(self, *args, **kwargs):
        """入力結果を可視化する関数"""
        raise NotImplementedError

    def _show_result(self, *args, **kwargs):
        """実行結果を可視化する関数"""
        raise NotImplementedError

In [None]:
class DemoPortfolio(BaseDemo):
    name = "demo_portfolio"

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        problem_out = Output()
        problem_result_out = Output()
        problem_out.add_class(self.name)
        problem_result_out.add_class(self.name)
        out_expectedReturnRate = Output()
        out_expectedReturnRate.add_class("expected_return_rate")

        self.problem_out = problem_out
        self.problem_result_out = problem_result_out
        self.out_expectedReturnRate = out_expectedReturnRate

    def _show_problem(self, K, D, gamma, *args, **kwargs):
        """データの読み込みを行う
        K: int
            最大投資口数
        D: int
            利益回収期間
        gamma: float
        """
        problem = MarkowitzProblem(K=K, D=D, gamma=gamma, *args, **kwargs)
        self.problem = problem

    # over ride
    def show_result(self, *args, **kwargs):
        assert self.problem_result_out is not None

        display_df, msg, display_fig_expected_return = self._show_result(
            *args, **kwargs
        )

        with self.problem_result_out:
            self.problem_result_out.clear_output()
            display(HTML("<h2><b>実行結果</b></h2>"))
            display(HTML("<h4>各金融商品に対する最適な投資口数を出力します。</h4>"))
            display(HTML("".join(msg)))
            display(display_df)

        with self.out_expectedReturnRate:
            self.out_expectedReturnRate.clear_output()
            display(HTML("<h2><b>入力</b></h2>"))
            display(HTML("<h4>選ばれた金融銘柄の過去の収益率を可視化します。</h4>"))
            display(HTML("<h4><br></h4>"))
            display(display_fig_expected_return.figure)
            plt.clf()
            plt.close()

        # ui = HBox([self.problem_result_out, out_expectedReturnRate])
        # display(ui)

    def _show_result(self, *args, **kwargs):
        """実行結果を可視化する"""
        # display(HTML("<h4>各金融商品に対する最適な投資口数を出力します。</h4>"))
        result = self.solve()

        # display(HTML("<h2><b>実行結果</b></h2>"))
        return self.problem.eval(result)

    def showReturnRate(self):
        """過去の収益率を可視化する."""
        pass

    def solve(self, *args, **kwargs):
        """問題を解く"""
        result = self.problem.solve()
        return result

In [None]:
gamma_slider = FloatSlider(
    value=10,
    min=0,
    max=100.0,
    step=1,
    disabled=False,
    continuous_update=False,
    orientation="horizontal",
    readout=True,
    readout_format="f",
)
K_slider = IntSlider(
    value=10,
    min=1,
    max=15,
    step=1,
    disabled=False,
    continuous_update=False,
    orientation="horizontal",
    readout=True,
    readout_format="d",
)
D_slider = IntSlider(
    value=7,
    min=1,
    max=30,
    step=1,
    disabled=False,
    continuous_update=False,
    orientation="horizontal",
    readout=True,
    readout_format="d",
)
timeout_slider = IntSlider(
    value=1000,
    min=0,
    max=10000,
    step=1000,
    disabled=False,
    continuous_update=False,
    orientation="horizontal",
    readout=True,
    readout_format="f",
)

problem = DemoPortfolio()

out = interactive_output(
    problem.show_problem,
    {"gamma": gamma_slider, "K": K_slider, "D": D_slider, "timeout": timeout_slider},
)
problem.problem_out = out

options1 = [
    Label(value="gamma: 分散を考慮するパラメータ"),
    gamma_slider,
    Label(value="timeout: 制限時間[ ms ]"),
    timeout_slider,
]
options2 = [Label(value="K: 投資する全資産数"), K_slider]
options3 = [Label(value="D: 投資してから回収するまでの期間(日付)"), D_slider]
options = [GridBox(options1), GridBox(options2), GridBox(options3)]
options = Accordion(children=[HBox(options)])
options.set_title(0, "Options")
options.selected_index = None

grid = GridspecLayout(10, 10)
right = 6

grid[0, 0] = problem.run_btn
grid[1:, :right] = problem.out_expectedReturnRate
grid[1:, right:] = problem.problem_result_out

display(VBox([options, grid]))