In [1]:

import torch
import torch.nn.functional as F
import random
import numpy as np
import tqdm
from torch.utils.data import DataLoader
from model.kbynet import YOLO
from model.metrics import AverageMeter, ComputeLoss, psnr,ssim
from model.dataset import YOLODataset, collateFunction
from model.utils import scale,box_iou,non_max_suppression, compute_ap_per_class
import matplotlib.pyplot as plt



In [None]:


def setup_seed():
    """
    Setup random seed.
    """
    random.seed(0)
    np.random.seed(0)
    torch.manual_seed(0)
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True


def setup_multi_processes():
    """
    Setup multi-processing environment variables.
    """
    import cv2
    from os import environ
    from platform import system

    # set multiprocess start method as `fork` to speed up the training
    if system() != 'Windows':
        torch.multiprocessing.set_start_method('fork', force=True)

    # disable opencv multithreading to avoid system being overloaded
    cv2.setNumThreads(0)

    # setup OMP threads
    if 'OMP_NUM_THREADS' not in environ:
        environ['OMP_NUM_THREADS'] = '1'

    # setup MKL threads
    if 'MKL_NUM_THREADS' not in environ:
        environ['MKL_NUM_THREADS'] = '1'




In [2]:







@torch.no_grad()
def inference(args, model=None):
    val_dir = '/kaggle/input/citydata/citydata/test'
    val_data = YOLODataset(val_dir, args['input_size'], args, False)
    val_loader = DataLoader(val_data, batch_size=1, num_workers=4, shuffle=False,collate_fn=collateFunction)
    if model is None:
        model = torch.load('/kaggle/input/cleanweights/adamw.pt', map_location='cuda')['model'].float()
    model.half()
    model.eval()


    class_names = ['person' , 'car', 'bicycle', 'motorcycle', 'bus']
    p_bar = tqdm.tqdm(val_loader)
    colors = ['r', 'g', 'b', 'y', 'm','c']
    img_no = 0
    for samples,norain, targets,shapes in p_bar:
        samples = samples.cuda()
        targets = targets.cuda()
        samples = samples.half()  # uint8 to fp16/32
        _, _, height, width = samples.shape  # batch size, channels, height, width
        pad_w, pad_h = shapes[0][1][1]
        pad_w, pad_h = int(pad_w), int(pad_h)
        # Inference
        nh=shapes[0][0][0] * shapes[0][1][0][0]
        nw=shapes[0][0][1]*shapes[0][1][0][1]
        outputs = model(samples)
        restoration = outputs['Restoration'].float().cpu().numpy()
        norain = norain.float().cpu().numpy()
        # NMS
        targets[:, 2:] *= torch.tensor((width, height, width, height)).cuda()  # to pixels
        det_outputs = non_max_suppression(outputs['Detection'][1], 0.5, 0.7)

        for i, output in enumerate(det_outputs):
            plt.figure(figsize=(12, 8))  # Adjust figure size as needed


            res_img = restoration[i][:, pad_h:height-pad_h, pad_w:width-pad_w]
            plt.imshow(res_img.transpose((1,2,0)))


            detections = output.clone()
            scale(detections[:, :4], samples[i].shape[1:], (nh,nw))

            if output.shape[0] > 0:
                for box in detections.cpu().numpy():
                    x1, y1, x2, y2, class_conf, class_pred = box
                        # Use the class prediction to select a color
                    if class_pred < 6:
                        color = colors[int(class_pred) % len(colors)]
                        bbox = patches.Rectangle((x1, y1), x2-x1, y2-y1, linewidth=1, edgecolor=color, facecolor='none')
                        plt.gca().add_patch(bbox)
                        plt.text(x1, y1, s=class_names[int(class_pred)], color='white', verticalalignment='bottom',
                                    bbox={'color': color, 'pad': 0}, fontsize=8)

            plt.tight_layout()
            plt.axis('off')  # Turn off axis display
            plt.subplots_adjust(0, 0, 1, 1, 0, 0)  # Remove extra space around the image
            plt.savefig(f'./kitti/{img_no}.png',bbox_inches='tight')  # Save the restoration image
            # plt.show()
            img_no+=1


