In [1]:
import os
import tqdm
import pandas as pd
import numpy as np

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

import Set
import Utility
import SQLSentence

%load_ext autoreload
%autoreload 1
%aimport Set
%aimport Indicators

In [2]:
# Constants
TABLE_CODE = 'stock.stock_code'
TABLE_TYPE = 'stock.stock_type'
TABLE_CODE_TYPE = 'stock.stock_code_type'

PATH_CHECKPOINT = 'checkpoint'

In [3]:
# Build Basic Data
connect = Utility.connect_to_database()
main_data = Utility.GetAllData(connect)
list_stock_type = (SQLSentence.QuerySQL(TABLE_CODE_TYPE, connect, ['distinct stock_type_id']))['stock_type_id'].tolist()
df_code_name = SQLSentence.QuerySQL(TABLE_CODE, connect)
dict_code_name = dict(zip(df_code_name.code, df_code_name.name))
dict_type_name = ((SQLSentence.QuerySQL(TABLE_TYPE, connect)).drop(['id'], axis=1)).to_dict()['name']

del df_code_name

# Build Environment
device = 'cuda' if torch.cuda.is_available() else 'cpu'

if not os.path.exists(PATH_CHECKPOINT):
    os.makedirs(PATH_CHECKPOINT)

myseed = 42069  # set a random seed for reproducibility
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
np.random.seed(myseed)
torch.manual_seed(myseed)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(myseed)

In [4]:
# Dataset
class StockDataset(Dataset):
    def __init__(self, data, target, window_length_train):
        self.data = data
        self.target = target
        self.window_length_train = window_length_train

        self.data = self.data.drop(['code', 'date'], axis=1)

        self.data = Set.Feature(self.data)

        self.data = self.data.dropna().reset_index(drop=True)

        self.index_col_open = self.data.columns.get_loc("open")
        self.index_col_high = self.data.columns.get_loc("high")
        self.index_col_volume = self.data.columns.get_loc("volume") # Get Feature Start Index

        self.data = self.data.to_numpy()
        self.dim = (self.data.shape[1]-(self.index_col_volume+1))*self.window_length_train

    def __getitem__(self, index):
        data = torch.tensor(self.data[index:index+self.window_length_train, self.index_col_volume+1:], dtype=torch.float32).flatten()
        target = (self.data[index + self.window_length_train, self.index_col_high]  - \
                  self.data[index + self.window_length_train, self.index_col_open]) / \
                  self.data[index + self.window_length_train, self.index_col_open]
        if self.target >= 0:
            label = torch.tensor(int(target >= self.target), dtype=torch.float32)
        else:
            label = torch.tensor(int(target <= self.target), dtype=torch.float32)
        return data, label
    
    def __len__(self):
        return len(self.data) - self.window_length_train

In [5]:
# Model
class Model_1(nn.Module):
    def __init__(self, input_dim):
        super(Model_1, self).__init__()

        self.net = nn.Sequential(
            nn.Linear(input_dim, input_dim*2),
            nn.Linear(input_dim*2, input_dim*4),
            nn.Linear(input_dim*4, input_dim*8),
            nn.Linear(input_dim*8, 1),
            nn.Sigmoid()
        )

    def forward(self, x):
        x = self.net(x)
        return x

In [10]:
# Some Functions
def ConcatDataSet(main_data, code_list):
    all_train_data = []
    all_test_data = []

    for code in code_list:
        stock_data = main_data[main_data['code'] == code].copy()
        train_size = int(0.9 * len(stock_data))

        all_train_data.append(stock_data.iloc[:train_size])
        all_test_data.append(stock_data.iloc[train_size:])

    train_data = pd.concat(all_train_data, ignore_index=True)  # 使用 ignore_index=True 来避免重新索引
    test_data = pd.concat(all_test_data, ignore_index=True)
    
    return train_data, test_data

def build_dataloader(main_data, code_list, target, batch_size, window_length_train):
    train_data, test_data = ConcatDataSet(main_data, code_list)
    dataset_train = StockDataset(train_data, target, window_length_train)
    dataset_test = StockDataset(test_data, target, window_length_train)
    datalaoder_train = DataLoader(dataset_train, batch_size, shuffle=True)
    datalaoder_test = DataLoader(dataset_test, batch_size, shuffle=False)

    return datalaoder_train, datalaoder_test

