In [1]:
import pandas as pd
import json


In [2]:
train_df = pd.read_csv('train.csv')
test_df = pd.read_csv('test.csv')

In [None]:
train_df = train_df[train_df["annotation_type"] == "Rectangle"][ train_df["label"] == "skalstock_ok"]
test_df = test_df[test_df["annotation_type"] == "Rectangle"][ test_df["label"] == "skalstock_ok"]

In [None]:
train_df

In [None]:
test_df

In [6]:
#only keep one person for each task_id
train_df = train_df.groupby("task_id").first()
test_df = test_df.groupby("task_id").first()


In [7]:
from PIL import Image
import matplotlib.pyplot as plt
from matplotlib import patches
import numpy as np

def visualize_image_with_bounding_boxes_colored(df):
    """
    Visualizes an image with bounding boxes, each colored according to the rectanglelabels.
    Adds labels and colors to the legend.
    
    Parameters:
    - image_path: Path to the image file.
    - bounding_boxes: A list of bounding boxes, where each bounding box is represented as a dictionary
      with keys 'x', 'y', 'width', 'height', and optionally 'label'. Coordinates are normalized to [0, 1].
    """
    # Open the image file
    img = Image.open(df["image"])
    fig, ax = plt.subplots(1)
    fig.set_size_inches(10, 10)
    ax.imshow(img)
    
    # Image dimensions
    img_width, img_height = img.size
    
    # Label to color mapping
    label_color_map = {
        'riktning_text': 'r',
        'another_label': 'g',
          # Example additional label
        # Add more labels and colors as needed
    }
    

    # Denormalize coordinates
    x = df['x'] * img_width //100
    y = df['y'] * img_height //100
    width = df['width'] * img_width //100
    height = df['height'] * img_height //100
    
    # Get color for the label
    label = df['label']
    color = label_color_map.get(label, 'b')  # Default to blue if label not in map
    
    # Create a Rectangle patch
    rect = patches.Rectangle((x, y), width, height, linewidth=1, edgecolor=color, facecolor='none', label=label)
    
    # Add the patch to the Axes
    ax.add_patch(rect)
    
    # Create legend from unique labels
    handles, labels = ax.get_legend_handles_labels()
    by_label = dict(zip(labels, handles))  # Removing duplicates
    ax.legend(by_label.values(), by_label.keys())
    
    return plt

In [None]:
visualize_image_with_bounding_boxes_colored(train_df.iloc[0]).show()

In [9]:
#data loader for the dataset
import torch
from PIL import Image
from torchvision.transforms import functional as F
import os
import cv2

