In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
from torchvision import transforms
import matplotlib.pyplot as plt
from tqdm import tqdm
import torch.nn.functional as F
import random
from sklearn.model_selection import train_test_split
%matplotlib inline

# GPU 사용 가능 여부 확인
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

  from .autonotebook import tqdm as notebook_tqdm


Using device: cuda


In [2]:
class QuickDrawDataset(Dataset):
    def __init__(self, data, labels, transform=None, augment=False):
        self.data = data
        self.labels = labels
        self.transform = transform
        self.augment = augment
    
    def __len__(self):
        return len(self.data)
    
    def add_noise(self, image, noise_factor=0.1):
        """가우시안 노이즈 추가"""
        noise = torch.randn_like(image) * noise_factor
        noisy_image = image + noise
        return torch.clamp(noisy_image, 0., 1.)

    def add_occlusion(self, image, block_size=4, num_blocks=4):
        """랜덤 위치에 사각형 영역을 0으로 설정하여 occlusion 효과 생성"""
        img = image.clone()
        h, w = img.shape[1:]
        
        for _ in range(num_blocks):
            # 랜덤 위치 선택
            x = random.randint(0, w - block_size)
            y = random.randint(0, h - block_size)
            
            # 해당 영역을 0으로 설정
            img[:, y:y+block_size, x:x+block_size] = 0
            
        return img

    def elastic_transform(self, image, alpha=500, sigma=20, random_state=None):
        """Elastic deformation for simulating natural drawing variations"""
        if random_state is None:
            random_state = np.random.RandomState(None)

        shape = image.shape[1:]
        dx = torch.tensor(random_state.rand(*shape) * 2 - 1)
        dy = torch.tensor(random_state.rand(*shape) * 2 - 1)

        # Gaussian filter
        dx = F.gaussian_blur(dx.unsqueeze(0).unsqueeze(0), kernel_size=7, sigma=sigma).squeeze()
        dy = F.gaussian_blur(dy.unsqueeze(0).unsqueeze(0), kernel_size=7, sigma=sigma).squeeze()

        # Normalize and scale
        dx = dx * alpha / (sigma * shape[0])
        dy = dy * alpha / (sigma * shape[1])

        # Create meshgrid
        x, y = torch.meshgrid(torch.arange(shape[0]), torch.arange(shape[1]))
        
        # Add displacement
        indices_x = torch.clamp(x + dx, 0, shape[0] - 1).long()
        indices_y = torch.clamp(y + dy, 0, shape[1] - 1).long()

        # Apply transformation
        transformed = image.clone()
        transformed[0] = image[0][indices_x, indices_y]
        
        return transformed

    def __getitem__(self, idx):
        image = self.data[idx]
        label = self.labels[idx]
        
        # 이미지를 텐서로 변환하고 채널 차원 추가
        image = torch.FloatTensor(image).unsqueeze(0)
        
        # Data augmentation 적용
        if self.augment:
            # 랜덤하게 augmentation 적용
            if random.random() < 0.3:  # 30% 확률로 노이즈 추가
                image = self.add_noise(image, noise_factor=0.1)
            
            if random.random() < 0.3:  # 30% 확률로 occlusion 추가
                image = self.add_occlusion(image, block_size=4, num_blocks=random.randint(1, 3))
            
            if random.random() < 0.3:  # 30% 확률로 elastic transform 적용
                image = self.elastic_transform(image, alpha=random.randint(300, 700))
            
            # 랜덤 회전 (-15도 ~ 15도)
            if random.random() < 0.3:
                angle = random.uniform(-15, 15)
                image = F.rotate(image, angle)

        # 추가 transform이 있다면 적용
        if self.transform:
            image = self.transform(image)
        
        # 레이블을 Long 타입으로 변환
        label = torch.LongTensor([label])[0]
        
        return image, label

def create_dataloaders(npz_path, batch_size=64, test_size=0.2):
    """데이터 로더 생성 함수"""
    # 데이터 로드
    data = np.load(npz_path)
    images = data['data'].astype(np.float32) / 255.0  # 정규화
    labels = data['labels']
    
    # Train/Test 분할
    num_samples = len(images)
    indices = np.random.permutation(num_samples)
    split = int(np.floor(test_size * num_samples))
    
    train_idx, test_idx = indices[split:], indices[:split]
    
    # Dataset 생성
    train_dataset = QuickDrawDataset(
        images[train_idx], 
        labels[train_idx],
        augment=True  # training set에만 augmentation 적용
    )
    
    test_dataset = QuickDrawDataset(
        images[test_idx],
        labels[test_idx],
        augment=False  # test set에는 augmentation 미적용
    )
    
    # DataLoader 생성
    train_loader = torch.utils.data.DataLoader(
        train_dataset,
        batch_size=batch_size,
        shuffle=True
    )
    
    test_loader = torch.utils.data.DataLoader(
        test_dataset,
        batch_size=batch_size,
        shuffle=False
    )
    
    return train_loader, test_loader

