In [None]:
import warnings
from typing import Optional, Callable, Dict, List
from functools import partial
from scipy.optimize import minimize
from scipy.cluster.hierarchy import linkage
from scipy.spatial.distance import squareform
import numpy as np
import pandas as pd


class OptMetrics:
    """portfolio optimizer metrics"""

    @staticmethod
    def expected_return(
        weights: np.ndarray,
        expected_returns: np.ndarray,
    ) -> float:
        """
        Portfolio expected return.

        Args:
            weight (np.ndarray): weight of assets.
            expected_returns (np.ndarray): expected return of assets.

        Returns:
            float: portfolio expected return.
        """
        return np.dot(weights, expected_returns)

    @staticmethod
    def expected_variance(
        weights: np.ndarray,
        covariance_matrix: np.ndarray,
    ) -> float:
        """
        Portfolio expected variance.

        Args:
            weight (np.ndarray): weight of assets.
            covariance_matrix (np.ndarray): covariance matrix of assets.

        Returns:
            float: portfolio expected variance.
        """
        return np.linalg.multi_dot((weights, covariance_matrix, weights))

    def expected_volatility(
        self, weights: np.ndarray, covariance_matrix: np.ndarray
    ) -> float:
        """
        Portfolio expected volatility.

        Args:
            weight (np.ndarray): weight of assets.
            covariance_matrix (np.ndarray): covariance matrix of assets.

        Returns:
            float: portfolio expected volatility.
        """
        return np.sqrt(
            self.expected_variance(weights=weights, covariance_matrix=covariance_matrix)
        )

    def expected_sharpe(
        self,
        weights: np.ndarray,
        expected_returns: np.ndarray,
        covariance_matrix: np.ndarray,
        risk_free: float = 0.0,
    ) -> float:
        """
        Portfolio expected sharpe ratio.

        Args:
            weight (np.ndarray): weight of assets.
            expected_returns (np.ndarray): expected return of assets.
            covariance_matrix (np.ndarray): covariance matrix of assets.

        Returns:
            float: portfolio expected sharpe ratio.
        """
        ret = self.expected_return(weights=weights, expected_returns=expected_returns)
        std = self.expected_volatility(
            weights=weights, covariance_matrix=covariance_matrix
        )
        return (ret - risk_free) / std

    @staticmethod
    def l1_norm(vals: np.ndarray, gamma: float = 1) -> float:
        """_summary_

        Args:
            vals (np.ndarray): _description_
            gamma (float, optional): _description_. Defaults to 1.

        Returns:
            float: _description_
        """
        return np.abs(vals).sum() * gamma

    @staticmethod
    def l2_norm(vals: np.ndarray, gamma: float = 1) -> float:
        """
        L2 regularization.

        Args:
            weight (np.ndarray): asset weight in the portfolio.
            gamma (float, optional): L2 regularisation parameter. Defaults to 1.
                Increase if you want more non-negligible weight.

        Returns:
            float: L2 regularization.
        """
        return np.sum(np.square(vals)) * gamma

    @staticmethod
    def exante_tracking_error(
        weights: np.ndarray, weights_bm: np.ndarray, covariance_matrix: np.ndarray
    ) -> float:
        """
        Calculate the ex-ante tracking error.

        Maths:
            formula here.

        Args:
            weight (np.ndarray): asset weight in the portfolio.
            weight_benchmark (np.ndarray): benchmarket weight of the portfolio.
            covaraince_matrix (np.ndarray): asset covariance matrix.

        Returns:
            float: ex-ante tracking error.
        """
        rel_weight = np.subtract(weights, weights_bm)
        tracking_variance = np.dot(np.dot(rel_weight, covariance_matrix), rel_weight)
        tracking_error = np.sqrt(tracking_variance)
        return tracking_error

    @staticmethod
    def expost_tracking_error(
        weights: np.ndarray,
        pri_returns_assets: np.ndarray,
        pri_returns_bm: np.ndarray,
    ) -> float:
        """_summary_

        Args:
            weights (np.ndarray): _description_
            pri_returns_assets (np.ndarray): _description_
            pri_returns_benchmark (np.ndarray): _description_

        Returns:
            float: _description_
        """
        rel_return = np.dot(pri_returns_assets, weights) - pri_returns_bm
        mean = np.sum(rel_return) / len(rel_return)
        return np.sum(np.square(rel_return - mean))

    def risk_contributions(
        self,
        weights: np.ndarray,
        covariance_matrix: np.ndarray,
        sub_covariance_matrix_idx: Optional[List] = None,
    ) -> np.ndarray:
        """risk contributions"""
        volatility = self.expected_volatility(
            weights=weights, covariance_matrix=covariance_matrix
        )
        if sub_covariance_matrix_idx:
            sub_covariance_matrix = covariance_matrix.copy()
            for i, row in enumerate(covariance_matrix):
                for j, val in enumerate(row):
                    if (
                        i not in sub_covariance_matrix_idx
                        and j not in sub_covariance_matrix_idx
                    ):
                        sub_covariance_matrix[i, j] = 0
            return np.dot(sub_covariance_matrix, weights) * weights / volatility

        return np.dot(covariance_matrix, weights) * weights / volatility


