#### 필요한 라이브러리 import하기

In [None]:
import torch
import torch.nn as nn
import numpy as np
import torch.nn.functional as F
import matplotlib.pyplot as plt
import matplotlib.image as img

from torch.optim.lr_scheduler import StepLR
from torch.utils.data import DataLoader, Dataset, Subset
from torchvision import datasets, transforms

####데이터 불러오기

In [None]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("akash2sharma/tiny-imagenet")

print("Path to dataset files:", path)

In [None]:
ls

In [None]:

BATCH_SIZE = 128

transform = transforms.Compose([
        transforms.ToTensor(),
    transforms.Resize(size =(224,224)),
    # # 이미지 해상도 맞추기
    #transforms.Resize(size= (256)),
    # transforms.CenterCrop(256),
   #MeanSubstractionTransform()
    ]
)

train_dir = path + '/tiny-imagenet-200/tiny-imagenet-200/train'
test_dir = path + '/tiny-imagenet-200/tiny-imagenet-200/test'


In [None]:
train_dataset = datasets.ImageFolder(train_dir, transform=transform)

train_dataloader = DataLoader(train_dataset, batch_size = BATCH_SIZE, shuffle = True)

test_dataset = datasets.ImageFolder(test_dir, transform= transform)
test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle = False)

In [None]:
train_dataset[0][0].shape # torch.Size([3, 224, 224])
plt.imshow(train_dataset[0][0].permute(1,2,0)) # C,H,W를 H,W,C순서로 바꿔주기

In [None]:
len(train_dataloader.dataset) # 100000

In [None]:
def show_img(img):
    plt.imshow(img)

In [None]:
train_dataset[0][0].shape # torch.Size([3, 224, 224]), 사이즈가 잘 변경됨.

#### 채널별 평균값을 구하고, mean substraction 수행하기

In [None]:
# 사용자 정의 transform 만들기

class MeanSubstractionTransform:
    def __call__(self,img):
        print(img.shape)
        mean = img.mean(dim=[1,2])  # 각 채널별 행, 열의 평균을 구할 수 있다 1행, 3열 형태이다.
        img -= mean.view(3,1,1) # mean을 C : 3개 , H : 1, W : 1의 3x1x1 형태로 바꿔주고 원본 이미지에서 빼기 연산을 수행해준다.

        return img

In [None]:
img = torch.randn(3, 224, 224)
mean = img.mean(dim=[1,2])  # shape: [3]
print(mean)
mean.view(3,1,1)

#### 신경망 정의하기 (Alexnet)

In [None]:
class Alexnet(nn.Module):
    def __init__(self):
        super().__init__()
        dropout = nn.Dropout(p=0.5)
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=96,kernel_size=(11,11),stride=4)
        self.conv2= nn.Conv2d(in_channels=96, out_channels=256, kernel_size=(5,5))
        self.conv3= nn.Conv2d(in_channels=256, out_channels=384, kernel_size=(3,3),padding=1)
        self.conv4= nn.Conv2d(in_channels=384, out_channels=384, kernel_size=(3,3),padding=1)
        self.conv5= nn.Conv2d(in_channels=384, out_channels=256, kernel_size=(3,3),padding = 1)

        self.classifier = nn.Sequential(
            nn.Linear(in_features=4096, out_features=4096),
            nn.ReLU(),
            nn.Dropout(p=0.5),
            nn.Linear(in_features = 4096, out_features=4096),
            nn.ReLU(),
            nn.Dropout(p=0.5),
            nn.Linear(in_features = 4096, out_features=1000),
        )


    def forward(self,x):
        # ---- Conv1 ----
        # ReLU
        out = F.relu(self.conv1(x))
        #  Response Normalization 적용하기
        lrn = nn.LocalResponseNorm(size = 5, alpha=10**(-4),beta=0.75,k = 2)
        out = lrn(out)
        # MaxPooling 적용 (window = 3, stride = 2)
        out = F.max_pool2d(input = out,kernel_size=3, stride=2)


        # ---- Conv2 ----
        out = F.relu(self.conv2(out))
        #  Response Normalization 적용하기
        out = lrn(out)

        # MaxPooling 적용 (window = 3, stride = 2)
        out = F.max_pool2d(input = out,kernel_size=3, stride=2)

        # ---- Conv3 ----
        out = F.relu(self.conv3(out))
        out = F.max_pool2d(input = out, kernel_size = 3, stride= 2 )

        # ---- Conv4 ----
        out = F.relu(self.conv4(out))

        # ---- Conv5 ----
        out = F.relu(self.conv5(out))

        # ---- Flatten ----

        out = torch.flatten(out,1)

        # ---- Fully Connected layer ----
        logits = self.classifier(out)

        # ---- softmax ----
        probs = F.softmax(logits, dim=1)

        return logits, probs


