In [13]:
# Imports 
import os
import random
import numpy as np
import pandas as pd
from PIL import Image
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score

import torch
import torch.nn as nn
import torchvision.transforms as T
from torch.utils.data import Dataset, DataLoader

# Setting random seeds for reproducibility
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False  # For true reproducibility

# Device Configuration
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Running on: {device}")

Running on: cuda


In [14]:
# Loading CSV files
BASE = '/kaggle/input/soil-classification/soil_classification-2025'
df_train = pd.read_csv(os.path.join(BASE, 'train_labels.csv'))
df_test  = pd.read_csv(os.path.join(BASE, 'test_ids.csv'))

In [15]:
# Standardize Column Names
# Training CSV has columns ['image_id', 'soil_type']
# Test CSV has ['image_id']
df_train = df_train.rename(columns={'image_id': 'id', 'soil_type': 'label'})
df_test  = df_test.rename(columns={'image_id': 'id'})

In [16]:
# Encoding string labels to integers
classes = sorted(df_train['label'].unique())
cls2idx = {c: i for i, c in enumerate(classes)}
idx2cls = {i: c for c, i in cls2idx.items()}

# Map label column to integer index
df_train['label_idx'] = df_train['label'].map(cls2idx)

print("Label mapping:", cls2idx)
print(df_train[['id', 'label', 'label_idx']].head())

Label mapping: {'Alluvial soil': 0, 'Black Soil': 1, 'Clay soil': 2, 'Red soil': 3}
                 id          label  label_idx
0  img_ed005410.jpg  Alluvial soil          0
1  img_0c5ecd2a.jpg  Alluvial soil          0
2  img_ed713bb5.jpg  Alluvial soil          0
3  img_12c58874.jpg  Alluvial soil          0
4  img_eff357af.jpg  Alluvial soil          0


In [17]:
# PyTorch Dataset
class SoilDataset(Dataset):
    def __init__(self, df, img_dir, transform=None, is_test=False):
        self.df = df.reset_index(drop=True)
        self.img_dir = img_dir
        self.transform = transform
        self.is_test = is_test

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        img_path = os.path.join(self.img_dir, row['id'])

        # Load image safely
        try:
            img = Image.open(img_path).convert('RGB')
        except Exception as e:
            raise RuntimeError(f"Failed to load image: {img_path} | Error: {e}")

        # Apply transforms (if any)
        if self.transform:
            img = self.transform(img)

        # Return differently for test/train
        if self.is_test:
            return img, row['id']
        else:
            return img, row['label_idx']


In [18]:
# Image Transforms
IMG_SIZE = 224
transform_train = T.Compose([
    T.Resize((IMG_SIZE, IMG_SIZE)),
    T.RandomHorizontalFlip(),
    T.RandomRotation(15),
    T.ToTensor(),
])

transform_val = T.Compose([
    T.Resize((IMG_SIZE, IMG_SIZE)),
    T.ToTensor(),
])

# Stratified Train/Validation Split
df_tr, df_val = train_test_split(
    df_train,
    test_size=0.2,
    stratify=df_train['label_idx'],
    random_state=42
)

# Datasets
train_ds = SoilDataset(df_tr, os.path.join(BASE, 'train'), transform_train)
val_ds   = SoilDataset(df_val, os.path.join(BASE, 'train'), transform_val)
test_ds  = SoilDataset(df_test, os.path.join(BASE, 'test'), transform_val, is_test=True)

# Dataloaders
BATCH_SIZE = 32
train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)
val_loader   = DataLoader(val_ds, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)
test_loader  = DataLoader(test_ds, batch_size=BATCH_SIZE, shuffle=False, num_workers=2)

print(f"Train: {len(train_loader)} batches")
print(f"Val:   {len(val_loader)} batches")
print(f"Test:  {len(test_loader)} batches")


Train: 31 batches
Val:   8 batches
Test:  11 batches


In [19]:
import torchvision.models as models