class KBABygglovDataset(torch.utils.data.Dataset):
    def __init__(self, df, scale = 1):
        self.df = df
        self.transforms = None
        self.scale = scale
    
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        image = cv2.imread(row["image"])
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB ) 
        #contour = find_bounding_box(image)
        
        
        # Normalize bounding box coordinates
        width, height = row["original_width"], row["original_height"]
        target = {}

        x1 = row['x'] * width //100 
        y1 = row['y'] * height //100
        width = row['width'] * width //100
        height = row['height'] * height //100
        target["boxes"] = torch.as_tensor([[x1,y1, x1 + width, y1 + height]], 
                                dtype = torch.float32)
        target["labels"]=torch.as_tensor([1],
                        dtype = torch.int64)
        target["image_id"] = torch.as_tensor(idx)
        #scale the image and target to half size
        image = cv2.resize(image, (image.shape[1]// self.scale,  image.shape[0]// self.scale))

        target["boxes"] = target["boxes"]/ self.scale

        image = F.to_tensor(image)
        #cropped_image, target = crop_to_contour(image, target, contour)
        return image, target
        return None



dataset = KBABygglovDataset(train_df)
test_dataset = KBABygglovDataset(test_df)

In [10]:
for data in dataset:
    image, boxes = data
    image.to("cpu")
    #print(boxes)
    #print(image.shape)

In [11]:
# Collate image-target pairs into a tuple.
def collate_fn(batch):
    return tuple(zip(*batch))
# Create the DataLoaders from the Datasets. 
train_dl = torch.utils.data.DataLoader(dataset, 
                                 batch_size = 4, 
                                 shuffle = True, 
                        collate_fn = collate_fn)
'''val_dl = torch.utils.data.DataLoader(val_ds, 
                             batch_size = 4, 
                            shuffle = False, 
                    collate_fn = collate_fn)'''
test_dl = torch.utils.data.DataLoader(test_dataset, 
                               batch_size = 1, 
                              shuffle = False, 
                      collate_fn = collate_fn)

In [12]:
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
def get_object_detection_model(num_classes = 2):

    model = fasterrcnn_resnet50_fpn(pretrained = False)

    # Replace the original 91 class top layer with a new layer
    # tailored for num_classes.
    in_feats = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_feats,
                                                   num_classes)
    return model

In [13]:
def unbatch(batch, device):
    """
    Unbatches a batch of data from the Dataloader.
    Inputs
        batch: tuple
            Tuple containing a batch from the Dataloader.
        device: str
            Indicates which device (CPU/GPU) to use.
    Returns
        X: list
            List of images.
        y: list
            List of dictionaries.
    """
    X, y = batch
    X = [x.to(device) for x in X]
    y = [{k: v.to(device) for k, v in t.items()} for t in y]
    return X, y
def train_batch(batch, model, optimizer, device):
    """
    Uses back propagation to train a model.
    Inputs
        batch: tuple
            Tuple containing a batch from the Dataloader.
        model: torch model
        optimizer: torch optimizer
        device: str
            Indicates which device (CPU/GPU) to use.
    Returns
        loss: float
            Sum of the batch losses.
        losses: dict
            Dictionary containing the individual losses.
    """
    model.train()
    X, y = unbatch(batch, device = device)
    optimizer.zero_grad()
    losses = model(X, y)
    loss = sum(loss for loss in losses.values())
    loss.backward()
    optimizer.step()
    return loss, losses
@torch.no_grad()
def validate_batch(batch, model, optimizer, device):
    """
    Evaluates a model's loss value using validation data.
    Inputs
        batch: tuple
            Tuple containing a batch from the Dataloader.
        model: torch model
        optimizer: torch optimizer
        device: str
            Indicates which device (CPU/GPU) to use.
    Returns
        loss: float
            Sum of the batch losses.
        losses: dict
            Dictionary containing the individual losses.
    """
    model.train()
    X, y = unbatch(batch, device = device)
    optimizer.zero_grad()
    losses = model(X, y)
    loss = sum(loss for loss in losses.values())
    return loss, losses

In [None]:
import torch
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torch.utils.data import DataLoader
from torchvision.transforms import functional as F
from torch_snippets import Report


num_epochs = 20
# Assuming 'dataset' is an instance of 'KBABygglovDataset' and 'data_loader' is an instance of 'DataLoader'
# Also assuming 'device' is defined (e.g., cuda or cpu)
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

model = get_object_detection_model(num_classes = 2)

params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, 
                        lr = 0.005, 
                    momentum = 0.9, 
             weight_decay = 0.0005)


log = Report(num_epochs)
# FasterRCNN loss names.
keys = ["loss_classifier", 
            "loss_box_reg", 
        "loss_objectness", 
        "loss_rpn_box_reg"]
model.to(device)
for epoch in range(num_epochs):
    N = len(train_dl)
    for ix, batch in enumerate(train_dl):
        loss, losses = train_batch(batch, model, 
                                optimizer, device)
        # Record the current train loss.
        pos = epoch + (ix + 1) / N
        log.record(pos = pos, trn_loss = loss.item(), 
                    end = "\r")
    if test_dl is not None:
        N = len(test_dl)
        for ix, batch in enumerate(test_dl):
            loss, losses = validate_batch(batch, model, 
                                        optimizer, device)
            
            # Record the current validation loss.
            pos = epoch + (ix + 1) / N
            log.record(pos = pos, val_loss = loss.item(), 
                        end = "\r")
