In [15]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import time
from PIL import Image

# DenseNet_model14 brightness_contrast augmentation architecture

In [30]:
class DenseLayer_model14(nn.Module):
    def __init__(self, in_channels, growth_rate, bn_size, drop_rate):
        super(DenseLayer_model14, self).__init__()
        # BN-ReLU-Conv(1x1)
        self.bn1 = nn.BatchNorm2d(in_channels)
        self.relu1 = nn.ReLU(inplace=True)
        self.conv1 = nn.Conv2d(in_channels, bn_size * growth_rate, kernel_size=1, stride=1, bias=False)
        
        # BN-ReLU-Conv(3x3)
        self.bn2 = nn.BatchNorm2d(bn_size * growth_rate)
        self.relu2 = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(bn_size * growth_rate, growth_rate, kernel_size=3, stride=1, padding=1, bias=False)
        
        self.drop_rate = drop_rate
        
    def forward(self, x):
        new_features = self.conv1(self.relu1(self.bn1(x)))
        new_features = self.conv2(self.relu2(self.bn2(new_features)))
        if self.drop_rate > 0:
            new_features = F.dropout(new_features, p=self.drop_rate, training=self.training)
        return torch.cat([x, new_features], 1)

class DenseBlock_model14(nn.Module):
    def __init__(self, num_layers, in_channels, growth_rate, bn_size, drop_rate):
        super(DenseBlock_model14, self).__init__()
        self.layers = nn.ModuleList()
        for i in range(num_layers):
            self.layers.add_module('DenseLayer_model14%d' % (i + 1),
                                  DenseLayer_model14(in_channels + i * growth_rate, growth_rate, bn_size, drop_rate))
            
    def forward(self, x):
        features = x
        for layer in self.layers:
            features = layer(features)
        return features

class Transition_model14(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(Transition_model14, self).__init__()
        self.bn = nn.BatchNorm2d(in_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1, bias=False)
        self.pool = nn.AvgPool2d(kernel_size=2, stride=2)
        
    def forward(self, x):
        x = self.bn(x)
        x = self.relu(x)
        x = self.conv(x)
        x = self.pool(x)
        return x

class DenseNet_model14(nn.Module):
    def __init__(self, growth_rate=12, block_config=(6, 12, 8), 
                 num_init_features=32, bn_size=4, drop_rate=0.3, num_classes=10):
        super(DenseNet_model14, self).__init__()
        
        # First convolution
        self.features = nn.Sequential(
            nn.Conv2d(3, num_init_features, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(num_init_features),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        
        # Each DenseBlock_model14
        num_features = num_init_features
        for i, num_layers in enumerate(block_config):
            # Add a dense block
            block = DenseBlock_model14(
                num_layers=num_layers,
                in_channels=num_features,
                growth_rate=growth_rate,
                bn_size=bn_size,
                drop_rate=drop_rate
            )
            self.features.add_module('DenseBlock_model14%d' % (i + 1), block)
            num_features = num_features + num_layers * growth_rate
            
            # Add a Transition_model14 layer between dense blocks (except after the last block)
            if i != len(block_config) - 1:
                trans = Transition_model14(in_channels=num_features, out_channels=num_features // 2)
                self.features.add_module('Transition_model14%d' % (i + 1), trans)
                num_features = num_features // 2
        
        # Final batch norm
        self.features.add_module('norm5', nn.BatchNorm2d(num_features))
        self.features.add_module('relu5', nn.ReLU(inplace=True))
        
        # Linear layer
        self.classifier = nn.Linear(num_features, num_classes)
        
        # Initialize weights
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.constant_(m.bias, 0)
    
    def forward(self, x):
        features = self.features(x)
        out = F.adaptive_avg_pool2d(features, (1, 1))
        out = torch.flatten(out, 1)
        out = self.classifier(out)
        return out

In [32]:
# Load the entire model
model_brightness_contrast = torch.load("../DenseNet/model_id_14_pelny_chyba.pth", weights_only=False)

# Set the model to evaluation mode
model_brightness_contrast.eval()

# Define preprocessing transformations (adjust based on how your model was trained)
preprocess = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])  # ImageNet normalization
])

