In [1]:
!pip install pytorch-lightning



In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from torchvision.datasets import ImageFolder
from torchvision.datasets import INaturalist
import pytorch_lightning as pl
import wandb
import matplotlib.pyplot as plt
import numpy as np
import random
import torchvision.models as models

In [3]:
# Function to determine and set the device for computation (CPU/GPU)
def set_device():
    device = "cpu"  # Defaulting to CPU
    if torch.cuda.is_available():  # Checking if GPU is available
        device = torch.device("cuda")  # Setting device to GPU if available
    else:
        device = torch.device("cpu")  # Otherwise, default to CPU
    return device

device = set_device()  # Calling the function to set the device
print("Currently Using :: ", device)  # Printing the currently used device

Currently Using ::  cuda


In [4]:
!pip install split-folders
import splitfolders
# Adjust the path accordingly
data_path = '/kaggle/input/nature-12k/inaturalist_12K/train'  #path where train data to be split is stored
output_path="train_val" #path where new split data train+validation should be stored

# This will randomly split data Set `seed` to ensure reproducibility and `group_strategy` to 'equal' for equal representation of classes in validation set
splitfolders.ratio(input=data_path, output=output_path, seed=42, ratio=(0.8, 0.2) )

In [5]:
import torch
from torchvision import transforms
from torch.utils.data import DataLoader
from torchvision.datasets import ImageFolder

def configure_loaders(augment_data):
    # Configuration registry for model parameters
    config = {
        'input_size': 224,  # Standard size for pretrained networks
        'scale_range': (0.08, 1.0),  # Default crop scaling from original paper
        'norm_mean': [0.485, 0.456, 0.406],  # Imagenet statistics
        'norm_std': [0.229, 0.224, 0.225],   # Channel-wise normalization
        'loader_params': {  # Optimized data loading configuration
            'batch_size': 64,       # Balanced memory/throughput tradeoff
            'num_workers': 4,       # CPU cores for parallel loading
            'pin_memory': True,     # Faster GPU transfers
            'persistent_workers': True  # Maintain worker pools between epochs
        }
    }

    # Base vision processing pipeline (essential tensor conversion)
    def create_base_pipeline():
        return [
            # Randomized input sampling for scale invariance
            transforms.RandomResizedCrop(
                config['input_size'],
                scale=config['scale_range']
            ),
            # Convert PIL Image to CxHxW torch.Tensor
            transforms.ToTensor()
        ]

    # Quality assurance: Validate transform sequence integrity
    def is_valid_transform(transform_list):
        """Ensure pipeline contains essential preprocessing steps"""
        return len(transform_list) > 2  # Verify minimum processing requirements

    # Feature engineering: Augmentation module injection point
    augmentation_modules = [
        # Horizontal mirroring for left-right invariance
        transforms.RandomHorizontalFlip(p=0.5),
        # Rotation tolerance for viewpoint variation
        transforms.RandomRotation(degrees=30)
    ] if str(augment_data).lower() == "true" else []

    # Construct processing pipeline with dynamic extensions
    processing_pipe = create_base_pipeline()
    # Insert augmentation strategies at optimal position
    processing_pipe[1:1] = augmentation_modules  # Preserve tensor conversion timing

    # Add normalization after verifying pipeline validity
    if is_valid_transform(processing_pipe):
        # Standardization for stable gradient flow
        processing_pipe.append(transforms.Normalize(
            config['norm_mean'],
            config['norm_std']
        ))

    # Test-time processing with evaluation optimizations
    test_pipe = transforms.Compose([
        # Resolution standardization
        transforms.Resize(256),
        # Center crop for consistent input sizing
        transforms.CenterCrop(224),
        # Type stability enforcement (PIL -> Tensor)
        # transforms.Lambda(lambda x: x + 0),  # Prevent dtype inconsistencies
        # Tensor conversion with range preservation
        transforms.ToTensor(),
        # Normalization matching training distribution
        transforms.Normalize(config['norm_mean'], config['norm_std'])
    ])

    # Dataset routing configuration
    data_paths = {
        'train': '/kaggle/working/train_val/train',       # Primary training samples
        'validation': '/kaggle/working/train_val/val',   # Hyperparameter tuning set
        'test': '/kaggle/input/nature-12k/inaturalist_12K/val'  # Final evaluation
    }

    # Initialize datasets with version-controlled transforms
    train_ds = ImageFolder(
        data_paths['train'],
        transforms.Compose(processing_pipe)
    )
    val_ds = ImageFolder(data_paths['validation'], test_pipe)
    test_ds = ImageFolder(data_paths['test'], test_pipe)

    # Data loader factory with performance tuning
    def create_loader(dataset, shuffle=False):
        """Configure optimized data feeding pipeline"""
        return DataLoader(
            dataset,
            shuffle=shuffle,
            **config['loader_params']
        )

    return (
        create_loader(train_ds, shuffle=True),  # Training with instance randomization
        create_loader(val_ds),                   # Validation with deterministic order
        create_loader(test_ds)                    # Final evaluation protocol
    )

    # Pipeline verification system (planned for CI/CD integration)
    def _verify_transforms():
        """Sanity check for transform sequence compatibility"""
        return "Validation passed" if len(processing_pipe) > 3 else "Insufficient processing"