@torch.no_grad()
def test_per_class(args, model=None,criterion=None,weight_opt=None):
    val_dir = '/kaggle/input/rain-kitti/rain_kitti_split/test'

    val_data = YOLODataset(val_dir, args['input_size'], args, False)

    val_loader = DataLoader(val_data, batch_size=args['batch_size'], num_workers=4, shuffle=True,collate_fn=collateFunction)

    if model is None:
        model = torch.load('/kaggle/input/cleanweights/6-4.pt', map_location='cuda')['model'].float()

    if criterion is None:
        criterion = ComputeLoss(model, args)

    model.half()
    model.eval()
    class_mapping = {
        0:"person",
        1:"car",
        2:"bicycle",
        3:"motorcycle",
        4:"bus"
    }

    # Configure
    iou_v = torch.linspace(0.5, 0.95, 10).cuda()  # iou vector for mAP@0.5:0.95
    n_iou = iou_v.numel()
    psnr_total = 0.
    ssim_total = 0.
    num_samples = 0

    metrics = []
    m_loss = AverageMeter()
    r_loss = AverageMeter()
    d_loss = AverageMeter()

    p_bar = tqdm.tqdm(val_loader, desc=('%10s' * 3) % ('precision', 'recall', 'mAP'))
    for samples,norain, targets,shapes in p_bar:
        samples = samples.cuda()
        targets = targets.cuda()
        norain = norain.cuda()
        samples = samples.half()  # uint8 to fp16/32
        _, _, height, width = samples.shape  # batch size, channels, height, width
        pad_w, pad_h = shapes[0][1][1]
        pad_w, pad_h = int(pad_w), int(pad_h)
        # Inference
        outputs = model(samples)
        det_loss = criterion(outputs['Detection'][0], targets)
        res_loss = F.l1_loss(outputs['Restoration'], norain)

        loss = (args['det'] * det_loss) + (args['res'] * res_loss)


        m_loss.update(loss.item(), samples.size(0))
        d_loss.update(det_loss.item(), samples.size(0))
        r_loss.update(res_loss.item(), samples.size(0))



        restoration = outputs['Restoration']

        for i in range(samples.shape[0]):
            pad_h, pad_w = shapes[i][1][1]
            pad_h, pad_w = int(pad_h), int(pad_w)

            # Apply padding to each image individually
            restored_img = restoration[i, :, pad_h:height-pad_h, pad_w:width-pad_w]
            norain_img = norain[i, :, pad_h:height-pad_h, pad_w:width-pad_w]

            # Clamp and convert to byte for each image individually
            restored_img = torch.clamp(restored_img.mul(255), 0, 255).byte()
            norain_img = torch.clamp(norain_img.mul(255), 0, 255).byte()
            restored_img = restored_img.unsqueeze(0)
            norain_img = norain_img.unsqueeze(0)
            # Calculate PSNR and SSIM for each pair of images
            psnr_total += psnr(norain_img, restored_img)
            ssim_total += ssim(norain_img, restored_img)
            num_samples += 1

        targets[:, 2:] *= torch.tensor((width, height, width, height)).cuda()  # to pixels
        det_outputs = non_max_suppression(outputs['Detection'][1], 0.001, 0.65)
        # Metrics
        for i, output in enumerate(det_outputs):

            labels = targets[targets[:, 0] == i, 1:]
            correct = torch.zeros(output.shape[0], n_iou, dtype=torch.bool).cuda()
            if output.shape[0] == 0:
                if labels.shape[0]:
                    metrics.append((correct, *torch.zeros((3, 0)).cuda()))
                continue

            detections = output.clone()
            scale(detections[:, :4], samples[i].shape[1:], shapes[i][0], shapes[i][1])
            # Evaluate
            if labels.shape[0]:
                tbox = labels[:, 1:5].clone()  # target boxes
                tbox[:, 0] = labels[:, 1] - labels[:, 3] / 2  # top left x
                tbox[:, 1] = labels[:, 2] - labels[:, 4] / 2  # top left y
                tbox[:, 2] = labels[:, 1] + labels[:, 3] / 2  # bottom right x
                tbox[:, 3] = labels[:, 2] + labels[:, 4] / 2  # bottom right y
                scale(tbox, samples[i].shape[1:], shapes[i][0], shapes[i][1])

                correct = np.zeros((detections.shape[0], iou_v.shape[0]))
                correct = correct.astype(bool)

                t_tensor = torch.cat((labels[:, 0:1], tbox), 1)
                iou = box_iou(t_tensor[:, 1:], detections[:, :4])
                correct_class = t_tensor[:, 0:1] == detections[:, 5]
                for j in range(len(iou_v)):
                    x = torch.where((iou >= iou_v[j]) & correct_class)
                    if x[0].shape[0]:
                        matches = torch.cat((torch.stack(x, 1), iou[x[0], x[1]][:, None]), 1)
                        matches = matches.cpu().numpy()
                        if x[0].shape[0] > 1:
                            matches = matches[matches[:, 2].argsort()[::-1]]
                            matches = matches[np.unique(matches[:, 1], return_index=True)[1]]
                            matches = matches[np.unique(matches[:, 0], return_index=True)[1]]
                        correct[matches[:, 1].astype(int), j] = True
                correct = torch.tensor(correct, dtype=torch.bool, device=iou_v.device)

            metrics.append((correct, output[:, 4], output[:, 5], labels[:, 0]))

    # Compute metrics
    metrics = [torch.cat(x, 0).cpu().numpy() for x in zip(*metrics)]  # to numpy
    if len(metrics) and metrics[0].any():
        ap50_per_class = compute_ap_per_class(*metrics,args['nc'])
    for i in range(args['nc']):
        print(f"AP@0.5 for class {class_mapping[i]}: {ap50_per_class[i]:.4f}")
    # Calculate the overall mAP@0.5
    overall_map50 = np.mean(ap50_per_class)

    # Print the overall mAP@0.5
    print(f"Overall mAP@0.5: {overall_map50}")


