In [None]:
!pip uninstall opencv-python-headless -y 

!pip install opencv-python --upgrade

In [None]:
!pip install pycocotools

In [None]:
import torchvision
import torchvision.datasets as dset
import torchvision.transforms as transforms
import os
import torch
import torch.utils.data
from PIL import Image
from pycocotools.coco import COCO
import matplotlib.pyplot as plt
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection import FasterRCNN_ResNet50_FPN_Weights
import pprint
import random
pp = pprint.PrettyPrinter(indent=1)
path2data="/kaggle/input/levi9-hack9-2023/train" # Add corect path
path2json="/kaggle/input/train-test/mata-train.json" # Add corect path

## Reading the data 

In [None]:
coco_train = dset.CocoDetection(root = path2data,
                                annFile = path2json,
                                transform = transforms.ToTensor())

In [None]:
print('Number of samples: ', len(coco_train))

In [None]:
img, target=coco_train[0]
print (img.size)
print(target)

In [None]:
class myCocoDataset(torch.utils.data.Dataset):
    def __init__(self, root, annotation, transforms=None):
        self.root = root
        self.transforms = transforms
        self.coco = COCO(annotation)
        self.ids = list(sorted(self.coco.imgs.keys()))

    def __getitem__(self, index):
        # Own coco file
        coco = self.coco
        # Image ID
        img_id = self.ids[index]
        # List: get annotation id from coco
        ann_ids = coco.getAnnIds(imgIds=img_id)
        # Dictionary: target coco_annotation file for an image
        coco_annotation = coco.loadAnns(ann_ids)
        # path for input image
        path = coco.loadImgs(img_id)[0]["file_name"]
        # open the input image
        img = Image.open(os.path.join(self.root, path))
        # number of objects in the image
        num_objs = len(coco_annotation)

        # Bounding boxes for objects
        # In coco format, bbox = [xmin, ymin, width, height]
        # In pytorch, the input should be [xmin, ymin, xmax, ymax]
        boxes = []
        area = 0 
        for i in range(num_objs):
            xmin = coco_annotation[i]["bbox"][0]
            ymin = coco_annotation[i]["bbox"][1]
            xmax = xmin + coco_annotation[i]["bbox"][2]
            ymax = ymin + coco_annotation[i]["bbox"][3]
            area += (xmax-xmin)*(ymax-ymin)
            boxes.append([xmin, ymin, xmax, ymax])
        if num_objs == 0:
            boxes = torch.zeros((0, 4), dtype=torch.float32)
        else:
            boxes = torch.as_tensor(boxes, dtype=torch.float32)
        # Labels (In my case, I only one class: target class or background)
        labels = torch.ones((num_objs,), dtype=torch.int64)
        # Tensorise img_id
        img_id = torch.tensor([img_id])
        # Size of bbox (Rectangular)
        areas = []
        for i in range(num_objs):
            areas.append(coco_annotation[i]["area"])
        area = torch.as_tensor(area, dtype=torch.float32)
        # Iscrowd
        iscrowd = torch.zeros((num_objs,), dtype=torch.int64)

        # Annotation is in dictionary format
        my_annotation = {}
        my_annotation["boxes"] = boxes
        my_annotation["labels"] = labels
        my_annotation["image_id"] = img_id
        my_annotation["area"] = area
        my_annotation["iscrowd"] = iscrowd

        if self.transforms is not None:
            img = self.transforms(img)

        return img, my_annotation

    def __len__(self):
        return len(self.ids)


# In my case, just added ToTensor
def get_transform():
    custom_transforms = []
    custom_transforms.append(torchvision.transforms.ToTensor())
    return torchvision.transforms.Compose(custom_transforms)


# collate_fn needs for batch
def collate_fn(batch):
    return tuple(zip(*batch))