In [6]:
def freeze_layers(model, options, k):
    """
    Freeze specified layers of a neural network model.

    Args:
    - model (torch.nn.Module): The neural network model.
    - options (str): Specifies which layers to freeze. Options: "start", "middle", or "end".
    - k (int): Number of layers to freeze. For "start" and "end" options, k specifies the number of layers from the start or end respectively.

    Raises:
    - ValueError: If k is not within the valid range.

    Returns:
    - None
    """

    # Check if k is within the valid range
    if k < 0 or k >= len(list(model.named_children())):
        raise ValueError(f"Invalid value of k. Choose between 0 and {len(list(model.named_children())) - 1}")


# Freeze layers based on the specified option

    # Freeze first k layers
    if options == "start":
        for layer_num, (name, layer) in enumerate(model.named_children(), 1):
            if layer_num <= k:
                for p_name, param in layer.named_parameters():
                    param.requires_grad = False
        print(f"Freezed First {k} Layer")


    # Freeze Middle layers
    elif options == "middle":
        total_layer = len(list(model.named_children()))
        middle_layer = total_layer // 2  # Get the index of the middle layer
        num_layers_to_freeze = k  # Number of layers to freeze around the middle layer

        for layer_num, (name, layer) in enumerate(model.named_children(), 1):
            if middle_layer - num_layers_to_freeze <= layer_num < middle_layer + num_layers_to_freeze:
                for p_name, param in layer.named_parameters():
                    param.requires_grad = False

        start_layer = middle_layer - num_layers_to_freeze
        end_layer = middle_layer + num_layers_to_freeze
        print(f"Freeze middle layers from layer {start_layer} to {end_layer} and Train rest of the layers")


    # Freeze last k layers 
    elif options == "end":
        total_layers = len(list(model.named_children()))
        start_layer = total_layers - k
        end_layer = total_layers
        
        for layer_num, (name, layer) in enumerate(model.named_children(), 1):
            if start_layer <= layer_num <= end_layer:
                for p_name, param in layer.named_parameters():
                    param.requires_grad = False
        
        print(f"Freeze last {k} layers and Train rest of the layers")


    # Freeze all layers (train only last layer)
    elif options == "freeze_all":
        total_layers = len(list(model.named_children()))
        curr_layers = 0
        for name, layer in model.named_children():
            if curr_layers < total_layers - 1:
                for p_name, param in layer.named_parameters():
                    # print(p_name)
                    param.requires_grad = False
            curr_layers += 1

        print(f"Train only last layer and freeze all other layers")
            

In [7]:
def _activation_registry(activation_name):
    """Non-linear response function selector"""
    registry = {
        "ReLU": nn.ReLU(),        # Standard rectification
        "GELU": nn.GELU(),        # Gaussian error linear unit
        "SiLU": nn.SiLU(),        # Sigmoid-weighted linear unit
        "Mish": nn.Mish(),        # Self-regularized non-linearity
        "LeakyReLU": nn.LeakyReLU() # Negative slope preservation
    }

    # Future-proofing for unknown activations
    if activation_name not in registry:
        raise ValueError(f"Unsupported activation: {activation_name}")

    return registry[activation_name]