# 데이터 로드 및 전처리 함수
def prepare_data(npz_path, test_size=0.2):
    # 데이터 로드
    data = np.load(npz_path)
    images = data['data']
    labels = data['labels']
    classes = data['classes']
    
    # 클래스별로 데이터 분리
    X_train, X_test = [], []
    y_train, y_test = [], []
    
    for class_idx in range(len(classes)):
        # 현재 클래스의 데이터 인덱스 찾기
        class_indices = np.where(labels == class_idx)[0]
        class_images = images[class_indices]
        class_labels = labels[class_indices]
        
        # 현재 클래스의 데이터를 train/test로 분할
        X_train_class, X_test_class, y_train_class, y_test_class = train_test_split(
            class_images, class_labels, test_size=test_size, random_state=42
        )
        
        X_train.append(X_train_class)
        X_test.append(X_test_class)
        y_train.append(y_train_class)
        y_test.append(y_test_class)
    
    # 모든 클래스의 데이터 합치기
    X_train = np.concatenate(X_train, axis=0)
    X_test = np.concatenate(X_test, axis=0)
    y_train = np.concatenate(y_train, axis=0)
    y_test = np.concatenate(y_test, axis=0)
    
    print(f"Training set size: {len(X_train)}")
    print(f"Test set size: {len(X_test)}")
    
    # 클래스별 분포 확인
    for i in range(len(classes)):
        train_count = np.sum(y_train == i)
        test_count = np.sum(y_test == i)
        print(f"Class {classes[i]}: Train={train_count}, Test={test_count}")
    
    return X_train, X_test, y_train, y_test, classes

# 데이터 준비
X_train, X_test, y_train, y_test, classes = prepare_data('quickdraw_dataset.npz')

# 데이터 변환 정의
train_transform = transforms.Compose([
    transforms.RandomAffine(degrees=10, translate=(0.1, 0.1), scale=(0.9, 1.1)),
    transforms.RandomErasing(p=0.2),
    transforms.Resize((32, 32)),
    transforms.Normalize((0.5,), (0.5,))
])

test_transform = transforms.Compose([
    transforms.Resize((32, 32)),
    transforms.Normalize((0.5,), (0.5,))
])

# 데이터셋 생성
train_dataset = QuickDrawDataset(X_train, y_train, transform=train_transform)
test_dataset = QuickDrawDataset(X_test, y_test, transform=test_transform)

# 데이터 로더 생성
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64)

# 데이터 형태 확인
for images, labels in train_loader:
    print(f"Batch images shape: {images.shape}")
    print(f"Batch labels shape: {labels.shape}")
    print(f"Labels in batch: {labels}")
    break

Training set size: 40000
Test set size: 10000
Class circle: Train=4000, Test=1000
Class triangle: Train=4000, Test=1000
Class square: Train=4000, Test=1000
Class donut: Train=4000, Test=1000
Class house: Train=4000, Test=1000
Class cloud: Train=4000, Test=1000
Class lightning: Train=4000, Test=1000
Class star: Train=4000, Test=1000
Class diamond: Train=4000, Test=1000
Class banana: Train=4000, Test=1000
Batch images shape: torch.Size([64, 1, 32, 32])
Batch labels shape: torch.Size([64])
Labels in batch: tensor([0, 5, 7, 2, 2, 6, 3, 3, 2, 6, 0, 1, 0, 1, 4, 1, 7, 4, 2, 4, 1, 7, 3, 7,
        4, 7, 4, 9, 5, 8, 6, 7, 8, 7, 6, 2, 8, 5, 5, 6, 2, 9, 2, 9, 0, 3, 9, 8,
        2, 0, 1, 1, 4, 2, 3, 3, 7, 8, 7, 4, 2, 8, 1, 1])