class Optimizer:
    """portfolio optimizer"""

    def __init__(
        self,
        expected_returns: Optional[pd.Series] = None,
        covariance_matrix: Optional[pd.DataFrame] = None,
        risk_free: float = 0.0,
        prices_assets: Optional[pd.DataFrame] = None,
        prices_bm: Optional[pd.Series] = None,
        weights_bm: Optional[pd.Series] = None,
        min_weight: float = 0.0,
        max_weight: float = 1.0,
        sum_weight: float = 1.0,
        min_return: Optional[float] = None,
        max_return: Optional[float] = None,
        min_volatility: Optional[float] = None,
        max_volatility: Optional[float] = None,
        active_weight: Optional[float] = None,
        exante_tracking_error: Optional[float] = None,
        expost_tracking_error: Optional[float] = None,
    ) -> None:
        """initialization"""

        if expected_returns is not None:
            self.expected_returns = expected_returns
            self.assets = self.expected_returns.index

        if covariance_matrix is not None:
            self.covariance_matrix = covariance_matrix

        self.expected_returns = expected_returns
        self.covariance_matrix = covariance_matrix
        self.prices_assets = prices_assets
        self.risk_free = risk_free
        self.prices_bm = prices_bm
        self.weights_bm = weights_bm
        self.constraints: List = []
        self.metrics: OptMetrics = OptMetrics()

        self.set_min_weight(min_weight=min_weight)
        self.set_max_weight(max_weight=max_weight)
        self.set_sum_weight(sum_weight=sum_weight)

        if min_return:
            self.set_min_return(min_return=min_return)

        if max_return:
            self.set_max_return(max_return=max_return)

        if min_volatility:
            self.set_min_volatility(min_volatility=min_volatility)

        if max_volatility:
            self.set_max_volatility(max_volatility=max_volatility)

        if active_weight:
            self.set_max_active_weight(active_weight=active_weight)

        if exante_tracking_error:
            self.set_exante_tracking_error(exante_tracking_error=exante_tracking_error)

        if expost_tracking_error:
            self.set_max_expost_tracking_error(
                expost_tracking_error=expost_tracking_error
            )

    @property
    def expected_returns(self) -> pd.Series:
        """expected_returns"""
        return self._expected_returns

    @expected_returns.setter
    def expected_returns(self, expected_returns: Optional[pd.Series] = None) -> None:
        self._expected_returns = expected_returns
        if expected_returns is not None:
            self.assets = self.expected_returns.index

    @property
    def covariance_matrix(self) -> pd.DataFrame:
        """covariance_matrix"""
        return self._covariance_matrix

    @covariance_matrix.setter
    def covariance_matrix(
        self, covariance_matrix: Optional[pd.DataFrame] = None
    ) -> None:
        self._covariance_matrix = covariance_matrix
        if covariance_matrix is not None:
            self.assets = self.covariance_matrix.index
            self.assets = self.covariance_matrix.columns

    @property
    def prices_assets(self) -> pd.DataFrame:
        """prices_assets"""
        return self._prices_assets

    @prices_assets.setter
    def prices_assets(self, prices_assets: Optional[pd.DataFrame] = None) -> None:
        self._prices_assets = prices_assets
        if prices_assets is not None:
            self.assets = self.prices_assets.columns

    @property
    def assets(self) -> pd.Index:
        """assets"""
        try:
            return self._assets
        except AttributeError:
            return None

    @assets.setter
    def assets(self, assets: pd.Index) -> None:
        """assets setter"""

        if self.assets is not None:
            assert self.assets.equals(assets)
            return
        self._assets = assets

    @property
    def num_asset(self) -> int:
        """return number of asset"""
        return len(self.assets)

    def set_min_weight(self, min_weight: float = 0.0) -> None:
        """set minimum weights constraint"""
        self.constraints.append(
            {
                "type": "ineq",
                "fun": lambda w: w - min_weight,
            }
        )

    def set_max_weight(self, max_weight: float = 1.0) -> None:
        """set maximum weights constraint"""
        self.constraints.append(
            {
                "type": "ineq",
                "fun": lambda w: max_weight - w,
            }
        )

    def set_sum_weight(self, sum_weight: float = 1.0) -> None:
        """set summation weights constriant"""
        self.constraints.append(
            {
                "type": "eq",
                "fun": lambda w: np.sum(w) - sum_weight,
            }
        )

    def set_min_return(self, min_return: float = 0.05) -> None:
        """set minimum return constraint"""
        if self.expected_returns is None:
            warnings.warn("unable to set minimum return constraint.")
            warnings.warn("expected returns is null.")
        self.constraints.append(
            {
                "type": "ineq",
                "fun": lambda w: self.metrics.expected_return(
                    weights=w, expected_returns=self.expected_returns.values
                )
                - min_return,
            }
        )

    def set_max_return(self, max_return: float = 0.05) -> None:
        """set maximum return constraint"""
        if self.expected_returns is None:
            warnings.warn("unable to set maximum return constraint.")
            warnings.warn("expected returns is null.")
        self.constraints.append(
            {
                "type": "ineq",
                "fun": lambda w: max_return
                - self.metrics.expected_return(
                    weights=w, expected_returns=self.expected_returns.values
                ),
            }
        )

    def set_min_volatility(self, min_volatility: float = 0.05) -> None:
        """set minimum volatility constraint"""
        if self.expected_returns is None:
            warnings.warn("unable to set minimum volatility constraint.")
            warnings.warn("expected returns is null.")
        self.constraints.append(
            {
                "type": "ineq",
                "fun": lambda w: self.metrics.expected_volatility(
                    weights=w, covariance_matrix=self.covariance_matrix.values
                )
                - min_volatility,
            }
        )

    def set_max_volatility(self, max_volatility: float = 0.05) -> None:
        """set maximum volatility constraint"""
        if self.expected_returns is None:
            warnings.warn("unable to set maximum volatility constraint.")
            warnings.warn("expected returns is null.")
        self.constraints.append(
            {
                "type": "ineq",
                "fun": lambda w: max_volatility
                - self.metrics.expected_volatility(
                    weights=w, covariance_matrix=self.covariance_matrix.values
                ),
            }
        )

    def set_max_active_weight(self, active_weight: float = 0.10) -> None:
        """set maximum active weight against benchmark"""
        if self.weights_bm is None:
            warnings.warn("unable to set maximum active weight constraint.")
            warnings.warn("benchmark weights is null.")
        self.constraints.append(
            {
                "type": "ineq",
                "fun": lambda w: active_weight
                - np.sum(np.abs(w - self.weights_bm.values)),
            }
        )

    def set_max_exante_tracking_error(
        self, max_exante_tracking_error: float = 0.02
    ) -> None:
        """set maximum exante tracking error constraint"""

        self.constraints.append(
            {
                "type": "ineq",
                "fun": lambda w: max_exante_tracking_error
                - self.metrics.exante_tracking_error(
                    weights=w,
                    weights_bm=self.weights_bm.values,
                    covariance_matrix=self.covariance_matrix.values,
                ),
            }
        )

    def set_max_expost_tracking_error(
        self, max_expost_tracking_error: float = 0.02
    ) -> None:
        """set maximum expost tracking error constraint"""

        itx = self.prices_assets.dropna().index.intersection(
            self.prices_bm.dropna().index
        )

        pri_returns_assets = self.prices_assets.loc[itx].pct_change().fillna(0)
        pri_returns_bm = self.prices_bm.loc[itx].pct_change().fillna(0)

        self.constraints.append(
            {
                "type": "ineq",
                "fun": lambda w: max_expost_tracking_error
                - self.metrics.expost_tracking_error(
                    weights=w,
                    pri_returns_assets=pri_returns_assets.values,
                    pri_returns_bm=pri_returns_bm.values,
                ),
            }
        )

    def solve(
        self, objective: Callable, extra_constraints: Optional[List[Dict]] = None
    ) -> Optional[pd.Series]:
        constraints = self.constraints.copy()
        if extra_constraints:
            constraints.extend(extra_constraints)
        problem = minimize(
            fun=objective,
            method="SLSQP",
            constraints=constraints,
            x0=np.ones(shape=self.num_asset) / self.num_asset,
        )

        if problem.success:
            return pd.Series(data=problem.x, index=self.assets, name="weights").round(6)
        return None

    def maximized_return(self) -> Optional[pd.Series]:
        """calculate max return weights"""
        return self.solve(
            objective=partial(
                self.metrics.expected_return,
                expected_returns=self.expected_returns.values * -1,
            )
        )

    def minimized_volatility(self) -> Optional[pd.Series]:
        """_summary_

        Returns:
            Optional[pd.Series]: _description_
        """
        return self.solve(
            objective=partial(
                self.metrics.expected_volatility,
                covariance_matrix=self.covariance_matrix.values,
            )
        )

    def maximized_sharpe_ratio(self) -> Optional[pd.Series]:
        """_summary_

        Returns:
            Optional[pd.Series]: _description_
        """
        return self.solve(
            objective=partial(
                self.metrics.expected_sharpe,
                expected_returns=self.expected_returns.values,
                covariance_matrix=self.covariance_matrix.values,
                risk_free=self.risk_free,
            )
        )

    def hierarchical_risk_parity(self) -> Optional[pd.Series]:
        """calculate herc weights"""
        corr = cov_to_corr(self.covariance_matrix.values)
        dist = np.sqrt((1 - corr).round(5) / 2)
        clusters = linkage(squareform(dist), method="single")
        cluster_sets = [
            (
                get_cluster_assets(clusters, cluster[0], self.num_asset),
                get_cluster_assets(clusters, cluster[1], self.num_asset),
            )
            for cluster in clusters
        ]
        

        return self.solve(
            objective=lambda w: self.metrics.l2_norm(
                np.array(
                    [
                        np.sum(
                            self.metrics.risk_contributions(
                                weights=w,
                                covariance_matrix=covariance_matrix.values,
                                sub_covariance_matrix_idx=left_idx,
                            )
                        )
                        - np.sum(
                            self.metrics.risk_contributions(
                                weights=w,
                                covariance_matrix=covariance_matrix.values,
                                sub_covariance_matrix_idx=right_idx,
                            )
                        )
                        for left_idx, right_idx in cluster_sets
                    ]
                )
            )
        )

    def risk_parity(self, budgets: Optional[np.ndarray] = None) -> Optional[pd.Series]:
        """_summary_

        Returns:
            Optional[pd.Series]: _description_
        """
        if budgets is None:
            budgets = np.ones(self.num_asset) / self.num_asset
        return self.solve(
            objective=lambda w: self.metrics.l2_norm(
                np.subtract(
                    self.metrics.risk_contributions(
                        weights=w, covariance_matrix=self.covariance_matrix.values
                    ),
                    np.multiply(
                        budgets,
                        self.metrics.expected_volatility(
                            weights=w, covariance_matrix=self.covariance_matrix.values
                        ),
                    ),
                )
            )
        )

    def inverse_variance(self) -> Optional[pd.Series]:
        """_summary_

        Returns:
            Optional[pd.Series]: _description_
        """
        inv_var_weights = 1 / np.diag(covariance_matrix)
        inv_var_weights /= inv_var_weights.sum()
        return self.solve(
            objective=lambda w: self.metrics.l2_norm(np.subtract(w, inv_var_weights))
        )