# Load and preprocess your image
image_path = "../../data/raw/valid/airplane/cifar10-train-10031.png"
image = Image.open(image_path).convert('RGB')
input_tensor = preprocess(image)
input_batch = input_tensor.unsqueeze(0)  # Add batch dimension

# Handle device compatibility (CPU or GPU)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model_brightness_contrast = model_brightness_contrast.to(device)
input_batch = input_batch.to(device)

# Make prediction
with torch.no_grad():  # Disable gradient calculation
    output = model_brightness_contrast(input_batch)

# Process the output based on your model type
# For classification:
_, predicted_class = torch.max(output, 1)
print(f"Predicted class: {predicted_class.item()}")

Predicted class: 0


In [62]:
criterion = nn.CrossEntropyLoss()
trainset_raw = torchvision.datasets.ImageFolder('../../data/raw/train/', transform=transform)
trainloader_raw = torch.utils.data.DataLoader(trainset_raw, batch_size=32,
                                          shuffle=True, num_workers=2)
valset = torchvision.datasets.ImageFolder('../../data/raw/valid/', transform=preprocess)
valloader = torch.utils.data.DataLoader(valset, batch_size=32, shuffle=False, num_workers=2)

In [49]:
val_error = 0
correct = 0
with torch.no_grad():
    model_brightness_contrast.eval()
    for images, labels in valloader:
        images, labels = images.to(device), labels.to(device)
        outputs = model_brightness_contrast(images)
        val_error = val_error + criterion(outputs, labels) * images.size(0)
        correct += (torch.argmax(outputs, 1) == labels).float().sum().item()
        val_error = val_error / len(valloader.dataset)
    print(f'epoch NONE val error: {val_error}, acc: {correct/len(valloader.dataset)}')

Exception ignored in: <function _MultiProcessingDataLoaderIter.__del__ at 0x00000270D33A84A0>
Traceback (most recent call last):
  File "C:\Users\micha\Envs\DeepLearning\Lib\site-packages\torch\utils\data\dataloader.py", line 1618, in __del__
    self._shutdown_workers()
  File "C:\Users\micha\Envs\DeepLearning\Lib\site-packages\torch\utils\data\dataloader.py", line 1582, in _shutdown_workers
    w.join(timeout=_utils.MP_STATUS_CHECK_INTERVAL)
  File "C:\Users\micha\AppData\Local\Programs\Python\Python311\Lib\multiprocessing\process.py", line 149, in join
    res = self._popen.wait(timeout)
          ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\micha\AppData\Local\Programs\Python\Python311\Lib\multiprocessing\popen_spawn_win32.py", line 109, in wait
    res = _winapi.WaitForSingleObject(int(self._handle), msecs)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt: 

KeyboardInterrupt



# DenseNet rotation augmentation architecture (model 12)

In [38]:
class DenseLayer(nn.Module):
    def __init__(self, in_channels, growth_rate, bn_size, drop_rate):
        super(DenseLayer, self).__init__()
        # BN-ReLU-Conv(1x1)
        self.bn1 = nn.BatchNorm2d(in_channels)
        self.relu1 = nn.ReLU(inplace=True)
        self.conv1 = nn.Conv2d(in_channels, bn_size * growth_rate, kernel_size=1, stride=1, bias=False)
        
        # BN-ReLU-Conv(3x3)
        self.bn2 = nn.BatchNorm2d(bn_size * growth_rate)
        self.relu2 = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(bn_size * growth_rate, growth_rate, kernel_size=3, stride=1, padding=1, bias=False)
        
        self.drop_rate = drop_rate
        
    def forward(self, x):
        new_features = self.conv1(self.relu1(self.bn1(x)))
        new_features = self.conv2(self.relu2(self.bn2(new_features)))
        if self.drop_rate > 0:
            new_features = F.dropout(new_features, p=self.drop_rate, training=self.training)
        return torch.cat([x, new_features], 1)

