In [14]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler

def gaf(X):

    X_normalized = X.reshape(-1, 1).flatten()

    # Pairwise differences
    X_diff = np.expand_dims(X_normalized, axis=0) - np.expand_dims(X_normalized, axis=1)

    # Gramian Angular Field
    GAF = np.cos(X_diff)

    return GAF


In [3]:
import yfinance as yf
import numpy as np
from tqdm import tqdm

def fetch_stock_price(stock_symbol, start_date, end_date):
    # 使用 yf.Ticker() 建立 Ticker 對象
    stock = yf.Ticker(stock_symbol)

    # 使用 history() 方法取得歷史價格資訊
    stock_data = stock.history(start=start_date, end=end_date)

    return stock_data

stock_symbol = '5871.TW'

# 起始日期和結束日期
end_date = '2024-12-31'

# 擷取股票價格資訊
stock_price_data = fetch_stock_price(stock_symbol=stock_symbol, start_date='2012-01-02',end_date=end_date)


In [4]:
stock_price_data['do'] = stock_price_data['Open'].pct_change()
stock_price_data['dh'] = stock_price_data['High'].pct_change()
stock_price_data['dl'] = stock_price_data['Low'].pct_change()
stock_price_data['dc'] = stock_price_data['Close'].pct_change()
stock_price_data['dv'] = stock_price_data['Volume'].pct_change()
stock_price_data['oc'] = stock_price_data['Open']-stock_price_data['Close']

stock_price_data['curr_bar_state'] = np.sign(stock_price_data['oc'])
stock_price_data = stock_price_data.dropna()

In [5]:
p_forward = 10
for i in range(1, p_forward+1):
    stock_price_data[f'bar_state_{str(i)}'] = stock_price_data['curr_bar_state'].shift(-i-p_forward+1)

In [6]:
# df = stock_price_data.iloc[:,7:]
df = stock_price_data
# Replace infinite values with NaN
df.replace([np.inf, -np.inf], np.nan, inplace=True)

# Drop rows with NaN values
df = df.dropna()

In [7]:
df_test = df.loc['2023':]
df = df.loc[:'2022']

In [8]:
from sklearn.preprocessing import Normalizer
scaler = Normalizer()
df[['do', 'dh', 'dl', 'dc', 'dv']] = scaler.fit_transform(df[['do', 'dh', 'dl', 'dc', 'dv']])
df_test[['do', 'dh', 'dl', 'dc', 'dv']] = scaler.fit_transform(df_test[['do', 'dh', 'dl', 'dc', 'dv']])

In [104]:
window_size = 10

x1_list, y1_list = [], []

# Iterate over the DataFrame to create the training and testing sets
for i in tqdm(range(len(df)-window_size+1)):
    window = df.iloc[i:i+window_size]  # Extract the window of data
    # print(window.T.values)
    x1_values = window[['do', 'dh', 'dl', 'dc', 'dv']].T.values  # Adjust column names as needed
    # print(x1_values)
    # print(window[['bar_state_1', 'bar_state_2', 'bar_state_3', 'bar_state_4', 'bar_state_5']])
    # y1_values = window[['bar_state_1', 'bar_state_2', 'bar_state_3', 'bar_state_4', 'bar_state_5']].iloc[0].T.values # Take the last value of 'bar_state_1' as the output
    y1_values = window[['bar_state_1']].iloc[0].T.values
    x1_list.append(x1_values)
    y1_list.append(y1_values)

# Convert the lists to NumPy arrays
x = np.array(x1_list)
y = np.array(y1_list)

  0%|          | 0/2669 [00:00<?, ?it/s]

100%|██████████| 2669/2669 [00:07<00:00, 351.75it/s]


In [105]:
X = []
for i in range(len(x)):
    X_element = []
    for j in range(len(x[i])):
        X_element.append(gaf(x[i][j]))
        # print(gaf(x[i][j]))

    X.append(X_element)
X = np.array(X)

In [107]:
import random
# valid
percentage = 20
num_numbers = int((percentage / 100) * len(X))

# Generate a list of randomly selected numbers
valid_numbers = random.sample(range(0, len(X)), num_numbers)
training_numbers = [num for num in range(0, len(X)) if num not in valid_numbers]

In [128]:
x_train = X[training_numbers]
x_valid = X[valid_numbers]

y_train = y[training_numbers]
y_valid = y[valid_numbers]

# Define model

In [193]:
import torch
import torch.nn as nn