# Partially fine-tuned ResNet18 Classifier with Dropout
class ResNet18Classifier(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        self.base_model = models.resnet18(weights='IMAGENET1K_V1')

        # Freeze all layers first
        for param in self.base_model.parameters():
            param.requires_grad = False

        # Unfreeze layer4
        for param in self.base_model.layer4.parameters():
            param.requires_grad = True

        # Add Dropout before final FC layer
        in_features = self.base_model.fc.in_features
        self.base_model.fc = nn.Sequential(
            nn.Dropout(p=0.4),
            nn.Linear(in_features, num_classes)
        )

    def forward(self, x):
        return self.base_model(x)

# Instantiate the model
model = ResNet18Classifier(len(classes)).to(device)

# Set up parameter groups for different learning rates
layer4_params = list(model.base_model.layer4.parameters())
fc_params = list(model.base_model.fc.parameters())

optimizer = torch.optim.Adam([
    {'params': layer4_params, 'lr': 1e-4},
    {'params': fc_params, 'lr': 1e-3}
])

print(model)

ResNet18Classifier(
  (base_model): ResNet(
    (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
    (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (layer1): Sequential(
      (0): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (1): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=T

In [20]:
from sklearn.metrics import f1_score

# Compute class weights
class_counts = df_train['label_idx'].value_counts().sort_index().values
weights = 1.0 / torch.tensor(class_counts, dtype=torch.float)
weights = weights / weights.sum()
criterion = nn.CrossEntropyLoss(weight=weights.to(device))

# Scheduler for new optimizer (already defined earlier)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.5)

EPOCHS = 30
best_f1 = 0

for epoch in range(1, EPOCHS + 1):
    model.train()
    total_loss = 0
    for imgs, labels in train_loader:
        imgs, labels = imgs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(imgs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    avg_loss = total_loss / len(train_loader)

    # Validation
    model.eval()
    all_preds, all_labels = [], []
    with torch.no_grad():
        for imgs, labels in val_loader:
            imgs = imgs.to(device)
            outputs = model(imgs)
            _, preds = outputs.max(1)
            all_preds += preds.cpu().tolist()
            all_labels += labels.tolist()

    val_f1 = f1_score(all_labels, all_preds, average='macro')
    print(f"Epoch {epoch}/{EPOCHS} - Loss: {avg_loss:.4f}, Val F1: {val_f1:.4f}")
    scheduler.step()

    # Save best model
    if val_f1 > best_f1:
        best_f1 = val_f1
        torch.save(model.state_dict(), 'best_model.pth')
        print(f"New best model saved with F1: {best_f1:.4f}")

# Save final model
torch.save(model.state_dict(), 'last_model.pth')
print("Saved last_model.pth")

Epoch 1/30 - Loss: 0.5612, Val F1: 0.9270
New best model saved with F1: 0.9270
Epoch 2/30 - Loss: 0.1751, Val F1: 0.9577
New best model saved with F1: 0.9577
Epoch 3/30 - Loss: 0.1149, Val F1: 0.9542
Epoch 4/30 - Loss: 0.0718, Val F1: 0.9622
New best model saved with F1: 0.9622
Epoch 5/30 - Loss: 0.0702, Val F1: 0.9592
Epoch 6/30 - Loss: 0.0743, Val F1: 0.9794
New best model saved with F1: 0.9794
Epoch 7/30 - Loss: 0.0415, Val F1: 0.9671
Epoch 8/30 - Loss: 0.0341, Val F1: 0.9714
Epoch 9/30 - Loss: 0.0459, Val F1: 0.9753
Epoch 10/30 - Loss: 0.0357, Val F1: 0.9755
Epoch 11/30 - Loss: 0.0268, Val F1: 0.9755
Epoch 12/30 - Loss: 0.0360, Val F1: 0.9755
Epoch 13/30 - Loss: 0.0254, Val F1: 0.9755
Epoch 14/30 - Loss: 0.0313, Val F1: 0.9755
Epoch 15/30 - Loss: 0.0278, Val F1: 0.9753
Epoch 16/30 - Loss: 0.0228, Val F1: 0.9753
Epoch 17/30 - Loss: 0.0180, Val F1: 0.9755
Epoch 18/30 - Loss: 0.0210, Val F1: 0.9755
Epoch 19/30 - Loss: 0.0285, Val F1: 0.9755
Epoch 20/30 - Loss: 0.0272, Val F1: 0.9796
N

In [25]:
# Load best model before inference
model.load_state_dict(torch.load('best_model.pth'))
model.eval()

ids, preds = [], []
with torch.no_grad():
    for imgs, img_ids in test_loader:
        imgs = imgs.to(device)
        outputs = model(imgs)
        _, p = outputs.max(1)
        ids += img_ids
        preds += p.cpu().tolist()

# Map back to class names
labels = [idx2cls[i] for i in preds]
submission = pd.DataFrame({'image_id': ids, 'soil_type': labels})
submission.to_csv('submission.csv', index=False)
print("submission.csv created using best_model.pth!")

submission.csv created using best_model.pth!


In [26]:
# Load best model before evaluation
model.load_state_dict(torch.load('best_model.pth'))
model.eval()

all_trues, all_preds = [], []

with torch.no_grad():
    for imgs, labels in val_loader:  
        imgs = imgs.to(device)
        outputs = model(imgs)
        _, preds = outputs.max(1)
        all_preds += preds.cpu().tolist()
        all_trues += labels.tolist()

# Compute per-class F1 and take minimum
per_class_f1 = f1_score(all_trues, all_preds, average=None)
min_f1 = per_class_f1.min()

print(f"Per-class F1 scores: {per_class_f1}")
print(f"Final metric (minimum F1): {min_f1:.4f}")

Per-class F1 scores: [0.97584541 0.9787234  0.96385542 1.        ]
Final metric (minimum F1): 0.9639
