# Logging

In [None]:
import logging
import os

from tqdm import tqdm
from tqdm.contrib.logging import logging_redirect_tqdm

! rm -rf logs

log_dir = 'logs'
if not os.path.exists(log_dir):
    os.makedirs(log_dir)

LOGGER = logging.getLogger('notebook')
LOGGER.setLevel(logging.DEBUG)

log_file_path = os.path.join(log_dir, 'run.log')
file_handler = logging.FileHandler(log_file_path)

formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

file_handler.setFormatter(formatter)

LOGGER.addHandler(file_handler)

console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
LOGGER.addHandler(console_handler)

logging_redirect_tqdm()

LOGGER.debug("This is a debug message")
LOGGER.info("This is an info message")
LOGGER.warning("This is a warning message")
LOGGER.error("This is an error message")
LOGGER.critical("This is a critical message")

In [None]:
import time
def timeit(func):
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        elapsed_time = end_time - start_time
        LOGGER.info(f"Function '{func.__name__}' executed in {elapsed_time:.4f} seconds")
        return result
    return wrapper

# Download Dataset

In [None]:
import sys

from huggingface_hub import snapshot_download

dataset_id = "RayanAi/Noisy_birds"
local_dataset_dir = "./Noisy_birds"

os.makedirs(local_dataset_dir, exist_ok=True)

with open(os.devnull, 'w') as fnull:
    original_stdout = sys.stdout
    try:
        sys.stdout = fnull
        snapshot_download(repo_id=dataset_id, local_dir=local_dataset_dir, repo_type="dataset")
    finally:
        sys.stdout = original_stdout

LOGGER.info("Dataset downloaded completely.")

total_size = 0
for dirpath, dirnames, filenames in os.walk(local_dataset_dir):
    for f in filenames:
        fp = os.path.join(dirpath, f)
        total_size += os.path.getsize(fp)

LOGGER.info(f"Total size of downloaded files: {total_size / (1024 * 1024):.2f} MB")

dataset_abs_path = os.path.abspath(local_dataset_dir)
LOGGER.info(f"Dataset has been saved at: [{dataset_abs_path}]")

In [None]:
!unzip -qo ./Noisy_birds/Noisy_birds.zip -d ./Noisy_birds/

# Dataset

In [1]:
from PIL import Image

def load_image(image_path: str) -> Image.Image:
    image = Image.open(image_path)
    return image

def get_label_from_path(image_path: str) -> str:
    return os.path.basename(os.path.dirname(image_path))

In [None]:
from typing import List, Tuple
import random
import torch
from torch.utils.data import Dataset
from torchvision import transforms
from tqdm import tqdm

class BirdsDataset(Dataset):
    def __init__(self):
        self.loaded_data: List[torch.Tensor] = []
        self.X: List[torch.Tensor] = []
        self.y: List[str] = []

    @timeit
    def load(self, data: List[Tuple[str, str]]):
        for image_path, label in data:
            image = load_image(image_path)
            self.loaded_data.append((image, label))

    @timeit
    def augment(self, generation_per_label: dict, augmentation: transforms.Compose):
        LOGGER.info("Augmenting data ...")
        for image, label in tqdm(self.loaded_data.copy()):
            for _ in range(generation_per_label[label]):
                augmented_image = augmentation(image)
                self.loaded_data.append((augmented_image, label))
        random.seed(68)
        random.shuffle(self.loaded_data)

    @timeit
    def transform(self, transformation: transforms.Compose):
        for image, label in self.loaded_data:
            self.X.append(transformation(image))
            self.y.append(label)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

In [None]:
IDX_TO_CLASS = ["budgie","canary","duckling","rubber duck"]
CLASS_TO_IDX = {key: IDX_TO_CLASS.index(key) for key in IDX_TO_CLASS}

# Models

In [None]:
import copy

import torch
import torch.nn as nn
import torchvision.transforms as transforms
from torchvision import models


class EfficientNetClassifier(nn.Module):
    def __init__(self, num_classes=4):
        super(EfficientNetClassifier, self).__init__()
        self.model = models.efficientnet_b0(weights='EfficientNet_B0_Weights.IMAGENET1K_V1')

        in_features = self.model.classifier[1].in_features
        self.model.classifier[1] = nn.Linear(in_features, num_classes)

        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        x = self.model(x)
        x = self.softmax(x)
        return x

    def save(self, path: str):
        torch.save(copy.deepcopy(self.state_dict()), path)

    def init(self, path: str = "model.pth"):
        self.load_state_dict(torch.load(path))

