# Część!

## Jak poprawnie odpalić ten notebook na datasecie testowym?

1. Przygotuj folder z obrazkami testowymi za pomocą schematu:
```
| - submission.ipynb
|
└───test_data
    ├───barszcz
    ├───bigos
    ├───grzybowa
    ├───Kutia
    ├───makowiec
    ├───piernik
    ├───pierogi
    └───sernik
└───models
    ├───resnet_18_best_f1.pth
    ├───mobilenet_v3_large_best_f1.pth
    ├───efficientnet_b0_best_f1.pth
    ├───efficientnet_b1_best_f1.pth
    ├───shufflenet_v2_x2_0_best_f1.pth
    ├───regnet_y_800mf_best_f1.pth
```

Proszę pilnuj tego aby nazwy folderów z klasami byli takie same jak w stukturze wyżej!

2. Umieść obrazki każdej klasy w odpowidającym folderze.

3. Aby urochomić poniższy kod będziesz potrzebował bibliotek z requirements.txt

Możesz ich zainstalować komendą: `pip install -r requirements.txt`

4. Potrzebujesz też CUDA 12.1

5. Puść poniższy kod!

In [1]:
import random
import torch.nn as nn
import torch
from torchvision import datasets, transforms, models
from sklearn.metrics import f1_score
from torch.utils.data import DataLoader
import torch
import torch.nn as nn
from sklearn.metrics import f1_score
import torchvision.models as models
import timm
device = "cuda" if torch.cuda.is_available() else "cpu"
import numpy as np
print(device)

  from .autonotebook import tqdm as notebook_tqdm


cuda


In [2]:
def set_seed(seed_value):
    random.seed(seed_value)
    torch.manual_seed(seed_value)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed_value)
        torch.cuda.manual_seed_all(seed_value)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False

In [3]:
def load_test_data(root_dir = 'test_data'):
    size_tuple = (224, 224)
    # Define a mapping from folder names to class numbers
    class_mapping = {
        'barszcz': 1,
        'bigos': 2,
        'Kutia': 3,
        'makowiec': 4,
        'piernik': 5,
        'pierogi': 6,
        'sernik': 7,
        'grzybowa': 8
    }

    batch_size = 32

    # Load the original dataset
    test_dataset = datasets.ImageFolder(root=root_dir, transform=transforms.Compose([
        transforms.Resize(size_tuple),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize
    ]))

    test_dataset.class_to_idx = {k: class_mapping[k] for k in test_dataset.class_to_idx.keys()}

    # Create data loaders
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)
    return test_loader

