In [None]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import  transforms, datasets
import pandas as pd
from tqdm import tqdm
from torch.utils.data import random_split, DataLoader, Subset, Dataset
from PIL import Image
# from torchvision.transforms import InterpolationMode
from sklearn.metrics import accuracy_score, f1_score
import random
import numpy as np
import matplotlib.pyplot as plt
from transformers import ViTForImageClassification, ViTImageProcessor
# from transformers import SwinForImageClassification, AutoImageProcessor, AutoModelForZeroShotImageClassification
import torch.nn as nn
import torch
from torchvision import models
import os


# Define variable enviroment

In [None]:
SEED = 2504
random.seed(SEED)
torch.manual_seed(SEED)

In [None]:
TEST_FOLDER_PATH = './data/test'
TRAIN_FOLDER_PATH = './data/train'

In [None]:
label_map = {
    "đông cô": "1",
    "tai mèo": "2",
    "tuyết khô": "0",
}

In [None]:
def imshow(img_tensor):
    img = img_tensor.permute(1, 2, 0) 
    img = img.numpy()
    plt.imshow(img)
    plt.axis('off')

# Transform

In [None]:
mean = [0.485, 0.456, 0.406]
std  = [0.229, 0.224, 0.225]

transform = transforms.Compose([
        transforms.Resize((300, 300)),
        transforms.RandomHorizontalFlip(p=0.5),
        transforms.RandomVerticalFlip(p=0.2),
        transforms.RandomRotation(degrees=15),
    
        transforms.ColorJitter(brightness=0.6, contrast=0.2, saturation=0.2, hue=0.1),
        transforms.RandomAdjustSharpness(sharpness_factor=4, p=0.5),
        transforms.RandomAutocontrast(p=0.4),
        transforms.RandomEqualize(p=0.2),
    
        transforms.ToTensor(),
        transforms.GaussianBlur(kernel_size=(5,9), sigma=(0.1,5)),
        transforms.RandomErasing(
            p=0.3,
            scale=(0.02, 0.10),       
            ratio=(0.3, 3.3),         
            value='random'            
        ),
        transforms.Normalize(mean=mean, std=std),
])

# train_transform = transforms.Compose([
#     # Crop với padding giúp model học được vùng viền
#     transforms.RandomCrop(32, padding=4),     
#     # Lật ngang (và lật dọc nếu nấm không phân biệt trên/dưới)
#     transforms.RandomHorizontalFlip(p=0.5),   
#     # Xoay nhẹ ±20 độ
#     transforms.RandomRotation(20),            
#     # Áp policy của CIFAR10 (AutoAugment)
#     transforms.AutoAugment(transforms.AutoAugmentPolicy.CIFAR10),
#     # Điều chỉnh sáng/tương phản nhẹ
#     transforms.ColorJitter(
#         brightness=0.2, 
#         contrast=0.2, 
#         saturation=0.2, 
#         hue=0.05
#     ),
#     transforms.ToTensor(),
#     # Occlude ngẫu nhiên 1 vùng (cutout-like)
#     transforms.RandomErasing(
#         p=0.3, 
#         scale=(0.02, 0.1), 
#         ratio=(0.3, 3.3), 
#         value=0
#     ),
#     transforms.Normalize(mean=mean, std=std),
# ])

test_transform = transforms.Compose([
    transforms.Resize((300, 300)),
    transforms.ToTensor(),
    transforms.Normalize(mean=mean, std=std),
])

In [None]:
class CustomTestDataset(Dataset):
    def __init__(self, test_dir, transform=None):
        self.test_dir = test_dir
        self.image_paths = sorted([os.path.join(test_dir, fname)
                                   for fname in os.listdir(test_dir)
                                   if fname.lower().endswith(('.png', '.jpg', '.jpeg'))])
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)
        return image, os.path.basename(img_path) 

class SubsetWithTransform(torch.utils.data.Dataset):
    def __init__(self, subset, transform):
        self.subset = subset
        self.transform = transform

    def __len__(self):
        return len(self.subset)

    def __getitem__(self, idx):
        image, label = self.subset[idx]       
        if self.transform:
            image = self.transform(image)     
        return image, label

# Define Dataset

In [None]:
base_dataset = datasets.ImageFolder(root=TRAIN_FOLDER_PATH,transform=None)
dataset_size = len(base_dataset)

# train/val
val_size     = int(0.2 * dataset_size)
train_size   = dataset_size - val_size
train_subset, val_subset = random_split(base_dataset, [train_size, val_size],generator=torch.Generator().manual_seed(SEED))

train_dataset = SubsetWithTransform(train_subset, transform)
val_dataset   = SubsetWithTransform(val_subset,   test_transform)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader   = DataLoader(val_dataset,   batch_size=32, shuffle=False)

# test
test_dir = TEST_FOLDER_PATH
test_dataset = CustomTestDataset(test_dir, transform=test_transform)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

class_names = base_dataset.classes
print("Classes:", class_names)
print("Original class_to_idx mapping:")
print(base_dataset.class_to_idx)

In [None]:

