Các thư viện cần thiết, trong đó có `yfinance` để lấy data

In [1]:
import numpy as np 
import pandas as pd
import datetime
import matplotlib.pyplot as plt
import matplotlib
import seaborn as sns
import yfinance as yf # lib để lấy data

import warnings
warnings.filterwarnings("ignore")

In [2]:

time_range = '5y' # khoảng thời gian làm backtest 
start_time = '2019-12-31' 
end_time = '2024-01-01'


In [3]:
# For S$P 500 stock

# tickers = pd.read_html( 'https://en.wikipedia.org/wiki/List_of_S%26P_500_companies')[0]

# # stock_list = tickers['Symbol'].to_list()
# print(tickers)
# stock_list = tickers['Symbol'][:50]

hàm lấy data, là cổ phiếu của 50 công ty trên sàn `EURO_STOXX_50` \
Kết quả trả về là 1 dataframe có dạng m dòng, 50 cột với m là time range 

In [None]:
def EU_Stock_data():
    """Lấy dữ liệu giá Close của 50 công ty trên sàn Euro_STOXX 50 vào thời gian cho trước"""

    stock_list = pd.read_html( 'https://en.wikipedia.org/wiki/EURO_STOXX_50')[4]['Ticker'][1:].to_list()
    futures = pd.DataFrame(columns= stock_list) # danh sách mã
    
    # đặt index 
    time_index = list(yf.Ticker(stock_list[0]).history(period = time_range,start = start_time, end = end_time).index) 

    # xét từng mã
    for symbol in stock_list:
        df = yf.Ticker(symbol).history(period = time_range, start = start_time, end = end_time)
        df = pd.DataFrame(df['Close']) # lấy giá close
        i = 0
        daily_return = []
        # tinh daily return, = 0 trong ngày đầu tiên 
        for k in df['Close']:
            if i != 0:
                daily_return.append(float((k-i)/i))
            else:
                daily_return.append(float(0))
            i = k
        try:
            futures[symbol] = daily_return
        except:
            while len(daily_return) < len(futures):
                daily_return.insert(0,np.nan)
            futures[symbol] = daily_return

    futures.index = time_index

    futures['Date'] = pd.to_datetime(futures.index, format='%Y-%m-%d')
    futures.set_index('Date', inplace=True)

    return futures

Hàm thực hiện tính toán để lấy về giá trị volatility (biến động) của mỗi ngày

In [5]:
def Volatility_scale(data, ignore_na=False, adjust = True, com = 60, min_periods=0):
    """Scale data using ex ante volatility"""

    # Lưu trữ index, tức thời gian 
    std_index = data.index

    # chứa kết quả
    daily_index = pd.DataFrame(index=std_index)

    # xét từng cổ phiếu
    for oo in data.columns:
        returns = data[oo]  # Lấy ra các return
        returns.dropna(inplace=True)  # xử lý null bằng zero

        # Tính cumulative (cum) return , nhưng ko có thành phần - 1
        ret_index = (1 + returns).cumprod()

        # Tính daily volatility (vol)
        day_vol = returns.ewm(ignore_na=ignore_na,
                              adjust=adjust,
                              com=com,
                              min_periods=min_periods).std(bias=False)
        
        vol = day_vol * np.sqrt(252)  # scale lại theo 252 ngày active trading

        # Join cum return và vol
        ret_index = pd.concat([ret_index, vol], axis=1)
        ret_index.columns = [oo, oo + '_Vol']  # Đặt tên cột cum return là tên cổ phiếu, bên cạnh là vol 

        # Join 
        daily_index = pd.concat([daily_index, ret_index], axis=1)

    return daily_index


Hàm implement chiến lược TSMOM, với logic cụ thể như sau:
Tại ngày t ta so  với ngày t - k về trước, cụ thể ta có thể lấy giá close,
 hoặc cumulative return (nhưng không có thành phần - 1, tức $\text{cum return}_t = \prod_{i = 0}^{t} (1 + r_i)$), 
ở đây xét `cum_return_t` với của k ngày trước
`cum_return_{t-k}`
  - Giả sử `cum_return_t` > `cum_return_{t-k}` tức `sign(cum_return_t - cum_return_{t - k}) = 1` (hàm dấu trả về 1 nếu input > 0)  thì ta có signal = 1, tức đó là tín hiệu để vào lệnh long vào ngày mai 
(ngày t + 1), ngược lại thì signal = -1, là tín hiệu vào short
  -  Sau đó hold trong h -1 ngày tiếp theo (ngày t + 1 vào long đã bắt đầu tính là hold). 
  - Trong các ngày này (tức t + i với i từ 1 đến h), đều có sinh ra Profit and Loss (PnL)  tính theo công thức:\
 ` 0.4/ vol_t * return_{t, t + i}` với `return_{t, t + i}` là return trong giai đoạn t đến t + i, tính tùy vào trường hợp long hay short:
      - nếu long, `return_{t, t + i}` = 1 - `cum_return_t / cum_return_{t + i}`
      - nếu short, `return_{t, t + i}` =  1 - `cum_return_{t + i} / cum_return_t` 
      
    và Leverage, là ` target_vol / vol_t`   (target_vol đang để là 0.4)
 
 Tóm lại, Các kết quả trả về lần lượt là: 