@torch.no_grad()
def test(args, model=None,criterion=None,weight_opt=None):
    val_dir = '/kaggle/input/vocfog/vocfog/test'
    val_data = YOLODataset(val_dir, args['input_size'], args, False)

    val_loader = DataLoader(val_data, batch_size=args['batch_size'], num_workers=4, shuffle=True,collate_fn=collateFunction)

    if model is None:
        model = torch.load('/kaggle/input/weights/best (1).pt', map_location='cuda')['ema'].float()

    if criterion is None:
        criterion = ComputeLoss(model, args)

    model.half()
    model.eval()

    # Configure
    iou_v = torch.linspace(0.5, 0.95, 10).cuda()  # iou vector for mAP@0.5:0.95
    n_iou = iou_v.numel()
    psnr_total = 0.
    ssim_total = 0.
    num_samples = 0
    m_pre = 0.
    m_rec = 0.
    map50 = 0.
    mean_ap = 0.
    metrics = []
    m_loss = AverageMeter()
    r_loss = AverageMeter()
    d_loss = AverageMeter()

    p_bar = tqdm.tqdm(val_loader, desc=('%10s' * 3) % ('precision', 'recall', 'mAP'))
    for samples,norain, targets,shapes in p_bar:
        samples = samples.cuda()
        targets = targets.cuda()
        norain = norain.cuda()
        samples = samples.half()  # uint8 to fp16/32
        _, _, height, width = samples.shape  # batch size, channels, height, width
        pad_w, pad_h = shapes[0][1][1]
        pad_w, pad_h = int(pad_w), int(pad_h)
        # Inference
        outputs = model(samples)
        det_loss = criterion(outputs['Detection'][0], targets)
        res_loss = F.l1_loss(outputs['Restoration'], norain)

        loss = (args['det'] * det_loss) + (args['res'] * res_loss)



        m_loss.update(loss.item(), samples.size(0))
        d_loss.update(det_loss.item(), samples.size(0))
        r_loss.update(res_loss.item(), samples.size(0))



        restoration = outputs['Restoration']

        for i in range(samples.shape[0]):
            pad_h, pad_w = shapes[i][1][1]
            pad_h, pad_w = int(pad_h), int(pad_w)

            # Apply padding to each image individually
            restored_img = restoration[i, :, pad_h:height-pad_h, pad_w:width-pad_w]
            norain_img = norain[i, :, pad_h:height-pad_h, pad_w:width-pad_w]

            # Clamp and convert to byte for each image individually
            restored_img = torch.clamp(restored_img.mul(255), 0, 255).byte()
            norain_img = torch.clamp(norain_img.mul(255), 0, 255).byte()
            restored_img = restored_img.unsqueeze(0)
            norain_img = norain_img.unsqueeze(0)
            # Calculate PSNR and SSIM for each pair of images
            psnr_total += psnr(norain_img, restored_img)
            ssim_total += ssim(norain_img, restored_img)
            num_samples += 1

        targets[:, 2:] *= torch.tensor((width, height, width, height)).cuda()  # to pixels
        det_outputs = non_max_suppression(outputs['Detection'][1], 0.001, 0.65)
        # Metrics
        for i, output in enumerate(det_outputs):

            labels = targets[targets[:, 0] == i, 1:]
            correct = torch.zeros(output.shape[0], n_iou, dtype=torch.bool).cuda()
            if output.shape[0] == 0:
                if labels.shape[0]:
                    metrics.append((correct, *torch.zeros((3, 0)).cuda()))
                continue

            detections = output.clone()
            scale(detections[:, :4], samples[i].shape[1:], shapes[i][0], shapes[i][1])
            # Evaluate
            if labels.shape[0]:
                tbox = labels[:, 1:5].clone()  # target boxes
                tbox[:, 0] = labels[:, 1] - labels[:, 3] / 2  # top left x
                tbox[:, 1] = labels[:, 2] - labels[:, 4] / 2  # top left y
                tbox[:, 2] = labels[:, 1] + labels[:, 3] / 2  # bottom right x
                tbox[:, 3] = labels[:, 2] + labels[:, 4] / 2  # bottom right y
                scale(tbox, samples[i].shape[1:], shapes[i][0], shapes[i][1])

                correct = np.zeros((detections.shape[0], iou_v.shape[0]))
                correct = correct.astype(bool)

                t_tensor = torch.cat((labels[:, 0:1], tbox), 1)
                iou = box_iou(t_tensor[:, 1:], detections[:, :4])
                correct_class = t_tensor[:, 0:1] == detections[:, 5]
                for j in range(len(iou_v)):
                    x = torch.where((iou >= iou_v[j]) & correct_class)
                    if x[0].shape[0]:
                        matches = torch.cat((torch.stack(x, 1), iou[x[0], x[1]][:, None]), 1)
                        matches = matches.cpu().numpy()
                        if x[0].shape[0] > 1:
                            matches = matches[matches[:, 2].argsort()[::-1]]
                            matches = matches[np.unique(matches[:, 1], return_index=True)[1]]
                            matches = matches[np.unique(matches[:, 0], return_index=True)[1]]
                        correct[matches[:, 1].astype(int), j] = True
                correct = torch.tensor(correct, dtype=torch.bool, device=iou_v.device)

            metrics.append((correct, output[:, 4], output[:, 5], labels[:, 0]))

    # Compute metrics
    metrics = [torch.cat(x, 0).cpu().numpy() for x in zip(*metrics)]  # to numpy
    if len(metrics) and metrics[0].any():
        tp, fp, m_pre, m_rec, map50, mean_ap = compute_ap(*metrics)

    # Print results
    print('%10.4g' * 4 % (m_pre, m_rec, map50, mean_ap))
    avg_psnr = psnr_total / num_samples
    avg_ssim = ssim_total / num_samples

    print('Average PSNR: %.4f' % avg_psnr)
    print('Average SSIM: %.4f' % avg_ssim)
    print('Val Det Loss: %.4f' % d_loss.avg)
    # Return results
    model.float()  # for training
    return map50, mean_ap, avg_psnr, avg_ssim, m_loss.avg,d_loss.avg,r_loss.avg

