In [None]:
# 깃허브에서 데이터셋 다운로드하기
!git clone https://github.com/ndb796/Scene-Classification-Dataset
# 폴더 안으로 이동
%cd Scene-Classification-Dataset

In [None]:
import os
import pandas as pd


path = 'train-scene classification/'

# 전체 이미지 개수 출력하기
file_list = os.listdir(path + 'train/')
print('전체 이미지의 개수:', len(file_list))

# 학습 이미지 확인하기
data = pd.read_csv(path + 'train.csv')
print('학습 이미지의 개수:', len(data))
print('학습 이미지별 클래스 정보')
data.head()

In [None]:
from sklearn.model_selection import train_test_split
X_train,X_val=train_test_split(data,test_size=0.2,random_state=42)

In [None]:
X_val,X_test=train_test_split(X_val,test_size=0.5,random_state=42)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
import torch.optim as optim
import torchvision
from torch.utils.data import DataLoader, Dataset
from skimage import io, color
import pandas as pd
import os
from tqdm.notebook import tqdm

In [None]:
import os
import pandas as pd
from torchvision.io import read_image
from torch.utils.data import Dataset,DataLoader

class CustomImageDataset(Dataset):
    def __init__(self, labels, img_dir, transform=None, target_transform=None):
        self.img_labels = labels
        self.img_dir = img_dir
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.img_labels)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.img_labels.iloc[idx, 0])
        image = read_image(img_path)
        label = self.img_labels.iloc[idx, 1]
        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            label = self.target_transform(label)
        return [image, label]

In [None]:
from torchvision import transforms
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize(size=(224,224)),
    transforms.AugMix(),
    transforms.RandAugment(),
    transforms.ToTensor(), 
#     transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)), # 이미지 정규화
])

test_transform=transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize(size=(224,224)),
    transforms.ToTensor(), 
])

In [None]:
train_dataset = CustomImageDataset(X_train,"/content/Scene-Classification-Dataset/train-scene classification/train",transform=transform )
train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True)

In [None]:
val_dataset = CustomImageDataset(X_val,"/content/Scene-Classification-Dataset/train-scene classification/train",transform=test_transform )
val_dataloader = DataLoader(val_dataset, batch_size=64, shuffle=True)

In [None]:
test_dataset = CustomImageDataset(X_test,"/content/Scene-Classification-Dataset/train-scene classification/train" ,transform=test_transform )
test_dataloader = DataLoader(test_dataset, batch_size=256, shuffle=True)

In [None]:
# import resnet
import torchvision.models.resnet as resnet
import torch.nn as nn
import torch.optim as optim

# 미리 정의
conv1x1=resnet.conv1x1
Bottleneck = resnet.Bottleneck
BasicBlock= resnet.BasicBlock

In [None]:
class ResNet(nn.Module):

    def __init__(self, block, layers, num_classes=1000, zero_init_residual=True):
        super(ResNet, self).__init__()
        self.inplanes = 32 # conv1에서 나올 채널의 차원 -> 이미지넷보다 작은 데이터이므로 32로 조정

        # inputs = 3x224x224 -> 3x128x128로 바뀜
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1, bias=False) # 마찬가지로 전부 사이즈 조정
        self.bn1 = nn.BatchNorm2d(32)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        
        self.layer1 = self._make_layer(block, 32, layers[0], stride=1) # 3 반복
        self.layer2 = self._make_layer(block, 64, layers[1], stride=2) # 4 반복
        self.layer3 = self._make_layer(block, 128, layers[2], stride=2) # 6 반복
        self.layer4 = self._make_layer(block, 256, layers[3], stride=2) # 3 반복
        
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(256 * block.expansion, num_classes)

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

        # Zero-initialize the last BN in each residual branch,
        # so that the residual branch starts with zeros, and each residual block behaves like an identity.
        # This improves the model by 0.2~0.3% according to https://arxiv.org/abs/1706.02677
        if zero_init_residual:
            for m in self.modules():
                if isinstance(m, Bottleneck):
                    nn.init.constant_(m.bn3.weight, 0)
                elif isinstance(m, BasicBlock):
                    nn.init.constant_(m.bn2.weight, 0)

    def _make_layer(self, block, planes, blocks, stride=1): # planes -> 입력되는 채널 수
        downsample = None
        if stride != 1 or self.inplanes != planes * block.expansion: 
            downsample = nn.Sequential(
                conv1x1(self.inplanes, planes * block.expansion, stride),
                nn.BatchNorm2d(planes * block.expansion),
            )

        layers = []
        layers.append(block(self.inplanes, planes, stride, downsample))
        self.inplanes = planes * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.inplanes, planes))

        return nn.Sequential(*layers)

    def forward(self, x):
        # input [32, 128, 128] -> [C ,H, W]
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        #x.shape =[32, 64, 64]

        x = self.layer1(x)
        #x.shape =[128, 64, 64]
        x = self.layer2(x)
        #x.shape =[256, 32, 32]
        x = self.layer3(x)
        #x.shape =[512, 16, 16]
        x = self.layer4(x)
        #x.shape =[1024, 8, 8]
        
        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)

        return x