In [None]:
model = Alexnet()

model

####  Local Response Normalization 함수 구현하기 -> 라이브러리에서 제공

In [None]:
data = torch.randn(3,4,2)
print(f'unsqueeze 전 {data.shape}')
size = 5

data0 = data.unsqueeze(0)
data1 = data.unsqueeze(1)

print(f'data0 : {data0.shape}')
print(f'data1 : {data1.shape}')

data0 = F.pad(data0, (0,0,size//2, (size-1)//2))
data1 = F.pad(data1, (0,0,size//2, (size-1)//2))

data0 = F.avg_pool2d(data0, (size,1),stride = 1)
data1 = F.avg_pool2d(data1, (size,1),stride = 1)

print(f'avg_pool 후 data0 {data0}')

print(f'avg_pool 후 data1 {data1}')

data0 = data0.squeeze(0)

print(f'squeeze 후 data0 :{data0}')
print(f'squeeze 후 data0 : {data0.shape}')

In [None]:
input = torch.randn(4,4)

print(input)

input =input.view(2,8).unsqueeze(0)

print(input.shape)

input = input.squeeze(0)
print(input)
print(input.shape)


#### 손실 함수 & 옵티마이저 정의

In [None]:
model = Alexnet()

EPOCH = 10
LEARNING_RATE = 1e-5

loss_fn = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(),lr=LEARNING_RATE,momentum=0.9,weight_decay=0.0005)


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

schedular = StepLR(optimizer=optimizer, step_size=1, gamma=0.0001)

In [None]:
len(train_dataloader.dataset) # 100000

#### 학습하기

In [None]:
def train_model(model, dataloader):
    model.to(device)
    model.train()
    running_loss = 0.0
    last_loss = 0.0

    for batch , (data,label) in enumerate(dataloader):
        print(f'train batch :{batch}')

        data = data.to(device)
        label = label.to(device)

        optimizer.zero_grad()

        logits,probs = model(data)
        loss = loss_fn(logits,label)

       # print(type(loss))  # tensor(6.9069, grad_fn=<NllLossBackward0>) 이렇게 나온다. loss값만 이용하려면 loss.item()을 써주면 된다.


        loss.backward()
        optimizer.step()

       # print(f'loss : {loss.item()}')

        running_loss += loss.item()

        if batch % 10 == 0: # 데이터 개수가 10000
            last_loss  = running_loss / (batch+1)
            print(f'train batch {batch} loss {last_loss}')
            running_loss = 0.0

    return last_loss



In [None]:
def validate(model, dataloader):

    model.to(device)
    model.eval()
    running_loss = 0.0
    with torch.no_grad():
        for batch, (data, label) in enumerate(dataloader):
            data = data.to(device)
            label = label.to(device)
            logits, probs = model(data)
            loss = loss_fn(logits, label)
            running_loss += loss.item()

            if (batch % 10) == 0:
                print(f'validation batch {batch}, loss {running_loss / (batch+1)}')
                running_loss = 0.0

In [None]:
for epoch in range(EPOCH):
    print(f'Epoch : {epoch}')

    train_model(model, train_dataloader)
    validate(model, test_dataloader)

In [None]:
torch.save(model.state_dict(), './model')

In [None]:
model =  Alexnet()
model.load_state_dict(torch.load('./model', weights_only=True))
model.eval()