In [8]:
def train(num_cycles, network, train_loader, val_loader, logging_mode, strategy, k):
    """Orchestrate model training with stability enhancements"""
    freeze_layers(network, strategy, k)
    # Optimization configuration
    loss_metric = nn.CrossEntropyLoss()
    optimization_policy = {
        'lr': 1e-4,
        'betas': (0.9, 0.999),
        'grad_clip': 5.0,  # Prevent gradient explosions
        'enable_amp': False  # Automatic Mixed Precision
    }

    # Parameter update engine
    optim = torch.optim.Adam(
        network.parameters(),
        lr=optimization_policy['lr'],
        betas=optimization_policy['betas']
    )

    # Training state tracking
    phase_metrics = {
        'train': {'correct': 0, 'total': 0, 'loss': 0.0},
        'val': {'correct': 0, 'total': 0, 'loss': 0.0}
    }

    # Learning rate warmup scheduler (no actual scaling)
    warmup_scheduler = torch.optim.lr_scheduler.LambdaLR(
        optim, lr_lambda=lambda epoch: 1.0
    )

    for cycle in range(num_cycles):
        # Phase 1: Parameter Update
        network.train()
        phase_metrics['train'] = {k: 0 for k in phase_metrics['train']}

        for batch_idx, (inputs, targets) in enumerate(train_loader):
            # Hardware acceleration protocol
            inputs, targets = inputs.to(device), targets.to(device)

            # Forward propagation
            predictions = network(inputs)
            batch_loss = loss_metric(predictions, targets)

            # Backward propagation with safety measures
            optim.zero_grad()
            batch_loss.backward()

            # Gradient normalization safeguard
            torch.nn.utils.clip_grad_norm_(
                network.parameters(),
                optimization_policy['grad_clip']
            )

            # Parameter update
            optim.step()

            # Metric aggregation
            phase_metrics['train']['loss'] += batch_loss.item()
            _, predicted_labels = torch.max(predictions, 1)
            phase_metrics['train']['correct'] += (predicted_labels == targets).sum().item()
            phase_metrics['train']['total'] += targets.size(0)

            # Progress monitoring
            if (batch_idx+1) % 25 == 0:
                print(f'Epoch [{cycle+1}/{num_cycles}], Batch [{batch_idx+1}/{len(train_loader)}]')

        # Phase 1 metrics calculation
        train_acc = 100.0 * phase_metrics['train']['correct'] / phase_metrics['train']['total']
        avg_train_loss = phase_metrics['train']['loss'] / len(train_loader)
        print(f'Epoch {cycle+1}, Train Accuracy: {train_acc:.2f}%, Avg Loss: {avg_train_loss:.4f}')

        # Phase 2: Model Validation
        network.eval()
        phase_metrics['val'] = {k: 0 for k in phase_metrics['val']}

        with torch.no_grad():
            for val_inputs, val_targets in val_loader:
                val_inputs, val_targets = val_inputs.to(device), val_targets.to(device)
                val_predictions = network(val_inputs)
                val_loss = loss_metric(val_predictions, val_targets)

                # Prediction consensus
                _, val_predicted = torch.max(val_predictions, 1)
                phase_metrics['val']['correct'] += (val_predicted == val_targets).sum().item()
                phase_metrics['val']['total'] += val_targets.size(0)
                phase_metrics['val']['loss'] += val_loss.item()

        # Phase 2 metrics calculation
        val_acc = 100.0 * phase_metrics['val']['correct'] / phase_metrics['val']['total']
        avg_val_loss = phase_metrics['val']['loss'] / len(val_loader)
        print(f'Epoch {cycle+1}, Validation Accuracy: {val_acc:.2f}%, Avg Loss: {avg_val_loss:.4f}')

        # External logging interface
        if logging_mode == "wandb":
            _log_training_artifacts(
                cycle+1, avg_train_loss, train_acc, avg_val_loss, val_acc
            )

    # Final model capability score
    return val_acc

def _log_training_artifacts(cycle, train_loss, train_acc, val_loss, val_acc):
    """Record training trajectory for analysis"""
    wandb.log({
        'Epoch': cycle,
        'Training Loss': train_loss,
        'Training Accuracy': train_acc,
        'Validation Loss': val_loss,
        'Validation Accuracy': val_acc
    })