In [4]:
from collections import defaultdict
from sklearn.metrics import f1_score, classification_report
from torch.utils.data import Subset
class EnsembleTrainer:
    def __init__(self, num_classes, device):
        self.device = device
        self.model_weights = dict()
        
        self.models = {
            'efficientnet_b0': self._prepare_efficientnet_b0(num_classes, 'efficientnet_b0'),
            'efficientnet_b1': self._prepare_efficientnet_b1(num_classes, 'efficientnet_b1'),
            'mobilenet_v3_large': self._prepare_mobilenet_v3_large(num_classes, 'mobilenet_v3_large'),
            'shufflenet_v2_x2_0': self._prepare_shufflenet_v2_x2_0(num_classes, 'shufflenet_v2_x2_0'),
        }


    def _prepare_efficientnet_b0(self, num_classes, model_name):
        model = models.efficientnet_b0(weights='DEFAULT')
        model.classifier[1] = nn.Linear(model.classifier[1].in_features, num_classes)

        # Load the saved checkpoint
        checkpoint = torch.load(f'models/{model_name}_best_f1.pth')

        # Load the model weights
        model.load_state_dict(checkpoint['model_state_dict'])

        # Optional: Set the model to evaluation mode
        model.eval()
        self.model_weights[model_name] = checkpoint['best_val_f1']
        return model.to(self.device)
    
    def _prepare_efficientnet_b1(self, num_classes, model_name):
        model = models.efficientnet_b1(weights="IMAGENET1K_V2")
        model.classifier[1] = nn.Linear(model.classifier[1].in_features, num_classes)

        # Load the saved checkpoint
        checkpoint = torch.load(f'models/{model_name}_best_f1.pth')

        # Load the model weights
        model.load_state_dict(checkpoint['model_state_dict'])

        # Optional: Set the model to evaluation mode
        model.eval()
        self.model_weights[model_name] = checkpoint['best_val_f1']
        return model.to(self.device)

    def _prepare_mobilenet_v3_large(self, num_classes, model_name):
        model = models.mobilenet_v3_large(weights='IMAGENET1K_V2')
        model.classifier[3] = nn.Linear(model.classifier[3].in_features, num_classes)
        
        # Load the saved checkpoint
        checkpoint = torch.load(f'models/{model_name}_best_f1.pth')

        # Load the model weights
        model.load_state_dict(checkpoint['model_state_dict'])

        # Optional: Set the model to evaluation mode
        model.eval()
        self.model_weights[model_name] = checkpoint['best_val_f1']
        
        return model.to(self.device)


    def _prepare_shufflenet_v2_x2_0(self, num_classes, model_name):
        model = models.shufflenet_v2_x2_0(weights='DEFAULT')
        model.fc = nn.Linear(model.fc.in_features, num_classes)
    
        # Load the saved checkpoint
        checkpoint = torch.load(f'models/{model_name}_best_f1.pth')

        # Load the model weights
        model.load_state_dict(checkpoint['model_state_dict'])

        # Optional: Set the model to evaluation mode
        model.eval()
        self.model_weights[model_name] = checkpoint['best_val_f1']
        
        return model.to(self.device)



    def sequential_batch_evaluation(self, dataloader, batch_size=20):
        # Total number of classes
        num_classes = len(dataloader.dataset.classes)
        
        # Create a list to track F1 scores for each sequential batch
        sequential_f1_scores = []
        
        # Get the total number of images
        total_images = len(dataloader.dataset)
        
        # Calculate how many full batches of 10 images per class we can process
        images_per_class = total_images // num_classes
        batches_per_class = images_per_class // batch_size
        
        # Iterate through sequential batches
        for batch_group in range(batches_per_class):
            # Reset ensemble probabilities and labels for this batch group
            all_ensemble_probs = defaultdict(list)
            all_labels = []
            
            # Process 10 images from each class
            for class_idx in range(num_classes):
                start_idx = (batch_group * batch_size) + (class_idx * images_per_class)
                end_idx = start_idx + batch_size
                
                # Create a subset for this specific slice of images
                subset_indices = list(range(start_idx, end_idx))
                subset = Subset(dataloader.dataset, subset_indices)
                
                # Create a new dataloader for this subset
                subset_loader = torch.utils.data.DataLoader(
                    subset, 
                    batch_size=batch_size, 
                    shuffle=False
                )
                
                # Process this subset
                for images, labels in subset_loader:
                    images = images.to(self.device)
                    
                    # Collect probabilities from each model
                    for model_name, model in self.models.items():
                        probs = torch.softmax(model(images), dim=1)
                        all_ensemble_probs[model_name].append(probs.detach().cpu())
                    
                    all_labels.extend(labels.cpu().numpy())
            
            # Normalize model weights
            n = 5
            self.model_weights_now = {k: v ** n for k, v in self.model_weights.items()}
            total_weight = sum(self.model_weights_now.values())
            model_weights_normalized = {model: weight / total_weight for model, weight in self.model_weights_now.items()}
            
            # Calculate ensemble predictions
            all_ensemble_preds = []
            for i in range(len(list(all_ensemble_probs.values())[0])):
                weighted_preds = []
                for model_name in list(self.models.keys()):
                    weight = model_weights_normalized.get(model_name, 0.0)
                    if weight == 0.0:
                        print(f"Warning: No weight for model {model_name}")
                        break
                    weighted_preds.append(all_ensemble_probs[model_name][i] * weight)
                
                ensemble_probs = torch.sum(torch.stack(weighted_preds), dim=0)
                _, ensemble_pred = torch.max(ensemble_probs, 1)
                all_ensemble_preds.extend(ensemble_pred.cpu().numpy())
            
            # Calculate F1 score for this batch group
            batch_f1 = f1_score(all_labels, all_ensemble_preds, average='weighted')
            sequential_f1_scores.append(batch_f1)
            
            # Print detailed report for this batch group
            print(f"\nBatch Group {batch_group + 1} Results:")
            print(f"Weighted Ensemble F1 Score: {batch_f1}")
            print("\nDetailed Classification Report:")
            print(classification_report(all_labels, all_ensemble_preds, 
                                        digits=4,  
                                        zero_division=0))
        
        # Print overall summary of F1 scores
        print("\nSequential Batch F1 Scores:")
        print(sequential_f1_scores)
        print(f"Average F1 Score: {np.mean(sequential_f1_scores)}")
        print(f"F1 Score Standard Deviation: {np.std(sequential_f1_scores)}")
        
        return sequential_f1_scores

In [5]:
test_loader = load_test_data("test_data2secuantional_our")

ensembler = EnsembleTrainer(num_classes=8, device=device)
our_results = ensembler.sequential_batch_evaluation(test_loader)

  checkpoint = torch.load(f'models/{model_name}_best_f1.pth')
  checkpoint = torch.load(f'models/{model_name}_best_f1.pth')
  checkpoint = torch.load(f'models/{model_name}_best_f1.pth')
  checkpoint = torch.load(f'models/{model_name}_best_f1.pth')