def get_model_instance_segmentation(num_classes):
    # load an instance segmentation model pre-trained pre-trained on COCO
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights=FasterRCNN_ResNet50_FPN_Weights.DEFAULT, progress=True)
    # get number of input features for the classifier
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    # replace the pre-trained head with a new one
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

    return model

## Config parametrii 

In [None]:
train_batch_size = 2
momentum = 0.9
weight_decay = 0.005

# Params for dataloader
train_shuffle_dl = True
num_workers_dl = 2

# Params for training
# Two classes; Only target class or background
num_classes = 2
num_epochs = 2

lr = 0.007


## Training 

In [None]:
print("Torch version:", torch.__version__)

# create own Dataset
my_dataset = myCocoDataset(
    root=path2data, annotation=path2json, transforms=get_transform()
)

# own DataLoader
data_loader = torch.utils.data.DataLoader(
    my_dataset,
    batch_size=train_batch_size,
    shuffle=train_shuffle_dl,
    num_workers=num_workers_dl,
    collate_fn=collate_fn
)


# select device (whether GPU or CPU)
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

# DataLoader is iterable over Dataset
for imgs, annotations in data_loader:
    imgs = list(img.to(device) for img in imgs)
    annotations = [{k: v.to(device) for k, v in t.items()} for t in annotations]


model = get_model_instance_segmentation(num_classes)

# move model to the right device
model.to(device)

# parameters
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(
    params, lr=lr, momentum=momentum, weight_decay=weight_decay
)

len_dataloader = len(data_loader)

# Training
for epoch in range(num_epochs):
    print(f"Epoch: {epoch}/{num_epochs}")
    model.train()
    i = 0
    for imgs, annotations in data_loader:
        i += 1
        imgs = list(img.to(device) for img in imgs)
        annotations = [{k: v.to(device) for k, v in t.items()} for t in annotations]
        loss_dict = model(imgs, annotations)
        losses = sum(loss for loss in loss_dict.values())

        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

        print(f"Iteration: {i}/{len_dataloader}, Loss: {losses}")

In [None]:
model.eval()

## Evaluation njihov, ovo mi ne treba

In [None]:
# from PIL import Image, ImageDraw
# sample_image_path = '/kaggle/input/levi9-hack9-2023/train/046.jpg'
# sample_image = Image.open(sample_image_path)
# sample_image

In [None]:
# transformed_img = torchvision.transforms.transforms.ToTensor()(sample_image)

# result = model([transformed_img.to(device)])
# result

In [None]:
# logo_id = 1
# logo_boxes = [x.cpu().detach().numpy().tolist() for i, x in enumerate(result[0]['boxes']) if result[0]['labels'][i] == logo_id]
# logo_boxes

# # obrisi ovo smanjivanje
# # logo_boxes = [logo_boxes[0],logo_boxes[1],logo_boxes[2]]
# # logo_boxes

In [None]:
# sample_image_annotated = sample_image.copy()

# img_bbox = ImageDraw.Draw(sample_image_annotated)


 
# for bbox in logo_boxes:
#     x1, x2, x3, x4 = map(int, bbox)
#     print(x1, x2, x3, x4)
#     img_bbox.rectangle([x1, x2, x3, x4], outline="red") 

# sample_image_annotated


# Njihove funkcije trebaju posle za submition

In [None]:
import numpy as np
from itertools import groupby
import pycocotools._mask as _mask
def mask_to_rle(mask):
    """
    params:  mask - numpy array
    returns: run-length encoding string (pairs of start & length of encoding)
    """
    # turn a n-dimensional array into a 1-dimensional series of pixels
    # for example:
    #     [[1. 1. 0.]
    #      [0. 0. 0.]   --> [1. 1. 0. 0. 0. 0. 1. 0. 0.]
    #      [1. 0. 0.]]
    flat = mask.flatten()
    
    # we find consecutive sequences by overlaying the mask
    # on a version of itself that is displaced by 1 pixel
    # for that, we add some padding before slicing
    padded = np.concatenate([[0], flat, [0]])
    
    # this returns the indices where the sliced arrays differ
    runs = np.where(padded[1:] != padded[:-1])[0] 
    # indexes start at 0, pixel numbers start at 1
    runs += 1

    # every uneven element represents the start of a new sequence
    # every even element is where the run comes to a stop
    # subtract the former from the latter to get the length of the run
    runs[1::2] -= runs[0::2]
 
    # convert the array to a string
    return ' '.join(str(x) for x in runs)

