In [None]:
from importlib import reload
from model import *
from train import *
from dataset import *
import dataset as _D
reload(_D)
import utils as _U
reload(_U)
from collections import OrderedDict
import yaml

## Training Settings

In [None]:
with open('configs/I20R5/I20R5_09-11.yml', 'r') as f:
    setting = _U.Dict2ObjParser(yaml.safe_load(f)).parse()

In [None]:
if 'models' not in os.listdir('./'):
    os.system('mkdir models')
if setting.TRAIN.MODEL_SAVE_FILE.split('/')[1] not in os.listdir('./models/'):
    os.system(f"cd models && mkdir {setting.TRAIN.MODEL_SAVE_FILE.split('/')[1]}")
if 'logs' not in os.listdir('./'):
    os.system('mkdir logs')
if setting.TRAIN.LOG_SAVE_FILE.split('/')[1] not in os.listdir('./logs/'):
    os.system(f"cd logs && mkdir {setting.TRAIN.LOG_SAVE_FILE.split('/')[1]}")

## Train & Valid

### Dataset Preparation

In [None]:
train_dataset = _D.ImageDataSet(win_size = setting.DATASET.LOOKBACK_WIN, \
                            start_date = setting.DATASET.START_DATE, \
                            end_date = setting.DATASET.END_DATE, \
                            mode = setting.DATASET.MODE, \
                            indicators = setting.DATASET.INDICATORS, \
                            show_volume = setting.DATASET.SHOW_VOLUME, \
                            parallel_num=setting.DATASET.PARALLEL_NUM)

In [None]:
image_set = train_dataset.generate_images(setting.DATASET.SAMPLE_RATE)

In [None]:
len(image_set)

In [None]:
# preview of the "image" (np.array)
display_image(image_set[69])

In [None]:
train_loader_size = int(len(image_set)*(1-setting.TRAIN.VALID_RATIO))
valid_loader_size = len(image_set) - train_loader_size

train_loader, valid_loader = torch.utils.data.random_split(image_set, [train_loader_size, valid_loader_size])
train_loader = torch.utils.data.DataLoader(dataset=train_loader, batch_size=setting.TRAIN.BATCH_SIZE, shuffle=True)
valid_loader = torch.utils.data.DataLoader(dataset=valid_loader, batch_size=setting.TRAIN.BATCH_SIZE, shuffle=True)

### Model Structure

In [None]:
model_d20 = CNN20d()
model_d20.to(device)

### Training Process

In [None]:
criterion = nn.BCELoss()
optimizer = optim.Adam(model_d20.parameters(), lr=setting.TRAIN.LEARNING_RATE, weight_decay=setting.TRAIN.WEIGHT_DECAY)

In [None]:
train_loss_set, valid_loss_set, train_acc_set, valid_acc_set = train_n_epochs(setting.TRAIN.NEPOCH, model_d20, setting.TRAIN.LABEL, train_loader, valid_loader, criterion, optimizer, setting.TRAIN.MODEL_SAVE_FILE, setting.TRAIN.EARLY_STOP_EPOCH)

In [None]:
plot_loss_and_acc({"temp train": [train_loss_set, train_acc_set], \
                    "temp valid": [valid_loss_set, valid_acc_set]})

In [None]:
log = pd.DataFrame([train_loss_set, train_acc_set, valid_loss_set, valid_acc_set], index=['train_loss', 'train_acc', 'valid_loss', 'valid_acc'])
log.to_csv(setting.TRAIN.LOG_SAVE_FILE)

In [None]:
logs = pd.read_csv(setting.TRAIN.LOG_SAVE_FILE, index_col=0).T

In [None]:
plot_loss_and_acc({"train": [logs['train_loss'], logs['train_acc']], \
                    "valid": [logs['valid_loss'], logs['valid_acc']]})

## Test

In [None]:
model_best =  CNN20d()
model_best.to(device)

# Load Model
state_dict = torch.load(setting.TRAIN.MODEL_SAVE_FILE)
model_best.load_state_dict(state_dict['model_state_dict'])