def calculate_error_percentage(outputs, label, threshold=0.5):
    type1_error = 0
    type2_error = 0
    type1_count = 0
    type2_count = 0

    predicted = (outputs.squeeze(1) > threshold).float()

    for i in range(label.size(0)):
        # Type1
        if label[i] == 1:
            type1_count += 1
            if predicted[i] == 0:
                type1_error += 1

        # Type2
        elif label[i] == 0:
            type2_count += 1
            if predicted[i] == 1:
                type2_error += 1
                
    type1_error_percentage = round((type1_error/type1_count)*100, 2) if type1_count > 0 else None
    type2_error_percentage = round((type2_error/type2_count)*100, 2) if type2_count > 0 else None

    return type1_error_percentage, type2_error_percentage

In [7]:
# Get data from single Type
BATCH_SIZE = 128
N_EPOCHS = 10

lr = 1e-4
target_pct = 0.02 # Target rise percentage value
window_length_train = 20

type_id=1

list_unique_code_from_data = (SQLSentence.GetCodeByTypeId(type_id, connect))['code'].tolist()
datalaoder_train, datalaoder_test = build_dataloader(main_data, list_unique_code_from_data, target_pct, BATCH_SIZE, window_length_train)

In [16]:
# Train Model
def train_model(model, dataloader, optimizer, criterion, device, threshold):
    model.train()
    total_loss = 0
    correct = 0
    accuracy_count = 0
    type1_error_percentage_sum = 0
    type2_error_percentage_sum = 0
    num_batches = 0

    for input, label in tqdm.tqdm(dataloader):
        optimizer.zero_grad()

        input, label = input.to(device), label.to(device)
        outputs = model(input)
        loss = criterion(outputs.squeeze(1), label)

        predicted = (outputs.squeeze(1) > threshold).float()
        total_loss+=loss.detach().cpu().item()

        correct += (predicted == label).sum().item()
        accuracy_count += label.size(0)

        type1_error_percentage, type2_error_percentage = calculate_error_percentage(outputs, label, threshold)
        if type1_error_percentage is not None:
            type1_error_percentage_sum += type1_error_percentage
        if type2_error_percentage is not None:
            type2_error_percentage_sum += type2_error_percentage

        num_batches += 1

        loss.backward()
        optimizer.step()
    
    avg_loss = round(total_loss/len(dataloader), 6)
    accuracy =  round((correct/accuracy_count)*100 , 2)
    type1_error_ratio = round(type1_error_percentage_sum/num_batches, 2)
    type2_error_ratio = round(type2_error_percentage_sum/num_batches, 2)

    return avg_loss, accuracy, type1_error_ratio, type2_error_ratio

In [18]:
# Test Model
def test_model(model, dataloader, optimizer, criterion, device, threshold):
    model.eval()
    total_loss = 0
    correct = 0
    accuracy_count = 0
    type1_error_percentage_sum = 0
    type2_error_percentage_sum = 0
    num_batches = 0

    with torch.no_grad():
        for input, label in tqdm.tqdm(dataloader):
            input, label = input.to(device), label.to(device)
            outputs = model(input)
            loss = criterion(outputs.squeeze(1), label)

            predicted = (outputs.squeeze(1) > threshold).float()
            total_loss+=loss.detach().cpu().item()

            correct += (predicted == label).sum().item()
            accuracy_count += label.size(0)

            type1_error_percentage, type2_error_percentage = calculate_error_percentage(outputs, label, threshold)
            if type1_error_percentage is not None:
                type1_error_percentage_sum += type1_error_percentage
            if type2_error_percentage is not None:
                type2_error_percentage_sum += type2_error_percentage

            num_batches += 1
    
    avg_loss = round(total_loss/len(dataloader), 6)
    accuracy =  round((correct/accuracy_count)*100 , 2)
    type1_error_ratio = round(type1_error_percentage_sum/num_batches, 2)
    type2_error_ratio = round(type2_error_percentage_sum/num_batches, 2)

    return avg_loss, accuracy, type1_error_ratio, type2_error_ratio

