In [None]:
!pip install kaggle

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
! mkdir ~/.kaggle
! cp /content/drive/MyDrive/kaggle/kaggle.json ~/.kaggle/
! chmod 600 ~/.kaggle/kaggle.json

In [None]:
!kaggle competitions download -c captcha-hacker-2023-spring

Downloading captcha-hacker-2023-spring.zip to /content
 96% 76.0M/79.4M [00:03<00:00, 19.7MB/s]
100% 79.4M/79.4M [00:03<00:00, 21.5MB/s]


In [None]:
!unzip captcha-hacker-2023-spring.zip

In [None]:
import csv
import cv2
import numpy as np
import random
import os

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from torchvision.datasets import ImageFolder

In [None]:
TRAIN_PATH = "/content/dataset/train"
TEST_PATH = "/content/dataset/test"
TASK1 = '/task1'
device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [None]:
alphabets = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" # 62
alphabets2index = {alphabet:i for i, alphabet in enumerate(alphabets)}

In [None]:
# 去除雜訊

import random
import glob
from PIL import Image

import matplotlib.pyplot as plt

from google.colab.patches import cv2_imshow

folder_path = TRAIN_PATH + TASK1

file_path = glob.glob(folder_path + '/*.png')
                                                            
# 產生一個隨機整數
random_int = random.randint(0, len(file_path))

# 載入圖像並將其轉換為灰度圖
image = cv2.imread(file_path[random_int])
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

# 對圖像進行二值化處理（轉換為純黑白）
thresh = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV | cv2.THRESH_OTSU)[1]

# 將上下左右的5個像素轉換為黑色
thresh[-5:5, :] = 0  # 上下方5個像素設為黑色
thresh[:, -5:5] = 0  # 左右側5個像素設為黑色

kernel = np.ones((3, 3))

# 侵蝕
thresh = cv2.erode(thresh, kernel=kernel, iterations=1)
# 膨脹
thresh = cv2.dilate(thresh, kernel=kernel, iterations=1)

# 顯示圖像
# cv2_imshow(thresh)

In [None]:
# def compute_mean_std(dataset):
#     # 計算平均值和標準差
#     mean = torch.zeros(3)
#     std = torch.zeros(3)

#     for images, _ in dataset:
#         mean += torch.mean(images, dim=[1, 2])
#         std += torch.std(images, dim=[1, 2])

#     mean /= len(dataset)
#     std /= len(dataset)

#     return mean, std

# # 創建 ImageFolder 數據集對象
# transform = transforms.Compose([transforms.ToTensor()])
# dataset = ImageFolder(root=TRAIN_PATH, transform=transform)

# # 計算平均值和標準差
# mean, std = compute_mean_std(dataset)

# print("平均值:", mean)
# print("標準差:", std)

In [None]:
class Task3Dataset(Dataset):
    def __init__(self, data, root, return_filename=False):
        self.data = [sample for sample in data if sample[0].startswith("task3")] # 從csv檔案取出
        self.root = root
        self.transform = transforms.Compose([
            transforms.Resize((96, 96)),
            transforms.RandomAffine(30, (0.1, 0.1)),
            transforms.ToTensor()
        ])
        self.return_filename = return_filename
    
    def __getitem__(self, index):
        filename, label = self.data[index]

        # 載入圖像並將其轉換為灰度圖
        image = cv2.imread(f"{self.root}/{filename}")
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

        # 對圖像進行二值化處理（轉換為純黑白）
        thresh = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY_INV | cv2.THRESH_OTSU)[1]

        # 將上下左右的5個像素轉換為黑色
        thresh[-5:5, :] = 0  # 上下方5個像素設為黑色
        thresh[:, -5:5] = 0  # 左右側5個像素設為黑色

        kernel = np.ones((3, 3))

        # 侵蝕
        thresh = cv2.erode(thresh, kernel=kernel, iterations=1)
        # 膨脹
        thresh = cv2.dilate(thresh, kernel=kernel, iterations=1)

        # img = Image.open(f"{self.root}/{filename}").convert("L")

        # 將numpy.ndarray轉換為PIL.Image.Image類型
        pil_image = Image.fromarray(thresh)

        img = self.transform(pil_image)

        label = torch.tensor([alphabets2index[x] for x in list(label)])
        
        if self.return_filename:
          
          return img, filename

        else:

          return img, label

    def __len__(self):
        return len(self.data)