folder_path = TRAIN_FOLDER_PATH +"/đông cô" 
image_files = os.listdir(folder_path)
image_path = os.path.join(folder_path, random.choice(image_files))
img_original = Image.open(image_path).convert('RGB')
img_transformed = transform(img_original)
print(image_path)

def tensor_to_img(img_tensor):
    img = img_tensor.permute(1, 2, 0).numpy()
    return img

# Vẽ ảnh gốc và ảnh transform
plt.figure(figsize=(8, 4))

plt.subplot(1, 2, 1)
plt.imshow(img_original)
plt.title("Ảnh gốc")
plt.axis("off")

plt.subplot(1, 2, 2)
plt.imshow(tensor_to_img(img_transformed))
plt.title("Sau transform")
plt.axis("off")

plt.tight_layout()
plt.show()

In [None]:
idx_to_class = {
    base_dataset.class_to_idx[orig]: label_map[orig]
    for orig in base_dataset.classes
}
print("Final class labels (used for submission):")
print(idx_to_class)

# Train/Eval Function

In [None]:
def evaluate_model(model, dataloader, idx_to_class, device):
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)

            # Chuyển label và pred sang dạng label chuẩn hóa (string)
            pred_labels = [idx_to_class[p.item()] for p in preds]
            true_labels = [idx_to_class[l.item()] for l in labels]

            all_preds.extend(pred_labels)
            all_labels.extend(true_labels)

    acc = accuracy_score(all_labels, all_preds)
    print(f'Accuracy: {acc:.4f}')
    return acc

In [None]:
def train(model, dataloader,val_loader, epochs=30):
    model.to(device)
    best_val_loss = float('inf')  # Initialize with a very large value
    patience = 10  # Number of epochs to wait before early stopping
    patience_counter = 0

    # Lists to store training and validation losses
    train_losses = []
    val_losses = []
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        optimizer.zero_grad()
        for inputs, labels in tqdm(dataloader):
            inputs, labels = inputs.to(device), labels.to(device)

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            
            optimizer.step()
            optimizer.zero_grad()

            running_loss += loss.item()
        
        # Record training loss for this epoch
        epoch_train_loss = running_loss / len(dataloader)
        train_losses.append(epoch_train_loss) 
        print(f"Epoch {epoch+1}, Loss: {epoch_train_loss:.4f}", end=', ')

        # Validation phase
        model.eval()
        val_loss = 0.0
        all_preds = []
        all_labels = []
        
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                _, preds = torch.max(outputs, 1)

                # Chuyển label và pred sang dạng label chuẩn hóa (string)
                pred_labels = [idx_to_class[p.item()] for p in preds]
                true_labels = [idx_to_class[l.item()] for l in labels]
            
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                
                all_preds.extend(pred_labels)
                all_labels.extend(true_labels)
        
        # Record validation loss for this epoch
        epoch_val_loss = val_loss / len(val_loader)
        val_losses.append(epoch_val_loss)
        
        acc = accuracy_score(all_labels, all_preds)
        print(f'Loss Valid: {epoch_val_loss:.4f}, Accuracy Valid: {acc:.4f}')
        # evaluate_model(model, test_loader, idx_to_class, device, df_true)
        
        # Early stopping check
        if epoch_val_loss < best_val_loss: 
            best_val_loss = epoch_val_loss
            patience_counter = 0
            torch.save(model.state_dict(), 'best_model.pth')
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print("Early stopping triggered!")
                break  
    scheduler.step()
        
    # Plotting Training and Validation Curves after training loop
    plt.plot(train_losses, label='Training Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.title('Training and Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()

# Model

In [None]:
class EfficientNetClassifier(nn.Module):
    def __init__(self, num_classes=4):
        super(EfficientNetClassifier, self).__init__()
        self.model = models.efficientnet_b3(pretrained=True)

        # Bỏ phần classifier gốc
        self.features = self.model.features

        self.adaptive_pool = nn.AdaptiveAvgPool2d((1, 1)) 
        in_features = self.model.classifier[1].in_features
        self.dropout = nn.Dropout(0.5)
        self.fc = nn.Linear(in_features, num_classes)

    def forward(self, x):
        x = self.features(x)                         
        x = self.adaptive_pool(x)                    
        x = torch.flatten(x, 1)                      
        x = self.dropout(x)                          
        x = self.fc(x)                               
        return x

model = EfficientNetClassifier(num_classes=4)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=2e-5, weight_decay=0.001) 
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=100, gamma=0.01)

trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Số lượng tham số được huấn luyện: {trainable_params}")

device

In [None]:
train(model, train_loader, val_loader, 100)

In [None]:
results = evaluate_model(model, val_dataset, idx_to_class, device)
results

# Submit

In [None]:
def create_submission(model, dataloader, idx_to_class, device):
    model.eval()
    results = []
    file_names = []

    with torch.no_grad():
        for inputs, filenames in dataloader:
            inputs = inputs.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            results.extend([idx_to_class[p.item()] for p in preds])
            file_names.extend(filenames)

    df = pd.DataFrame({
        "id": [file.split('.')[0] for file in file_names],
        "type": results
    })
    df.to_csv("submission.csv", index=False)
    print("Saved submission.csv")
    return df
predcits = create_submission(model, test_loader, idx_to_class, device)