In [None]:
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")

In [None]:
model = ResNet(resnet.Bottleneck, [3, 4, 6, 3], 6, True).to(device)

In [None]:
def check_accuracy(loader, model):
    num_correct = 0
    num_samples = 0
    # Don't forget to toggle to eval mode!
    model.eval()
    
    with torch.no_grad():
        for data, targets in tqdm(loader):
            data = data.to(device)
            targets = targets.to(device)
            scores = model(data)
            _, predictions = scores.max(1)
            num_correct += (predictions == targets).sum()
            num_samples += predictions.size(0)
        print("Correct: {}, Total: {}, Accuracy: {}".format(num_correct, num_samples, int(num_correct) / int(num_samples)))
    # Don't forget to toggle back to model.train() since you're done with evaluation
    model.train()

In [None]:
if __name__ == '__main__':
    LEARNING_RATE = 0.0001
    # You could try playing around with the batch size(say 16) and learning rate(say 0.001) for faster convergence.
    BATCH_SIZE = 64
    EPOCHS = 10
    device = "cuda" if torch.cuda.is_available() else "cpu"

    transform_img = transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize((224, 224)),
        transforms.ToTensor()
    ])

    

    
    model = model
    #model.to(device)
    # change the output layer to 10 classes

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE) 
    torch.cuda.empty_cache()   
    data, targets = next(iter(train_dataloader))
    for epoch in tqdm(range(EPOCHS)):
        losses = []
        with tqdm(total=len(train_dataloader)) as pbar:
            for batch_idx, (data, targets) in enumerate(train_dataloader):
                data = data.to(device=device)
                targets = targets.to(device=device)

                # backprop
                optimizer.zero_grad()


                scores = model(data)
                loss = criterion(scores, targets)
                losses.append(loss)

                
                loss.backward()
                optimizer.step()
#                 print(loss.item())
                pbar.update(1)
        print("Cost at epoch {} is {}".format(epoch, sum(losses) / len(losses)))
        check_accuracy(train_dataloader, model)
        check_accuracy(val_dataloader, model)

In [None]:
# 깃허브에서 데이터셋 다운로드하기
!git clone https://github.com/ndb796/Scene-Classification-Dataset
# 폴더 안으로 이동
%cd Scene-Classification-Dataset

In [None]:
import os
import pandas as pd


path = 'train-scene classification/'

# 전체 이미지 개수 출력하기
file_list = os.listdir(path + 'train/')
print('전체 이미지의 개수:', len(file_list))

# 학습 이미지 확인하기
data = pd.read_csv(path + 'train.csv')
print('학습 이미지의 개수:', len(data))
print('학습 이미지별 클래스 정보')
data.head()

In [None]:
from sklearn.model_selection import train_test_split
X_train,X_val=train_test_split(data,test_size=0.2,random_state=42)

In [None]:
X_val,X_test=train_test_split(X_val,test_size=0.5,random_state=42)