e_net_classifier = EfficientNetClassifier()

In [None]:
class ResNet18Classifier(nn.Module):
    def __init__(self, num_classes=4):
        super(ResNet18Classifier, self).__init__()
        self.model = models.resnet18(weights='ResNet18_Weights.IMAGENET1K_V1')

        self.model.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.model.maxpool = nn.Identity()

        num_features = self.model.fc.in_features
        self.model.fc = nn.Sequential(
            nn.Linear(num_features, num_classes),
            nn.Softmax(dim=1)
        )

    def forward(self, x):
        return self.model(x)

    def save(self, path: str):
        torch.save(copy.deepcopy(self.state_dict()), path)

    def init(self, path: str = "model.pth"):
        self.load_state_dict(torch.load(path))

res_net_18_classifier = ResNet18Classifier()


In [None]:
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
LOGGER.info(f'Device is {DEVICE}')

# Train

In [None]:
import random
import string

def generate_random_string(length=8, use_digits=True, use_lowercase=True, use_uppercase=True, use_special=False):
    char_pool = ''
    if use_digits:
        char_pool += string.digits
    if use_lowercase:
        char_pool += string.ascii_lowercase
    if use_uppercase:
        char_pool += string.ascii_uppercase
    if use_special:
        char_pool += string.punctuation

    if not char_pool:
        raise ValueError("At least one character type must be enabled.")

    return ''.join(random.choice(char_pool) for _ in range(length))

In [None]:
from typing import Tuple

from torch.utils.data import DataLoader
from sklearn.metrics import accuracy_score

@timeit
def train_epoch(model: torch.nn.Module, train_loader: DataLoader, optimizer, criterion) -> Tuple[float, float]:
    model.train()
    total_train_loss = 0.0
    train_preds = []
    train_targets = []

    for inputs, labels in tqdm(train_loader):
        inputs, labels = inputs.to(DEVICE), torch.tensor([CLASS_TO_IDX[label] for label in labels], dtype=torch.long).to(DEVICE)

        optimizer.zero_grad()
        outputs = model(inputs)

        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        total_train_loss += loss.item() * inputs.size(0)

        _, predicted = torch.max(outputs, 1)
        train_preds.extend(predicted.cpu().numpy())
        train_targets.extend(labels.cpu().numpy())

    train_accuracy = accuracy_score(train_targets, train_preds)
    epoch_train_loss = total_train_loss / len(train_loader.dataset)

    return train_accuracy, epoch_train_loss

@timeit
def evaluate_epoch(model: torch.nn.Module, val_loader: DataLoader, criterion) -> Tuple[float, float]:
    model.eval()
    total_val_loss = 0.0
    val_preds = []
    val_targets = []

    with torch.no_grad():
        for i, (inputs, labels) in enumerate(val_loader):
            inputs, labels = inputs.to(DEVICE), torch.tensor([CLASS_TO_IDX[label] for label in labels], dtype=torch.long).to(DEVICE)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            total_val_loss += loss.item() * inputs.size(0)

            _, predicted = torch.max(outputs, 1)
            val_preds.extend(predicted.cpu().numpy())
            val_targets.extend(labels.cpu().numpy())

    val_accuracy = accuracy_score(val_targets, val_preds)
    epoch_val_loss = total_val_loss / len(val_loader.dataset)

    return val_accuracy, epoch_val_loss