In [None]:
def rle_to_mask(lre, shape=(1181, 1772)):
    '''
    params:  rle   - run-length encoding string (pairs of start & length of encoding)
             shape - (width,height) of numpy array to return 
    
    returns: numpy array with dimensions of shape parameter
    '''    
    # the incoming string is space-delimited
    runs = np.asarray([int(run) for run in lre.split(' ')])
    
    # we do the same operation with the even and uneven elements, but this time with addition
    runs[1::2] += runs[0::2]
    # pixel numbers start at 1, indexes start at 0
    runs -= 1
    
    # extract the starting and ending indeces at even and uneven intervals, respectively
    run_starts, run_ends = runs[0::2], runs[1::2]
    
    # build the mask
    h,w  = shape
    mask = np.zeros(h*w, dtype=np.uint8)
    for start, end in zip(run_starts, run_ends):
        mask[start:end] = 255
    
    # transform the numpy array from flat to the original image shape
    return mask.reshape(shape)

In [None]:
import cv2
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
def prediction_to_rle(image_path, model):
    columns = 2
    real_image = Image.open(image_path)
    images = []
    images.append(real_image)
    w, h = real_image.size
    transformed_img = torchvision.transforms.transforms.ToTensor()(real_image)
    prediction = model([transformed_img.to(device)])
    logo_id = 1
    logo_boxes = [x.cpu().detach().numpy().tolist() for i, x in enumerate(prediction[0]['boxes']) if prediction[0]['labels'][i] == logo_id]
    
    scores = prediction[0]['scores']
    
    logo_boxes = [box for indexx, box in enumerate(logo_boxes) if scores[indexx] > 0.65 ] # scoreThresh
    
    rle = None
    if len(logo_boxes) > 0:
        mask = np.zeros((h, w), np.uint8)
        real_img_bbox = ImageDraw.Draw(real_image)
        for bbox in logo_boxes:
            x1, x2, x3, x4 = map(int, bbox)
            real_img_bbox.rectangle([x1, x2, x3, x4], outline="red")
            mask[x2:x4,x1:x3] = 255
        rle = mask_to_rle(mask)
    
    return rle

In [None]:
def image_to_binary(image_path, bounding_boxes):
    real_image = Image.open(image_path)
    w, h = real_image.size
    mask = np.zeros((h, w), np.uint8)
    real_img_bbox = ImageDraw.Draw(real_image)
    for bbox in bounding_boxes:
        x1, x2, x3, x4 = map(int, bbox)
        real_img_bbox.rectangle([x1, x2, x3, x4], outline="red")
        mask[x2:x4,x1:x3] = 255
        
    return mask

In [None]:
def image_to_binary(image_tensor, bounding_boxes):
    tranformation = transforms.ToPILImage()
    real_image = tranformation(image_tensor)
    plt.imshow(real_image)
    w, h = real_image.size
    mask = np.zeros((h, w), np.uint8)
    real_img_bbox = ImageDraw.Draw(real_image)
    for bbox in bounding_boxes:
        #print(bbox)
        x1, x2, x3, x4 = map(int, bbox)
        real_img_bbox.rectangle([x1, x2, x3, x4], outline="red")
        mask[x2:x4,x1:x3] = 255
        
    return mask

# OVO JE SUBMIT

In [None]:
# import pandas as pd
# import os
# test_images_path = '/kaggle/input/levi9-hack9-2023/test'
# solution = []
# for image_path in os.listdir(test_images_path):
#     rle = prediction_to_rle(test_images_path+'/'+image_path, model)
#     solution.append({'img': image_path, 'pixels': rle})
    

