In [2]:
import os
import re
import sys
import time
import datetime
import numpy as np
import pandas as pd
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision import transforms
from PIL import Image
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
import seaborn as sns
import matplotlib.pyplot as plt

In [16]:
import os
import yaml
import argparse
from zipfile import ZipFile
from importlib import reload
import warnings
warnings.filterwarnings('ignore')

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from tqdm import tqdm
from joblib import Parallel, delayed

import time
import datetime
from contextlib import contextmanager
from collections import namedtuple
from collections import OrderedDict

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np

In [7]:
# 參數設置
use_gpu = True
use_random_split = False
use_dataparallel = True
torch.manual_seed(42)

<torch._C.Generator at 0x2c2d292fe90>

In [8]:
# 圖片維度常數
IMG_HEIGHT = {5: 32, 20: 64, 60: 96}
IMG_WIDTH = {5: 15, 20: 60, 60: 180}
IMG_CHANNELS = 1

In [9]:
# 檢查 GPU 是否可用
if use_gpu:
    def query_gpu(qargs=[]):
        qargs = ['index', 'gpu_name', 'memory.free'] + qargs
        cmd = 'nvidia-smi --query-gpu={} --format=csv,noheader'.format(','.join(qargs))
        results = os.popen(cmd).readlines()
        return results

    def select_gpu(results, thres=4096):
        available = []
        try:
            for i, line in enumerate(results):
                if int(re.findall(r'(.*), (.*?) MiB', line)[0][-1]) > thres:
                    available.append(i)
            return available
        except:
            return ''

    os.environ["CUDA_VISIBLE_DEVICES"] = ','.join([str(gpu) for gpu in select_gpu(query_gpu())])
    
if not torch.cuda.is_available() and use_gpu:
    raise RuntimeError("此程式需要 GPU 才能運行！請確認 CUDA 環境是否正確安裝。")

DEVICE = torch.device('cuda:0' if torch.cuda.is_available() and use_gpu else 'cpu')
print(f"使用裝置: {DEVICE}")


使用裝置: cuda:0


In [10]:
class CustomImageDataset(Dataset):
    def __init__(self, img_dir, labels_file, transform=None):
        self.img_dir = img_dir
        self.labels = pd.read_excel(labels_file)
        self.transform = transform or transforms.Compose([
            transforms.Resize((IMG_HEIGHT[5], IMG_WIDTH[5])),
            transforms.ToTensor(),
        ])

    def __len__(self):
        return len(self.labels)

    # def __getitem__(self, idx):
    #     img_path = os.path.join(self.img_dir, self.labels.iloc[idx, 0])
    #     try:
    #         image = Image.open(img_path).convert('1')  # Convert to grayscale
    #     except Exception as e:
    #         print(f"Error opening image {img_path}: {e}")
    #         image = Image.new('1', (IMG_WIDTH_SELECTED, IMG_HEIGHT_SELECTED), 0)  # Return a blank image
    #     image = self.transform(image) if self.transform else image
    #     label = int(self.labels.iloc[idx, 1])
    #     return image, label
    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.labels.iloc[idx, 0])
        
        if not os.path.exists(img_path):
            for ext in ['.png', '.jpg', '.jpeg', '.bmp', '.tiff']:
                test_path = img_path + ext
                if os.path.exists(test_path):
                    img_path = test_path
                    break
        
        try:
            image = Image.open(img_path).convert('1')
        except Exception as e:
            print(f"Error loading image {img_path}: {e}")
            image = Image.new('1', (IMG_WIDTH, IMG_HEIGHT), 0)
        
        if self.transform:
            try:
                image = self.transform(image)
            except Exception as e:
                print(f"Error transforming image {img_path}: {e}")
                blank = Image.new('1', (IMG_WIDTH, IMG_HEIGHT), 0)
                image = self.transform(blank)
        
        label = int(self.labels.iloc[idx, 1])
        return image, label

    # def __getitem__(self, idx):
    #     img_path = os.path.join(self.img_dir, self.labels.iloc[idx, 0])
        
    #     if not os.path.exists(img_path):
    #         img_path = next((img_path + ext for ext in ['.png', '.jpg', '.jpeg', '.bmp', '.tiff'] if os.path.exists(img_path + ext)), None)
        
    #     image = Image.open(img_path).convert('1') if img_path else Image.new('1', (IMG_WIDTH[5], IMG_HEIGHT[5]), 0)
    #     image = self.transform(image) if self.transform else image
    #     label = int(self.labels.iloc[idx, 1])
        
    #     return image, label

In [20]:
IMG_HEIGHT_SELECTED = 32
IMG_WIDTH_SELECTED = 15

