In [1]:
import yfinance as yf
import matplotlib.pyplot as plt
import os
import shutil
import torch
import torch.nn as nn
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import torch.optim as optim
from sklearn.model_selection import train_test_split

# Stock Data Preparation

In [2]:

taiwan_2330_stock = yf.Ticker("2330.TW")
data = taiwan_2330_stock.history(period="5y")

if data.empty:
    raise ValueError("Stock data is empty. Please check the ticker symbol or network connection.")

K_data = data[['Open', 'Close', 'High', 'Low']]
ten_data = []
labels = []

for i in range(len(K_data.values) - 19):
    segment = K_data.values[i:i+20]
    ten_data.append(segment)

    if i < len(K_data.values) - 20:
        next_segment = K_data.values[i+1:i+21]
        next_close_prices = [day[1] for day in next_segment]  # Extract close prices
        MA20 = sum(next_close_prices) / len(next_close_prices)
        MA10 = sum(next_close_prices[-10:]) / len(next_close_prices[-10:])
        labels.append(1 if MA10 > MA20 else 0)


# K-Line Chart Generation

In [3]:
image_path = './image'
os.makedirs(image_path, exist_ok=True)

for idx, segment in enumerate(ten_data[:-1]):
    for day_idx, day in enumerate(segment):
        open_price, close_price, high_price, low_price = map(int, day)
        color = "red" if close_price > open_price else "green"

        plt.bar(day_idx, abs(close_price - open_price), bottom=min(open_price, close_price), color=color, width=0.5)
        plt.bar(day_idx, high_price - low_price, bottom=low_price, color=color, width=0.1)

    rise_or_fall = "rise" if labels[idx] == 1 else "fail"
    plt.axis('off')
    plt.savefig(f'{image_path}/{idx:05d}_{rise_or_fall}.png', format='png', dpi=100)
    plt.close()


# Dataset Preparation

In [4]:
#%% Dataset Preparation
train_rise_folder = './train_folder_path/rise'
train_fail_folder = './train_folder_path/fail'
test_rise_folder = './test_folder_path/rise'
test_fail_folder = './test_folder_path/fail'

for folder in [train_rise_folder, train_fail_folder, test_rise_folder, test_fail_folder]:
    os.makedirs(folder, exist_ok=True)

image_files = os.listdir(image_path)
X_train, X_test, y_train, y_test = train_test_split(ten_data[:-1], labels, test_size=0.2, random_state=42)
image_train, image_test = train_test_split(image_files, test_size=0.2, random_state=42)

for img in image_train:
    src = os.path.join(image_path, img)
    dest = os.path.join(train_rise_folder if 'rise' in img else train_fail_folder, img)
    shutil.copy(src, dest)

for img in image_test:
    src = os.path.join(image_path, img)
    dest = os.path.join(test_rise_folder if 'rise' in img else test_fail_folder, img)
    shutil.copy(src, dest)


# Fusion Model Definition

In [5]:
class FusionModel(nn.Module):
    def __init__(self, lstm_input_size, cnn_input_shape):
        super(FusionModel, self).__init__()
        self.lstm = nn.LSTM(input_size=lstm_input_size, hidden_size=64, num_layers=2, batch_first=True)
        self.lstm_fc = nn.Linear(64, 128)
        self.cnn = nn.Sequential(
            nn.Conv2d(3, 16, kernel_size=5, stride=1, padding=2),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
            nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        cnn_height, cnn_width = cnn_input_shape[1] // 4, cnn_input_shape[2] // 4
        self.cnn_fc = nn.Linear(32 * cnn_height * cnn_width, 128)
        self.fusion_fc = nn.Linear(128 + 128, 2)

    def forward(self, time_data, image_data):
        lstm_out, _ = self.lstm(time_data)
        lstm_features = self.lstm_fc(lstm_out[:, -1, :])
        cnn_out = self.cnn(image_data)
        cnn_features = self.cnn_fc(cnn_out.view(cnn_out.size(0), -1))
        fused_features = torch.cat((lstm_features, cnn_features), dim=1)
        return self.fusion_fc(fused_features)


In [6]:
#%% Dataset and DataLoader
transform = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.ToTensor()
])

class FusionDataset(torch.utils.data.Dataset):
    def __init__(self, lstm_data, lstm_labels, image_folder, transform=None):
        self.lstm_data = lstm_data
        self.lstm_labels = lstm_labels
        self.image_data = datasets.ImageFolder(root=image_folder, transform=transform)

    def __len__(self):
        return len(self.lstm_data)

    def __getitem__(self, idx):
        lstm_data = self.lstm_data[idx]
        lstm_label = self.lstm_labels[idx]
        cnn_image, cnn_label = self.image_data[idx]
        return torch.FloatTensor(lstm_data), cnn_image, lstm_label

train_dataset = FusionDataset(X_train, y_train, 'train_folder_path/', transform=transform)
test_dataset = FusionDataset(X_test, y_test, 'test_folder_path/', transform=transform)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)

#  Training Loop

In [7]:

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = FusionModel(lstm_input_size=4, cnn_input_shape=(3, 128, 128)).to(device)
loss_func = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

for epoch in range(10):
    model.train()
    total_loss, correct, total = 0, 0, 0

    for lstm_data, cnn_data, labels in train_loader:
        lstm_data, cnn_data, labels = lstm_data.to(device), cnn_data.to(device), labels.to(device)
        optimizer.zero_grad()
        output = model(lstm_data, cnn_data)
        loss = loss_func(output, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        _, predicted = torch.max(output, 1)
        correct += (predicted == labels).sum().item()
        total += labels.size(0)

    accuracy = correct / total
    print(f"Epoch [{epoch + 1}/10], Loss: {total_loss:.4f}, Accuracy: {accuracy:.4f}")

Epoch [1/10], Loss: 54.7348, Accuracy: 0.5617
Epoch [2/10], Loss: 40.9926, Accuracy: 0.5785
Epoch [3/10], Loss: 40.9409, Accuracy: 0.5785
Epoch [4/10], Loss: 40.9128, Accuracy: 0.5785
Epoch [5/10], Loss: 40.9307, Accuracy: 0.5785
Epoch [6/10], Loss: 40.9222, Accuracy: 0.5785
Epoch [7/10], Loss: 40.9126, Accuracy: 0.5785
Epoch [8/10], Loss: 40.9815, Accuracy: 0.5785
Epoch [9/10], Loss: 40.8697, Accuracy: 0.5785
Epoch [10/10], Loss: 40.9123, Accuracy: 0.5785


### Evaluation

In [8]:
model.eval()
correct, total = 0, 0

with torch.no_grad():
    for lstm_data, cnn_data, labels in test_loader:
        lstm_data, cnn_data, labels = lstm_data.to(device), cnn_data.to(device), labels.to(device)
        output = model(lstm_data, cnn_data)
        _, predicted = torch.max(output, 1)
        correct += (predicted == labels).sum().item()
        total += labels.size(0)

test_accuracy = correct / total
print(f"Test Accuracy: {test_accuracy:.4f}")


Test Accuracy: 0.5750