In [None]:
import os
import pandas as pd
from torchvision.io import read_image
from torch.utils.data import Dataset,DataLoader

class CustomImageDataset(Dataset):
    def __init__(self, labels, img_dir, transform=None, target_transform=None):
        self.img_labels = labels
        self.img_dir = img_dir
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.img_labels)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.img_labels.iloc[idx, 0])
        image = read_image(img_path)
        label = self.img_labels.iloc[idx, 1]
        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            label = self.target_transform(label)
        return image.float(), label

In [None]:
from torchvision import transforms
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize(size=(224,224)),
    transforms.AugMix(),
    transforms.RandAugment(),
    transforms.ToTensor(), 
#     transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)), # 이미지 정규화
])

test_transform=transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize(size=(224,224)),
    transforms.ToTensor(), 
])

In [None]:

train_dataset = CustomImageDataset(X_train,"/content/Scene-Classification-Dataset/train-scene classification/train",transform=transform )
train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True)

In [None]:
val_dataset = CustomImageDataset(X_val,"/content/Scene-Classification-Dataset/train-scene classification/train",transform=test_transform )
val_dataloader = DataLoader(val_dataset, batch_size=64, shuffle=True)

In [None]:
test_dataset = CustomImageDataset(X_test,"/content/Scene-Classification-Dataset/train-scene classification/train" ,transform=test_transform )
test_dataloader = DataLoader(test_dataset, batch_size=64, shuffle=True)

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim import lr_scheduler
import torchvision
from torchvision import datasets, models, transforms
import matplotlib.pyplot as plt
import time
import os
import copy




use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")

In [None]:
# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)
# Hyper-parameters
num_epochs = 10
learning_rate = 0.001


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
import torch.optim as optim
import torchvision
from torch.utils.data import DataLoader, Dataset
from skimage import io, color
import pandas as pd
import os
from tqdm.notebook import tqdm