# df = pd.DataFrame(solution)
# df.to_csv('sub-novi2.csv', index=False)

# Mata funkcija ovo mi ne treba ja msm

In [None]:
# from collections import namedtuple
# Rectangle = namedtuple('Rectangle', 'xmin ymin xmax ymax')

# ra = Rectangle(3., 3., 5., 5.)
# rb = Rectangle(1., 1., 4., 3.5)
# # intersection here is (3, 3, 4, 3.5), or an area of 1*.5=.5

# # from: 
# # https://stackoverflow.com/questions/27152904/calculate-overlapped-area-between-two-rectangles
# def area(a, b):  # returns 0 if rectangles don't intersect
#     dx = min(a.xmax, b.xmax) - max(a.xmin, b.xmin)
#     dy = min(a.ymax, b.ymax) - max(a.ymin, b.ymin)
    
#     if (dx>=0) and (dy>=0):
#         return dx*dy
    
#     return 0

# print(area(ra, rb))

# Moje funckija za pozivanje modela

In [None]:
def bboxToCocoBbox(bbox):
    bbox[2] = bbox[2] - bbox[0]
    bbox[3] = bbox[3] - bbox[1]

def model_predict(image_path, model):
    real_image = Image.open(image_path)
    images = []
    images.append(real_image)
    w, h = real_image.size
    transformed_img = torchvision.transforms.transforms.ToTensor()(real_image)
    prediction = model([transformed_img.to(device)])
    
    scores = prediction[0]['scores'].tolist()
    
    logo_id = 1
    imageBboxes = [x.cpu().detach().numpy().tolist() for i, x in enumerate(prediction[0]['boxes']) if prediction[0]['labels'][i] == logo_id]
    
    for oneBbox in imageBboxes:
        bboxToCocoBbox(oneBbox)
    
    
    return imageBboxes, scores

In [None]:
# slikaputanja = '/kaggle/input/levi9-hack9-2023/train/041.jpg'

# bbbboxess  = model_predict(slikaputanja, model)

# print(bbbboxess)


# Funkcije za racunanje metrika

In [None]:
def calculate_metrics_simple(ground_truth, predictions, iou_threshold):
    """
    Calculate metrics from COCO format annotations using IoU.

    Args:
        ground_truth (list): Ground truth annotations in COCO format.
        predictions (list): Predicted annotations in COCO format.
        iou_threshold (float): IoU threshold for matching predictions with ground truth.

    Returns:
        float: Precision value.
        float: Recall value.
        float: F1-score value.
    """
    true_positives = 0
    false_positives = 0
    false_negatives = 0

    for pred in predictions:
        pred_bbox = pred["bbox"]
        pred_category = pred["category_id"]

        pred_matched = False

        for gt in ground_truth:
            gt_bbox = gt["bbox"]
            gt_category = gt["category_id"]

            iou = calculate_iou(pred_bbox, gt_bbox)

            if iou >= iou_threshold and pred_category == gt_category:
                true_positives += 1
                pred_matched = True
                break

        if not pred_matched:
            false_positives += 1

    false_negatives = len(ground_truth) - true_positives


    precision = true_positives / (true_positives + false_positives)
    recall = true_positives / (true_positives + false_negatives)
    
    if precision == 0 and recall ==0:
        f1_score = 0
    else:
        f1_score = 2 * (precision * recall) / (precision + recall)
    
    
    print('TP')
    print(true_positives)
    print('FP')
    print(false_positives)
    print('FN')
    print(false_negatives)

    return precision, recall, f1_score