In [None]:
class Model(nn.Module):
    def __init__(self, num_classes):
        super(Model, self).__init__()
        self.feature_extractor = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=16, kernel_size=3, stride=1, padding='same'),
            nn.BatchNorm2d(num_features=16),
            nn.ReLU(),
            nn.Conv2d(16, 16, kernel_size=3, padding='same'),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),

            nn.Conv2d(16, 32, kernel_size=3, padding='same'),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.Conv2d(32, 32, kernel_size=3, padding='same'),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),

            nn.Conv2d(32, 64, kernel_size=3, padding='same'),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, padding='same'),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        
        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),
            nn.Flatten(),
            nn.Linear(64, num_classes)
        )

    def forward(self, x):
        x = self.feature_extractor(x)
        x = self.classifier(x)
        
        return x

# # 建立模型
# model = Model(len(alphabets2index)).to(device)

# from torchsummary import summary

# # 輸出模型摘要
# summary(model, input_size=(1, 96, 96), batch_size=-1, device=device)


In [None]:
train_data, val_data = [], []

# 隨機取樣
with open(f'{TRAIN_PATH}/annotations.csv', newline='') as csvfile:
    for row in csv.reader(csvfile, delimiter=','):
        if random.random() < 0.8:
            train_data.append(row)
        else:
            val_data.append(row)

train_dataset = Task1Dataset(train_data, root=TRAIN_PATH)
train_Loader = DataLoader(train_dataset, batch_size=16, num_workers=2, drop_last=True, shuffle=True)

val_dataset = Task1Dataset(val_data, root=TRAIN_PATH)
val_Loader = DataLoader(val_dataset, batch_size=20, num_workers=2, drop_last=False, shuffle=False)

In [None]:
class EarlyStoppingCallback:
    def __init__(self, patience=10):
        self.patience = patience  # 最大等待改善的訓練輪數
        self.counter = 0  # 紀錄連續沒有改善的輪數
        self.best_accuracy = 0.0  # 目前最佳的準確率
        self.stop_training = False  # 用於停止訓練迴圈

    def __call__(self, model, accuracy):

        if accuracy > self.best_accuracy:  # 檢查目前的準確率是否優於目前最佳準確率
            self.best_accuracy = accuracy  # 更新最佳準確率
            self.counter = 0  # 重設計數器
            torch.save(model.state_dict(), "best_model.pt")  # 儲存模型
            print("已保存最佳模型。")
        else:
            self.counter += 1  # 增加計數器，因為沒有改善

        if self.counter >= self.patience:  # 檢查計數器是否超過等待改善的輪數
            print("觸發提前停止。")
            self.stop_training = True

In [None]:
model = Model(len(alphabets2index)).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
loss_fn = nn.CrossEntropyLoss()

# 創建自定義回調函數實例
callback = EarlyStoppingCallback(patience=10)

for epoch in range(30):
    print(f"Epoch [{epoch}]")
    model.train()

    for image, label in train_Loader:
        image = image.to(device)
        label = label.to(device)

        pred = model(image)

        loss = loss_fn(pred, label)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    model.eval()
        
    sample_count = 0
    correct_count = 0

    for image, label in val_Loader:
        image = image.to(device)
        label = label.to(device)
        
        pred = model(image)
        loss = loss_fn(pred, label)
        
        pred = torch.argmax(pred, dim=1)
        
        sample_count += len(image)
        correct_count += (label == pred).sum()

        accuracy = correct_count / sample_count
        
    print("accuracy (validation):", accuracy)

    # 在每個 epoch 結束後調用檢查準確率
    callback(model, accuracy)

    if callback.stop_training:
      break

In [None]:
test_data = []
with open(f'{TEST_PATH}/../sample_submission.csv', newline='') as csvfile:
    for row in csv.reader(csvfile, delimiter=','):
        test_data.append(row)

test_dataset = Task1Dataset(test_data, root=TEST_PATH, return_filename=True)
test_Loader = DataLoader(test_dataset, batch_size=500, num_workers=4, drop_last=False, shuffle=False)

# 使否存在，否則創建
if os.path.exists('submission.csv'):
    csv_writer = csv.writer(open('submission.csv', 'a', newline=''))
else:
    csv_writer = csv.writer(open('submission.csv', 'w', newline=''))
    csv_writer.writerow(["filename", "label"])


model.eval()
for image, filenames in test_Loader:
    image = image.to(device)
    
    pred = model(image)
    pred = torch.argmax(pred, dim=1)
    
    for i in range(len(filenames)):
        csv_writer.writerow([filenames[i], alphabets[pred[i].item()]])

for filename, _ in test_data:
    if filename.startswith("task2") or filename.startswith("task3"):
        csv_writer.writerow([filename, 0])