In [None]:
import warnings
warnings.filterwarnings("ignore")

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
from torchvision import datasets, models, transforms
from torchsummary import summary

import os
import copy
import time
import numpy as np
import matplotlib.pyplot as plt

print("PyTorch Version: ",torch.__version__)
print("Torchvision Version: ",torchvision.__version__)

In [None]:
import os
import cv2
import numpy as np
from matplotlib import pyplot as plt

In [None]:
from google.colab import drive
drive.mount("/gdrive", force_remount=True)

In [None]:
root_dir = "/gdrive/My Drive/CV/c2"

import sys
sys.path.append(root_dir)

In [None]:
data_dir = root_dir + "/recaptcha-dataset/Large"
class_names = ['Bicycle', 'Bridge', 'Bus', 'Car', 
               'Chimney', 'Crosswalk', 'Hydrant', 
               'Motorcycle', 'Palm', 'Traffic Light']

input_size = 224
batch_size = 32     # batch_size 는 본인이 적절하게 설정

# Data augmentation and normalization for training
# Just normalization for validation
data_transforms = transforms.Compose([
        transforms.ToTensor(),
        transforms.RandomResizedCrop(input_size),   # data image resize
        transforms.RandomHorizontalFlip(),      # 좌우반전을 통해 데이터 증강 효과
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

print("Initializing Datasets and Dataloaders...")

image_datasets = datasets.ImageFolder(data_dir, data_transforms)  # your dataset / 데이터셋 다 읽어오는 코드
num_data = len(image_datasets)
indices = np.arange(num_data)
np.random.shuffle(indices)  # 데이터셋 랜덤으로 셔플

train_size = int(num_data*0.8)
train_indices = indices[:train_size]    # 80% 인덱스는 훈련용 인덱스
val_indices = indices[train_size:]      # 20% 인덱스는 validation용 인덱스
train_set = torch.utils.data.Subset(image_datasets, train_indices)
val_set = torch.utils.data.Subset(image_datasets, val_indices)

print('Number of training data:', len(train_set))
print('Number of validation data:', len(val_set))

dataloaders = {'train': torch.utils.data.DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=4), # num_workers 도 본인이 적절하게 설정
                 'val': torch.utils.data.DataLoader(val_set, batch_size=batch_size, shuffle=True, num_workers=4)}

In [None]:
def imshow(imgs, title=None):
    """Display image for Tensor."""
    imgs = imgs.numpy().transpose((1, 2, 0))    # 1 : 길이, 2 : 너비
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    imgs = std * imgs + mean
    imgs = np.clip(imgs, 0, 1)
    plt.imshow(imgs)
    if title is not None:
        plt.title(title)

In [None]:
# Get a batch of training data
inputs, labels = next(iter(dataloaders['train']))
print("inputs.shape:", inputs.shape)    # inputs.shape : [0] : batch, [1] : channel, [2]: 길이 , [3] : 너비
print("labels.shape:", labels.shape)

# Make a grid from batch
out = torchvision.utils.make_grid(inputs[:8])

imshow(out, title=[class_names[x] for x in labels[:8]])

In [None]:
# Get a batch of validation data
inputs, labels = next(iter(dataloaders['val']))
print("inputs.shape:", inputs.shape)
print("labels.shape:", labels.shape)

# Make a grid from batch
out = torchvision.utils.make_grid(inputs[:8])

# imshow(out, title=[class_names[x] for x in labels[:8]])

In [None]:
class Block(nn.Module):
    
    def __init__(self, in_channels, out_channels, identity_downsample=None, stride=1):
        super(Block, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU()
        self.identity_downsample = identity_downsample
        
    def forward(self, x):
        identity = x
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.bn2(x)
        if self.identity_downsample is not None:
            identity = self.identity_downsample(identity)
        x += identity
        x = self.relu(x)
        return x

In [None]:
class ResNet_18(nn.Module):
    
    def __init__(self, image_channels, num_classes):
        
        super(ResNet_18, self).__init__()
        self.in_channels = 64
        self.conv1 = nn.Conv2d(image_channels, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU()
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        
        #resnet layers
        self.layer1 = self.__make_layer(64, 64, stride=1)
        self.layer2 = self.__make_layer(64, 128, stride=2)
        self.layer3 = self.__make_layer(128, 256, stride=2)
        self.layer4 = self.__make_layer(256, 512, stride=2)
        
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512, num_classes)
        
    def __make_layer(self, in_channels, out_channels, stride):
        
        identity_downsample = None
        if stride != 1:
            identity_downsample = self.identity_downsample(in_channels, out_channels)
            
        return nn.Sequential(
            Block(in_channels, out_channels, identity_downsample=identity_downsample, stride=stride), 
            Block(out_channels, out_channels)
        )
        
    def forward(self, x):
        
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        
        x = self.avgpool(x)
        x = x.view(x.shape[0], -1)
        x = self.fc(x)
        return x 
    
    def identity_downsample(self, in_channels, out_channels):
        
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=2, bias=False), 
            nn.BatchNorm2d(out_channels)
        )