def cov_to_corr(covariance_matrix: np.ndarray) -> np.ndarray:
    """correlation matrix from covariance matrix"""
    vol = np.sqrt(np.diag(covariance_matrix))
    corr = np.divide(covariance_matrix, np.outer(vol, vol))
    corr[corr < -1], corr[corr > 1] = -1, 1
    return corr


def get_cluster_assets(clusters, node, num_assets) -> List:
    if node < num_assets:
        return [int(node)]
    row = clusters[int(node - num_assets)]
    return get_cluster_assets(clusters, row[0], num_assets) + get_cluster_assets(
        clusters, row[1], num_assets
    )

import yfinance as yf

prices = yf.download("SPY, QQQ, IWV, MTUM, QUAL, VLUE, IVV")["Adj Close"].dropna()
expected_returns = prices.pct_change().fillna(0).mean() * 252
covariance_matrix = prices.pct_change().fillna(0).cov() * (252)

optimizer = Optimizer(
    expected_returns=expected_returns,
    covariance_matrix=covariance_matrix,
)
optimizer.hierarchical_risk_parity()

In [None]:
import riskfolio as rf

port = rf.Portfolio(returns=prices.pct_change().fillna(0).dropna())
port.optimization(model="HERC")

In [72]:
def quasi_diagnalization(clusters, num_assets, curr_index):
    """Rearrange the assets to reorder them according to hierarchical tree clustering order"""

    if curr_index < num_assets:
        return [curr_index]

    left = int(clusters[curr_index - num_assets, 0])
    right = int(clusters[curr_index - num_assets, 1])

    return quasi_diagnalization(clusters, num_assets, left) + quasi_diagnalization(
        clusters, num_assets, right
    )

