In [21]:
import pandas as pd
import numpy as np

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchmetrics.functional as metrics
from torch.utils.data import Dataset, DataLoader, random_split
import cv2
import matplotlib.pyplot as plt
from torchvision import transforms
from torchvision.datasets import ImageFolder
import tqdm
from image_sequence_dataset import ImageSequenceDataset



In [22]:
image_path = 'Image/'


In [23]:
trans = transforms.Compose([
                    transforms.Resize((64,32)),
                    transforms.ToTensor(),
                    transforms.Normalize((0.5,0.5,0.5),(0.5,0.5,0.5))
                    ])

In [24]:
class ImageSequenceDataset(Dataset):
    def __init__(self, dataset, seq_length=5):
        self.dataset = dataset
        self.seq_length = seq_length

    def __len__(self):
        return len(self.dataset) - self.seq_length

    def __getitem__(self, idx):
        images = [self.dataset[idx+i][0] for i in range(self.seq_length)]
        labels = self.dataset[idx + self.seq_length - 1][1]  # Label of the last image in the sequence
        return torch.stack(images), labels

In [25]:
datasets = ImageFolder(root=image_path, transform = trans)
datasets

Dataset ImageFolder
    Number of datapoints: 756977
    Root location: Image/
    StandardTransform
Transform: Compose(
               Resize(size=(64, 32), interpolation=bilinear, max_size=None, antialias=True)
               ToTensor()
               Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5))
           )

# CNN 일 때 DS / DL


In [26]:
# seed_gen=torch.Generator().manual_seed(42)

# trainDS, validDS, testDS = random_split(datasets,
#                                         [0.7, 0.1, 0.2],
#                                         generator=seed_gen)

In [27]:
# len(trainDS), len(validDS), len(testDS)


In [28]:
# TRAIN_DL = DataLoader(trainDS, batch_size=9)
# VALID_DL = DataLoader(validDS, batch_size=9)
# TEST_DL = DataLoader(testDS, batch_size=9)


# CRNN일 때 DS / DL


In [29]:
seq_length = 5  # 시퀀스 길이 설정
total_count = len(datasets)
train_count = int(total_count * 0.7) - (int(total_count * 0.7) % seq_length)
valid_count = int(total_count * 0.1) - (int(total_count * 0.1) % seq_length)
test_count = total_count - train_count - valid_count

# 데이터셋 분할
trainDS, validDS, testDS = random_split(datasets, [train_count, valid_count, test_count], generator=torch.Generator().manual_seed(42))


In [30]:


# Adjust DataLoader and Dataset
seq_dataset = ImageSequenceDataset(datasets)
train_loader = DataLoader(seq_dataset, batch_size=9, shuffle=True)


In [31]:
train_seqDS = ImageSequenceDataset(trainDS, seq_length=seq_length)
valid_seqDS = ImageSequenceDataset(validDS, seq_length=seq_length)
test_seqDS = ImageSequenceDataset(testDS, seq_length=seq_length)


In [32]:
TRAIN_DL = DataLoader(train_seqDS, batch_size=9, shuffle=True)
VALID_DL = DataLoader(valid_seqDS, batch_size=9, shuffle=False)
TEST_DL = DataLoader(test_seqDS, batch_size=9, shuffle=False)


In [33]:
device = 'mps'


# 원본 사이즈 모델

In [34]:


# class CNN(nn.Module):
#     def __init__(self, num_classes):
#         super(CNN, self).__init__()
#         self.conv1 = nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, padding=1)
#         self.relu1 = nn.ReLU()
#         self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)  # Output size: (32, 36, 64)

#         self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1)
#         self.relu2 = nn.ReLU()
#         self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)  # Output size: (64, 18, 32)

#         self.fc1 = nn.Linear(64 * 18 * 32, 128)  #After pool2: torch.Size([9, 64, 18, 32]) 64, 18, 32 꼴 하기
#         self.relu3 = nn.ReLU()
#         self.fc2 = nn.Linear(128, num_classes)

#     def forward(self, x):
#         x = self.conv1(x)
#         x = self.relu1(x)
#         x = self.pool1(x)
#         print("After pool1:", x.shape)  # Debug: print the shape


#         x = self.conv2(x)
#         x = self.relu2(x)
#         x = self.pool2(x)
#         print("After pool2:", x.shape)  # Debug: print the shape

#         x = x.view(x.size(0), -1)
#         print("Before FC:", x.shape)  # Debug: print the shape

#         x = self.fc1(x)
#         x = self.relu3(x)
#         x = self.fc2(x)
#         return x


# resize 64*32

In [35]:


# class CNN(nn.Module):
#     def __init__(self, num_classes):
#         super(CNN, self).__init__()
#         self.conv1 = nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, padding=1)
#         self.relu1 = nn.ReLU()
#         self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)  # Output size: (32, 36, 64)

#         self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1)
#         self.relu2 = nn.ReLU()
#         self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)  # Output size: (64, 18, 32)

#         self.fc1 = nn.Linear(64 * 16 * 8, 128)  #After pool2: torch.Size([9, 64, 16, 8]) 64 * 16* 8 꼴 하기
#         self.relu3 = nn.ReLU()
#         self.fc2 = nn.Linear(128, num_classes)

#     def forward(self, x):
#         x = self.conv1(x)
#         x = self.relu1(x)
#         x = self.pool1(x)
#         # print("After pool1:", x.shape)  # Debug: print the shape


#         x = self.conv2(x)
#         x = self.relu2(x)
#         x = self.pool2(x)
#         # print("After pool2:", x.shape)  # Debug: print the shape