In [None]:
model = ResNet_18(image_channels=3, num_classes=10)
summary(model, (3, 224, 224), device='cpu')
# summary(model, (3, 512, 512), device='cpu')

In [None]:
def set_parameter_requires_grad(model, feature_extracting):
    if feature_extracting:
        for param in model.parameters():
            param.requires_grad = False

In [None]:
def initialize_model(model_name, num_classes, feature_extract, use_pretrained=True):
    # Initialize these variables which will be set in this if statement. Each of these
    #   variables is model specific.
    model_ft = None
    input_size = 0

    if model_name == "resnet":
        """ Resnet18
        """
        model_ft = models.resnet18(pretrained=use_pretrained)
        set_parameter_requires_grad(model_ft, feature_extract)
        num_ftrs = model_ft.fc.in_features
        model_ft.fc = nn.Linear(num_ftrs, num_classes)
        input_size = 224

    elif model_name == "alexnet":
        """ Alexnet
        """
        model_ft = models.alexnet(pretrained=use_pretrained)
        set_parameter_requires_grad(model_ft, feature_extract)
        num_ftrs = model_ft.classifier[6].in_features
        model_ft.classifier[6] = nn.Linear(num_ftrs,num_classes)
        input_size = 224

    elif model_name == "vgg":
        """ VGG11_bn
        """
        model_ft = models.vgg11_bn(pretrained=use_pretrained)
        set_parameter_requires_grad(model_ft, feature_extract)
        num_ftrs = model_ft.classifier[6].in_features
        model_ft.classifier[6] = nn.Linear(num_ftrs,num_classes)
        input_size = 224

    elif model_name == "squeezenet":
        """ Squeezenet
        """
        model_ft = models.squeezenet1_0(pretrained=use_pretrained)
        set_parameter_requires_grad(model_ft, feature_extract)
        model_ft.classifier[1] = nn.Conv2d(512, num_classes, kernel_size=(1,1), stride=(1,1))
        model_ft.num_classes = num_classes
        input_size = 224

    else:
        print("Invalid model name, exiting...")
        exit()

    return model_ft, input_size

In [None]:
# Models to choose from [resnet, alexnet, vgg, squeezenet, densenet, inception]
model_name = "resnet"

num_classes = 10
num_epochs = 15

# Flag for feature extracting. When False, we finetune the whole model,
#   when True we only update the reshaped layer params
feature_extract = False

# Initialize the model for this run
model_ft, input_size = initialize_model(model_name, num_classes, feature_extract, use_pretrained=True)

# Print the model we just instantiated
summary(model_ft, (3, 224, 224), device='cpu')

In [None]:
def train_model(model, dataloaders, criterion, optimizer, num_epochs=25):
    since = time.time()

    val_acc_history = []

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    model = model.to(device)
    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data.
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    # Get model outputs and calculate loss
                    outputs = model(inputs)
                    loss = criterion(outputs, labels)

                    _, preds = torch.max(outputs, 1)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)

            print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))

            # deep copy the model
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())
            if phase == 'val':
                val_acc_history.append(epoch_acc)

        print()

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model, val_acc_history

In [None]:
params_to_update = model_ft.parameters()
print("Params to learn:")
if feature_extract:
    params_to_update = []
    for name,param in model_ft.named_parameters():
        if param.requires_grad == True:
            params_to_update.append(param)
            print("\t",name)
else:
    for name,param in model_ft.named_parameters():
        if param.requires_grad == True:
            print("\t",name)