In [9]:
def load_model(device):
    model = models.googlenet(pretrained=True)
    last_layer_in_features = model.fc.in_features
    model.fc = nn.Linear(last_layer_in_features, 10)
    model = model.to(device)
    return model

In [10]:
model = load_model(device)



In [11]:
classes = ['Amphibia', 'Animalia', 'Arachnida', 'Aves', 'Fungi',
           'Insecta', 'Mammalia', 'Mollusca', 'Plantae', 'Reptilia']

In [12]:
def test_model(network, data_loader):
    """Execute model evaluation with diagnostic analytics"""
    # Evaluation protocol configuration
    eval_profile = {
        'loss_function': nn.CrossEntropyLoss(),
        'sample_capture_interval': 200,  # Diagnostic imaging frequency
        'precision_mode': 'fp32',         # Evaluation precision
        'enable_metrics': True            # Comprehensive reporting
    }

    # Performance tracking
    performance_stats = {
        'correct': 0,
        'total': 0,
        'loss': 0.0,
        'diagnostic_images': [],
        'prediction_records': []
    }

    # Hardware optimization
    compute_device = next(network.parameters()).device

    with torch.inference_mode():
        sample_counter = 0
        for batch_inputs, batch_labels in data_loader:
            # Data standardization protocol
            batch_inputs = batch_inputs.to(compute_device)
            batch_labels = batch_labels.to(compute_device)

            # Model inference
            predictions = network(batch_inputs)

            # Loss computation
            batch_loss = eval_profile['loss_function'](predictions, batch_labels)
            performance_stats['loss'] += batch_loss.item()

            # Prediction analysis
            _, predicted_classes = torch.max(predictions, 1)
            performance_stats['correct'] += (predicted_classes == batch_labels).sum().item()
            performance_stats['total'] += batch_labels.size(0)

            # Diagnostic image capture
            if eval_profile['enable_metrics']:
                for idx in range(batch_inputs.size(0)):
                    sample_counter += 1
                    if sample_counter % eval_profile['sample_capture_interval'] in (1, 2, 3):
                        if sample_counter % eval_profile['sample_capture_interval'] in (1, 2, 3):
                            performance_stats['diagnostic_images'].append(batch_inputs[idx])
                            performance_stats['prediction_records'].append((batch_labels[idx].item(), predicted_classes[idx].item()))
                            print(f'Class Verification: Actual: {classes[batch_labels[idx]]}, Predicted: {classes[predicted_classes[idx]]}')


        # Final metric computation
        accuracy = 100.0 * performance_stats['correct'] / performance_stats['total']
        avg_loss = performance_stats['loss'] / len(data_loader)

        # Result certification
        print(f'Model Diagnostics :: Accuracy: {accuracy:.2f}% | Loss: {avg_loss:.4f}')
        print(f'Total Samples Analyzed: {performance_stats["total"]}')

    return performance_stats['diagnostic_images'], performance_stats['prediction_records']

In [13]:
# test_images , test_labels = test_model(model , test_dl)

In [14]:
import wandb
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
from io import BytesIO
from PIL import Image

sns.set_style("white")

def display_images_with_predictions(test_images, test_labels, classes, num_rows=10, num_cols=3, log_to_wandb=False):
    fig, axes = plt.subplots(num_rows, num_cols, figsize=(num_cols * 4, num_rows * 3))
    axes = axes.flatten()
    total = num_rows * num_cols

    for i in range(total):
        ax = axes[i]
        img = np.transpose(test_images[i].cpu().numpy(), (1, 2, 0))
        img_min, img_max = img.min(), img.max()
        img = (img - img_min) / (img_max - img_min + 1e-5)

        ax.imshow(img)
        ax.axis('off')

        true_label = test_labels[i][0]
        pred_label = test_labels[i][1]
        correct = (true_label == pred_label)

        emoji = "✓" if correct else "✗"
        label_color = 'green' if correct else 'red'

        ax.set_title(f"{emoji} True: {classes[true_label]}\nPred: {classes[pred_label]}",
                     fontsize=9, color=label_color, loc='center', pad=10)

        for spine in ax.spines.values():
            spine.set_linewidth(1.5)
            spine.set_color(label_color)
            spine.set_linestyle('--')

    plt.subplots_adjust(hspace=0.6, wspace=0.2)
    plt.tight_layout()


    if log_to_wandb:
        buf = BytesIO()
        plt.savefig(buf, format='png')
        buf.seek(0)
        image = Image.open(buf)  # Convert BytesIO to PIL Image
        wandb.log({"Predictions": wandb.Image(image)})
        buf.close()
        plt.close(fig)
        plt.show()

    else:
        plt.show()


