In [None]:
import glob
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy.optimize import minimize
from scipy.optimize import linprog
from plotly.subplots import make_subplots
from plotly.offline import init_notebook_mode
import plotly.io as pio
import plotly.figure_factory as ff
import plotly.graph_objects as go

# Assuming InitialBalance and Market are defined elsewhere
InitialBalance = 1e+6
Market = 'SP104'

class PortfolioOptimizer:
    """
    Class for optimizing and visualizing portfolio metrics.
    """

    def __init__(self, market, method, initial_balance):
        """
        Initializes the PortfolioOptimizer object.

        Parameters:
        - market (str): The market identifier.
        - method (str): The optimization method used.
        - initial_balance (float): The initial balance for portfolio optimization.
        """
        self.market = market
        self.method = method
        self.initial_balance = initial_balance
        self.names = self.load_stock_names()
        self.mean_return = None  # You need to define mean_return
        self.df_metrics = self.load_metrics()
        self.names_HHT = self.get_top_names()

    def load_stock_names(self):
        """
        Loads stock names from the market data.

        Returns:
        - numpy.ndarray: Array of stock names.
        """
        df_names = pd.read_csv(f'Data/{self.market}/{self.market}.csv', index_col=None)
        return df_names['Stock'].to_numpy()

    def load_metrics(self):
        """
        Loads portfolio metrics from the result data.

        Returns:
        - pandas.DataFrame: DataFrame containing portfolio metrics.
        """
        fname = f'Result/{self.market}/HHT/metrics.csv'
        df_metrics = pd.read_csv(fname, index_col=None)
        df_metrics.sort_values(by=['SR'], ascending=False, inplace=True)
        return df_metrics

    def get_top_names(self):
        """
        Retrieves the top stock names based on metrics.

        Returns:
        - numpy.ndarray: Array of top stock names.
        """
        return self.df_metrics.iloc[:10]['Stock'].to_numpy()

    def compute_return(self, dataframe):
        """
        Computes returns for the given dataframe.

        Parameters:
        - dataframe (pandas.DataFrame): DataFrame containing stock data.

        Returns:
        - pandas.DataFrame: DataFrame with additional columns for shares, capital, and balance.
        """
        stock = 0
        first_buy = dataframe.loc[dataframe.Signal == 1, :].head(1).index.values[0]

        signal_length = dataframe.shape[0]

        shares = [0] * signal_length
        balance = [0] * signal_length
        capital = [0] * signal_length

        for i in range(first_buy):
            capital[i] = self.initial_balance
            balance[i] = self.initial_balance

        for i in range(first_buy, signal_length):
            if dataframe.Signal[i] == 1 and dataframe.Signal[i - 1] == 0:
                shares[i] = balance[i - 1] // dataframe.Close[i]
                balance[i] = balance[i - 1] % dataframe.Close[i]
                capital[i] = shares[i] * dataframe.Close[i] + balance[i]
            elif dataframe.Signal[i] == 1 and dataframe.Signal[i - 1] == 1:
                shares[i] = shares[i - 1]
                capital[i] = capital[i - 1]
                balance[i] = balance[i - 1]
            elif dataframe.Signal[i] == 0 and dataframe.Signal[i - 1] == 1:
                balance[i] = balance[i - 1] + shares[i - 1] * dataframe.Close[i]
                capital[i] = balance[i]
                shares[i] = 0
            elif dataframe.Signal[i] == 0 and dataframe.Signal[i - 1] == 0:
                shares[i] = 0
                capital[i] = capital[i - 1]
                balance[i] = balance[i - 1]

        dataframe = dataframe.assign(Shares=pd.Series(shares), Capital=pd.Series(capital), Balance=pd.Series(balance))

        return dataframe

    def optimize_portfolio(self):
        """
        Optimizes the portfolio for given mean returns.

        Returns:
        - tuple: Tuple containing risks, returns, and optimized risks.
        """
        D = len(self.mean_return)
        bounds = [(-0.5, None)] * D

        # Minimize
        res = linprog(self.mean_return, A_eq=np.ones((1, D)), b_eq=np.ones(1), bounds=bounds)
        min_return = res.fun

        # Maximize
        res = linprog(-self.mean_return, A_eq=np.ones((1, D)), b_eq=np.ones(1), bounds=bounds)
        max_return = -res.fun

        N = 100
        target_returns = np.linspace(min_return, max_return, num=N)

        risks = np.zeros(N)
        returns = np.zeros(N)
        optimized_risks = []

        for i, target in enumerate(target_returns):
            constraints = [
                {
                    'type': 'eq',
                    'fun': lambda weights, t=target: weights.dot(self.mean_return) - t,
                },
                {
                    'type': 'eq',
                    'fun': lambda weights: weights.sum() - 1,
                }
            ]

            res = minimize(
                fun=lambda weights: weights.dot(np.diag(self.mean_return)),
                x0=np.ones(D) / D,  # uniform
                method='SLSQP',
                constraints=constraints,
                bounds=bounds,
            )

            optimized_risks.append(np.sqrt(res.fun))
            risks[i] = np.sqrt(res.fun)
            returns[i] = res.x.dot(self.mean_return)

        return risks, returns, optimized_risks

    def visualize_portfolio(self, risks, returns, optimized_risks):
        """
        Visualizes the portfolio using scatter plots.

        Parameters:
        - risks (numpy.ndarray): Array of portfolio risks.
        - returns (numpy.ndarray): Array of portfolio returns.
        - optimized_risks (list): List of optimized risks.

        Returns:
        - None
        """
        mv_risk, mv_ret, mv_weights = self.get_min_variance_portfolio()

        fig, ax = plt.subplots(figsize=(5, 3))
        plt.scatter(risks, returns, alpha=0.1)
        plt.plot(optimized_risks, target_returns, c='black')
        plt.scatter([mv_risk], [mv_ret], c='yellow')
        plt.savefig(fname='Result/portfolio_HHT-XGB.png', format='png')

        fig1 = go.Figure()
        fig1.add_trace(
            go.Scatter(
                mode='markers',
                x=risks,
                y=returns,
                opacity=0.5,
                marker=dict(
                    color='LightSkyBlue',
                    size=5,
                    line=dict(
                        color='MediumPurple',
                        width=2
                    )
                ),
                name='Random Portfolio'
            )
        )

        fig1.update_traces(marker=dict(color='rgba(31, 72, 223, .8)'))
        fig1.add_trace(go.Scatter(x=[mv_risk], y=[mv_ret], name='GMV Portfolio', mode='markers',
                                  marker_color='rgba(255, 255, 0, 0.6)', marker_symbol='circle', marker_size=15))
        fig1.add_trace(go.Scatter(x=optimized_risks, y=target_returns, name='Frontier', line=dict(color='rgba(0, 0, 0, 1)')))
        fig1.update_yaxes(gridcolor='rgb(233,233,233)')
        fig1.update_xaxes(gridcolor='rgb(233,233,233)')
        fig1.update_layout(yaxis_title='Daily Return', xaxis_title='Risk',
                           showlegend=True, paper_bgcolor='rgb(255,255,255)', plot_bgcolor='rgb(255,255,255)')

        fig1.update_layout(legend=dict(
            orientation="h",
            yanchor="bottom",
            y=1.02,
            xanchor="right",
            x=1
        ))
        config = {
            'toImageButtonOptions': {
                'format': 'jpeg',
                'filename': 'Portfo',
                'height': 450,
                'width': 800,
                'scale': 3
            }
        }
        fig1.show(config=config)

    def get_min_variance_portfolio(self):
        """
        Computes the minimum variance portfolio.

        Returns:
        - tuple: Tuple containing minimum variance risk, return, and weights.
        """
        D = len(self.mean_return)
        bounds = [(-0.5, None)] * D

        # Minimize
        res = minimize(
            fun=lambda weights: weights.dot(np.diag(self.mean_return)),
            x0=np.ones(D) / D,  # uniform
            method='SLSQP',
            constraints={
                'type': 'eq',
                'fun': lambda weights: weights.sum() - 1,
            },
            bounds=bounds,
        )

        mv_risk = np.sqrt(res.fun)
        mv_weights = res.x
        mv_ret = mv_weights.dot(self.mean_return)

        return mv_risk, mv_ret, mv_weights

    def optimize_sharpe_ratio(self):
        """
        Optimizes the portfolio for the Sharpe ratio.

        Returns:
        - tuple: Tuple containing the best Sharpe ratio and corresponding weights.
        """
        risk_free_rate = 0.24 / 252

        def neg_sharpe_ratio(weights):
            mean = weights.dot(self.mean_return)
            sd = np.sqrt(weights.dot(np.diag(self.cov)).dot(weights))
            return -((mean - risk_free_rate) / sd) * ((252) ** 0.5)

        res = minimize(
            fun=neg_sharpe_ratio,
            x0=np.ones(len(self.mean_return)) / len(self.mean_return),  # uniform
            method='SLSQP',
            constraints={
                'type': 'eq',
                'fun': lambda weights: weights.sum() - 1,
            },
            bounds=[(-0.5, None)] * len(self.mean_return),
        )

        best_sr, best_w = -res.fun, res.x
        best_w = np.round(best_w, 2)
        best_sr = np.round(best_sr, 4)
        print(best_w, best_sr)

        return best_sr, best_w


