# 재구축 데이터셋 Scratch



In [1]:
import torch
import torch.utils.data as data
import torch.nn as nn
import torch.nn.functional as F
from torchvision import transforms, datasets, models
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import TensorDataset
from PIL import Image
import numpy as np
from tqdm import tqdm

In [None]:
# 구축된 .npy파일을 Pytorch DataLoader을 사용할 수 있도록 CUSTOM DATASET을 만듬.
import numpy as np
from google.colab import drive
from sklearn.model_selection import train_test_split

default_path = "/content/drive/MyDrive/인공지능 수업/final/"

CUB200_TYPE_TRAIN = 1
CUB200_TYPE_TEST = 2
CUB200_TYPE_SUBMIT = 3

drive.mount('/content/drive')
class CUB200(data.Dataset):

    def __init__(self, type, transform = None):
        super(CUB200, self).__init__()
        """
        type : int = 1, 2, 3
        """

        # train_data = np.load(default_path + 'train_image.npy')
        # train_label = np.load(default_path + 'train_label.npy')

        original_train_data = np.load(default_path + 'train_image.npy')
        original_train_label = np.load(default_path + 'train_label.npy')

        train_data, test_data, train_label, test_label = train_test_split(
            original_train_data,
            original_train_label,
            test_size = 0.3,
            random_state = 1)
        
        if type == CUB200_TYPE_TRAIN:
          self.image = train_data
          self.label = train_label
        elif type == CUB200_TYPE_TEST:
          self.image = test_data
          self.label = test_label
        elif type == CUB200_TYPE_SUBMIT:
          self.image = np.load(default_path + 'test_image.npy')
          self.label = np.zeros(500)
        
        self.transform = transform

    def __getitem__(self, index):
        img, target = self.image[index], self.label[index]
        img = Image.fromarray(img)

        if self.transform is not None:
            img = self.transform(img)

        return img, target

    def __len__(self):
        return len(self.image)

Mounted at /content/drive


In [None]:
trainCUB = CUB200(CUB200_TYPE_TRAIN)
print(trainCUB.image.shape)
print(trainCUB.label.shape)

testCUB = CUB200(CUB200_TYPE_TEST)
print(testCUB.image.shape)
print(testCUB.label.shape)
print(np.max(testCUB.label), np.min(testCUB.label))

submitCUB = CUB200(CUB200_TYPE_SUBMIT)
print(submitCUB.image.shape)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
# train_data에만 data augmentaion을 적용
transform_train = transforms.Compose([
        transfroms.Rs
        transforms.RandomCrop(224),
        transforms.ToTensor(),
        transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))])

transform_test = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))])

In [None]:
linear1 = nn.Linear(2048, 1024, bias=True)
linear1_1 = nn.Linear(2048, 50, bias=True)
linear1_2 = nn.Linear(2048, 50, bias=True)
linear1_3 = nn.Linear(2048, 50, bias=True)
linear2 = nn.Linear(1024, 50, bias=True)
linear3 = nn.Linear(512, 50, bias=True)
relu = nn.ReLU()

# xavier initialization
nn.init.xavier_uniform_(linear1.weight)
nn.init.xavier_uniform_(linear2.weight)
nn.init.xavier_uniform_(linear3.weight)

In [None]:
# CUSTOM DATASET을 이용하여 train_loader, test_loader을 구축

batch_size = 4

train_loader = torch.utils.data.DataLoader(
    dataset = CUB200(CUB200_TYPE_TRAIN, transform = transform_train),
    batch_size = batch_size,
    shuffle = True
)

test_loader = torch.utils.data.DataLoader(
    dataset = CUB200(CUB200_TYPE_TEST, transform = transform_test),
    batch_size = batch_size,
    shuffle = False
)

submit_loader = torch.utils.data.DataLoader(
    dataset = CUB200(CUB200_TYPE_SUBMIT, transform = transform_test),
    batch_size = batch_size,
    shuffle = False
)

In [None]:
def training_model(model, criterion, optimizer, scheduler, num_epochs = 25):

    for epoch in range(num_epochs):
        scheduler.step()

        running_loss = 0.0
        for i, data in enumerate(train_loader, 0):
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

            if i % 60 == 59:
                print('[%d, %5d] loss: %.7f' %
                    (epoch + 1, (i + 1), running_loss / 20))
                running_loss = 0.0
        
        train_correct = 0
        train_total = 0
        for i, data in enumerate(train_loader, 0):
            inputs, labels = data
            inputs = inputs.squeeze()
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)

            _, predicted = torch.max(outputs.data, 1)
            train_total += labels.size(0)
            train_correct += (predicted == labels).sum().item()

        print('[%d epoch] Accuracy of the network on the train images: %d %%' %
              (epoch + 1, 100 * train_correct / train_total))
        
    print("End Training do it eval_accuracy")
    return model