# Define the basic block with skip connection
class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_channels, out_channels, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != self.expansion * out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, self.expansion * out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion * out_channels)
            )

    def forward(self, x):
        residual = x
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        out += self.shortcut(residual)
        out = self.relu(out)
        return out

# Define the ResNet model
class ResNet(nn.Module):
    def __init__(self, block, layers, num_classes=1):
        super(ResNet, self).__init__()
        self.in_channels = 64
        self.conv1 = nn.Conv2d(5, 64, kernel_size=3, stride=1, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

    def _make_layer(self, block, out_channels, blocks, stride=1):
        layers = []
        layers.append(block(self.in_channels, out_channels, stride))
        self.in_channels = out_channels * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.in_channels, out_channels))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return torch.tanh(x)

# Create the ResNet-50 model
def resnet50():
    return ResNet(BasicBlock, [3, 4, 6, 3])

# Instantiate the model
model = resnet50()

In [194]:
x_train_tensor = torch.tensor(x_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)

x_val_tensor = torch.tensor(x_valid, dtype=torch.float32)
y_val_tensor = torch.tensor(y_valid, dtype=torch.float32)

In [195]:
from torch.utils.data import DataLoader, TensorDataset
dataset_train = TensorDataset(x_train_tensor, y_train_tensor)
dataset_valid = TensorDataset(x_val_tensor, y_val_tensor)

dataloader_train = DataLoader(dataset_train , batch_size=128, shuffle=True)
dataloader_valid = DataLoader(dataset_valid , batch_size=128, shuffle=True)

In [196]:
import torch.optim as optim
criterion = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.0001, weight_decay=0.01)

In [197]:
num_epochs = 500
for epoch in range(num_epochs):
    # Training phase
    model.train()
    for batch_x, batch_y in dataloader_train:
        optimizer.zero_grad()
        outputs = model(batch_x)

        loss = criterion(outputs, batch_y) 
        loss.backward()
        optimizer.step()
    model.eval()
    val_loss = []

    with torch.no_grad():
        for batch_x_val, batch_y_val in dataloader_valid:
            outputs_val = model(batch_x_val)
            loss_val = criterion(outputs_val, batch_y_val)
            val_loss.append(loss_val.item())


        # Print statistics
        print(f'Epoch [{epoch+1}/{num_epochs}]', 
            f'Training Loss: {loss.item():.10f}',
            f'Valid Loss: {sum(val_loss)/64:.10f}')

Epoch [1/500] Training Loss: 1.0956386328 Valid Loss: 0.0736543480
Epoch [2/500] Training Loss: 0.8983930349 Valid Loss: 0.0733987680
Epoch [3/500] Training Loss: 0.7699970603 Valid Loss: 0.0847673863


KeyboardInterrupt: 

In [198]:
window_size = 10

x2_list, y2_list = [], []

# Iterate over the DataFrame to create the training and testing sets
for i in tqdm(range(len(df_test)-window_size+1)):
    window = df_test.iloc[i:i+window_size]  # Extract the window of data
    # print(window.T.values)
    x2_values = window[['do', 'dh', 'dl', 'dc', 'dv']].T.values  # Adjust column names as needed
    # print(x1_values)
    # print(window[['bar_state_1', 'bar_state_2', 'bar_state_3', 'bar_state_4', 'bar_state_5']])
    # y1_values = window[['bar_state_1', 'bar_state_2', 'bar_state_3', 'bar_state_4', 'bar_state_5']].iloc[0].T.values # Take the last value of 'bar_state_1' as the output
    y2_values = window[['bar_state_1']].iloc[0].T.values
    x2_list.append(x2_values)
    y2_list.append(y2_values)

# Convert the lists to NumPy arrays
x2 = np.array(x2_list)
y2 = np.array(y2_list)

100%|██████████| 230/230 [00:00<00:00, 416.15it/s]


In [199]:
X2 = []
for i in range(len(x2)):
    X_element = []
    for j in range(len(x2[i])):
        X_element.append(gaf(x2[i][j]))

    X2.append(X_element)
X2 = np.array(X2)

In [200]:
x_test_tensor = torch.tensor(X2, dtype=torch.float32)
y_test_tensor = torch.tensor(y2, dtype=torch.float32)

In [201]:
b = torch.sign(model(x_test_tensor)) - y_test_tensor

In [202]:
1-len(torch.nonzero(torch.sum(b, dim=1)))/len(b)

0.4739130434782609