from scipy.cluster.hierarchy import to_tree, linkage, dendrogram

prices = yf.download("SPY, QQQ, IWV, MTUM, QUAL, VLUE, IVV")["Adj Close"].dropna()
expected_returns = prices.pct_change().fillna(0).mean() * 252
covariance_matrix = prices.pct_change().fillna(0).cov() * (252)
corr = cov_to_corr(covariance_matrix.values)
dist = np.sqrt((1 - corr).round(5) / 2)
clusters = linkage(squareform(dist), method="single")
sorted_index = list(to_tree(clusters, rd=False).pre_order())
# clustered_assets = [[self.assets[x] for x in sorted_index]]
# print(sorted_index)
# dendrogram(clusters)


[*********************100%***********************]  7 of 7 completed


In [75]:
def bisect(sorted_index):
    
    if len(sorted_index) < 3:
        return None
    
    num = len(sorted_index)
    bis = int(num/2)
    
    left = sorted_index[0: bis]
    right = sorted_index[bis:]
    return [(left, right), bisect(left), bisect(right)]
print(bisect(sorted_index))

[([2, 6, 3], [4, 1, 0, 5]), [([2], [6, 3]), None, None], [([4, 1], [0, 5]), None, None]]


In [None]:
sorted_index
def divide_chunks(l, n):
     
    # looping till length l
    for i in range(0, len(l), n):
        yield l[i:i + n]