In [None]:
# Takes inputs with dims = (N, C, *)
# Gives outputs with dimes = (N, C, *)
class LocalResponseNormalization(nn.Module):
    def __init__(self, neighbourhood_length, normalisation_const_alpha, contrast_const_beta, noise_k):
        super(LocalResponseNormalization, self).__init__()
        self.nbd_len = neighbourhood_length
        self.alpha = normalisation_const_alpha
        self.beta = contrast_const_beta
        self.k = noise_k
    
    # The following is exactly what pytorch does under the hood as well. I only replicated it for my understanding :)
    def forward(self, x):
        # Lets validate if x is atleast 3 dimensional
        dim = x.dim()
        if dim < 3:
            raise ValueError("Expected tensor of atleast 3 dimensions, found only {}".format(dim))
        denom = x.pow(2).unsqueeze(1)
        if dim == 3:
            denom = F.pad(denom, (0, 0, self.nbd_len // 2, (self.nbd_len - 1) // 2))
            denom = F.avg_pool2d(denom, (self.nbd_len, 1), stride=1)
            denom = denom.squeeze(1)
        else:
            sizes = x.size()
            # The last two dimensions make up a single channel. The third dimension decides the number of channels
            # across which we will apply local response normalization.
            denom = denom.view(sizes[0], 1, sizes[1], sizes[2], -1)
            # The point is to pad in front and back of the channels across which we'll apply normalization
            denom = F.pad(denom, (0, 0, 0, 0, self.nbd_len // 2, (self.nbd_len - 1) // 2))
            denom = F.avg_pool3d(denom, (self.nbd_len, 1, 1), stride=1)
            denom = denom.squeeze(1).view(sizes)
        denom = denom.mul(self.alpha).add(self.k).pow(self.beta)
        return x.div(denom)

In [None]:
# Expects input tensor to be of dimensions (batch_size, 3, 224, 224)
class Alexnet(nn.Module):
    def __init__(self):
        super(Alexnet, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=96, kernel_size=11, stride=4, padding=2)
        self.conv2 = nn.Conv2d(in_channels=96, out_channels=256, kernel_size=5, stride=1, padding=2)
        self.conv3 = nn.Conv2d(in_channels=256, out_channels=384, kernel_size=3, stride=1, padding=1)
        self.conv4 = nn.Conv2d(in_channels=384, out_channels=384, kernel_size=3, stride=1, padding=1)
        self.conv5 = nn.Conv2d(in_channels=384, out_channels=256, kernel_size=3, stride=1, padding=1)
        self.fc1 = nn.Linear(in_features=256 * 6 * 6, out_features=4096)
        self.fc2 = nn.Linear(in_features=4096, out_features=4096)
        self.fc3 = nn.Linear(in_features=4096, out_features=6)
        self.max_pool = nn.MaxPool2d(kernel_size=3, stride=2)
        # This layer helps us avoid calculating output map size when feeding into a linear layer in PyTorch.
        self.adaptive_pool = nn.AdaptiveAvgPool2d(output_size=(6, 6))
        self.norm = LocalResponseNormalization(neighbourhood_length=5, normalisation_const_alpha=1e-4, contrast_const_beta=0.75, noise_k=1.0)
        self.dropout = nn.Dropout()
    
    def forward(self, x):
        x = self.max_pool(self.norm(F.relu(self.conv1(x))))
        x = self.max_pool(self.norm(F.relu(self.conv2(x))))
        x = F.relu(self.conv3(x))
        x = F.relu(self.conv4(x))
        x = self.adaptive_pool(self.norm(F.relu(self.conv5(x))))
        x = torch.flatten(x, 1)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.dropout(x)
        x = self.fc3(x)
        return x

In [None]:
model = Alexnet().to(device)

In [None]:
# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)
exp_lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

In [None]:
def train_model(model, criterion,train,val, optimizer, scheduler, num_epochs=25):
    since = time.time()

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0
    
    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)

        # 각 에폭(epoch)은 학습 단계와 검증 단계를 갖습니다.
        model.train()  # 모델을 학습 모드로 설정
                

        running_loss = 0.0
        running_corrects = 0
        test_loss=0.0
        test_corrects=0
        # 데이터를 반복
        for inputs, labels in train:
            #print(inputs.size())
            inputs = inputs.to(device)
            labels = labels.to(device)

                # 매개변수 경사도를 0으로 설정
            optimizer.zero_grad()

                # 순전파
                # 학습 시에만 연산 기록을 추적
            
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            loss = criterion(outputs, labels)

                # 학습 단계인 경우 역전파 + 최적화
                    
            loss.backward()
            optimizer.step()

            # 통계
            running_loss += loss.item() 
            running_corrects += torch.sum(preds == labels.data)/len(labels)
            #print(len(labels),torch.sum(preds == labels.data))
            #print(running_loss,running_corrects)
           
            scheduler.step()
        #print(len(train))
        epoch_loss = running_loss / len(train)
        epoch_acc = running_corrects.double() / len(train)

        # 모델을 평가 모드로 설정
        print('{} Loss: {:.4f} Acc: {:.4f}'.format(
                "train", epoch_loss, epoch_acc*100))
        model.eval()

        for inputs, labels in val:
   
            inputs = inputs.to(device)
            labels = labels.to(device)


                # 순전파
                # 학습 시에만 연산 기록을 추적
            
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            loss = criterion(outputs, labels)
            test_loss += loss.item()
            test_corrects += torch.sum(preds == labels.data)/len(labels)
        epoch_loss = test_loss / len(val)
        epoch_acc = test_corrects.double() /len(val)

           # 모델을 평가 모드로 설정
        print('{} Loss: {:.4f} Acc: {:.4f}'.format(
                "Val", epoch_loss, epoch_acc*100))
            # 모델을 깊은 복사(deep copy)함
        if epoch_acc > best_acc:
            best_acc = epoch_acc
            best_model_wts = copy.deepcopy(model.state_dict())


    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(
        time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))

    # 가장 나은 모델 가중치를 불러옴
    model.load_state_dict(best_model_wts)
    return model

In [None]:
model_fit = train_model(model, criterion, train_dataloader,val_dataloader, optimizer, exp_lr_scheduler,
                       num_epochs)