class CNN5d(nn.Module):
    def init_weights(self, m):
        if isinstance(m, nn.Linear) or isinstance(m, nn.Conv2d):
            torch.nn.init.xavier_uniform_(m.weight)  # 修正
            m.bias.data.fill_(0.01)
    
    def __init__(self):
        super(CNN5d, self).__init__()
        self.conv1 = nn.Sequential(OrderedDict([
            ('Conv', nn.Conv2d(1, 64, (5, 3), padding=(2, 1), stride=(1, 1), dilation=(1, 1))),
            ('BN', nn.BatchNorm2d(64, affine=True)),
            ('ReLU', nn.ReLU()),
            ('Max-Pool', nn.MaxPool2d((2, 1)))
        ]))
        self.conv1 = self.conv1.apply(self.init_weights)
        
        self.conv2 = nn.Sequential(OrderedDict([
            ('Conv', nn.Conv2d(64, 128, (5, 3), padding=(2, 1), stride=(1, 1), dilation=(1, 1))),
            ('BN', nn.BatchNorm2d(128, affine=True)),
            ('ReLU', nn.ReLU()),
            ('Max-Pool', nn.MaxPool2d((2, 1)))
        ]))
        self.conv2 = self.conv2.apply(self.init_weights)
        
        # 計算展平大小
        dummy_input = torch.zeros(1, 1, IMG_HEIGHT_SELECTED, IMG_WIDTH_SELECTED)
        flattened_size = self.conv2(self.conv1(dummy_input)).view(1, -1).shape[1]

        self.DropOut = nn.Dropout(p=0.5)
        self.FC = nn.Linear(flattened_size, 2)
        self.FC.apply(self.init_weights)

    def forward(self, x): 
        # 輸入數據應為 [N, 32, 15]
        if x.ndim == 4:  # 若有多餘維度，去掉
            x = x.squeeze(1)
        x = x.unsqueeze(1).to(torch.float32)  # 增加通道維度，變為 [N, 1, 32, 15]
        x = self.conv1(x)  # 通過卷積層
        x = self.conv2(x)  # 通過第二層卷積
        x = self.DropOut(x.view(x.shape[0], -1))  # 展平
        x = self.FC(x)  # 全連接層
        return x



In [None]:
# IMG_HEIGHT_SELECTED = 32
# IMG_WIDTH_SELECTED = 15

# class Net(nn.Module):
#     def __init__(self):
#         super(Net, self).__init__()

#         self.layer1 = nn.Sequential(
#             nn.Conv2d(1, 64, kernel_size=(5, 3), stride=(1, 1), padding=(2, 1)),
#             nn.ReLU(inplace=True),
#             nn.MaxPool2d((2, 1), stride=(2, 1))
#         )
        
#         self.layer2 = nn.Sequential(
#             nn.Conv2d(64, 128, kernel_size=(5, 3), stride=(1, 1), padding=(2, 1)),
#             nn.ReLU(inplace=True),
#             nn.MaxPool2d((2, 1), stride=(2, 1))
#         )
        
#         # Calculate the flattened size before FC layer
#         self.fc = nn.Linear(128 * (IMG_HEIGHT_SELECTED // 4) * IMG_WIDTH_SELECTED, 15360)
#         self.softmax = nn.Softmax(dim=1)

#     def forward(self, x):
#         x = x.view(-1, 1, IMG_HEIGHT_SELECTED, IMG_WIDTH_SELECTED)
#         x = self.layer1(x)
#         x = self.layer2(x)
#         x = x.view(x.size(0), -1)  # Flatten
#         x = self.fc(x)
#         x = self.softmax(x)
#         return x

In [7]:
# old model

# IMG_HEIGHT_SELECTED = 32  # Set to the fixed height of your image format
# IMG_WIDTH_SELECTED = 15   # Set to the fixed width of your image format

# class Net(nn.Module):
#     def __init__(self):
#         super(Net, self).__init__()

#         self.layer1 = nn.Sequential(
#             nn.Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
#             nn.LayerNorm([32, IMG_HEIGHT_SELECTED, IMG_WIDTH_SELECTED]),
#             nn.LeakyReLU(negative_slope=0.01, inplace=True),
#             nn.MaxPool2d((2, 2), stride=(2, 2))
#         )
#         self.layer2 = nn.Sequential(
#             nn.Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
#             nn.LayerNorm([64, IMG_HEIGHT_SELECTED // 2, IMG_WIDTH_SELECTED // 2]),
#             nn.LeakyReLU(negative_slope=0.01, inplace=True),
#             nn.MaxPool2d((2, 2), stride=(2, 2))
#         )
#         # Update the linear layer to match the reduced size after pooling
#         self.fc1 = nn.Sequential(
#             nn.Dropout(p=0.5),
#             nn.Linear(64 * (IMG_HEIGHT_SELECTED // 4) * (IMG_WIDTH_SELECTED // 4), 2)
#         )

#     def forward(self, x):
#         x = x.view(-1, 1, IMG_HEIGHT_SELECTED, IMG_WIDTH_SELECTED)
#         x = self.layer1(x)
#         x = self.layer2(x)
#         x = x.view(x.size(0), -1)  # Flatten for the fully connected layer
#         return self.fc1(x)