In [None]:
def model_test(model, label_type, classes, criterion):
    # track test loss
    test_loss = 0.0
    class_correct = [0., 0.]
    class_total = [0., 0.]

    model.eval()
    # iterate over test data
    sub_points = [setting.TEST.START_DATE] + [int(setting.TEST.END_DATE//1e4 * 1e4) + i*100 + 1 for i in range(4, 13, 3)] + [setting.TEST.END_DATE]
    
    for m_idx in range(len(sub_points)-1):
        print(f"Testing: {sub_points[m_idx]} - {sub_points[m_idx+1]}")
        test_dataset = _D.ImageDataSet(win_size = setting.DATASET.LOOKBACK_WIN, \
                            start_date = sub_points[m_idx], \
                            end_date = sub_points[m_idx+1], \
                            mode = 'default', \
                            indicators = setting.DATASET.INDICATORS, \
                            show_volume = setting.DATASET.SHOW_VOLUME, \
                            parallel_num=setting.DATASET.PARALLEL_NUM)
        test_imageset = test_dataset.generate_images(1.0)
        test_loader = torch.utils.data.DataLoader(dataset=test_imageset, batch_size=setting.TRAIN.BATCH_SIZE, shuffle=False)
            
        for i, (data, ret5, ret20) in enumerate(test_loader):
            assert label_type in ['RET5', 'RET20'], f"Wrong Label Type: {label_type}"
            if label_type == 'RET5':
                target = ret5
            else:
                target = ret20
                
            target = (1-target).unsqueeze(1) @ torch.LongTensor([1., 0.]).unsqueeze(1).T + target.unsqueeze(1) @ torch.LongTensor([0, 1]).unsqueeze(1).T
            target = target.to(torch.float32)
                
            # move tensors to GPU if CUDA is available
            data, target = data.to(device), target.to(device)
            # forward pass: compute predicted outputs by passing inputs to the model
            output = model(data)
            # calculate the batch loss
            loss = criterion(output, target)
            # update test loss 
            test_loss += loss.item()*data.size(0)
            # convert output probabilities to predicted class
            pred = torch.argmax(output, 1)    
            # compare predictions to true label
            correct_tensor = pred.eq(torch.argmax(target, 1).data.view_as(pred))
            correct = np.squeeze(correct_tensor.numpy()) if not device == 'cuda' else np.squeeze(correct_tensor.cpu().numpy())
            # calculate test accuracy for each object class
            for i in range(target.shape[0]):
                label = torch.argmax(target.data[i])
                class_correct[label] += correct[i].item()
                class_total[label] += 1

    # average test loss
    test_loss = test_loss/len(test_loader.dataset)
    print('Test Loss: {:.6f}\n'.format(test_loss))

    for i in range(2):
        if class_total[i] > 0:
            print('Test Accuracy of %5s: %2d%% (%2d/%2d)' % (
                classes[i], 100 * class_correct[i] / class_total[i],
                np.sum(class_correct[i]), np.sum(class_total[i])))
        else:
            print('Test Accuracy of %5s: N/A (no training examples)' % (classes[i]))

    print('\nTest Accuracy (Overall): %2d%% (%2d/%2d)' % (
        100. * np.sum(class_correct) / np.sum(class_total),
        np.sum(class_correct), np.sum(class_total)))

In [None]:
criterion = nn.BCELoss()
model_test(model_best, setting.TRAIN.LABEL, ['down', 'up'], criterion)

## Inference

In [None]:
def model_inference(model):

    model.eval()
    # iterate over test data
    sub_points = [setting.TEST.START_DATE] + [int(setting.TEST.END_DATE//1e4 * 1e4) + i*100 + 1 for i in range(4, 13, 3)] + [setting.TEST.END_DATE]

    symbol_factors = pd.DataFrame([], index=['code', 'date', 'up_factor']).T
    for m_idx in range(len(sub_points)-1):
        print(f"Inferencing: {sub_points[m_idx]} - {sub_points[m_idx+1]}")
        
        inference_dataset = _D.ImageDataSet(win_size = setting.DATASET.LOOKBACK_WIN, \
                                        start_date = sub_points[m_idx], \
                                        end_date = sub_points[m_idx+1], \
                                        mode = 'inference', \
                                        indicators = setting.DATASET.INDICATORS, \
                                        show_volume = setting.DATASET.SHOW_VOLUME, \
                                        parallel_num=setting.DATASET.PARALLEL_NUM)
        inference_imageset = inference_dataset.generate_images(1.0)

        for id in range(len(inference_imageset)-1):
            if len(inference_imageset[id][1]) == 0:
                continue
            inference_imgs = []
            for img in inference_imageset[id][1]:
                inference_imgs.append(img[0])
            input = torch.Tensor(np.array(inference_imgs))
            input = input.to(device)
            output = model_best(input)[:, 1]
            up_factors = []
            for pred in output:
                up_factors.append(pred.item())
            symbol_f = pd.DataFrame([[inference_imageset[id][0] for _ in range(len(inference_imageset[id][1]))], inference_imageset[id][2], up_factors], index=['code', 'date', 'up_factor']).T
            
            symbol_factors = pd.concat([symbol_factors, symbol_f], axis=0)
            
    return symbol_factors

In [None]:
up_factors = model_inference(model_best)