In [1]:
import yfinance as yf
import os
import mplfinance as mpf
import pandas as pd
import torch
import torch.nn as nn
import torchvision.models as models
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from PIL import Image

In [2]:
# 1. 從 yfinance 獲取 AAPL 股票數據
stock_symbol = "AAPL"
start_date = "2020-01-01"
end_date = "2024-11-26"

stock_data = yf.download(stock_symbol, start=start_date, end=end_date)
stock_data.to_csv("AAPL.csv")
print("AAPL 股價數據已儲存至 AAPL.csv")

[*********************100%***********************]  1 of 1 completed

AAPL 股價數據已儲存至 AAPL.csv





In [3]:
# 2. 每 10 天生成一張 K 線圖並儲存
output_dir = "week11_kline"
os.makedirs(output_dir, exist_ok=True)

N = 2  # 每張 K 線圖的天數
num_chunks = len(stock_data) // N + (1 if len(stock_data) % N > 0 else 0)

for i in range(num_chunks):
    start_idx = i * N
    end_idx = (i + 1) * N
    kline_data = stock_data.iloc[start_idx:end_idx]
    if kline_data.empty:
        continue

    save_path = os.path.join(output_dir, f"AAPL_kline_part_{i+1}.png")
    mpf.plot(
        kline_data,
        type="candle",
        style="charles",
        title=f"AAPL K-Line Part {i+1}",
        ylabel="Price",
        savefig=save_path,
    )
    print(f"K 線圖已儲存至: {save_path}")

K 線圖已儲存至: week11_kline/AAPL_kline_part_1.png
K 線圖已儲存至: week11_kline/AAPL_kline_part_2.png
K 線圖已儲存至: week11_kline/AAPL_kline_part_3.png
K 線圖已儲存至: week11_kline/AAPL_kline_part_4.png
K 線圖已儲存至: week11_kline/AAPL_kline_part_5.png
K 線圖已儲存至: week11_kline/AAPL_kline_part_6.png
K 線圖已儲存至: week11_kline/AAPL_kline_part_7.png
K 線圖已儲存至: week11_kline/AAPL_kline_part_8.png
K 線圖已儲存至: week11_kline/AAPL_kline_part_9.png
K 線圖已儲存至: week11_kline/AAPL_kline_part_10.png
K 線圖已儲存至: week11_kline/AAPL_kline_part_11.png
K 線圖已儲存至: week11_kline/AAPL_kline_part_12.png
K 線圖已儲存至: week11_kline/AAPL_kline_part_13.png
K 線圖已儲存至: week11_kline/AAPL_kline_part_14.png
K 線圖已儲存至: week11_kline/AAPL_kline_part_15.png
K 線圖已儲存至: week11_kline/AAPL_kline_part_16.png
K 線圖已儲存至: week11_kline/AAPL_kline_part_17.png
K 線圖已儲存至: week11_kline/AAPL_kline_part_18.png
K 線圖已儲存至: week11_kline/AAPL_kline_part_19.png
K 線圖已儲存至: week11_kline/AAPL_kline_part_20.png
K 線圖已儲存至: week11_kline/AAPL_kline_part_21.png
K 線圖已儲存至: week11_kline/AAPL_kline_part_22.p

In [5]:
# 3. 新增圖片名稱和 Label 欄位到 CSV
stock_data["Image"] = ""
stock_data["Label"] = 0

for i in range(num_chunks):
    start_idx = i * N
    end_idx = min((i + 1) * N, len(stock_data))
    if end_idx - start_idx < 2:
        continue
    stock_data.iloc[start_idx:end_idx, stock_data.columns.get_loc("Image")] = f"AAPL_kline_part_{i+1}.png"
    start_close = stock_data.iloc[start_idx]["Close"]
    end_close = stock_data.iloc[end_idx - 1]["Close"]
    stock_data.iloc[end_idx - 1, stock_data.columns.get_loc("Label")] = int(end_close > start_close)

stock_data.to_csv("AAPL_labeled.csv")
print("新增 Image 和 Label 欄位後的數據已儲存至 AAPL_labeled.csv")

新增 Image 和 Label 欄位後的數據已儲存至 AAPL_labeled.csv


In [6]:
# 4. 分割數據集
labeled_data = pd.read_csv("AAPL_labeled.csv")
train_data, temp_data = train_test_split(labeled_data, test_size=0.3, random_state=42)
val_data, test_data = train_test_split(temp_data, test_size=0.5, random_state=42)