Batch Group 1 Results:
Weighted Ensemble F1 Score: 1.0

Detailed Classification Report:
              precision    recall  f1-score   support

           0     1.0000    1.0000    1.0000        20
           1     1.0000    1.0000    1.0000        20
           2     1.0000    1.0000    1.0000        20
           3     1.0000    1.0000    1.0000        20
           4     1.0000    1.0000    1.0000        20
           5     1.0000    1.0000    1.0000        20
           6     1.0000    1.0000    1.0000        20
           7     1.0000    1.0000    1.0000        20

    accuracy                         1.0000       160
   macro avg     1.0000    1.0000    1.0000       160
weighted avg     1.0000    1.0000    1.0000       160


Batch Group 2 Results:
Weighted Ensemble F1 Score: 1.0

Detailed Classification Report:
              precision    recall  f1-score   support

           0     1.0000    1.0000    1.0000        20
           1     1.0000    1.0000    1.0000        20
        

In [6]:
def load_test_data(root_dir = 'test_data'):
    size_tuple = (224, 224)
    # Define a mapping from folder names to class numbers
    # class_mapping = {
    #     'barszcz': 1,
    #     'bigos': 2,
    #     'Kutia': 3,
    #     'makowiec': 4,
    #     'piernik': 5,
    #     'pierogi': 6,
    #     'sernik': 7,
    #     'grzybowa': 8
    # }
    
    class_mapping = {
        'barszcz czerwony': 1,
        'bigos': 2,
        'kutia': 3,
        'makowiec': 4,
        'pierniki': 5,
        'pierogi': 6,
        'sernik': 7,
        'zupa grzybowa': 8
    }

    batch_size = 32

    # Load the original dataset
    test_dataset = datasets.ImageFolder(root=root_dir, transform=transforms.Compose([
        transforms.Resize(size_tuple),
        transforms.ToTensor(),
#        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize
    ]))

    test_dataset.class_to_idx = {k: class_mapping[k] for k in test_dataset.class_to_idx.keys()}

    # Create data loaders
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)
    return test_loader

In [7]:
from collections import defaultdict
from sklearn.metrics import f1_score, classification_report
class EnsembleTrainer:
    def __init__(self, num_classes, device):
        self.device = device
        self.model_weights = {'resnet50': 1}
        
        self.models = {
            'resnet50': self._prepare_resnet50(num_classes, 'resnet50'),
        }


    def _prepare_resnet50(self, num_classes, model_name):
        # Wczytanie modelu
        model_ft = models.resnet50(weights=None)

        num_ftrs = model_ft.fc.in_features
        model_ft.fc = nn.Linear(num_ftrs, num_classes)

        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        model_ft.load_state_dict(torch.load("model_weights.pth", map_location=torch.device(device)))

        print("Model loaded successfully!")

        return model_ft.to(device)



    def sequential_batch_f1_score(self, dataloader, batch_size=20, average='weighted'):
        # Total number of classes
        num_classes = len(dataloader.dataset.classes)
        
        # Create a list to track F1 scores for each sequential batch
        sequential_f1_scores = []
        
        # Get the total number of images
        total_images = len(dataloader.dataset)
        
        # Calculate how many full batches of 10 images per class we can process
        images_per_class = total_images // num_classes
        batches_per_class = images_per_class // batch_size
        
        # Assuming only one model in self.models
        model = list(self.models.values())[0]
        model.eval()  # Set the model to evaluation mode
        
        # Iterate through sequential batches
        for batch_group in range(batches_per_class):
            # Reset labels and predictions for this batch group
            all_labels = []
            all_preds = []
            
            # Process 10 images from each class
            for class_idx in range(num_classes):
                start_idx = (batch_group * batch_size) + (class_idx * images_per_class)
                end_idx = start_idx + batch_size
                
                # Create a subset for this specific slice of images
                subset_indices = list(range(start_idx, end_idx))
                subset = Subset(dataloader.dataset, subset_indices)
                
                # Create a new dataloader for this subset
                subset_loader = torch.utils.data.DataLoader(
                    subset, 
                    batch_size=batch_size, 
                    shuffle=False
                )
                
                # Process this subset
                with torch.no_grad():
                    for images, labels in subset_loader:
                        images = images.to(self.device)
                        
                        # Get model predictions
                        outputs = model(images)
                        _, preds = torch.max(outputs, 1)
                        
                        all_preds.extend(preds.cpu().numpy())
                        all_labels.extend(labels.cpu().numpy())
            
            # Calculate F1 score for this batch group
            batch_f1 = f1_score(all_labels, all_preds, average=average)
            sequential_f1_scores.append(batch_f1)
            
            # Print detailed report for this batch group
            print(f"\nBatch Group {batch_group + 1} Results:")
            print(f"Model F1 Score ({average}): {batch_f1}")
            print("\nDetailed Classification Report:")
            print(classification_report(all_labels, all_preds, 
                                        digits=4,  
                                        zero_division=0))
        
        # Print overall summary of F1 scores
        print("\nSequential Batch F1 Scores:")
        print(sequential_f1_scores)
        print(f"Average F1 Score: {np.mean(sequential_f1_scores)}")
        print(f"F1 Score Standard Deviation: {np.std(sequential_f1_scores)}")
        
        return sequential_f1_scores