for i in range(len(sorted_index) - 2, 0, -1):
    
    print(i)
    
    print(list(divide_chunks(sorted_index, i)))
    # print(list(divide_chunks(sorted_index, i)))

In [None]:
def inverse_variance_weights(covariance_matrix: np.ndarray) -> np.ndarray:
    """calculate weights of inverse variance. (tot weights = 100%)

    Args:
        covariance_matrix (np.ndarray): _description_

    Returns:
        np.ndarray: weights of
    """
    inv_var_weights = 1 / np.diag(covariance_matrix)
    inv_var_weights /= inv_var_weights.sum()
    return inv_var_weights

inverse_variance_weights(covariance_matrix.values)

In [None]:
import pandas as pd

meta = pd.read_excel("database.xlsx", sheet_name="tb_meta")

import yfinance as yf

# for id, m in meta.iterrows():
#     tic = yf.Ticker(m.ticker)
#     ff = tic.isin
#     print(ff)

for id, row in meta.iterrows():
    print(row.ticker)
    if row.__ticker.endswith(".KS"):
        continue

    t = yf.Ticker(row.ticker)

    hist = t.history(period="1y")
    try:
        splits = hist["Stock Splits"]

        test = (splits != 0).any()
        if test:
            print("Has splits")
            print(row.ticker)
    except:
        continue