In [None]:
@timeit
def train_with_validation(model: torch.nn.Module, train_dataset: Dataset, val_dataset: Dataset,
                          epochs: int, stability_epochs: int, max_stability_distance: float,
                         best_models_dir: str, stable_models_dir: str):

    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters())


    model = model.to(DEVICE)

    train_losses = []
    val_losses = []
    train_accuracies = []
    val_accuracies = []

    best_val_accuracy = 0
    best_stable_accuracy = 0

    best_model_path = ""
    best_stable_model_path = ""

    models_history_in_memory = []


    for epoch in range(epochs):
        train_accuracy, epoch_train_loss = train_epoch(model, train_loader, optimizer, criterion)

        train_accuracies.append(train_accuracy)
        train_losses.append(epoch_train_loss)

        val_accuracy, epoch_val_loss = evaluate_epoch(model, val_loader, criterion)

        val_accuracies.append(val_accuracy)
        val_losses.append(epoch_val_loss)

        models_history_in_memory.append(copy.deepcopy(model))
        if len(models_history_in_memory) > 50:
            del models_history_in_memory[0]
            models_history_in_memory.insert(0, None)

        if val_accuracy > best_val_accuracy:
            best_val_accuracy = val_accuracy
            model_path = os.path.join(best_models_dir,
                                      f'acc_{round(val_accuracy, 4)}_with_data_epoch_{epoch}_{generate_random_string(3)}.pth')
            model.save(model_path)
            best_model_path = model_path

            LOGGER.info(f'model {model_path} has been saved.')


        if epoch >= 50 and all([abs(err1 - err2) <= max_stability_distance for err1 in val_accuracies[-stability_epochs:]
                            for err2 in val_accuracies[-stability_epochs:]]):
            idx = epoch - stability_epochs // 2
            if val_accuracies[idx] >= best_stable_accuracy:
                best_stable_accuracy = val_accuracies[idx]

                model_path = os.path.join(stable_models_dir,
                                          f'acc_{round(val_accuracies[idx], 4)}_epoch_{idx + 1}_{generate_random_string(3)}.pth')
                models_history_in_memory[idx].save(model_path)
                best_stable_model_path = model_path

                LOGGER.info(f'model {model_path} has been saved.')


        LOGGER.info(f'\nEpoch {epoch+1}/{epochs}, '
              f'Training Loss: {epoch_train_loss:.4f}, Validation Loss: {epoch_val_loss:.4f}, '
              f'Training Accuracy: {train_accuracy:.4f}, Validation Accuracy: {val_accuracy:.4f}\n')

    return (
        best_model_path,
        best_stable_model_path,
        train_losses,
        val_losses,
        train_accuracies,
        val_accuracies
    )

# Plot utils

In [None]:
import matplotlib.pyplot as plt

def plot_metrics(train_metric, val_metric, save_path, metric_name="Loss", color_train='b', color_val='r'):
    epochs = range(1, len(train_metric) + 1)

    plt.figure()
    plt.plot(epochs, train_metric, color=color_train, label=f'Training {metric_name}')
    plt.plot(epochs, val_metric, color=color_val, label=f'Validation {metric_name}')

    plt.xlabel('Epochs')
    plt.ylabel(metric_name)
    plt.title(f'Training and Validation {metric_name} Over Epochs')
    plt.legend()
    plt.grid(True)

    plt.savefig(save_path)
    plt.show()

# Gradual Learning

In [None]:
from typing import List

def get_learned_images(other_images: List[Tuple[str, str]],
                       model: torch.nn.Module,
                       confidence_threshold: float,
                       confidence_distance_threshold: float,
                       transformation: transforms.Compose) -> List[Tuple[str, str]]:
    model.eval()
    model = model.to(DEVICE)
    learned_images = []
    softmax = nn.Softmax(dim=1)


    for image_path, real_label in other_images:
        image = load_image(image_path)
        image = transformation(image)
        image = image.unsqueeze(0).to(DEVICE)

        with torch.no_grad():
            outputs = model(image)
            probs = softmax(outputs)

            top2_probs, top2_preds = torch.topk(probs, 2, dim=1)
            top2_probs = top2_probs.squeeze() 

            most_confident = top2_probs[0].item()
            second_most_confident = top2_probs[1].item()
            pred_class_idx = top2_preds[0, 0].item()  
            pred_class_name = IDX_TO_CLASS[pred_class_idx]

            if most_confident >= confidence_threshold and \
                (most_confident - second_most_confident) > confidence_distance_threshold:
                learned_images.append((image_path, pred_class_name))

    return learned_images


# Load Dataset

In [None]:
import glob

all_images_path = [(path, get_label_from_path(path)) for path in glob.glob(os.path.join(local_dataset_dir, '*/*.jpg'))]

images_path = [data for data in all_images_path if data[1] != 'unlabeled']
unlabeled_images_path = [data for data in all_images_path if data[1] == 'unlabeled']
unlabeled_images_path[:5]

In [None]:
TEST_TRANSFORMATION = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406],
                             std=[0.229, 0.224, 0.225])
    ])