def calculate_iou(bbox1, bbox2):
    """
    Calculate Intersection over Union (IoU) between two bounding boxes.

    Args:
        bbox1 (list): Bounding box coordinates [x, y, width, height].
        bbox2 (list): Bounding box coordinates [x, y, width, height].

    Returns:
        float: IoU value.
    """
    x1, y1, w1, h1 = bbox1
    x2, y2, w2, h2 = bbox2

    area1 = w1 * h1
    area2 = w2 * h2

    intersection_x = max(0, min(x1 + w1, x2 + w2) - max(x1, x2))
    intersection_y = max(0, min(y1 + h1, y2 + h2) - max(y1, y2))
    intersection_area = intersection_x * intersection_y

    union_area = area1 + area2 - intersection_area

    iou = intersection_area / union_area

    return iou


# Proba da li funkcije za metrike rade

In [None]:
# import json

# truth_file = '/kaggle/input/coco-jsons/coco-truth.json'
# pred_file = '/kaggle/input/coco-jsons/coco-pred.json'
# iou_thresh = 0.5

# # Load predictions JSON file into a Python object
# with open(pred_file, 'r') as f:
#     predd = json.load(f)

# with open(truth_file, 'r') as f:
#     truthh = json.load(f)
    

# precision, recall, f1_score = calculate_metrics_simple(truthh['annotations'], predd['annotations'], iou_thresh)

# print()
# print()
# print(precision)
# print(recall)
# print(f1_score)


# Zapamti

## TP - broj pogodjenih logoa
## FN - broj logoa koji postoje a nismo ih pogodili
## FP - broj nasih izmisljenih logoa

# Broj slika sa logom

In [None]:
# inputt_path = '/kaggle/input/levi9-hack9-2023/train.json'

# with open(inputt_path, 'r') as f:
#     inputt = json.load(f)
    
# numOfImages = len(inputt['images'])

# print("Broj ukupnih slika")
# print(numOfImages)



In [None]:
# imagesWithLogo = []

# for image in inputt['images']:
#     logosInThisImage = sum(annotation['image_id'] == image['id']   for annotation in inputt['annotations'])
    
#     if logosInThisImage > 0:
#         imagesWithLogo.append(image)
        
# print("Broj slika sa logom")
# print(len(imagesWithLogo))

In [None]:
# imagesNoLogo = []

# for image in inputt['images']:
#     logosInThisImage = sum(annotation['image_id'] == image['id']   for annotation in inputt['annotations'])
    
#     if logosInThisImage == 0:
#         imagesNoLogo.append(image)
        
# print("Broj slika bez logom")
# print(len(imagesNoLogo))


In [None]:
def diffArray(arr1, arr2):
    diffArr = []
    
    for image1 in arr1:
        occurences = sum(image2['id'] == image1['id']  for image2 in arr2 )
        
        if occurences == 0:
            diffArr.append(image1)

    return diffArr

In [None]:

# print("With logo")
# print(len(imagesWithLogo))
# print("No logo")
# print(len(imagesNoLogo))
# print()


# numTrainWithLogos = int(0.75 * len(imagesWithLogo))
# numTrainNoLogos = int(0.75 * len(imagesNoLogo))


# trainImagesWithLogo = random.sample(imagesWithLogo, numTrainWithLogos)
# trainImagesNoLogo = random.sample(imagesNoLogo, numTrainNoLogos)

# testImagesWithLogo = diffArray(imagesWithLogo, trainImagesWithLogo)
# testImagesNoLogo = diffArray(imagesNoLogo, trainImagesNoLogo)


# print("Train logo")
# print(len(trainImagesWithLogo))
# print("Train no logo")
# print(len(trainImagesNoLogo))
# print()

# print("Test logo")
# print(len(testImagesWithLogo))
# print("Test no logo")
# print(len(testImagesNoLogo))
# print()



# trainImages = trainImagesWithLogo + trainImagesNoLogo
# testImages = testImagesWithLogo + testImagesNoLogo

# print("Train")
# print(len(trainImages))
# print("test")
# print(len(testImages))