In [None]:
def eval_accuracy(model):
    class_correct = list(0. for i in range(50))
    class_total = list(0. for i in range(50))

    correct = 0
    total = 0
    
    model.eval()
    with torch.no_grad():
        for i, data in enumerate(test_loader, 0):
            images, labels = data
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            c = (predicted == labels).squeeze()
                    
            for i in range(labels.shape[0]):
                label = labels[i]
                class_correct[label] += c[i].item()
                class_total[label] += 1
                
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

    print('Accuracy of the network on test images: %d %%' % (
        100 * correct / total))            
                
    return 

In [None]:
model_ft = models.resnext101_32x8d(pretrained=True)
for param in model_ft.parameters():
  param.requires_grad = False
print(model_ft)
model_ft.fc = nn.Sequential(
                      linear1
)
print(model_ft)

In [None]:
#0.001: 72퍼
#0.05 = 72퍼
#0.0075 25에폭 : 74퍼
#0.0008 이미지 사이즈 400 70퍼
num_epochs = 25
model_ft.to(device)
criterion = nn.CrossEntropyLoss()
#optimizer = optim.Adam(model_ft.parameters(), lr=0.01, betas=(0.9, 0.999), eps=1e-08, weight_decay=0.3, amsgrad=False)
optimizer = optim.SGD(model_ft.parameters(), lr=0.1, momentum=0.9)
lr_scheduler = optim.lr_scheduler.StepLR(optimizer, step_size = 16, gamma = 0.1)

model_ft = training_model(model_ft, criterion, optimizer, lr_scheduler, num_epochs)

In [None]:
eval_accuracy(model_ft)

In [None]:
import itertools

def get_result(model):
  result=[]
  model.eval()
  with torch.no_grad():
    for i, data in enumerate(submit_loader, 0):
      images, _ = data
      images = images.to(device)
      outputs = model(images)
      _, predicted = torch.max(outputs, 1)
      result.append(predicted.cpu().numpy())
  return list(itertools.chain(*result))

In [None]:
submit_result = get_result(model_ft)

In [None]:
pip install pycryptodomex --no-binary :all:

In [None]:
import json
from base64 import b64encode
from Cryptodome.Cipher import AES
from Cryptodome.Util.Padding import pad

def read_txt(fileName):
    with open(fileName, 'rt') as f:
        list_data = [a.strip('\n\r') for a in f.readlines()]
    return list_data

def write_json(fileName, data):
    with open(fileName, 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=4)

def load_key(key_path):
    with open(key_path, "rb") as f:
        key = f.read()
    return key

def encrypt_data(key_path, ans_list, encrypt_store_path='ans.json'):
    key = load_key(key_path)
    print(key)
    data = " ".join([str(i) for i in ans_list])
    encode_data = data.encode()
    cipher = AES.new(key, AES.MODE_CBC)
    ct_bytes = cipher.encrypt(pad(encode_data, AES.block_size))
    iv = b64encode(cipher.iv).decode('utf-8')
    ct = b64encode(ct_bytes).decode('utf-8')
    write_json(encrypt_store_path, {'iv':iv, 'ciphertext':ct})

if __name__=="__main__":
    # 1.이메일을 통해서 전달 받은 키 파일의 경로 입력
    #key_path = default_path + "team9.pem"
    key_path = "/content/drive/MyDrive/인공지능 수업/team9.pem"
    # 2. 예측한 결과를 텍스트 파일로 저장했을 경우 리스트로 다시 불러오기
    # 본인이 원하는 방식으로 리스트 형태로 예측 값을 불러오기만 하면 됨(순서를 지킬것)
    #raw_ans_path = "ans.txt"
    #ans = read_txt(raw_ans_path)
    #ans에 result 저장한 리스트 넣기
    ans = submit_result
    # 3. 암호화된 파일을 저장할 위치
    encrypt_ans_path = default_path + "ai_4_answer.json"
    # 4. 암호화!(pycrytodome 설치)
    encrypt_data(key_path, ans, encrypt_ans_path)
    print("finished!")

0 0 0 0 ... 50
...
500

np.torch([0 0 0 0 0])