class DenseBlock(nn.Module):
    def __init__(self, num_layers, in_channels, growth_rate, bn_size, drop_rate):
        super(DenseBlock, self).__init__()
        self.layers = nn.ModuleList()
        for i in range(num_layers):
            self.layers.add_module('denselayer%d' % (i + 1),
                                  DenseLayer(in_channels + i * growth_rate, growth_rate, bn_size, drop_rate))
            
    def forward(self, x):
        features = x
        for layer in self.layers:
            features = layer(features)
        return features

class Transition(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(Transition, self).__init__()
        self.bn = nn.BatchNorm2d(in_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1, bias=False)
        self.pool = nn.AvgPool2d(kernel_size=2, stride=2)
        
    def forward(self, x):
        x = self.bn(x)
        x = self.relu(x)
        x = self.conv(x)
        x = self.pool(x)
        return x

class DenseNet(nn.Module):
    def __init__(self, growth_rate=12, block_config=(6, 12, 8), 
                 num_init_features=32, bn_size=4, drop_rate=0.2, num_classes=10):
        super(DenseNet, self).__init__()
        
        # First convolution
        self.features = nn.Sequential(
            nn.Conv2d(3, num_init_features, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(num_init_features),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        
        # Each denseblock
        num_features = num_init_features
        for i, num_layers in enumerate(block_config):
            # Add a dense block
            block = DenseBlock(
                num_layers=num_layers,
                in_channels=num_features,
                growth_rate=growth_rate,
                bn_size=bn_size,
                drop_rate=drop_rate
            )
            self.features.add_module('denseblock%d' % (i + 1), block)
            num_features = num_features + num_layers * growth_rate
            
            # Add a transition layer between dense blocks (except after the last block)
            if i != len(block_config) - 1:
                trans = Transition(in_channels=num_features, out_channels=num_features // 2)
                self.features.add_module('transition%d' % (i + 1), trans)
                num_features = num_features // 2
        
        # Final batch norm
        self.features.add_module('norm5', nn.BatchNorm2d(num_features))
        self.features.add_module('relu5', nn.ReLU(inplace=True))
        
        # Linear layer
        self.classifier = nn.Linear(num_features, num_classes)
        
        # Initialize weights
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.constant_(m.bias, 0)
    
    def forward(self, x):
        features = self.features(x)
        out = F.adaptive_avg_pool2d(features, (1, 1))
        out = torch.flatten(out, 1)
        out = self.classifier(out)
        return out

In [39]:
# Load the entire model
model_rotation = DenseNet(growth_rate=12, block_config=(6, 12, 8), num_classes=10)
state_dict  = torch.load("../basic_CNN/model_id_12_DenseNET.pth")
model_rotation.load_state_dict(state_dict)

# Set the model to evaluation mode
model_rotation.eval()

image = Image.open(image_path).convert('RGB')
input_tensor = preprocess(image)
input_batch = input_tensor.unsqueeze(0)  # Add batch dimension

model_rotation = model_rotation.to(device)
input_batch = input_batch.to(device)

# Make prediction
with torch.no_grad():  # Disable gradient calculation
    output = model_rotation(input_batch)

# Process the output based on your model type
# For classification:
_, predicted_class = torch.max(output, 1)
print(f"Predicted class: {predicted_class.item()}")

Predicted class: 0


# Soft voting

In [47]:
def ensemble_predict(image, model1, model2):
    # Get predictions from both models
    with torch.no_grad():
        output1 = model1(image)
        output2 = model2(image)
    
    # For classification (voting)
    if output1.shape[1] > 1:  # Multi-class
        # Average the probabilities
        avg_output = (output1 + output2) / 2
        # Or use hard voting
        # pred1 = output1.argmax(dim=1)
        # pred2 = output2.argmax(dim=1)
        # Use most common prediction
    
    # For regression
    else:
        avg_output = (output1 + output2) / 2
        
    return avg_output

In [48]:
image = Image.open(image_path).convert('RGB')
input_tensor = preprocess(image)
input_batch = input_tensor.unsqueeze(0)  # Add batch dimension

input_batch = input_batch.to(device)

# Make prediction
with torch.no_grad():  # Disable gradient calculation
    output = ensemble_predict(input_batch, model_brightness_contrast, model_rotation)

# Process the output based on your model type
# For classification:
_, predicted_class = torch.max(output, 1)
print(f"Predicted class: {predicted_class.item()}")

Predicted class: 0


In [50]:
val_error = 0
correct = 0
with torch.no_grad():
    for images, labels in valloader:
        images, labels = images.to(device), labels.to(device)
        outputs = ensemble_predict(images, model_brightness_contrast, model_rotation)
        val_error = val_error + criterion(outputs, labels) * images.size(0)
        correct += (torch.argmax(outputs, 1) == labels).float().sum().item()
        val_error = val_error / len(valloader.dataset)
    print(f'epoch NONE val error: {val_error}, acc: {correct/len(valloader.dataset)}')

epoch NONE val error: 9.175079321721569e-05, acc: 0.7689333333333334


In [75]:
def ensemble_predict_hard_voting(image, model1, model2):
    # Get predictions from both models
    with torch.no_grad():
        output1 = model1(image)
        output2 = model2(image)
    
    # Get the predicted class from each model
    pred1 = output1.argmax(dim=1)
    pred2 = output2.argmax(dim=1)
    
    batch_size = image.shape[0]
    num_classes = output1.shape[1]
    
    # Initialize output tensor with zeros
    ensemble_output = torch.zeros((batch_size, num_classes), device=image.device)
    
    # Implement hard voting for each sample in the batch
    for i in range(batch_size):
        # Get confidence scores for both models
        conf1 = output1[i, pred1[i]]
        conf2 = output2[i, pred2[i]]
        
        # If models agree, use their prediction
        if pred1[i] == pred2[i]:
            chosen_class = pred1[i]
        else:
            # If models disagree, choose the one with higher confidence
            chosen_class = pred1[i] if conf1 > conf2 else pred2[i]
        
        # Use the probabilities from the model with higher confidence
        ensemble_output[i] = output1[i] if conf1 > conf2 else output2[i]
    
    return ensemble_output

In [76]:
val_error = 0
correct = 0
with torch.no_grad():
    for images, labels in valloader:
        images, labels = images.to(device), labels.to(device)
        outputs = ensemble_predict_hard_voting(images, model_brightness_contrast, model_rotation)
        val_error = val_error + criterion(outputs, labels) * images.size(0)
        correct += (torch.argmax(outputs, 1) == labels).float().sum().item()
        val_error = val_error / len(valloader.dataset)
    print(f'epoch NONE val error: {val_error}, acc: {correct/len(valloader.dataset)}')

epoch NONE val error: 0.00011848946451209486, acc: 0.7558555555555555


# Hard voting

# Weighted ensemble

In [51]:
def weighted_ensemble(image, model1, model2, weight1=0.6, weight2=0.4):
    with torch.no_grad():
        output1 = model1(image)
        output2 = model2(image)
    
    # Weight the outputs based on model performance
    weighted_output = weight1 * output1 + weight2 * output2
    return weighted_output

In [53]:
val_error = 0
correct = 0
with torch.no_grad():
    for images, labels in valloader:
        images, labels = images.to(device), labels.to(device)
        outputs = weighted_ensemble(images, model_brightness_contrast, model_rotation, weight1=0.55, weight2=0.45)
        val_error = val_error + criterion(outputs, labels) * images.size(0)
        correct += (torch.argmax(outputs, 1) == labels).float().sum().item()
        val_error = val_error / len(valloader.dataset)
    print(f'epoch NONE val error: {val_error}, acc: {correct/len(valloader.dataset)}')

epoch NONE val error: 9.493881952948868e-05, acc: 0.7687111111111111


# Ensemble Model

In [68]:

import os

class EarlyStopper:
    def __init__(self, patience=2, min_delta=0):
        self.patience = patience
        self.min_delta = min_delta
        self.counter = 0
        self.min_error = float('inf')
    
    def check(self, val_error, model):
        if val_error < self.min_error - self.min_delta:
            self.min_error = val_error
            # Save state_dict instead of entire model
            torch.save(model.state_dict(), 'best_ensemble_model.pth')
            self.counter = 0
            return (False, True)  # Not stopping, but saved new best model
        else:
            self.counter += 1
            if self.counter >= self.patience:
                return (True, False)  # Stopping, no new best model
            return (False, False)  # Not stopping, no new best model

class EnsembleModel(nn.Module):
    def __init__(self, model1, model2, meta_layer_size=64):
        super(EnsembleModel, self).__init__()
        self.model1 = model1
        self.model2 = model2
        
        # Freeze base models
        for param in self.model1.parameters():
            param.requires_grad = False
        for param in self.model2.parameters():
            param.requires_grad = False
            
        # Determine output size of the models
        try:
            # Try to get output size from the final layer
            last_layer1 = list(model_brightness_contrast.modules())[-1]
            while not isinstance(last_layer1, nn.Linear) and len(list(last_layer1.children())) == 0:
                last_layer1 = list(model_brightness_contrast.modules())[-2]
            
            last_layer2 = list(model_rotation.modules())[-1]
            while not isinstance(last_layer2, nn.Linear) and len(list(last_layer2.children())) == 0:
                last_layer2 = list(model_rotation.modules())[-2]
                
            output_size1 = last_layer1.out_features
            output_size2 = last_layer2.out_features
            
            # Check if outputs have same dimensions
            if output_size1 != output_size2:
                raise ValueError("Models have different output dimensions")
            
            output_size = output_size1
        except:
            # Fallback - you'll need to specify this manually if automatic detection fails
            output_size = 1000  # replace with your actual output size
            print("Couldn't detect output size automatically. Using default:", output_size)
        
        # Create meta-learning layers
        self.meta_layer = nn.Sequential(
            nn.Linear(output_size * 2, meta_layer_size),
            nn.ReLU(),
            nn.Dropout(0.3),  # Add dropout for regularization
            nn.Linear(meta_layer_size, output_size)
        )
        
    def forward(self, x):
        # Get outputs from both models
        with torch.no_grad():  # Ensure base models aren't updated
            out1 = self.model1(x)
            out2 = self.model2(x)
        
        # Concatenate outputs
        combined = torch.cat((out1, out2), dim=1)
        
        # Pass through meta-layer
        final_output = self.meta_layer(combined)
        return final_output

# Now you need to train the meta-learner on a validation set
def train_ensemble(ensemble_model, train_loader, val_loader, num_epochs=10, patience=5):
    criterion = nn.CrossEntropyLoss()  # Change to appropriate loss function
    # Only train the meta-layers
    optimizer = optim.Adam(filter(lambda p: p.requires_grad, ensemble_model.parameters()), lr=0.001)
    
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    ensemble_model = ensemble_model.to(device)
    
    # Initialize early stopper
    early_stopper = EarlyStopper(patience=patience, min_delta=0.001)
    
    # Track best metrics
    best_val_loss = float('inf')
    best_val_acc = 0.0
    
    print(f"Starting training for {num_epochs} epochs on {device}")
    print(f"Early stopping patience: {patience}")
    print("-" * 80)
    print("| Epoch | Train Loss | Val Loss  | Val Acc | Time (s) | Best Model |")
    print("-" * 80)
    
    for epoch in range(num_epochs):
        epoch_start_time = time.time()
        
        # Training phase
        ensemble_model.train()
        running_loss = 0.0
        batch_count = 0
        correct_train = 0
        total_train = 0
        
        for i, (inputs, labels) in enumerate(train_loader):
            inputs = inputs.to(device)
            labels = labels.to(device)
            
            optimizer.zero_grad()
            outputs = ensemble_model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
            batch_count += 1
            
            # Calculate training accuracy
            _, predicted = torch.max(outputs, 1)
            total_train += labels.size(0)
            correct_train += (predicted == labels).sum().item()
            
            # Print batch progress every 10 batches
            if (i+1) % 10 == 0:
                batch_loss = running_loss / batch_count
                batch_acc = 100 * correct_train / total_train
                print(f"Epoch {epoch+1}/{num_epochs} - Batch {i+1}/{len(train_loader)}: Loss: {batch_loss:.4f}, Acc: {batch_acc:.2f}%", end='\r')
        
        # Calculate average training loss and accuracy
        epoch_loss = running_loss / batch_count
        train_acc = 100 * correct_train / total_train
        
        # Validation phase
        ensemble_model.eval()
        val_loss = 0.0
        correct = 0
        total = 0
        
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs = inputs.to(device)
                labels = labels.to(device)
                
                outputs = ensemble_model(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                
                _, predicted = torch.max(outputs, 1)
                
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
        
        val_epoch_loss = val_loss / len(val_loader)
        val_acc = 100 * correct / total
        
        # Check if this is the best model
        is_best = ""
        if val_epoch_loss < best_val_loss:
            best_val_loss = val_epoch_loss
            is_best = "✓ (loss)"
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            if is_best == "":
                is_best = "✓ (acc)"
            else:
                is_best = "✓ (both)"
        
        # Calculate epoch time
        epoch_time = time.time() - epoch_start_time
        
        # Print progress
        print(f"| {epoch+1:5d} | {epoch_loss:.6f} | {val_epoch_loss:.6f} | {val_acc:6.2f}% | {epoch_time:7.2f} | {is_best:10s} |")
        
        # Check early stopping
        should_stop, new_best = early_stopper.check(val_epoch_loss, ensemble_model)
        if new_best:
            print(f"New best model saved with validation loss: {val_epoch_loss:.6f}")
        
        if should_stop:
            print(f'Early stopping triggered at epoch {epoch+1}. No improvement for {patience} epochs.')
            break
    
    print("-" * 80)
    print(f"Training completed. Best validation accuracy: {best_val_acc:.2f}%, Best validation loss: {best_val_loss:.6f}")
    
    # Load the best model
    if os.path.exists('best_ensemble_model.pth'):
        print("Loading best model weights...")
        ensemble_model.load_state_dict(torch.load('best_ensemble_model.pth'))
    
    return ensemble_model

def predict_with_ensemble(ensemble_model, image):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    ensemble_model = ensemble_model.to(device)
    ensemble_model.eval()
    
    with torch.no_grad():
        image = image.to(device)
        output = ensemble_model(image)
        
        # For classification
        _, predicted = torch.max(output, 1)
        # For regression
        # predicted = output
        
    return predicted

In [69]:
ensemble = EnsembleModel(model_brightness_contrast, model_rotation)

In [70]:
ensemle_model = train_ensemble(ensemble, train_loader=trainloader_raw, val_loader=valloader, num_epochs=100)

Starting training for 100 epochs on cuda
Early stopping patience: 5
--------------------------------------------------------------------------------
| Epoch | Train Loss | Val Loss  | Val Acc | Time (s) | Best Model |
--------------------------------------------------------------------------------
|     1 | 0.642540 | 0.706942 |  76.85% |  144.92 | ✓ (both)   |
New best model saved with validation loss: 0.706942
|     2 | 0.575266 | 0.698058 |  77.00% |  147.08 | ✓ (both)   |
New best model saved with validation loss: 0.698058
|     3 | 0.568090 | 0.709668 |  76.78% |  144.86 |            |
|     4 | 0.565016 | 0.711150 |  76.84% |  147.74 |            |
|     5 | 0.564080 | 0.699611 |  77.01% |  148.42 | ✓ (acc)    |
|     6 | 0.562507 | 0.716086 |  76.87% |  145.89 |            |
|     7 | 0.560734 | 0.704827 |  77.05% |  146.23 | ✓ (acc)    |
Early stopping triggered at epoch 7. No improvement for 5 epochs.
----------------------------------------------------------------------------

# Hierarchical stacking

In [79]:
# First, let's define the augmentation functions
def apply_brightness_contrast(images, brightness_factor=1.2, contrast_factor=1.2):
    """Apply brightness and contrast augmentation to the images"""
    # Create a copy to avoid modifying the original
    augmented_images = images.clone()
    # Apply brightness (multiply)
    augmented_images = augmented_images * brightness_factor
    # Apply contrast (subtract mean, multiply by contrast factor, add mean back)
    mean = torch.mean(augmented_images, dim=[1, 2, 3], keepdim=True)
    augmented_images = (augmented_images - mean) * contrast_factor + mean
    # Clip values to be in valid range [0, 1]
    augmented_images = torch.clamp(augmented_images, 0, 1)
    return augmented_images

# Define the weighted ensemble function
def weighted_ensemble(images, model1, model2, weight1=0.5, weight2=0.5):
    """Perform weighted soft voting between two models"""
    outputs1 = model1(images)
    outputs2 = model2(images)
    return outputs1 * weight1 + outputs2 * weight2

# Define the hierarchical ensemble function
def hierarchical_ensemble(images, model_brightness_contrast, model_rotation, 
                         bc_weight1=0.55, bc_weight2=0.45,
                         final_weight_normal=0.5, final_weight_augmented=0.5):
    """
    Implement hierarchical ensembling:
    1. Soft voting for normal image
    2. Soft voting for brightness/contrast-augmented image
    3. Soft voting between results from steps 1 and 2
    """
    # Step 1: Soft voting for normal image
    normal_ensemble = weighted_ensemble(
        images, 
        model_brightness_contrast, 
        model_rotation, 
        bc_weight1, 
        bc_weight2
    )
    
    # Step 2: Apply brightness/contrast augmentation and perform soft voting
    augmented_images = apply_brightness_contrast(images)
    augmented_ensemble = weighted_ensemble(
        augmented_images, 
        model_brightness_contrast, 
        model_rotation, 
        bc_weight1, 
        bc_weight2
    )
    
    # Step 3: Combine predictions from normal and augmented images
    final_ensemble = normal_ensemble * final_weight_normal + augmented_ensemble * final_weight_augmented
    
    return final_ensemble

# Modified evaluation code
val_error = 0
correct = 0
with torch.no_grad():
    for images, labels in valloader:
        images, labels = images.to(device), labels.to(device)
        
        # Use the hierarchical ensemble for predictions
        outputs = hierarchical_ensemble(
            images, 
            model_brightness_contrast, 
            model_rotation, 
            bc_weight1=0.5, 
            bc_weight2=0.5,
            final_weight_normal=0.5,  
            final_weight_augmented=0.5
        )
        
        val_error = val_error + criterion(outputs, labels) * images.size(0)
        correct += (torch.argmax(outputs, 1) == labels).float().sum().item()
    
    val_error = val_error / len(valloader.dataset)
    print(f'epoch HIER val error: {val_error}, acc: {correct/len(valloader.dataset)}')

epoch HIER val error: 0.7796308994293213, acc: 0.7232888888888889


In [None]:
# You might want to experiment with different weights for the final ensemble
# Here's code to find optimal weights on the validation set
def find_optimal_weights(valloader, model_brightness_contrast, model_rotation, device, criterion,
                        bc_weight1=0.55, bc_weight2=0.45):
    best_acc = 0
    best_weights = (0.5, 0.5)
    
    for final_weight_normal in [0.3, 0.4, 0.5, 0.6, 0.7]:
        final_weight_augmented = 1.0 - final_weight_normal
        val_error = 0
        correct = 0
        
        with torch.no_grad():
            for images, labels in valloader:
                images, labels = images.to(device), labels.to(device)
                
                outputs = hierarchical_ensemble(
                    images, 
                    model_brightness_contrast, 
                    model_rotation, 
                    bc_weight1, 
                    bc_weight2,
                    final_weight_normal,  
                    final_weight_augmented
                )
                
                val_error = val_error + criterion(outputs, labels) * images.size(0)
                correct += (torch.argmax(outputs, 1) == labels).float().sum().item()
            
            val_error = val_error / len(valloader.dataset)
            accuracy = correct / len(valloader.dataset)
            
            print(f'Weights normal={final_weight_normal:.1f}, aug={final_weight_augmented:.1f} - '
                  f'val error: {val_error:.4f}, acc: {accuracy:.4f}')
            
            if accuracy > best_acc:
                best_acc = accuracy
                best_weights = (final_weight_normal, final_weight_augmented)
    
    print(f'Best weights: normal={best_weights[0]:.1f}, aug={best_weights[1]:.1f}, accuracy: {best_acc:.4f}')
    return best_weights