method = 'XGB'
portfolio_optimizer = PortfolioOptimizer(market=Market, method=method, initial_balance=InitialBalance)


returns = pd.DataFrame(index=df_AMD.index.unique().sort_values())
fname_prefix = 'Result/SP104/Trade/'
fname_suffix = '_trade_XGB.csv'
fname = fname_prefix + portfolio_optimizer.names[0] + fname_suffix
df_INDEX = pd.read_csv(fname, index_col='Date', parse_dates=['Date'])
for name in portfolio_optimizer.names:
    fname = fname_prefix + name + fname_suffix
    print(fname)
    df_trade = pd.read_csv(fname, index_col='Date', parse_dates=['Date'])
    df_tmp = pd.DataFrame(data=(df_trade['Return'].to_numpy() * 100), index=df_INDEX.index, columns=[name])
    returns = returns.join(df_tmp)

returns.dropna(inplace=True)
cov = returns.cov()
cov_np = cov.to_numpy()

portfolio_optimizer.mean_return = returns.mean()
portfolio_optimizer.cov = cov_np

risks, returns, optimized_risks = portfolio_optimizer.optimize_portfolio()
portfolio_optimizer.visualize_portfolio(risks, returns, optimized_risks)

best_sr, best_w = portfolio_optimizer.optimize_sharpe_ratio()


plt.scatter(risks, returns, alpha=0.1)
plt.plot(optimized_risks, target_returns, c='black')
plt.scatter([mv_risk], [mv_ret], c='yellow')