In [None]:
# Select single Type to adjust the model and the optimizer
model = Model_1(datalaoder_train.dataset.dim).to(device)
criterion = nn.BCELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr, betas=(0.5, 0.999))

threshold=0.8
best_test_loss = 1000.

epoch = 0
while epoch < 1:
    epoch+=1
    print(f'Train {dict_type_name[type_id]} Epoch: {epoch}')

    # Train
    train_avg_loss, train_accuracy, train_type1_error_ratio, train_type2_error_ratio = train_model(model, datalaoder_train, optimizer, criterion, device, threshold)
    
    print(f'\nTrain Loss: {train_avg_loss}')
    print(f"Accuracy: {train_accuracy}%")
    print(f"Type1 Error Ratio: {train_type1_error_ratio}%")
    print(f"Type2 Error Ratio: {train_type2_error_ratio}%\n")

    # Test
    test_avg_loss, test_accuracy, test_type1_error_ratio, test_type2_error_ratio = train_model(model, datalaoder_test, optimizer, criterion, device, threshold)
    print(f'Test Loss: {test_avg_loss}')
    print(f"Accuracy: {test_accuracy}%")
    print(f"Type1 Error Ratio: {test_type1_error_ratio}%")
    print(f"Type2 Error Ratio: {test_type2_error_ratio}%\n")
    
    if test_avg_loss < best_test_loss:
        best_test_loss = test_avg_loss
        print(f'Model has saved in {PATH_CHECKPOINT}/type_{type_id}/type_{type_id}_{model.__class__.__name__}_Target_{target_pct}\n')
        torch.save(model.state_dict(), f'{PATH_CHECKPOINT}/type_{type_id}/type_{type_id}_{model.__class__.__name__}_Target_{target_pct}')

In [None]:
# Train all data
BATCH_SIZE = 128
N_EPOCHS = 10

target_pct = 0.02 # Target rise percentage value
window_length_train = 20

for type_id in list_stock_type:
    print(f'Train {dict_type_name[type_id]}')

    list_unique_code_from_data = (SQLSentence.GetCodeByTypeId(type_id, connect))['code'].tolist()
    train_data, test_data = ConcatDataSet(main_data, list_unique_code_from_data)
    datalaoder_train, datalaoder_test = build_dataloader(main_data, list_unique_code_from_data, target_pct, BATCH_SIZE, window_length_train)
    
    lr = 1e-4
    model = Model_1(datalaoder_train.dataset.dim).to(device)
    criterion = nn.BCELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr, betas=(0.5, 0.999))

    best_test_loss = 1000.

    epoch = 0
    while epoch < N_EPOCHS:
        epoch+=1

        # Train
        train_avg_loss, train_accuracy, train_type1_error_ratio, train_type2_error_ratio = train_model(model, datalaoder_train, optimizer, criterion, device, threshold)
        
        print(f'\nTrain Loss: {train_avg_loss}')
        print(f"Accuracy: {train_accuracy}%")
        print(f"Type1 Error Ratio: {train_type1_error_ratio}%")
        print(f"Type2 Error Ratio: {train_type2_error_ratio}%\n")

        # Test
        test_avg_loss, test_accuracy, test_type1_error_ratio, test_type2_error_ratio = train_model(model, datalaoder_test, optimizer, criterion, device, threshold)
        print(f'Test Loss: {test_avg_loss}')
        print(f"Accuracy: {test_accuracy}%")
        print(f"Type1 Error Ratio: {test_type1_error_ratio}%")
        print(f"Type2 Error Ratio: {test_type2_error_ratio}%\n")
                     
        if test_avg_loss < best_test_loss:
            best_test_loss = test_avg_loss
            print(f'Test Loss: {best_test_loss}, Model has saved in {PATH_CHECKPOINT}/type_{type_id}/type_{type_id}_{model.__class__.__name__}_Target_{target_pct}')
            torch.save(model.state_dict(), f'{PATH_CHECKPOINT}/type_{type_id}/type_{type_id}_{model.__class__.__name__}_Target_{target_pct}')
