In [1]:
from statsmodels.regression.rolling import RollingOLS
import statsmodels.api as sm

from quantdev.data import Databank
from quantdev.backtest import *
from quantdev.analysis import *
# from quantdev.trade import PortfolioManager, Position, Portfolio

from typing import Literal
import pandas as pd
import numpy as np

In [2]:
db = Databank()
# db._update_stock_sector()
# db.update_databank()
# db._update_factor_model()

In [20]:
close = get_factor('收盤價')
universe = (get_factor('稅後淨利') >0) & (close>=close.rolling(60).mean())
roe = get_factor('roe', universe=universe)
pbr = get_factor('股價淨值比', universe=universe, asc=False)
mtm = get_factor('mtm_3m', universe=universe)
f = get_factor(roe+pbr+mtm)

In [23]:
roe = get_factor('roe')
pbr = get_factor('股價淨值比', asc=False)
mtm = get_factor('mtm_3m')
f = get_factor(roe+pbr+mtm, universe=universe)

In [28]:
universe = (get_factor('稅後淨利') >0)
f = get_factor(roe+pbr+mtm, universe=universe)
strategy = backtesting(f>=0.99, 'QR')

In [29]:
strategy.summary

Unnamed: 0,strategy,0050.TT
Annual return,29.16%,10.83%
Total return,17743.60%,733.53%
Max drawdown,-53.54%,-55.66%
Annual volatility,20.51%,19.20%
Sharpe ratio,1.4,0.56
Calmar ratio,0.54,0.19
beta,0.6,-


In [30]:
strategy.report