In [None]:
import yfinance as yf

t = yf.Ticker("HYMB")
hist = t.history(period="1y")
splits = hist["Stock Splits"]
splits = splits[splits != 0]
splits


In [None]:
""" base strategy class """

import warnings
from typing import Any, List, Optional, Dict
import numpy as np
import pandas as pd


class Account:
    """strategy account"""

    def __init__(self):
        self.date: List = []
        self.value: List = []
        self.weights: List[Dict] = []
        self.reb_weights: List[Dict] = []
        self.trade_weights: List[Dict] = []


class BaseStrategy:
    """
    BaseStrategy class is an algorithmic trading strategy that sequentially allocates capital among
    group of assets based on pre-defined allocatioin method.

    BaseStrategy shall be the parent class for all investment strategies with period-wise
    rebalancing scheme.

    Using this class requires following pre-defined methods:
    1. rebalance(self, price_asset, date, **kwargs):
        the method shall be in charge of calculating new weights based on prices
        that are avaiable.
    2. monitor (self, ...):
        the method shall be in charge of monitoring the strategy status, and if
        necessary call the rebalance method to re-calculate weights. aka irregular rebalancing.
    """

    def __init__(
        self,
        price_asset: pd.DataFrame,
        frequency: str = "M",
        min_assets: int = 2,
        min_periods: int = 2,
        investment: float = 1000.0,
        commission: int = 0,
        currency: str = "KRW",
        name: str = "strategy",
        account: Optional[Account] = None,
    ) -> None:
        """Initialization"""
        self.price_asset: pd.DataFrame = self.check_price_df(price_asset)
        self.frequency: str = frequency
        self.min_assets: int = min_assets
        self.min_periods: int = min_periods
        self.name: str = name
        self.commission = commission
        self.currency = currency

        # account information
        self.idx: int = 0
        self.date: Any = None
        self.value: float = investment
        self.weights: pd.Series = pd.Series(dtype=float)
        self.reb_weights: pd.Series = pd.Series(dtype=float)
        self.trade_weights: pd.Series = pd.Series(dtype=float)
        self.account: Account = account or Account()

    ################################################################################################
    @property
    def value_df(self):
        """values dataframe"""
        return pd.DataFrame(
            data=self.account.value, index=self.account.date, columns=["value"]
        )

    @property
    def weights_df(self):
        """weights dataframe"""
        return pd.DataFrame(data=self.account.weights, index=self.account.date)

    @property
    def reb_weights_df(self):
        """weights dataframe"""
        return pd.DataFrame(data=self.account.reb_weights, index=self.account.date)

    @property
    def trade_weights_df(self):
        """weights dataframe"""
        return pd.DataFrame(data=self.account.trade_weights, index=self.account.date)

    ################################################################################################

    def update_book(self) -> None:
        """update the account value based on the current date"""
        prices = self.price_asset.loc[self.date]
        if not self.weights.empty:
            print(f"update book values")
            pre_prices = self.price_asset.iloc[
                self.price_asset.index.get_loc(self.date) - 1
            ]
            capitals = self.weights * self.value
            pri_return = prices / pre_prices
            new_capitals = capitals * pri_return.loc[capitals.index]
            profit_loss = new_capitals - capitals
            self.value += profit_loss.sum()
            self.weights = new_capitals / self.value

        if not self.reb_weights.empty:
            print(f"make re-allocation {self.reb_weights.to_dict()}")
            # reindex to contain the same asset.
            union_assets = self.reb_weights.index.union(self.weights.index)
            self.weights = self.weights.reindex(union_assets, fill_value=0)
            self.reb_weights = self.reb_weights.reindex(union_assets, fill_value=0)
            self.trade_weights = self.reb_weights.subtract(self.weights)
            trade_capitals = self.value * self.trade_weights
            trade_costs = trade_capitals.abs() * self.commission / 10_000
            trade_cost = trade_costs.sum()
            # update the account metrics.
            self.value -= trade_cost
            self.weights = self.reb_weights

        # do nothing if no account data.
        if self.weights.empty and self.reb_weights.empty:
            return
        # loop through all variables in account history
        for name in vars(self.account).keys():
            getattr(self.account, name).append(getattr(self, name))
        # clear the rebalancing weights.
        self.reb_weights = pd.Series(dtype=float)
        self.trade_weights = pd.Series(dtype=float)

    ################################################################################################

    def simulate(self, start: ... = None, end: ... = None) -> ...:
        """simulate historical strategy perfromance"""
        start = start or self.price_asset.index[0]
        end = end or self.price_asset.index[-1]
        reb_dates = pd.date_range(start=start, end=end, freq=self.frequency)
        for self.date in pd.date_range(start=start, end=end, freq="D"):
            print(self.date)
            if self.date in self.price_asset.index:
                self.update_book()
            if self.weights.empty or self.monitor() or self.date in reb_dates:
                if not self.reb_weights.empty:
                    continue
                self.reb_weights = self.allocate(self.date)
                print(f"rebalancing weights: {self.reb_weights.to_dict()}")
        return self

    def allocate(self, date: ... = None) -> pd.Series:
        """allocate weights based on date if date not provided use latest"""
        # pylint: disable=multiple-statements
        date = date or self.price_asset.index[-1]
        price_slice = self.price_asset.loc[:date].dropna(
            thresh=self.min_periods, axis=1
        )
        if price_slice.empty:
            return pd.Series(dtype=float)
        reb_weights = self.rebalance(price_asset=price_slice)
        if reb_weights is None:
            return pd.Series(dtype=float)
        return self.clean_weights(reb_weights, decimals=4)

    def rebalance(self, price_asset: pd.DataFrame) -> pd.Series:
        """Default rebalancing method"""
        asset = price_asset.columns
        uniform_weight = np.ones(len(asset))
        uniform_weight /= uniform_weight.sum()
        weight = pd.Series(index=asset, data=uniform_weight)
        return weight

    def monitor(self) -> bool:
        """Default monitoring method."""
        return False

    ################################################################################################
    @staticmethod
    def check_price_df(price_df: pd.DataFrame) -> pd.DataFrame:
        """Check the price_df.

        Args:
            price_df (pd.DataFrame): _description_

        Raises:
            TypeError: if price_df is not pd.DataFrame.

        Returns:
            pd.DataFrame: price_df
        """
        if not isinstance(price_df, pd.DataFrame):
            raise TypeError("price_df must be a pd.DataFrame.")
        if not isinstance(price_df.index, pd.DatetimeIndex):
            warnings.warn("converting price_df's index to pd.DatetimeIndex.")
            price_df.index = pd.to_datetime(price_df.index)
        return price_df

    @staticmethod
    def clean_weights(weights: pd.Series, decimals: int = 4) -> pd.Series:
        """Clean weights based on the number decimals and maintain the total of weights.

        Args:
            weights (pd.Series): asset weights.
            decimals (int, optional): number of decimals to be rounded for
                weight. Defaults to 4.

        Returns:
            pd.Series: clean asset weights.
        """
        # clip weight values by minimum and maximum.
        tot_weight = weights.sum().round(4)
        weights = weights.round(decimals=decimals)
        # repeat round and weight calculation.
        for _ in range(10):
            weights = weights / weights.sum() * tot_weight
            weights = weights.round(decimals=decimals)
            if weights.sum() == tot_weight:
                return weights
        # if residual remains after repeated rounding.
        # allocate the the residual weight on the max weight.
        residual = tot_weight - weights.sum()
        # !!! Error may occur when there are two max weights???
        weights[np.argmax(weights)] += np.round(residual, decimals=decimals)
        return weights