- profit and loss `pnl` 
- `leverage`
- `signal`

In [None]:
def backtest_strategy(data, k, h, vol_flag = 1, target_vol = 0.2, tolerance = 0,ignore_na = False, adjust = True, com = 60, min_periods = 0):
    """Replicate TSMOM strategy
    Input:
        - data là daily return sau khi lấy
        - k là lookback, h là holding period
        - target volatility 0.4...

    Return: 3 list
        - daily profit and loss PnL 
        - leverage 
        - signal 
    """
    pnl = pd.DataFrame(index=data.index)
    leverage = pd.DataFrame(index = data.index)
    signal = pd.DataFrame(index = data.index)

    # gọi hàm Volatility scale
    daily_index = Volatility_scale(data,ignore_na=ignore_na,
                          adjust=adjust,
                          com=com,   
                          min_periods = min_periods)

    company = data.columns

    # Volatility settings
    vol_flag = vol_flag    # Set flag to 1 for vol targeting
    if vol_flag == 1:
        target_vol = target_vol 
    else:
        target_vol = 'no target vol'
    

    for oo in company:
        flag_h = 0
        flag_k = k+1
        df = pd.concat([daily_index[oo], daily_index[oo+"_Vol"]], axis=1)
        df['rolling returns'] = df[oo].pct_change(k) # so sánh thay đổi ở ngày t với k ngày trước đó (tức t - k)
        df['pnl'] = 0. 
        df['leverage'] = 0.
        df['signal'] = 0.
        for x, v in enumerate(df['rolling returns']):
            if flag_h != 0:
                # Bỏ qua giai đoạn hold, tránh bị tính lặp lại
                flag_h = flag_h - 1
                continue
            # Bỏ qua thời gian cty chưa được lên sàn (nêu có)
            if df[oo].isnull().iloc[x] == False:
                # bỏ qua k ngày đầu vì chưa đủ k lookback
                if flag_k != 0:
                    flag_k = flag_k - 1
                    continue
            else: continue
            try:
                if df['rolling returns'].iloc[x-1] < tolerance:
                    for h_period in range(0,h):
                        # rolling return < 0, short rồi giữ trong h ngày, tính pnl, leverage///
                        df['signal'].iloc[x + h_period] = -1
                        if vol_flag == 1:
                            # 
                            df['pnl'].iloc[x + h_period] = (1 - df[oo].iloc[x + h_period] / df[oo].iloc[x - 1 + h_period]) * \
                                target_vol / df[oo+"_Vol"].iloc[x -1] 
                            df['leverage'].iloc[x + h_period] = target_vol / df[oo+"_Vol"].iloc[x -1]
                        else:
                            df['pnl'].iloc[x + h_period] = (1 - df[oo].iloc[x + h_period] / df[oo].iloc[x - 1 + h_period])
                            df['leverage'].iloc[x+h_period] = 1
                elif df['rolling returns'].iloc[x-1] > tolerance:
                    for h_period in range(0,h):
                        # rolling return > 0, long rồi giữ trong h ngày, tính pnl, leverage///
                        df['signal'].iloc[x + h_period] = 1
                        if vol_flag == 1:
                            df['pnl'].iloc[x + h_period] = (df[oo].iloc[x + h_period] / df[oo].iloc[x - 1 + h_period] - 1) * \
                                    target_vol / df[oo+"_Vol"].iloc[x - 1]
                            df['leverage'].iloc[x+h_period] = target_vol / df[oo+"_Vol"].iloc[x -1]
                        else:
                            df['pnl'].iloc[x + h_period] = (df[oo].iloc[x + h_period] / df[oo].iloc[x - 1 + h_period] - 1)
                            df['leverage'].iloc[x+h_period] = 1
            except:pass
            

            # Đặt flag holding là h - 1, để qua vòng for mới bỏ qua ngày hold, tránh bị tính lặp lại
            if df['rolling returns'].iloc[x-1] != tolerance: flag_h = h - 1



        leverage = pd.concat([leverage, df['leverage']], axis = 1)
        pnl = pd.concat([pnl, df['pnl']], axis=1)
        signal = pd.concat([signal, df['signal']], axis=1)

    pnl.columns = data.columns
    leverage.columns = data.columns
    signal.columns = data.columns


    return [pnl,leverage,signal]

Cuối cùng, ta lấy mean của 50 cổ phiếu để có `PnL` đại diện 

In [7]:
def strategy_daily_return(pnl):
    
    return pnl.mean(skipna = False, axis=1)

In [None]:
daily_return = EU_Stock_data()
daily_index = Volatility_scale(daily_return)

# print ra result là pnl, leverage, signal của hàm backtest_strategy(), với k = 3, h = 3, target volatility = 0.4
LOOKBACK = 3
HOLDING = 3
TARGET_VOL = 0.4
result = backtest_strategy(daily_return, k = LOOKBACK, h = HOLDING, target_vol = TARGET_VOL)

print(f'pnl với k = {LOOKBACK} , h = {HOLDING}, target volatility = {TARGET_VOL}:')
result[0]

print(f'leverage với k = {LOOKBACK} , h = {HOLDING}, target volatility = {TARGET_VOL}:')
result[1]

print(f'signal với k = {LOOKBACK} , h = {HOLDING}, target volatility = {TARGET_VOL}:')
result[2]