In [1]:
import pandas as pd
import numpy as np
from datetime import datetime

## Data Preprocess

In [2]:
%%time
data = pd.read_csv("data.csv", low_memory=False)
data.columns = ["Code", "Company", "Industry", "Date", "Open", "High", "Low", "Close", "Volume", "MV", "PB"]
data.Date = data.Date.apply(lambda x : datetime.strptime(x, "%Y/%m/%d"))
data.head()

Wall time: 60 s


**MV** is Market Value(in million NTD), **PB** is Price to Book ratio.

In [3]:
#find which years included in data
data_year = data.Date.apply(lambda x : x.year).unique()

#find unique trading date in data
data_date = pd.DataFrame(data.Date.unique(), columns = ["Date"])

#find first trading date for each year, these dates will be used as entry points/exit points during backtesting
first_date_each_year = []
for year in data_year:
    temp = data_date[data_date.Date.apply(lambda x : x.year == year)].min()["Date"]
    first_date_each_year.append(temp)

## Backtest

In [57]:
def selected_company(data = data, num_selected = 10, by = "MV", ascending = True, least_volume = 10, trade_mode = "A"):
    
    data = data
    #empty dataframe to collect result
    selected_data = pd.DataFrame([])
    for i in range(len(first_date_each_year)):
        
        #entry point
        date_in = first_date_each_year[i]
        #exit point == a year after entry point (use the latest date of data when dealing with last year)
        try:
            date_out = first_date_each_year[i + 1]
        except:
            date_out = data_date.iloc[-1, :]["Date"]
            
        #deal with entry trade
        #selected companies need to possess trading data on appointed date
        temp_data_in = data[data.Volume >= least_volume]
        #also need to meet the requirement of least trading volume(in case of liquidity risk)
        temp_data_in = temp_data_in[temp_data_in.Date == date_in]
        temp_data_in = temp_data_in.sort_values(by = by, ascending = ascending).reset_index(drop = True)
        temp_data_in = temp_data_in.iloc[0:num_selected, :]
        
        #deal with exit trade
        temp_data_out = pd.DataFrame([])
        for code in temp_data_in.Code:
            temp_company = data[data.Code == code]
            temp_date_out = pd.DataFrame(temp_company.Date - date_out).Date.apply(lambda x : x.days)
            try:
                #find the closest exit point according to appointed point
                temp_date_out = temp_company.Date[temp_date_out[temp_date_out >= 0].idxmin()]
                temp_result_out = temp_company[temp_company.Date == temp_date_out]
                temp_result_out.columns = (data.columns + "_out")
                
                #if exit point's trade volume don't meet the requirement of least volume, then do:
                if temp_result_out["Volume_out"].values < least_volume:
                    #in trade mode "A, set exit price = 0(more realistic, because we maybe unable to sell at such a low liquidity)
                    if trade_mode == "A":
                        temp_result_out = pd.DataFrame([[0] * len(data.columns)], columns = (data.columns + "_out"))
                    #in trade mode "B", set exit price = entry price
                    if trade_mode == "B":
                        temp_result_out = temp_data_in[temp_data_in.Code == code]
                        temp_result_out.columns = (data.columns + "_out")
                    
            #if the company was unlisted, then for loop would jump to except, set exit price = 0
            except:
                temp_result_out = pd.DataFrame([[0] * len(data.columns)], columns = (data.columns + "_out"))

            temp_data_out = pd.concat([temp_data_out, temp_result_out], axis = 0)
            temp_data_out.reset_index(inplace = True, drop = True)
        
        temp_selected_data = pd.concat([temp_data_in,  temp_data_out], axis = 1)
        selected_data = pd.concat([selected_data, temp_selected_data], axis = 0)
        
    total_return = (sum(selected_data["Close_out"]) - sum(selected_data["Close"])) / sum(selected_data["Close"])
    return selected_data# total_return

In [58]:
%%time
test = selected_company(data=data, num_selected=10, by='MV', ascending=True, least_volume=10, trade_mode='A')

Wall time: 9.79 s


In [64]:
test_date = test.Date.unique()[0]

In [None]:
100000 / 

In [66]:
test[test.Date == test_date]["Close"]

0     1.11
1     0.66
2     4.30
3     3.17
4     2.83
5    89.46
6     3.49
7    11.28
8    25.95
9     9.91
Name: Close, dtype: float64

In [70]:
temp = data[data.Date == data.Date.unique()[0]]

In [71]:
len(temp)

10

In [75]:
temp.MV / temp.MV.sum()

0    0.046942
1    0.063300
2    0.064723
3    0.073257
4    0.089616
5    0.109531
6    0.124467
7    0.137269
8    0.145092
9    0.145804
Name: MV, dtype: float64

In [None]:
def calculate_return(data, weight_mode = "equal"):
    
    for date in data.Date.unique():
        temp = data[data.Date == date]
        
        if weight_mode == "equal":
            weight_array = [1/len(temp)] * 10
        if weight_mode == "MV":
            weight_array = temp.MV / temp.MV.sum()
            
        