<a href="https://colab.research.google.com/github/BaBa0525/machine-learning/blob/dev/HW5/hw5.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Import package

In [1]:
import csv
import cv2
import numpy as np
import random
import os

from tqdm import tqdm

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

## Mount Google Drive

In [2]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [3]:
%cd /content/drive/MyDrive/Colab\ Notebooks/ML-HW5
!ls

/content/drive/MyDrive/Colab Notebooks/ML-HW5
captcha-hacker.zip  hw5.ipynb  input  submission.csv


In [4]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

cpu


## Check dataset

In [None]:
# import os

# for dirname, _, filenames in os.walk('./input'):
#     for filename in filenames[:3]:
#         print(os.path.join(dirname, filename))
#     if len(filenames) > 3:
#         print("...")


In [5]:
TRAIN_PATH = "./input/train"
TEST_PATH = "./input/test"

## Dataset Class

In [6]:
class TaskDataset(Dataset):
  def __init__(self, data, root, task_type, return_filename=False):
    self.data = [sample for sample in data if sample[0].startswith(task_type)]
    self.return_filename = return_filename
    self.root = root

  def __getitem__(self, index):
    filename, label = self.data[index]
    img = cv2.imread(f"{self.root}/{filename}")
    img = cv2.resize(img, (32, 32))
    img = np.mean(img, axis=2)
    if self.return_filename:
      return torch.FloatTensor((img - 128) / 128), filename

    return torch.FloatTensor((img - 128) / 128), int(label)

  def __len__(self):
    return len(self.data)
    

## Model Class

In [7]:
class Model(nn.Module):
  def __init__(self, out_feature):
    super().__init__()
    self.layers = nn.Sequential(
        nn.Linear(1024, 512),
        nn.LeakyReLU(),
        nn.Linear(512, out_features=out_feature)
    )

  def forward(self, x):
    b, h, w = x.shape
    x = x.view(b, h*w)
    return self.layers(x)

## Process training data

In [10]:
train_data = []
val_data = []

with open(f"{TRAIN_PATH}/annotations.csv", newline="") as csvfile:
  for row in csv.reader(csvfile, delimiter=","):
    if random.random() < 0.7:
      train_data.append(row)
    else:
      val_data.append(row)


In [11]:
train_dl = []
val_dl = []

for i in range(1, 4):
  train_ds = TaskDataset(train_data, TRAIN_PATH, task_type=f"task{i}")
  train_dl.append(DataLoader(train_ds, batch_size=500, drop_last=True, shuffle=True))

## Training

In [13]:
def train_model(n_epoch: int, model: Model, optimizer: torch.optim.Adam, train_dl: DataLoader, val_dl: DataLoader):
  for epoch in tqdm(range(n_epoch)):
    print(f"Epoch: [{epoch}]")

    model.train()
    for image, label in train_dl:
      image = image.to(device)
      label = label.to(label)

      pred = model(image)
      loss = loss_fn(pred, label)

      optimizer.zero_grad()
      loss.backward()
      optimizer.step()

    sample_count = 0
    correct_count = 0

    model.eval()
    for image, label in val_dl:
      image = image.to(device)
      label = label.to(label)

      pred = model(image)
      loss = loss_fn(pred, label)

      pred = torch.argmax(pred, dim=1)

      sample_count += len(image)
      correct_count += (label == pred).sum()

    print(f"accuracy(validation): {correct_count / sample_count}")

In [14]:
models = [ Model().to(device) for _ in range(3) ]
optimizers = [ torch.optim.Adam(params=models[i].parameters(), lr = 1e-3) for i in range(3) ]
loss_fn = nn.CrossEntropyLoss()

for task in range(3):
  train_model(100, models[i], optimizers[i], train_dl[i], val_dl[i])

TypeError: ignored

## Output prediction

In [None]:
test_data = []

with open(f"{TEST_PATH}/../sample_submission.csv", newline="") as csvfile:
  for row in csv.reader(csvfile, delimiter=","):
    test_data.append(row)

test_ds = Task1Dataset(test_data, TEST_PATH, return_filename=True)
test_dl = DataLoader(test_ds, batch_size=500, num_workers=2, drop_last=False, shuffle=False)

if os.path.exists("submission.csv"):
  fo = open("submission.csv", mode="a", newline="")
  csv_writer = csv.writer(fo)
else:
  fo = open("submission.csv", mode="a", newline="")
  csv_writer = csv.writer(fo)
  csv_writer.writerow(["filename", "label"])

model.eval()
for image, filenames in test_dl:
  image = image.to(device)

  pred = model(image)
  pred = torch.argmax(pred, dim=1)

  for ind, filename in enumerate(filenames):
    if (len(filename) < 3):
      print(filename)
    csv_writer.writerow([filename, str(pred[ind].item())])



In [None]:
for filename, _ in test_data:
  if filename.startswith("task2") or filename.startswith("task3"):
    # print(filename)
    csv_writer.writerow([filename, 0])

fo.close()

In [None]:
!ls

captcha-hacker.zip  hw5.ipynb  input  submission.csv