In [3]:
class TeacherVGG(nn.Module):
    def __init__(self, num_classes=10):
        super(TeacherVGG, self).__init__()
        
        # 입력: 1채널 (흑백)
        self.features = nn.Sequential(
            # 첫 번째 블록
            nn.Conv2d(1, 64, 3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 64, 3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2),
            
            # 두 번째 블록
            nn.Conv2d(64, 128, 3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 128, 3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2),
            
            # 세 번째 블록
            nn.Conv2d(128, 256, 3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, 3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2, 2),
        )
        
        # 분류기
        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool2d((4, 4)),  # 가변적인 입력 크기 처리
            nn.Flatten(),
            nn.Linear(256 * 4 * 4, 512),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),
            nn.Linear(512, num_classes)
        )
        
        # 가중치 초기화
        self._initialize_weights()
    
    def forward(self, x):
        x = self.features(x)
        x = self.classifier(x)
        return x
    
    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.constant_(m.bias, 0)

# 모델 생성
model = TeacherVGG(num_classes=len(classes)).to(device)

In [4]:
def train_model(model, train_loader, test_loader, criterion, optimizer, scheduler, num_epochs=10):
    best_acc = 0.0
    train_losses = []
    test_accs = []
    
    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        
        # 학습
        with tqdm(train_loader, desc=f'Epoch {epoch+1}/{num_epochs}') as pbar:
            for inputs, labels in pbar:
                inputs, labels = inputs.to(device), labels.to(device)
                
                optimizer.zero_grad()
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()
                
                running_loss += loss.item()
                pbar.set_postfix({'loss': f'{loss.item():.4f}'})
        
        # 평가
        model.eval()
        correct = 0
        total = 0
        with torch.no_grad():
            for inputs, labels in test_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                _, predicted = outputs.max(1)
                total += labels.size(0)
                correct += predicted.eq(labels).sum().item()
        
        accuracy = 100. * correct / total
        train_losses.append(running_loss / len(train_loader))
        test_accs.append(accuracy)
        
        print(f'Epoch {epoch+1}: Loss = {running_loss/len(train_loader):.4f}, '
              f'Accuracy = {accuracy:.2f}%')
        
        # 모델 저장
        if accuracy > best_acc:
            best_acc = accuracy
            torch.save({
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'accuracy': accuracy,
            }, 'best_teacher_model.pth')
        
        scheduler.step(accuracy)
    
    return train_losses, test_accs

# 손실 함수, 최적화기, 스케줄러 정의
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=5)

# 모델 학습
train_losses, test_accs = train_model(model, train_loader, test_loader, 
                                    criterion, optimizer, scheduler)

Epoch 1/10: 100%|██████████| 625/625 [00:38<00:00, 16.44it/s, loss=0.2676]


Epoch 1: Loss = 0.4575, Accuracy = 94.76%


Epoch 2/10: 100%|██████████| 625/625 [00:34<00:00, 18.01it/s, loss=0.1956]


Epoch 2: Loss = 0.2927, Accuracy = 95.15%


Epoch 3/10: 100%|██████████| 625/625 [00:33<00:00, 18.39it/s, loss=0.4629]


Epoch 3: Loss = 0.2536, Accuracy = 95.82%


Epoch 4/10: 100%|██████████| 625/625 [00:33<00:00, 18.54it/s, loss=0.1276]


Epoch 4: Loss = 0.2420, Accuracy = 95.23%


Epoch 5/10: 100%|██████████| 625/625 [00:33<00:00, 18.56it/s, loss=0.1175]


Epoch 5: Loss = 0.2264, Accuracy = 95.99%


Epoch 6/10: 100%|██████████| 625/625 [00:33<00:00, 18.54it/s, loss=0.0811]


Epoch 6: Loss = 0.2038, Accuracy = 95.39%


Epoch 7/10: 100%|██████████| 625/625 [00:34<00:00, 17.89it/s, loss=0.3752]


Epoch 7: Loss = 0.1998, Accuracy = 96.49%


Epoch 8/10: 100%|██████████| 625/625 [00:35<00:00, 17.45it/s, loss=0.2341]


Epoch 8: Loss = 0.1921, Accuracy = 96.31%


Epoch 9/10: 100%|██████████| 625/625 [00:34<00:00, 17.90it/s, loss=0.1216]


Epoch 9: Loss = 0.1819, Accuracy = 96.29%


Epoch 10/10: 100%|██████████| 625/625 [00:35<00:00, 17.69it/s, loss=0.4419]


Epoch 10: Loss = 0.1780, Accuracy = 96.53%