####################################################################################################
def backtest(allocation_df: pd.DataFrame, price_df: pd.DataFrame) -> BaseStrategy:
    """provide a backtested strategy.

    Args:
        allocation_df (pd.DataFrame): allocation dataframe.
        price_df (pd.DataFrame): asset price dataframe.

    Returns:
        BaseStrategy: backtested strategy.
    """

    class Backtest(BaseStrategy):
        """backtest class"""

        def rebalance(self, price_asset: pd.DataFrame) -> pd.Series:
            if self.date in allocation_df.index:
                weights = allocation_df.loc[self.date].dropna()
                return weights[weights != 0]
            return pd.Series(dtype=float)

    strategy = Backtest(price_asset=price_df, frequency="D").simulate(
        start=allocation_df.index[0]
    )

    return strategy


path = r"C:\Users\vip\OneDrive\DWS\ABL_RESULT\MLP_ALLOCATION.xlsx"

import pandas_datareader as pdr

allocation = pd.read_excel(path, "allocation", index_col=[1, 2, 0], parse_dates=True)
allocation = allocation.unstack().unstack()["weights"]

stra = "US_5"

allo = allocation[stra].dropna(how="all", axis=1)

tickers = ", ".join(allo.columns.tolist())
tickers = tickers.replace(".KS", "")
tickers = (
    tickers
    if isinstance(tickers, (list, set, tuple))
    else tickers.replace(",", " ").upper().split()
)

prices = []
if stra.startswith("KR"):
    for ticker in tickers:
        price = pdr.DataReader(ticker, "naver", start="1990-1-1").astype(float)["Close"]
        price.name = ticker + ".KS"
        prices.append(price)

    prices = pd.concat(prices, axis=1)

    prices.index = pd.to_datetime(prices.index)
if stra.startswith("US"):
    import yfinance as yf

    prices = yf.download(tickers)["Adj Close"]

result = backtest(allocation_df=allo, price_df=prices)

with pd.ExcelWriter(f"{stra}.xlsx", engine="openpyxl") as writer:
    # Save each dataframe to a separate sheet in the Excel file
    value = result.value_df
    value.index.name = "date"
    weights = result.weights_df
    weights.index.name = "date"
    al = result.reb_weights_df.dropna(how="all")
    al.index.name = "date"

    value.to_excel(writer, sheet_name="value")
    weights.to_excel(writer, sheet_name="weights")
    al.to_excel(writer, sheet_name="allocations")

value.plot()