In [8]:
test_loader = load_test_data("test_data2secuantional")

ensembler = EnsembleTrainer(num_classes=8, device=device)
their_results = ensembler.sequential_batch_f1_score(test_loader)

  model_ft.load_state_dict(torch.load("model_weights.pth", map_location=torch.device(device)))


Model loaded successfully!

Batch Group 1 Results:
Model F1 Score (weighted): 0.917666102455339

Detailed Classification Report:
              precision    recall  f1-score   support

           0     0.7692    1.0000    0.8696        20
           1     0.9524    1.0000    0.9756        20
           2     1.0000    1.0000    1.0000        20
           3     1.0000    0.9500    0.9744        20
           4     0.9000    0.9000    0.9000        20
           5     0.9333    0.7000    0.8000        20
           6     0.8947    0.8500    0.8718        20
           7     0.9500    0.9500    0.9500        20

    accuracy                         0.9187       160
   macro avg     0.9250    0.9187    0.9177       160
weighted avg     0.9250    0.9187    0.9177       160


Batch Group 2 Results:
Model F1 Score (weighted): 0.943201343400242

Detailed Classification Report:
              precision    recall  f1-score   support

           0     0.8696    1.0000    0.9302        20
         

In [9]:
len(their_results)

10

In [10]:
sum([1 for i in range(len(their_results)) if our_results[i] > their_results[i]])

8

In [11]:
class CustomResNet18(nn.Module):
    def __init__(self, num_classes):
        super(CustomResNet18, self).__init__()
        self.model = models.resnet18(pretrained=True)
        num_ftrs = self.model.fc.in_features
        self.model.fc = nn.Linear(num_ftrs, num_classes)
        

    def forward(self, x):
        x = self.model(x)
        return x 

In [12]:
from collections import defaultdict
from sklearn.metrics import f1_score, classification_report
class EnsembleTrainer:
    def __init__(self, num_classes, device):
        self.device = device
        self.model_weights = {'resnet18': 1}
        
        self.models = {
            'resnet18': self._prepare_resnet18(num_classes, 'resnet18'),
        }


    def _prepare_resnet18(self, num_classes, model_name):
        model_ft = CustomResNet18(8)

        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        model_ft.load_state_dict(torch.load("CustomResNet18_epoch_20.pth", map_location=torch.device(device)))

        print("Model loaded successfully!")

        return model_ft.to(device)



    def calculate_f1_score(self, dataloader, average='weighted'):
        all_labels = []
        all_preds = []
        
        with torch.no_grad():
            for images, labels in dataloader:
                images = images.to(self.device)
                
                # Assuming only one model in self.models
                model = list(self.models.values())[0]
                model.eval()  # Set the model to evaluation mode
                
                # Get model predictions
                outputs = model(images)
                _, preds = torch.max(outputs, 1)
                
                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())
        
        # Calculate F1 score and print classification report
        f1 = f1_score(all_labels, all_preds, average=average)
        print(f"Model F1 Score ({average}): {f1}")
        
        # Print detailed classification report
        print("\nDetailed Classification Report:")
        print(classification_report(all_labels, all_preds, 
                                    digits=4,  # 4 decimal places for precision/recall/f1
                                    zero_division=0))  # handles classes with zero samples
        
        return f1

In [13]:
test_loader = load_test_data("test_data_dominik")

ensembler = EnsembleTrainer(num_classes=8, device=device)
ensembler.calculate_f1_score(test_loader)

  model_ft.load_state_dict(torch.load("CustomResNet18_epoch_20.pth", map_location=torch.device(device)))


Model loaded successfully!
Model F1 Score (weighted): 0.9292446719595162

Detailed Classification Report:
              precision    recall  f1-score   support

           0     0.9370    0.9754    0.9558       122
           1     0.9718    0.9247    0.9477       186
           2     0.9091    0.9353    0.9220       139
           3     0.9409    0.9215    0.9311       242
           4     0.8832    0.9098    0.8963       133
           5     0.8656    0.9877    0.9226       163
           6     0.9517    0.8914    0.9206       221
           7     0.9586    0.9153    0.9364       177

    accuracy                         0.9291      1383
   macro avg     0.9272    0.9326    0.9291      1383
weighted avg     0.9311    0.9291    0.9292      1383



0.9292446719595162