log.report_avgs(epoch + 1)
log


In [None]:
!pip list

In [16]:
torch.save(model, 'skalstock.model')

In [17]:
@torch.no_grad()
def predict_batch(batch, model, device):
    model.to(device)
    model.eval()
    X, _ = unbatch(batch, device = device)
    predictions = model(X)
    return [x.cpu() for x in X], predictions
def predict(model, data_loader, device = "cpu"):
    images = []
    predictions = []
    for i, batch in enumerate(data_loader):
        X, p = predict_batch(batch, model, device)
        images.append(X)
        predictions.append(p)
    
    return images, predictions

In [18]:

images, predictions = predict(model, test_dl, device = device)


In [None]:
len(images)

In [None]:
predictions

In [21]:
import torchvision
def decode_prediction(prediction, 
                      score_threshold = 0.8, 
                      nms_iou_threshold = 0.2):
    """
    Inputs
        prediction: dict
        score_threshold: float
        nms_iou_threshold: float
    Returns
        prediction: tuple
    """
    boxes = prediction["boxes"]
    scores = prediction["scores"]
    labels = prediction["labels"]
    # Remove any low-score predictions.
    if score_threshold is not None:
        want = scores > score_threshold
        boxes = boxes[want]
        scores = scores[want]
        labels = labels[want]
    # Remove any overlapping bounding boxes using NMS.
    if nms_iou_threshold is not None:
        want = torchvision.ops.nms(boxes = boxes, scores = scores, 
                                iou_threshold = nms_iou_threshold)
        boxes = boxes[want]
        scores = scores[want]
        labels = labels[want]
    return (boxes.cpu().numpy(), 
            labels.cpu().numpy(), 
            scores.cpu().numpy())

In [22]:
import numpy as np

In [23]:
from Object_detection.object_detection_helper import visualize_image_with_bounding_boxes_colored_img

In [None]:
for i in range(len(images)):
    print(i)
    visualize_image_with_bounding_boxes_colored_img(images[i][0], predictions[i][0])


In [None]:
!pip list


In [None]:
negative_examples = pd.read_csv("test.csv")
negative_examples = negative_examples[negative_examples["annotation_type"] == "Rectangle"][ negative_examples["label"] == "skalstock_saknas"]
negative_examples

In [None]:
#data loader for the dataset
import torch
from PIL import Image
from torchvision.transforms import functional as F
import os

class KBABygglovDataset_false(torch.utils.data.Dataset):
    def __init__(self, df):
        self.df = df
        self.transforms = None
        print(len(self.df))
    
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        image_path = self.df.iloc[idx]["image"]
        image = Image.open(image_path)
        image = F.to_tensor(image)
        
        target = {}
        target["id"] = torch.as_tensor(row["annotation_id"])
        
        return image, target
  

In [None]:
dataset_false = KBABygglovDataset_false(negative_examples)
false_dl = torch.utils.data.DataLoader(dataset_false, 
                                 batch_size =1, 
                                 shuffle = False, 
                        collate_fn = collate_fn)
images, predictions = predict(model, false_dl, device = device)

In [None]:
for i in range(len(images)):
    print(i)
    print(predictions[i][0])
    visualize_image_with_bounding_boxes_colored_img(images[i][0], predictions[i][0])



In [None]:
model.save('model.pt')

In [None]:
df_false["path"]

In [None]:
for i in range(0, len(false_dl)):
    print(i)
    print(false_dl.dataset.df["annotation_id"].iloc[i])
    visualize_image_with_bounding_boxes_colored(false_dl.dataset.df["path"].iloc[i],json.loads(false_dl.dataset.df["label"].iloc[i]))


In [None]:
false_dl.dataset.df

In [None]:
import json

json.loads(df["label"].iloc[0])

In [None]:
import json

with open('../../../data/kba-bygglov/annoteringar/Batch32024-04-17.json') as f:
    data = json.load(f)

In [None]:
data[0]