TRAIN_TRANSFORMATION = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406],
                             std=[0.229, 0.224, 0.225])
    ])


## Augmentation

In [None]:
AUGMENTATION_TRANSFORMATION = transforms.Compose([
        transforms.RandomHorizontalFlip(p=0.5),
        transforms.RandomRotation(degrees=30),
        transforms.ColorJitter(brightness=0.3, contrast=0.3), #saturation=0.3, hue=0.2),
        transforms.GaussianBlur(kernel_size=(5, 1), sigma=(0.001, 5)),
    ])

In [None]:
from collections import defaultdict
from typing import Dict

AMOUNT_PER_LABEL = 2000

def get_augment_per_label(data: List[Tuple[str, str]]) -> Dict[str, int] :
    labels = list(set(dt[0] for dt in data))
    generation_per_label = defaultdict(lambda: 0)
    for _, label in data:
        generation_per_label[label] += 1

    for label in generation_per_label.keys():
        generation_per_label[label] = AMOUNT_PER_LABEL // generation_per_label[label] + 1
    return generation_per_label


In [None]:

random.shuffle(images_path)

known_images_path = images_path[:120].copy()
unknown_images_path = unlabeled_images_path.copy()#
validation_images_path = images_path[120:].copy()


validation_dataset = BirdsDataset()
validation_dataset.load(validation_images_path)
validation_dataset.transform(TEST_TRANSFORMATION)


# Train

In [None]:
BEST_MODELS_DIR = "./best_models"
STABLE_MODELS_DIR = "./stable_models"
PLOTS_DIR = "./plots"

! rm -rf "./best_models"
! rm -rf "./stable_models"
! rm -rf "./plots"

os.makedirs(BEST_MODELS_DIR, exist_ok=True)
os.makedirs(STABLE_MODELS_DIR, exist_ok=True)
os.makedirs(PLOTS_DIR, exist_ok=True)

In [None]:

EPOCHS = 200


while True:
    known_dataset = BirdsDataset()
    known_dataset.load(known_images_path)
    aug_amount = get_augment_per_label(known_images_path)
    known_dataset.augment(aug_amount, AUGMENTATION_TRANSFORMATION)
    known_dataset.transform(TRAIN_TRANSFORMATION)

    model = EfficientNetClassifier()

    best_model_path, stable_model_path,  train_losses, val_losses, train_accuracies, val_accuracies = train_with_validation \
                        (model=model, train_dataset=known_dataset, val_dataset = validation_dataset,
                          epochs=EPOCHS, stability_epochs = 10, max_stability_distance = 0.1,
                         best_models_dir = BEST_MODELS_DIR, stable_models_dir = STABLE_MODELS_DIR)

    plot_metrics(train_losses, val_losses,
                 os.path.join(PLOTS_DIR, f'loss_with_{len(known_dataset)}_data.png'), 'Loss')
    plot_metrics(train_accuracies, val_accuracies,
                 os.path.join(PLOTS_DIR, f'accuracy_with_{len(known_dataset)}_data.png'), 'Accuracy')

    stable_model = EfficientNetClassifier()
    best_model = EfficientNetClassifier()

    stable_model.init(stable_model_path)
    best_model.init(best_model_path)


    conf_threshold = 0.47
    model = stable_model
    
    learned_images = get_learned_images(other_images = unknown_images_path,
                           model=best_model,
                           confidence_threshold = conf_threshold,
                           confidence_distance_threshold = 0.2,
                           transformation = TEST_TRANSFORMATION)
    
    LOGGER.info(f'{len(learned_images)} has been found successfuly!')

    if not learned_images:
        break

    known_images_path.extend(learned_images)
    for image_path, _ in learned_images:
        unknown_images_path.remove((image_path, get_label_from_path(image_path)))



# Save

In [None]:
#stable_model.save('model.pth')
stable_conf = torch.load('./best_models/acc_1.0_with_data_epoch_7_to4.pth')
model = EfficientNetClassifier()
model.load_state_dict(stable_conf)
model.save('model.pth')

# Submit

In [None]:
import zipfile

with zipfile.ZipFile('submission.zip', 'w') as zipf:
    zipf.write('model.pth')
    zipf.write('model.py')

In [None]:
from google.colab import drive
drive.mount('/content/drive')