In [None]:
def train_model(model, train_loader, test_loader, num_epochs=30):
    model = model.to(DEVICE)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.00001)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=5)
    torch.backends.cudnn.benchmark = True
    
    train_losses, test_losses = [], []
    best_test_loss = float('inf')
    
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for batch_idx, (inputs, labels) in enumerate(train_loader):
            inputs, labels = inputs.to(DEVICE), labels.to(DEVICE)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
            
            if batch_idx % 10 == 0:
                print(f'Epoch [{epoch+1}/{num_epochs}], Batch [{batch_idx}/{len(train_loader)}], Loss: {loss.item():.4f}')
        
        model.eval()
        test_loss = 0.0
        with torch.no_grad():
            for inputs, labels in test_loader:
                inputs, labels = inputs.to(DEVICE), labels.to(DEVICE)
                outputs = model(inputs)
                test_loss += criterion(outputs, labels).item()
        
        train_loss = running_loss / len(train_loader)
        test_loss /= len(test_loader)
        train_losses.append(train_loss)
        test_losses.append(test_loss)
        
        print(f'GPU 記憶體使用: {torch.cuda.memory_allocated(DEVICE) / 1024**2:.1f}MB')
        print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}')
        
        scheduler.step(test_loss)
        
        if test_loss < best_test_loss:
            best_test_loss = test_loss
            torch.save({
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'loss': test_loss,
            }, 'best_model.pth')
    
    return train_losses, test_losses

In [9]:
def evaluate_model(model, test_loader):
    model = model.to(DEVICE)
    model.eval()
    all_preds, all_labels = [], []
    
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(DEVICE), labels.to(DEVICE)
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    accuracy = accuracy_score(all_labels, all_preds)
    conf_matrix = confusion_matrix(all_labels, all_preds)
    
    plt.figure(figsize=(8, 6))
    sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=['Class 0', 'Class 1'], yticklabels=['Class 0', 'Class 1'])
    plt.ylabel('Actual')
    plt.xlabel('Predicted')
    plt.title('Confusion Matrix')
    plt.show()
    
    print("\nClassification Report:")
    print(classification_report(all_labels, all_preds, target_names=['Class 0', 'Class 1']))
    
    return accuracy, conf_matrix

In [None]:
def main():
    try:
        torch.cuda.empty_cache()
        
        img_dir = 'C:\\Users\\680-9000\\Desktop\\finance\\datav3\\Image1'
        labels_file = 'C:\\Users\\680-9000\\Desktop\\finance\\datav3\\X_5_5.xlsx'
        
        print("正在載入數據...")
        dataset = CustomImageDataset(img_dir=img_dir, labels_file=labels_file)
        train_size = int(0.8 * len(dataset))
        test_size = len(dataset) - train_size
        train_dataset, test_dataset = random_split(dataset, [train_size, test_size]) if use_random_split else (dataset, dataset)
        
        # train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=4, pin_memory=True)
        # test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False, num_workers=4, pin_memory=True)
        train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True, num_workers=0, pin_memory=True)
        test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False, num_workers=0, pin_memory=True)


        print("初始化模型...")
        model = CNN5d()
        if use_dataparallel and torch.cuda.device_count() > 1:
            model = nn.DataParallel(model)
        
        print("開始訓練...")
        train_losses, test_losses = train_model(model, train_loader, test_loader)
        
        print("評估模型...")
        accuracy, conf_matrix = evaluate_model(model, test_loader)
        print(f"\nFinal Test Accuracy: {accuracy:.4f}")
        
    except RuntimeError as e:
        if "out of memory" in str(e):
            print("GPU 記憶體不足！嘗試減少 batch size 或圖片大小。")
        else:
            print(f"運行時錯誤: {str(e)}")
        raise e
    except Exception as e:
        print(f"發生錯誤: {str(e)}")
        raise e
    finally:
        torch.cuda.empty_cache()

if __name__ == "__main__":
    main()


正在載入數據...
初始化模型...
開始訓練...
Epoch [1/10], Batch [0/20508], Loss: 1.8046
Epoch [1/10], Batch [10/20508], Loss: 1.2332
Epoch [1/10], Batch [20/20508], Loss: 1.1095
Epoch [1/10], Batch [30/20508], Loss: 1.1553
Epoch [1/10], Batch [40/20508], Loss: 1.0263
Epoch [1/10], Batch [50/20508], Loss: 0.9939
Epoch [1/10], Batch [60/20508], Loss: 1.0370
Epoch [1/10], Batch [70/20508], Loss: 0.9212
Epoch [1/10], Batch [80/20508], Loss: 1.0727
Epoch [1/10], Batch [90/20508], Loss: 1.0174
Epoch [1/10], Batch [100/20508], Loss: 0.9905
Epoch [1/10], Batch [110/20508], Loss: 1.0257
Epoch [1/10], Batch [120/20508], Loss: 0.9981
Epoch [1/10], Batch [130/20508], Loss: 1.1537
Epoch [1/10], Batch [140/20508], Loss: 0.9738
Epoch [1/10], Batch [150/20508], Loss: 0.9750
Epoch [1/10], Batch [160/20508], Loss: 1.0172
Epoch [1/10], Batch [170/20508], Loss: 1.1036
Epoch [1/10], Batch [180/20508], Loss: 0.8635
Epoch [1/10], Batch [190/20508], Loss: 1.0031
Epoch [1/10], Batch [200/20508], Loss: 0.9749
Epoch [1/10], Batc