#         x = x.view(x.size(0), -1)
#         # print("Before FC:", x.shape)  # Debug: print the shape

#         x = self.fc1(x)
#         x = self.relu3(x)
#         x = self.fc2(x)
#         return x


In [36]:
class CRNN(nn.Module):
    def __init__(self, num_classes):
        super(CRNN, self).__init__()
        self.num_classes = num_classes
        # CNN Layers
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, padding=1)
        self.relu1 = nn.ReLU()
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1)
        self.relu2 = nn.ReLU()
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
        
        # LSTM Layers
        self.lstm = nn.LSTM(input_size=64 * 16 * 8, hidden_size=256, num_layers=1, batch_first=True)
        self.fc = nn.Linear(256, num_classes)

    def forward(self, x):
        batch_size, seq_len, C, H, W = x.shape
        c_in = x.view(batch_size * seq_len, C, H, W)  # Combine batch and seq_len for CNN processing
        x = self.conv1(c_in)
        x = self.relu1(x)
        x = self.pool1(x)
        x = self.conv2(x)
        x = self.relu2(x)
        x = self.pool2(x)
        
        # Prepare for LSTM
        x = x.view(batch_size, seq_len, -1)  # Combine batch and seq_len for LSTM processing
        lstm_out, (h_n, c_n) = self.lstm(x)
        # We use the last hidden state to classify
        x = self.fc(lstm_out[:, -1, :])
        return x


In [37]:
num_epochs = 20
learning_rate = 0.001
model = CRNN(num_classes=5).to(device)  # Assuming you have 10 classes
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

In [38]:
from tqdm import tqdm

for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0
    train_accuracy = 0.0
    train_progress_bar = tqdm(TRAIN_DL, desc=f"Training Epoch {epoch+1}/{num_epochs}", leave=False)

    for inputs, labels in train_progress_bar:
        inputs = inputs.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        batch_loss = loss.item()
        train_loss += batch_loss
        _, predicted = torch.max(outputs.data, 1)
        batch_accuracy = (predicted == labels).sum().item() / labels.size(0)
        train_accuracy += batch_accuracy

        train_progress_bar.set_postfix(loss=batch_loss, acc=batch_accuracy)
        torch.save(model.state_dict(), 'dancer.pth')  # Save the model after each batch

    model.eval()
    valid_loss = 0.0
    valid_accuracy = 0.0
    valid_progress_bar = tqdm(VALID_DL, desc=f"Validation Epoch {epoch+1}/{num_epochs}", leave=False)

    with torch.no_grad():
        for inputs, labels in valid_progress_bar:
            inputs = inputs.to(device)
            labels = labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            batch_loss = loss.item()
            valid_loss += batch_loss
            _, predicted = torch.max(outputs.data, 1)
            batch_accuracy = (predicted == labels).sum().item() / labels.size(0)
            valid_accuracy += batch_accuracy

            valid_progress_bar.set_postfix(loss=batch_loss, acc=batch_accuracy)

    print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss/len(TRAIN_DL):.4f}, Train Accuracy: {train_accuracy/len(TRAIN_DL):.4f}, Valid Loss: {valid_loss/len(VALID_DL):.4f}, Valid Accuracy: {valid_accuracy/len(VALID_DL):.4f}')


                                                                                                    

Epoch [1/20], Train Loss: 0.6385, Train Accuracy: 0.7537, Valid Loss: 0.3680, Valid Accuracy: 0.8667


                                                                                                     

Epoch [2/20], Train Loss: 0.2778, Train Accuracy: 0.9006, Valid Loss: 0.2569, Valid Accuracy: 0.9089


                                                                                                    

Epoch [3/20], Train Loss: 0.2086, Train Accuracy: 0.9264, Valid Loss: 0.1988, Valid Accuracy: 0.9307


                                                                                                    

Epoch [4/20], Train Loss: 0.1804, Train Accuracy: 0.9365, Valid Loss: 0.1790, Valid Accuracy: 0.9376


                                                                                                    

Epoch [5/20], Train Loss: 0.1661, Train Accuracy: 0.9420, Valid Loss: 0.1703, Valid Accuracy: 0.9416


                                                                                                         

Epoch [6/20], Train Loss: 0.1541, Train Accuracy: 0.9462, Valid Loss: 0.1562, Valid Accuracy: 0.9459


                                                                                                      

Epoch [7/20], Train Loss: 0.1471, Train Accuracy: 0.9487, Valid Loss: 0.1581, Valid Accuracy: 0.9440


                                                                                                        

Epoch [8/20], Train Loss: 0.1413, Train Accuracy: 0.9512, Valid Loss: 0.1592, Valid Accuracy: 0.9431


                                                                                                    

Epoch [9/20], Train Loss: 0.1360, Train Accuracy: 0.9532, Valid Loss: 0.1457, Valid Accuracy: 0.9502


                                                                                                     

Epoch [10/20], Train Loss: 0.1320, Train Accuracy: 0.9543, Valid Loss: 0.1441, Valid Accuracy: 0.9502


                                                                                                    

KeyboardInterrupt: 

In [39]:
model.eval()
test_loss = 0.0
test_accuracy = 0.0

with torch.no_grad():
    for inputs, labels in TEST_DL:
        inputs = inputs.to(device)
        labels = labels.to(device)
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        test_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        test_accuracy += (predicted == labels).sum().item() / labels.size(0)

print(f'Test Loss: {test_loss/len(TEST_DL):.4f}, Test Accuracy: {test_accuracy/len(TEST_DL):.4f}')

Test Loss: 0.1548, Test Accuracy: 0.9460
