In [None]:
!pip install --quiet torch torchvision albumentations tqdm codecarbon optuna torchmetrics torchmetrics[detection]

In [1]:
import albumentations.pytorch
import datasets as ds
import utils
import torch as t
import torchvision as tv
import albumentations as A
from tqdm.auto import tqdm
import torchmetrics as tm
import optuna
from codecarbon import EmissionsTracker
import random
import json


def sample_trial_config():
    config = {
        "enable_horizontal_flip": random.choice([True, False]),
        "enable_color_jitter": random.choice([True, False]),
        "enable_affine": random.choice([True, False])
    }

    # Only sample color_jitter params if enabled
    if config["enable_color_jitter"]:
        config.update({
            "color_jitter_brightness": random.choice([0.1, 0.2, 0.3, 0.4, 0.5]),
            "color_jitter_contrast": random.choice([0.1, 0.2, 0.3, 0.4, 0.5]),
            "color_jitter_saturation": random.choice([0.1, 0.2, 0.3, 0.4, 0.5]),
            "color_jitter_hue": random.choice([0.1, 0.15, 0.2])
        })

    # Only sample affine params if enabled
    if config["enable_affine"]:
        config["enable_affine_scale"] = random.choice([True, False])
        config["enable_affine_rotate"] = random.choice([True, False])
        config["enable_affine_shear"] = random.choice([True, False])

        if config.get("enable_affine_scale", False):
            scale_min = random.choice([0.05, 0.1, 0.15, 0.2])
            scale_max = random.choice([0.2, 0.3, 0.4, 0.5])
            config["affine_scale_min"], config["affine_scale_max"] = min(scale_min, scale_max), max(scale_min,
                                                                                                    scale_max)

        if config.get("enable_affine_rotate", False):
            rotate_min = random.choice([-20, -18, -15])
            rotate_max = random.choice([15, 18, 20])
            config["affine_rotate_min"], config["affine_rotate_max"] = min(rotate_min, rotate_max), max(rotate_min,
                                                                                                        rotate_max)

        if config.get("enable_affine_shear", False):
            shear_min = random.choice([-20, -18, -15])
            shear_max = random.choice([15, 18, 20])
            config["affine_shear_min"], config["affine_shear_max"] = min(shear_min, shear_max), max(shear_min,
                                                                                                    shear_max)

    return config


def train_model(config: dict, num_epochs=3):
    # for train
    x2, y2 = ds.extract_all('./datasets/30/annotations.xml')
    x3, y3 = ds.extract_all('./datasets/60/annotations.xml')
    x4, y4 = ds.extract_all('./datasets/90/annotations.xml')
    x6, y6 = ds.extract_all('./datasets/clear2/annotations.xml')
    x7, y7 = ds.extract_all('./datasets/clear3/annotations.xml')
    # for val
    x0, y0 = ds.extract_all('./datasets/090/annotations.xml')
    x1, y1 = ds.extract_all('./datasets/190/annotations.xml')
    x5, y5 = ds.extract_all('./datasets/clear1/annotations.xml')

    augmentations = []

    if config.get("enable_horizontal_flip", False):
        augmentations.append(A.HorizontalFlip(p=0.5))
    if config.get("enable_color_jitter", False):
        augmentations.append(A.ColorJitter(
            brightness=config["color_jitter_brightness"],
            contrast=config["color_jitter_contrast"],
            saturation=config["color_jitter_saturation"],
            hue=config["color_jitter_hue"]
        ))
    if config.get("enable_affine", False):
        kwords = {}
        if config.get("enable_affine_scale", False):
            kwords["scale"] = (config["affine_scale_min"], config["affine_scale_max"])
        if config.get("enable_affine_rotate", False):
            kwords["rotate"] = (config["affine_rotate_min"], config["affine_rotate_max"])
        if config.get("enable_affine_shear", False):
            kwords["shear"] = (config["affine_shear_min"], config["affine_shear_max"])
        augmentations.append(A.Affine(**kwords, p=0.5))

    train_ds = ds.CustomImageDataset([*x2, *x3, *x4, *x6, *x7], [*y2, *y3, *y4, *y6, *y7],
                                     transform=A.Compose([*augmentations, albumentations.pytorch.ToTensorV2()],
                                                         bbox_params=ds.albumentations_params))
    val_ds = ds.CustomImageDataset([*x0, *x1, *x5], [*y0, *y1, *y5])

    dl_train = t.utils.data.DataLoader(train_ds, batch_size=4, shuffle=True, collate_fn=utils.unroller)
    dl_val = t.utils.data.DataLoader(val_ds, batch_size=4, shuffle=True, collate_fn=utils.unroller)

    device = t.device('cuda') if t.cuda.is_available() else t.device('cpu')

    model = tv.models.detection.fasterrcnn_resnet50_fpn(weights='COCO_V1', backbone_weights='IMAGENET1K_V2').to(device)
    model.roi_heads.box_predictor = tv.models.detection.faster_rcnn.FastRCNNPredictor(
        in_channels=model.roi_heads.box_predictor.cls_score.in_features,
        num_classes=2
    ).to(device)

    optimizer = t.optim.SGD([p for p in model.parameters() if p.requires_grad],
                            lr=0.005, momentum=0.9, weight_decay=0.0005)
    metric = tm.detection.mean_ap.MeanAveragePrecision(box_format="xyxy", iou_thresholds=[0.5])
    val_map = 0.0

    for epoch in range(num_epochs):
        # ---- TRAIN ----
        model.train()
        train_loss = 0
        for images, targets in tqdm(dl_train, desc=f"Train Epoch {epoch + 1}", leave=True):
            images = [img.to(device) for img in images]
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
            loss_dict = model(images, targets)
            total_loss = sum(loss for loss in loss_dict.values())
            optimizer.zero_grad()
            total_loss.backward()
            optimizer.step()
            train_loss += total_loss.item()
        train_loss /= len(dl_train)

        # ---- VALIDATION ----
        val_loss = 0
        metric.reset()
        with t.no_grad():
            for images, targets in dl_val:
                images = [img.to(device) for img in images]
                targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
                # loss
                model.train()
                loss_dict = model(images, targets)
                val_loss += sum(loss for loss in loss_dict.values())
                # predictions
                model.eval()
                pred = model(images)
                metric.update(pred, targets)
        val_loss /= len(dl_val)
        val_metrics = metric.compute()
        val_map = val_metrics["map"].item()
        print(f"epoch={epoch + 1}; train_loss={train_loss:.4f}; val_loss={val_loss:.4f}; val_map={val_map:.4f}")

    return val_map