In [None]:
def idExistsInImageArray(id, imageArray):
    num = sum(image['id'] == id for image in imageArray)
    
    if (num > 0):
        return True
    
    return False

In [None]:
# allAnnotations = inputt['annotations']

# print("All annotations")
# print(len(allAnnotations))
# print()

# trainAnnotations = [annotation for annotation in allAnnotations if idExistsInImageArray(annotation['image_id'],trainImages)]
# testAnnotations = [annotation for annotation in allAnnotations if idExistsInImageArray(annotation['image_id'],testImages)]

# print("Train annotations")
# print(len(trainAnnotations))
# print()

# print("Test annotations")
# print(len(testAnnotations))
# print()


In [None]:
# import copy

# trainInputt = copy.deepcopy(inputt)
# testInputt = copy.deepcopy(inputt)

# trainInputt['images'] = trainImages;
# trainInputt['annotations'] = trainAnnotations;

# testInputt['images'] = testImages;
# testInputt['annotations'] = testAnnotations;


In [None]:
# print(len(trainInputt['images']))
# print(len(testInputt['images']))

# print(len(trainInputt['annotations']))
# print(len(testInputt['annotations']))

# trainInputtJson = json.dumps(trainInputt)
# testInputtJson = json.dumps(testInputt)

# # jsonFile1 = open("mata-train.json", "w")
# # jsonFile1.write(trainInputtJson)
# # jsonFile1.close()

# # jsonFile1 = open("mata-test.json", "w")
# # jsonFile1.write(testInputtJson)
# # jsonFile1.close()


In [None]:
# myTrainPath = '/kaggle/input/train-test/mata-train.json'
# myTestPath = '/kaggle/input/train-test/mata-test.json'

# with open(myTrainPath, 'r') as f:
#     myTrain = json.load(f)

# with open(myTestPath, 'r') as f:
#     myTest = json.load(f)

# print(len(myTrain['images']))
# print(len(myTrain['annotations']))

# print()
# print(len(myTest['images']))
# print(len(myTest['annotations']))



# Pusti model

In [None]:
realTruthPath = '/kaggle/input/train-test/mata-test.json'

with open(realTruthPath, 'r') as f:
    realTruth = json.load(f)

truthImages = realTruth['images']
truthAnnotations = realTruth['annotations']


In [None]:
predAnnotations = []

for i, image in enumerate(truthImages):
    
    imagePath = '/kaggle/input/levi9-hack9-2023/train/' + image['file_name']
    # run model for this image name and get bboxes
    imageBboxes, scores = model_predict(imagePath, model)
    
#     print('Boxes:')
#     pp.pprint(imageBboxes)
#     print()
    
    for boxIndex, oneBbox in enumerate(imageBboxes):
        # get score
        score = 0.8
        
        oneAnnotation = {
                  "id": len(predAnnotations)+1,
                  "image_id": image['id'],
                  "category_id": 1,
                  "bbox": oneBbox,
                  "score": scores[boxIndex]
                }
        
        predAnnotations.append(oneAnnotation)

# print()
# print("Annotations:")
# pp.pprint(predAnnotations)



print("Done")

In [None]:
# remove all annotations with score less than scoreThresh
scoreThresh = 0.65

scoredPredAnnotations = [annotation for annotation in predAnnotations if annotation['score'] >= scoreThresh]


# Izracunaj metrike

In [None]:
iou_thresh = 0.5
precision, recall, f1_score = calculate_metrics_simple(truthAnnotations, scoredPredAnnotations, iou_thresh)

print()
print(precision)
print(recall)
print(f1_score)

In [None]:
# import itertools

# batch = [2, 3, 4]
# momentum = [0.8, 0.9, 0.95]
# wDecay = [0.01, 0.005, 0.001]

# allParams = [batch, momentum, wDecay]

# # Generate all combinations of parameters
# combinations = list(itertools.product(*allParams))

# # Run function for each combination

# for params in combinations:
#     print(f"{params}")