# Observe that all parameters are being optimized
optimizer_ft = optim.SGD(params_to_update, lr=0.001, momentum=0.9)

In [None]:
# # Setup the loss fxn
# criterion = nn.CrossEntropyLoss()

# # Train and evaluate
# model_ft, hist = train_model(model_ft, dataloaders, criterion, optimizer_ft, num_epochs=num_epochs)

In [None]:
# torch.save(model_ft, root_dir + '/resnet18_ft.pt')      # 그래서 이거 씀

In [None]:
# 저장한 모델 불러와서 사용

model_ft = torch.load(root_dir + '/resnet18_ft.pt')
modules = list(model_ft.children())[:-1]
resnet18_feat = nn.Sequential(*modules)
for p in resnet18_feat.parameters():
    p.requires_grad = False

for inputs, labels in dataloaders['val']:
    inputs = inputs.to(device)
    h = resnet18_feat(inputs)   # 히든 값의 특징값 
#     # print(h.shape)      # [32, 512, 1, 1] : 출력의 사이즈 [배치, 채널, 길이, 너비]
#     # 내가 해야하는건 512와 labels를 각각 대응해서 저장하는게 우리가 할 일

#     '''
#     code:
#     save the (features, labels)
#     '''
    data_features = []
    data_labels = []

    h = h.view(h.size(0), -1)
    data_features.extend(h.detach().cpu().numpy())
    data_labels.extend(labels.numpy())

data_features = np.array(data_features)
data_labels = np.array(data_labels)


In [None]:
# save data_features

np.save(root_dir + '/data_features.npy', data_features)
np.save(root_dir + '/data_labels.npy', data_labels)

In [None]:
# task 1

from sklearn.neighbors import KNeighborsClassifier
from joblib import dump, load

# KNN classifier 객체 생성
knn = KNeighborsClassifier(n_neighbors=3)

# Classifier 학습
knn.fit(data_features, data_labels)

# 이후에 테스트 이미지의 피처를 추출하고 예측을 수행할 수 있습니다:
# test_features는 테스트 이미지의 피처입니다.
# test_preds = knn.predict(test_features)



# knn 모델 저장
dump(knn, root_dir + '/knn_model_task1.joblib') 



In [None]:
from PIL import Image
from torchvision import transforms

# 이미지 전처리를 위한 transform을 정의합니다.
transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Query 이미지 경로
query_dir = root_dir + '/query'

# Query 폴더에서 이미지를 읽어옵니다.
query_images = os.listdir(query_dir)

In [None]:
import torch.nn.functional as F

labels = ['Bicycle', 'Bridge', 'Bus', 'Car', 'Chimney', 'Crosswalk', 'Hydrant', 'Motorcycle', 'Palm', 'Traffic Light']
test_features = []
predict_labels = []

for img_name in query_images:
    # 이미지를 불러옵니다.
    img_path = os.path.join(query_dir, img_name)
    img = Image.open(img_path).convert("RGB")

    # 전처리를 적용합니다.
    img = transform(img)

    # 배치 차원을 추가합니다. (PyTorch 모델은 입력으로 4D 텐서를 기대합니다)
    img = img.unsqueeze(0).to(device)

    # 이미지 피처를 뽑습니다.
    features = resnet18_feat(img)

    # print(features)

    # 평균 풀링을 적용합니다.
    features = F.adaptive_avg_pool2d(features, (1, 1))

    # 피처를 1D로 flatten합니다.
    features = features.view(features.size(0), -1).cpu().detach().numpy()

    # # KNN 모델을 불러옵니다.
    # predict_labels = knn.predict(features)

    test_features.append(features)
    
# predict_labels = knn.predict(test_features[:2])
# print(labels[predict_labels])



knn = load(root_dir + '/knn_model_task1_2.joblib')

predict_labels = []

for feature in test_features:
    predict_label = knn.predict(feature)
    predict_labels.append(labels[predict_label[0]])

print(predict_labels)



In [None]:
# task 1

import csv

with open(root_dir + '/c2_t1_a2.csv', 'w') as file:
    write = csv.writer(file)
    for i, predict_label in enumerate(predict_labels):
        write.writerow([f'query{i+1}.png', predict_label])