BokehModel(combine_events=True, render_bundle={'docs_json': {'df4205cb-529c-438c-b4b4-a8c979a14e17': {'version…

##### Brinson model

In [24]:
from quantdev.analysis import calc_portfolio_style, calc_brinson_model
portfolio_style = calc_portfolio_style(strategy.daily_return)
brinson_model = calc_brinson_model(strategy.portfolio_df, strategy.return_df, strategy.benchmark)

In [31]:
def plot_return_attribution(portfolio_style, brinson_model):
        warnings.filterwarnings('ignore', message="Series.__getitem__ treating keys as positions is deprecated.*", category=FutureWarning)
        fig = make_subplots(
            rows=2, cols=2,
            specs=[
                [{"secondary_y": True}, None],
                [{}, {"type": "scatterpolar"}],
            ],
            vertical_spacing=0.05,
            horizontal_spacing=0.1,
        )

        # style analysis
        betas = portfolio_style.drop(columns=['const']).round(2)
        alpha = (1+portfolio_style['const']).cumprod()-1

        colors = sample_colorscale('Spectral', len(betas.columns))
        for i, col in enumerate(betas.columns):
            fig.add_trace(
                go.Scatter(
                    x=betas.index, 
                    y=betas[col], 
                    name=col,
                    line=dict(color=colors[i]),
                    showlegend=False,
                ),
                secondary_y=True,
                row=1, col=1,
            )

            fig.add_annotation(
                x=betas.index[0],
                y=betas[col][0]+0.05, 
                xref="x", 
                yref="y", 
                text=col,  
                showarrow=False, 
                xanchor="left", 
                yanchor="middle", 
                font=dict(color="white", size=12),
                secondary_y=True,
                row=1, col=1)
        
        fig.add_trace(
            go.Scatter(
                x=alpha.index, 
                y=alpha.values, 
                name='Alpha',
                line=dict(color=PlotMaster().fig_param['colors']['Light Grey'], width=2),
                showlegend=False,
            ),
            secondary_y=False,
            row=1, col=1,
        )

        fig.add_annotation(
                x=alpha.index[0],
                y=alpha.values[0]+0.1, 
                xref="x", 
                yref="y", 
                text='Alpha (rhs)',  
                showarrow=False, 
                xanchor="left", 
                yanchor="middle", 
                font=dict(color="white", size=12),
                secondary_y=False,
                row=1, col=1)

        # Brinson model

        # trend
        bm_trend = brinson_model[['allocation_effect','selection_effect']]\
            .groupby('t_date')\
            .sum()\
            .assign(excess_return=lambda x: x.sum(axis=1))
        bm_trend = (1+bm_trend).cumprod()-1

        # Define colors for consistent legend
        allocation_color = PlotMaster().fig_param['colors']['Bright Magenta']
        selection_color = PlotMaster().fig_param['colors']['Light Blue']

        fig.add_trace(
            go.Scatter(
                x=bm_trend.index, 
                y=bm_trend['allocation_effect'].values, 
                name='Allocation Effect',
                line=dict(color=allocation_color, width=2),
                legendgroup='allocation',
                showlegend=True,
            ),
            row=2, col=1,
        )

        fig.add_trace(
            go.Scatter(
                x=bm_trend.index, 
                y=bm_trend['selection_effect'].values, 
                name='Selection Effect',
                line=dict(color=selection_color, width=2),
                legendgroup='selection',
                showlegend=True,
            ),
            row=2, col=1,
        )

        fig.add_trace(
            go.Scatter(
                x=bm_trend.index, 
                y=bm_trend['excess_return'].values, 
                name='Excess Return',
                line=dict(color=PlotMaster().fig_param['colors']['Dark Blue'], width=2),
                showlegend=True,
            ),
            row=2, col=1,
        )

        # Scatterpolar
        sector_mapping = {
            # 電子 (Electronics & Technology)
            '電子': '電子',
            '數位': '電子',
            '電器': '電子',
            '電機': '電子',
            
            # 金融 (Financial)
            '金融': '金融',
            '證券': '金融',
            
            # 傳產 (Traditional Industries)
            '化學': '傳產',
            '塑膠': '傳產',
            '建材': '傳產',
            '橡膠': '傳產',
            '水泥': '傳產',
            '玻璃': '傳產',
            '紡織': '傳產',
            '航運': '傳產',
            '汽車': '傳產',
            '油電': '傳產',
            '鋼鐵': '傳產',
            '農業': '傳產',
            
            # 消費 (Consumer)
            '居家': '消費',
            '百貨': '消費',
            '觀光': '消費',
            '貿易': '消費',
            '運動': '消費',
            '食品': '消費',
            '文化': '消費',
            
            # Others (map to 傳產 as default)
            '其它': '傳產'
        }

        bm_scatter = brinson_model\
            .reset_index()\
            .assign(sec=lambda x: x['sector'].map(sector_mapping))\
            [['allocation_effect','selection_effect', 'sec']]\
            .groupby('sec').apply(lambda x: (1 + x).cumprod() - 1).groupby('sec').last()

        fig.add_trace(go.Scatterpolar(
            r=bm_scatter['allocation_effect'],
            theta=bm_scatter.index,
            line=dict(color=allocation_color, width=2),
            name='Allocation Effect',
            mode='lines',
            fill='toself',
            legendgroup='allocation',
            showlegend=False,
        ),
        row=2, col=2,
        )

        fig.add_trace(go.Scatterpolar(
            r=bm_scatter['selection_effect'], 
            theta=bm_scatter.index,
            line=dict(color=selection_color, width=2),
            name='Selection Effect',
            mode='lines',
            fill='toself',
            legendgroup='selection',
            showlegend=False,
        ),
        row=2, col=2,
        )

        # adjust axes
        fig.update_yaxes(tickformat=".0%", row=1, col=1, secondary_y=False, showgrid=False)
        fig.update_yaxes(tickformat=".0%", row=2, col=1, showgrid=False)


        # position
        fig.update_xaxes(domain=[0.025, 0.975], row=1, col=1)
        fig.update_xaxes(domain=[0.025, 0.575], row=2, col=1)

        fig.update_yaxes(domain=[0.55, 0.975], row=1, col=1)
        fig.update_yaxes(domain=[0, 0.4], row=2, col=1)

        # titles
        fig.add_annotation(text=('Style Analysis'), x=0, y = 1, yshift=30, xref="x domain", yref="y domain", showarrow=False, row=1, col=1)
        fig.add_annotation(text=('BF Model - subperiod'), x=0, y = 1, yshift=30, xref="x domain", yref="y domain", showarrow=False, row=2, col=1)
        fig.add_annotation(text=('BF Model - subsector'), x=0, y = 1, yshift=30, xshift=400, xref="x domain", yref="y domain", showarrow=False, row=2, col=1)

        # layout
        fig.update_layout(
            legend = dict(x=0.05, y=0.4, xanchor='left', yanchor='top', bgcolor='rgba(0,0,0,0)', tracegroupgap=2.5),
            width = PlotMaster().fig_param['size']['w'],
            height = PlotMaster().fig_param['size']['h'],
            margin = PlotMaster().fig_param['margin'],
            template = PlotMaster().fig_param['template'],
            yaxis = dict(side='right'),
            yaxis2 = dict(side='left'),
            polar = dict(
                domain=dict(x=[0.6, 0.975], y=[0, 0.35]), # Adjust size and position of polar plot
                radialaxis_angle = 90,
            )
        )
        fig.update_polars(radialaxis_showline=False)

        return fig

plot_return_attribution(portfolio_style, brinson_model)

In [None]:
scatter_data['new_sector'] = scatter_data.index.map(sector_mapping)

# Then group by the new sectors
scatter_data = scatter_data.groupby('new_sector').sum()

In [21]:
sector_data = portfolio_sector
weight_data = portfolio_weight
sector_weight = sector_data\
        .merge(weight_data, on=['stock_id', 't_date'], how='left')\
        .groupby(['t_date', 'sector']).agg({'weight': 'sum'}).reset_index()\
        .assign(weight=lambda x: x['weight'] / x.groupby('t_date')['weight'].transform('sum'))\
        .set_index(['t_date', 'sector']).unstack()

##### other method brinson model

In [78]:
portfolio_df = strategy.portfolio_df
portfolio_df = portfolio_df[portfolio_df!=0].stack()

benchmark_sector = db.read_dataset('stock_sector',
                columns=['stock_id', 't_date', 'sector'],
                filters=[['is_tw50', '==', 'Y']])

portfolio_sector = db.read_dataset('stock_sector',
                columns=['stock_id', 't_date', 'sector'],
                filters=[['stock_id', 'in', portfolio_df.index.get_level_values(1).unique()]])

benckmark_weight = db.read_dataset('stock_trading_data',
                columns=['t_date', 'stock_id', '個股市值(元)'],
                filters=[['stock_id', 'in', benchmark_sector['stock_id'].unique()]])\
                .rename(columns={'個股市值(元)':'weight'})

portfolio_weight = portfolio_df\
    .reset_index()\
    .rename(columns={0:'weight', 'level_1':'stock_id'})

return_df = strategy.return_df.stack().reset_index().rename(columns={0:'rtn'})

# Process benchmark data
# Process benchmark data more efficiently by reducing operations
benchmark_data = (benchmark_sector
    .merge(benckmark_weight, on=['t_date', 'stock_id'])
    .merge(return_df, on=['t_date', 'stock_id'])
    .dropna()
    .assign(w_rtn=lambda x: x['weight'] * x['rtn'])
    .groupby(['t_date', 'sector'], as_index=False)
    .agg({'weight': 'sum', 'w_rtn': 'sum'})
)
benchmark_data['benchmark_w'] = benchmark_data['weight'] / benchmark_data.groupby('t_date')['weight'].transform('sum') 
benchmark_data['benchmark_r'] = benchmark_data['w_rtn'] / benchmark_data['weight']
benchmark_data = benchmark_data.set_index(['t_date', 'sector'])[['benchmark_w', 'benchmark_r']]

# Process portfolio data similarly
portfolio_data = (portfolio_sector
    .merge(portfolio_weight, on=['t_date', 'stock_id'])
    .merge(return_df, on=['t_date', 'stock_id'])
    .dropna()
    .assign(w_rtn=lambda x: x['weight'] * x['rtn'])
    .groupby(['t_date', 'sector'], as_index=False)
    .agg({'weight': 'sum', 'w_rtn': 'sum'})
)
portfolio_data['portfolio_w'] = portfolio_data['weight'] / portfolio_data.groupby('t_date')['weight'].transform('sum')
portfolio_data['portfolio_r'] = portfolio_data['w_rtn'] / portfolio_data['weight']
portfolio_data = portfolio_data.set_index(['t_date', 'sector'])[['portfolio_w', 'portfolio_r']]

# Combine the data
data = pd.concat([benchmark_data, portfolio_data], axis=1)\
    .fillna(0)\
    .merge(
        pd.DataFrame(strategy.return_df[strategy.benchmark[0]].rename('benchmark_total_r')),
        on='t_date',
        how='left'
    )

# BHB
brinson_model = data.assign(
    allocation_effect=lambda x: (x['portfolio_w'] - x['benchmark_w']) * (x['benchmark_r'] - x['benchmark_total_r']),
    selection_effect=lambda x: x['portfolio_w'] * (x['portfolio_r'] - x['benchmark_r']),
    # interaction_effect=lambda x: (x['portfolio_w'] - x['benchmark_w']) * (x['portfolio_r'] - x['benchmark_r']),
)\
    .groupby('t_date')\
    .sum()\
    [['allocation_effect', 'selection_effect']]\
    .assign(excess_return=lambda x: x.sum(axis=1))
# ((1+brinson_model).cumprod()-1).plot()

In [None]:
def calc_brinson_model(portfolio_df:pd.DataFrame, return_df:pd.DataFrame, benchmark:list[str]):
    from quantdev.data import Databank
    db = Databank()

    portfolio_df = portfolio_df[portfolio_df!=0].stack()

    benchmark_sector = db.read_dataset('stock_sector',
                    columns=['stock_id', 't_date', 'sector'],
                    filters=[['is_tw50', '==', 'Y']])

    portfolio_sector = db.read_dataset('stock_sector',
                    columns=['stock_id', 't_date', 'sector'],
                    filters=[['stock_id', 'in', portfolio_df.index.get_level_values(1).unique()]])

    benckmark_weight = db.read_dataset('stock_trading_data',
                    columns=['t_date', 'stock_id', '個股市值(元)'],
                    filters=[['stock_id', 'in', benchmark_sector['stock_id'].unique()]])\
                    .rename(columns={'個股市值(元)':'weight'})

    portfolio_weight = portfolio_df\
        .reset_index()\
        .rename(columns={0:'weight', 'level_1':'stock_id'})

    rtn = return_df.stack().reset_index().rename(columns={0:'rtn'})

    # Process benchmark data
    benchmark_data = (benchmark_sector
        .merge(benckmark_weight, on=['t_date', 'stock_id'])
        .merge(rtn, on=['t_date', 'stock_id'])
        .dropna()
        .assign(w_rtn=lambda x: x['weight'] * x['rtn'])
        .groupby(['t_date', 'sector'], as_index=False)
        .agg({'weight': 'sum', 'w_rtn': 'sum'})
    ).assign(
        benchmark_w=lambda x: x['weight'] / x.groupby('t_date')['weight'].transform('sum'),
        benchmark_r=lambda x: x['w_rtn'] / x['weight']
    ).set_index(['t_date', 'sector'])[['benchmark_w', 'benchmark_r']]

    # Process portfolio data similarly
    portfolio_data = (portfolio_sector
        .merge(portfolio_weight, on=['t_date', 'stock_id'])
        .merge(rtn, on=['t_date', 'stock_id'])
        .dropna()
        .assign(w_rtn=lambda x: x['weight'] * x['rtn'])
        .groupby(['t_date', 'sector'], as_index=False)
        .agg({'weight': 'sum', 'w_rtn': 'sum'})
    ).assign(
        portfolio_w=lambda x: x['weight'] / x.groupby('t_date')['weight'].transform('sum'),
        portfolio_r=lambda x: x['w_rtn'] / x['weight']
    ).set_index(['t_date', 'sector'])[['portfolio_w', 'portfolio_r']]

    # Combine the data
    data = pd.concat([benchmark_data, portfolio_data], axis=1)\
        .fillna(0)\
        .join(
            pd.DataFrame(return_df[benchmark[0]].rename('benchmark_R')),
            on='t_date',
            how='left'
        )
    
    # BF
    brinson_model = data.assign(
        allocation_effect=lambda x: (x['portfolio_w'] - x['benchmark_w']) * (x['benchmark_r'] - x['benchmark_R']),
        selection_effect=lambda x: x['portfolio_w'] * (x['portfolio_r'] - x['benchmark_r']),
    )\
    [['allocation_effect', 'selection_effect']]

    return brinson_model