In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
import os
import numpy as np
from PIL import Image
from torch.utils.data import Dataset
import torch
import torch.nn.functional as F
import pandas as pd
from torchvision import transforms
from torch.utils.data import random_split, DataLoader
import torch.nn as nn
import torchvision.models as models
import torch.optim as optim

In [None]:
class ImageDataset(Dataset):
    def __init__(self, csv_file, root_dir, transform=None, is_test=False):
        self.data = pd.read_csv(csv_file)
        self.root_dir = root_dir
        self.transform = transform
        self.is_test = is_test

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        if not self.is_test:
            # Fetch training images using paths from train.csv
            img_path = os.path.join(self.root_dir, self.data.iloc[idx, 1])
            image = Image.open(img_path).convert('RGB')
            label = int(self.data.iloc[idx, 0].split('_')[1]) - 1  # Labeling
            label = torch.tensor(label)
            if self.transform:
                image = self.transform(image)
            return image, label
        else:
            # For test data
            img_path = os.path.join(self.root_dir, self.data.iloc[idx, 1])
            image = Image.open(img_path).convert('RGB')
            if self.transform:
                image = self.transform(image)
            return image, self.data.iloc[idx, 0]

In [None]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(15),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])


# Create datasets
train_dataset = ImageDataset(
    csv_file='/kaggle/input/dl-63-cw-image-classification/train.csv',  # Correct path to train.csv
    root_dir='/kaggle/input/dl-63-cw-image-classification/train',  # Directory containing images
    transform=transform
)

test_dataset = ImageDataset(
    csv_file='/kaggle/input/dl-63-cw-image-classification/test.csv',  # Correct path to test.csv
    root_dir='/kaggle/input/dl-63-cw-image-classification/test/',  # Directory containing test images
    transform=transform, is_test=True
)

In [None]:
train_size = int(0.8 * len(train_dataset))
valid_size = len(train_dataset) - train_size

trainset, validset = random_split(train_dataset, [train_size, valid_size])

# Create DataLoaders
batch_size = 32
train_loader = DataLoader(trainset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(validset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
class CustomConvNeXt(nn.Module):
    def __init__(self, num_classes=71):
        super(CustomConvNeXt, self).__init__()
        
        # Load pre-trained ConvNeXt model (convnext_large or convnext_base)
        self.base_model = models.convnext_large(pretrained=True)
        
        # Freeze earlier layers (feature extraction layers)
        for param in self.base_model.features.parameters():
            param.requires_grad = False  # Freeze feature extraction layers

        # Replace the classifier layer with a new fully connected layer
        self.base_model.classifier[2] = nn.Sequential(
            nn.Linear(self.base_model.classifier[2].in_features, 256),  # in_features dựa trên ConvNeXt architecture
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(256, num_classes)  # Final output layer với num_classes outputs
        )

    def forward(self, x):
        return self.base_model(x)

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Instantiate the model
model = CustomConvNeXt(num_classes=71).to(device)

# Optimizer and Loss Function
optimizer = optim.RMSprop(filter(lambda p: p.requires_grad, model.parameters()), lr=0.001)
criterion = nn.CrossEntropyLoss()

In [None]:
def train_and_val_model(model, criterion, optimizer, train_loader, val_loader, num_epochs=10):
    best_val_acc = 0.0
    for epoch in range(num_epochs):
        model.train()  # Set to training mode
        running_loss, running_corrects = 0.0, 0

        # Training loop
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            _, preds = torch.max(outputs, 1)
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)

        epoch_loss = running_loss / len(train_loader.dataset)
        epoch_acc = running_corrects.double() / len(train_loader.dataset)
        print(f'Training - Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}, Accuracy: {epoch_acc:.4f}')

        # Validation loop
        model.eval()  # Set to evaluation mode
        val_corrects = 0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                _, preds = torch.max(outputs, 1)
                val_corrects += torch.sum(preds == labels.data)

        val_acc = val_corrects.double() / len(val_loader.dataset)
        print(f'Validation Accuracy: {val_acc:.4f}')

        # Save the best model based on validation accuracy
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save(model.state_dict(), 'best_convnext_model.pth')

In [None]:
train_and_val_model(model, criterion, optimizer, train_loader, val_loader, num_epochs=10)

In [None]:
for param in model.base_model.features[-8:].parameters():
    param.requires_grad = True

# Re-define optimizer with a lower learning rate
optimizer = optim.SGD(filter(lambda p: p.requires_grad, model.parameters()), lr=1e-4, momentum=0.9)

# Learning rate scheduler
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=10)

# Fine-tune the model for additional epochs
train_and_val_model(model, criterion, optimizer, train_loader, val_loader, num_epochs=5)

In [None]:
def evaluate_model_on_test_set(model, test_loader, device, class_mapping):
    model.eval()
    predictions = []
    ids = []
    
    with torch.no_grad():
        for images, image_ids in test_loader:
            images = images.to(device)
            outputs = model(images)
            _, predicted_classes = torch.max(outputs, 1)
            
            predicted_classes = [class_mapping[p.item()] for p in predicted_classes]
            
        
            predictions.extend(predicted_classes)
            ids.extend(image_ids)  # image_ids chính là các ID từ file test.csv
    
    return ids, predictions

class_mapping = {i: f'class_{i+1}' for i in range(71)}

In [None]:
ids, predictions = evaluate_model_on_test_set(model, test_loader, device, class_mapping)
renamed_ids = list(range(len(ids)))
results = pd.DataFrame({
    'ID': renamed_ids, 
    'TARGET': predictions 
})

results.to_csv('submission34.csv', index=False)