train_data.to_csv("AAPL_train.csv", index=False)
val_data.to_csv("AAPL_val.csv", index=False)
test_data.to_csv("AAPL_test.csv", index=False)

print(train_data['Label'].value_counts())
print(val_data['Label'].value_counts())


print(f"訓練集大小: {len(train_data)}")
print(f"驗證集大小: {len(val_data)}")
print(f"測試集大小: {len(test_data)}")

Label
0    652
1    211
Name: count, dtype: int64
Label
0    135
1     50
Name: count, dtype: int64
訓練集大小: 863
驗證集大小: 185
測試集大小: 186


In [7]:
# 5. 定義數據集類
class MultiModalDataset(Dataset):
    def __init__(self, csv_file, image_dir, transform=None):
        self.data = pd.read_csv(csv_file)
        self.image_dir = image_dir
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        row = self.data.iloc[idx]
        structured_data = row[['Open', 'High', 'Low', 'Close', 'Volume']].values.astype(float)
        image_path = os.path.join(self.image_dir, f"{row['Image']}")
        image = Image.open(image_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        label = row['Label']
        return torch.tensor(structured_data, dtype=torch.float32), image, torch.tensor(label, dtype=torch.long)

In [8]:
# 6. 定義數據轉換和加載器
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

image_dir = "week11_kline"

train_dataset = MultiModalDataset("AAPL_train.csv", image_dir, transform=transform)
val_dataset = MultiModalDataset("AAPL_val.csv", image_dir, transform=transform)
test_dataset = MultiModalDataset("AAPL_test.csv", image_dir, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)

In [9]:
# 7. 定義多模態模型
class MultiModalModel(nn.Module):
    def __init__(self, num_structured_features, num_classes):
        super(MultiModalModel, self).__init__()
        self.image_model = models.resnet18(pretrained=True)
        self.image_model.fc = nn.Identity()
        self.structured_fc = nn.Sequential(
            nn.Linear(num_structured_features, 128),
            nn.ReLU(),
            nn.Dropout(0.2)
        )
        self.fc = nn.Sequential(
            nn.Linear(512 + 128, 256),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(256, num_classes)
        )
    
    def forward(self, structured_data, images):
        image_features = self.image_model(images)
        structured_features = self.structured_fc(structured_data)
        combined_features = torch.cat((image_features, structured_features), dim=1)
        return self.fc(combined_features)

In [10]:
# 8. 初始化模型和訓練
model = MultiModalModel(num_structured_features=5, num_classes=2)
criterion = nn.CrossEntropyLoss(weight=torch.tensor([1.0, 5.0]).cuda())
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

for epoch in range(10):
    model.train()
    for structured_data, images, labels in train_loader:
        if torch.cuda.is_available():
            model = model.cuda()
            structured_data = structured_data.cuda()
            images = images.cuda()
            labels = labels.cuda()
        outputs = model(structured_data, images)
        loss = criterion(outputs, labels)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    print(f"Epoch {epoch+1}, Loss: {loss.item():.4f}")



Epoch 1, Loss: 735942.0625
Epoch 2, Loss: 467334.6250
Epoch 3, Loss: 27402.7227
Epoch 4, Loss: 10118.1836
Epoch 5, Loss: 12812.3701
Epoch 6, Loss: 2055.9563
Epoch 7, Loss: 1161.1523
Epoch 8, Loss: 430.0906
Epoch 9, Loss: 293.5457
Epoch 10, Loss: 277.5284


In [11]:
# 9. 驗證模型性能
model.eval()
all_preds, all_labels = [], []

with torch.no_grad():
    for structured_data, images, labels in val_loader:
        if torch.cuda.is_available():
            structured_data, images, labels = structured_data.cuda(), images.cuda(), labels.cuda()
        outputs = model(structured_data, images)
        preds = torch.argmax(outputs, dim=1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

accuracy = accuracy_score(all_labels, all_preds)
precision = precision_score(all_labels, all_preds)
recall = recall_score(all_labels, all_preds)
f1 = f1_score(all_labels, all_preds)

print(f"驗證集性能指標：\n準確率: {accuracy:.2f}\n精確率: {precision:.2f}\n召回率: {recall:.2f}\nF1 分數: {f1:.2f}")

驗證集性能指標：
準確率: 0.27
精確率: 0.27
召回率: 1.00
F1 分數: 0.43