def main():
    args ={
        'input_size':640,
        'batch_size':16,
        'local_rank':0,
        'epochs': 100,
        'cosine_epochs':100,
        'world_size': 1,
        'lr0':1e-4,
        'lr1':1e-2,
        'lr2':1e-4,
        'lrf':1e-2,
        'lrf2':1e-2,
        'weight_decay':5e-4,
        'warmup_bias_lr':0,
        'Freeze_Epoch':0,
        'Freeze_Train': False,
        'warmup_epochs': 7,
        'warmup_momentum': 0.8,
        'momentum':0.93700000,
        'nc':5,
        'box': 7.5 ,                     # box loss gain
        'cls': 0.5,                      # cls loss gain
        'dfl': 1.5 ,
        'hsv_h': 0.015000,               # image HSV-Hue augmentation (fraction)
        'hsv_s': 0.700000,              # image HSV-Saturation augmentation (fraction)
        'hsv_v': 0.400000,               # image HSV-Value augmentation (fraction)
        'degrees': 0.0000,               # image rotation (+/- deg)
        'translate': 0.10,               # image translation (+/- fraction)
        'scale': 0.500000,               # image scale (+/- gain)
        'shear': 0.000000,               # image shear (+/- deg)
        'flip_ud': 0.0000,               # image flip up-down (probability)
        'flip_lr': 0.5000,               # image flip left-right (probability)
        'mosaic': 0.00000,               # image mosaic (probability)
        'mix_up': 0.00000,               # image mix-up (probability)
        'det': 0.6,
        'res': 0.4
    }

    setup_seed()
    setup_multi_processes()
#     with open('/kaggle/input/utilis2/argscity.yaml', errors='ignore') as f:
#         params = yaml.safe_load(f)


    test(args)
    # test_per_class(args)
#     inference(args)





In [None]:
main()