In [15]:
model = load_model(device)
print(model)

GoogLeNet(
  (conv1): BasicConv2d(
    (conv): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (bn): BatchNorm2d(64, eps=0.001, momentum=0.1, affine=True, track_running_stats=True)
  )
  (maxpool1): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=True)
  (conv2): BasicConv2d(
    (conv): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
    (bn): BatchNorm2d(64, eps=0.001, momentum=0.1, affine=True, track_running_stats=True)
  )
  (conv3): BasicConv2d(
    (conv): Conv2d(64, 192, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (bn): BatchNorm2d(192, eps=0.001, momentum=0.1, affine=True, track_running_stats=True)
  )
  (maxpool2): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=True)
  (inception3a): Inception(
    (branch1): BasicConv2d(
      (conv): Conv2d(192, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn): BatchNorm2d(64, eps=0.001, momentum=0.1, affine=True, track



In [16]:
train_loader , val_loader , test_loader = configure_loaders(True)

In [None]:
epochs=6
strategy='start'
k=5
train(epochs,model,train_loader,val_loader,"print_on", strategy, k)
test_images, test_labels = test_model(model, test_loader)
display_images_with_predictions(test_images, test_labels, classes, log_to_wandb=False)

Freezed First 5 Layer
Epoch [1/6], Batch [25/125]
Epoch [1/6], Batch [50/125]
Epoch [1/6], Batch [75/125]
Epoch [1/6], Batch [100/125]
Epoch [1/6], Batch [125/125]
Epoch 1, Train Accuracy: 53.39%, Avg Loss: 1.5109
Epoch 1, Validation Accuracy: 70.15%, Avg Loss: 0.9186
Epoch [2/6], Batch [25/125]
Epoch [2/6], Batch [50/125]
Epoch [2/6], Batch [75/125]
Epoch [2/6], Batch [100/125]
Epoch [2/6], Batch [125/125]
Epoch 2, Train Accuracy: 67.55%, Avg Loss: 0.9920
Epoch 2, Validation Accuracy: 74.95%, Avg Loss: 0.7647
Epoch [3/6], Batch [25/125]
Epoch [3/6], Batch [50/125]
Epoch [3/6], Batch [75/125]
Epoch [3/6], Batch [100/125]
Epoch [3/6], Batch [125/125]
Epoch 3, Train Accuracy: 71.28%, Avg Loss: 0.8637
Epoch 3, Validation Accuracy: 77.20%, Avg Loss: 0.6784
Epoch [4/6], Batch [25/125]
Epoch [4/6], Batch [50/125]
Epoch [4/6], Batch [75/125]
Epoch [4/6], Batch [100/125]
Epoch [4/6], Batch [125/125]
Epoch 4, Train Accuracy: 73.85%, Avg Loss: 0.7832
Epoch 4, Validation Accuracy: 77.80%, Avg Los

In [None]:
model = load_model(device)
# print(model)
train_loader , val_loader , test_loader = configure_loaders(True)
epochs=
strategy='middle'
k=5
train(epochs,model,train_loader,val_loader,"print_on", strategy, k)
test_images, test_labels = test_model(model, test_loader)
display_images_with_predictions(test_images, test_labels, classes, log_to_wandb=False)

In [None]:
model = load_model(device)
# print(model)
train_loader , val_loader , test_loader = configure_loaders(True)
epochs=5
strategy='end'
k=5
train(epochs,model,train_loader,val_loader,"print_on", strategy, k)
test_images, test_labels = test_model(model, test_loader)
display_images_with_predictions(test_images, test_labels, classes, log_to_wandb=False)

In [None]:
model = load_model(device)
# print(model)
train_loader , val_loader , test_loader = configure_loaders(True)
epochs=5
strategy='freeze_all'
k=5
train(epochs,model,train_loader,val_loader,"print_on", strategy, k)
test_images, test_labels = test_model(model, test_loader)
display_images_with_predictions(test_images, test_labels, classes, log_to_wandb=False)