# optuna objective
def objective(trial):
    config = {}

    config["enable_horizontal_flip"] = trial.suggest_categorical("enable_horizontal_flip", [True, False])

    config["enable_color_jitter"] = trial.suggest_categorical("enable_color_jitter", [True, False])
    if config["enable_color_jitter"]:
        config["color_jitter_brightness"] = trial.suggest_float("color_jitter_brightness", 0.1, 0.5)
        config["color_jitter_contrast"] = trial.suggest_float("color_jitter_contrast", 0.1, 0.5)
        config["color_jitter_saturation"] = trial.suggest_float("color_jitter_saturation", 0.1, 0.5)
        config["color_jitter_hue"] = trial.suggest_float("color_jitter_hue", 0.1, 0.2)

    config["enable_affine"] = trial.suggest_categorical("enable_affine", [True, False])
    if config["enable_affine"]:
        config["enable_affine_scale"] = trial.suggest_categorical("enable_affine_scale", [True, False])
        if config["enable_affine_scale"]:
            config["affine_scale_min"] = trial.suggest_float("affine_scale_min", 0.05, 0.2)
            config["affine_scale_max"] = trial.suggest_float("affine_scale_max", config["affine_scale_min"], 0.5)

        config["enable_affine_rotate"] = trial.suggest_categorical("enable_affine_rotate", [True, False])
        if config["enable_affine_rotate"]:
            config["affine_rotate_min"] = trial.suggest_float("affine_rotate_min", -20, -15)
            config["affine_rotate_max"] = trial.suggest_float("affine_rotate_max", 15, 20)

        config["enable_affine_shear"] = trial.suggest_categorical("enable_affine_shear", [True, False])
        if config["enable_affine_shear"]:
            config["affine_shear_min"] = trial.suggest_float("affine_shear_min", -20, -15)
            config["affine_shear_max"] = trial.suggest_float("affine_shear_max", 15, 20)

    return train_model(config)


In [None]:
tracker = EmissionsTracker(log_level="info")
tracker.start()

num_random_models = 30
random_results = []
for i in range(num_random_models):
    config = sample_trial_config()
    val_map = train_model(config)
    random_results.append({"model": i + 1, "val_map": val_map, "config": config})

emissions = tracker.stop()
print(f"Estimated CO2 emissions: {emissions:.6f} kg")

In [None]:
study = optuna.create_study(direction="maximize", study_name="optuna_random_fusion")
study.optimize(objective, n_trials=30)  # adjust number of trials