### Installations and Imports

In [None]:
#installations 
# !pip install -q datasets transformers evaluate timm albumentations
# !pip3 install -q accelerate


In [None]:
#imports 
import os

import PIL
from PIL import Image, ImageDraw
from datasets import Dataset, load_dataset
import json

import albumentations
import numpy as np
import torch

from transformers import AutoImageProcessor, AutoModelForObjectDetection, Trainer, TrainingArguments
import torchvision
import evaluate
from tqdm import tqdm

import accelerate

### Raw Data - Loading and Inspection

In [None]:
#load in the datasets

with open("data/ds2_dense/deepscores_train.json") as f:
    training_set = json.load(f)
print(training_set.keys())
with open("data/ds2_dense/deepscores_test.json") as f:
    test_set = json.load(f)
print(test_set.keys())

In [None]:
#for speed, v1 of this notebook will just run the test set (treating it as a training set, since both have the same structure)

#Based on inspection of the dataset (not shown/since deleted), here is the overall structure of our dataset:

'''
test_set (dict): {

    'info': publishing/title/author information about the dataset. WILL NOT BE USED.
    'annotation_sets': denotes that the data came from two data sourcese (deepscores and musicama++). WILL NOT BE USED.

    'categories' (dict): contains all label information. key = integer id (in string format). Looks like the below:
    {
        '1': {'name': 'brace', 'annotation_set': 'deepscores', 'color': 1},
        '2': {'name': 'ledgerLine', 'annotation_set': 'deepscores', 'color': 2},
        ...
        ...
    } 

    'images' (list): list of dictionaries containing overall image information. Looks like the below:
    [
        {'id': 1, 'filename': 'lg-75827152-aug-lilyjazz-.png', 'width': 1960, 'height': 2772, 'ann_ids': ['160131', '160132', '160133', ...]},
        ...
        ...
    ]

    'annotations' (dict): dictionary of dictionary of dictionaries. Contains annotation and object information. Looks like the below:
    {
        '1': {'a_bbox': [93.0, 161.0, 1866.0, 228.0], 'o_bbox': [1866.0, 228.0, 1866.0, 161.0, 93.0, 161.0, 93.0, 228.0], \
              'cat_id': ['135', '208'], area': 13641, 'img_id': '1180', 'comments': 'instance:#000015;'}
              },
        ...
        ...

    }
}
'''

### Pre-processing - Creating the Dataset

In [None]:
#Based on example notebooks and documentation from huggingface, we'll be selecting for, modifying, and aggregating select 
#features from the data above.

In [None]:
def select_and_aggregate(images: list, annotation_set: dict) -> object:
  '''
  Given multiple sets of data -- select and aggregate the data, returning it as a Dataset type block of information.

  Params:
    images (list): A list of dictionaries. Each dictionary is an image with keys corresponding
                   to image id, image width, image height, and a list of annotation ids. (see above examples for more details.)
    annotations (dict): A dictionary of dictionaries. The keys are annotation ids (in string format), each of which can be used to access
                        a dictionary containing annotation information, including the bbox coordinates, the category/label id, annotation area,
                        image id, and comments.
  Returns:
    Dataset: a huggingface dataset object.

  '''
  #get (and reformat if necessary) all the features that we want in the dataset
  processed_images = []


  for i, image in enumerate(images):

    processed_image = {}

    #we can keep much of the original
    processed_image['image_id'] = image['id']
    processed_image['width'] = image['width']
    processed_image['height'] = image['height']

    #create a new key for PIL images
    try:
      processed_image['image'] = Image.open(f"data/ds2_dense/images/{image['filename']}") #for some reason, the dataset appears to have 1 mistake, where 1 png image is missing though it is listed in the records
    except:
      continue

    ## now let's deal with the annotation info ##
    ann_ids = []
    areas = []
    bboxes = []
    categories = []

    #filter and modify annotation information formats
    for ann in image['ann_ids']:

      ann_ids.append(int(ann))

      x0, y0, x1, y1 = annotation_set[ann]['a_bbox']
      if x1 <= x0 or y1 <= y0: #there appear to be occasional annotation mistakes in the data where coordinates do not produce boxes whose area > 0
        continue

      x0, y0, width, height, area = get_dims(annotation_set[ann]['a_bbox'])
      areas.append(area)
      bboxes.append([x0, y0, width, height])

      categories.append(annotation_set[ann]['cat_id'][0]) #select the first label if there are multiple viable labels per object

    processed_image['objects'] = {'id': ann_ids, 'area': areas, 'bbox': bboxes, 'category': categories}

    processed_images.append(processed_image)

  #turn the dataset into a Dataset object
  final_data = Dataset.from_list(processed_images)

  return final_data

def get_dims(a_bbox: list) -> tuple:
  '''
  Given original coordinates of the orthogonal bounding box, return the bounding box values and other area information.

  Params:
    a_bbox (list[float]): two pairs of x-y coordinates denoting the "min" and "max" coordinate corners of a box 
  
  Returns:
    (tuple): 5-element tuple. The first pair of "min" coordinates and the calculated width, height, and area of the box
  '''
  x0, y0, x1, y1 = a_bbox
  width = abs(x1-x0)
  height = abs(y1-y0)
  area = width*height #optional

  return x0, y0, width, height, area

def id_maps(categories: dict) -> tuple:
  '''
  Returns maps between category ids and their names.
  
  Params:
    categories (dict): the dictionary of dictionary containing category information. Should be the same as the dataset['categories'] dict.
  Returns:
    (tuple): a map from ids to label names, a map from label names to ids
  '''
  id2label = {int(key): categories[key]['name'] for key in categories if key and categories[key]['name']}
  label2id = {categories[key]['name']: int(key) for key in categories if key and categories[key]['name']}

  return id2label, label2id


In [None]:
#get a single dataset with the specific features we want
data = select_and_aggregate(test_set['images'], test_set['annotations'])

In [None]:
#check
data

In [None]:
#get the category mappings
id2label, label2id = id_maps(test_set['categories'])

### Preprocessing - Transforming the Data

In [None]:
#load in the pretrained models 
checkpoint = "facebook/detr-resnet-50"
image_processor = AutoImageProcessor.from_pretrained(checkpoint)

In [None]:
# for a baseline run, some boilerplate/standard transformations from huggingface example documentation is used.

transform = albumentations.Compose(
    [
        albumentations.Resize(480, 480),
        albumentations.HorizontalFlip(p=1.0),
        albumentations.RandomBrightnessContrast(p=1.0),
    ],
    bbox_params=albumentations.BboxParams(format="coco", label_fields=["category"]),
)

In [None]:
def formatted_anns(image_id, category, area, bbox):
    annotations = []
    for i in range(0, len(category)):
        new_ann = {
            "image_id": image_id,
            "category_id": category[i],
            "isCrowd": 0,
            "area": area[i],
            "bbox": list(bbox[i]),
        }
        annotations.append(new_ann)

    return annotations

In [None]:
# transforming a batch
def transform_aug_ann(examples):
    image_ids = examples["image_id"]
    images, bboxes, area, categories = [], [], [], []
    for image, objects in zip(examples["image"], examples["objects"]):
        image = np.array(image.convert("RGB"))[:, :, ::-1]
        out = transform(image=image, bboxes=objects["bbox"], category=objects["category"])

        area.append(objects["area"])
        images.append(out["image"])
        bboxes.append(out["bboxes"])
        categories.append(out["category"])

    targets = [
        {"image_id": id_, "annotations": formatted_anns(id_, cat_, ar_, box_)}
        for id_, cat_, ar_, box_ in zip(image_ids, categories, area, bboxes)
    ]

    return image_processor(images=images, annotations=targets, return_tensors="pt")

In [None]:
#transform the data
transformed_data = data.with_transform(transform_aug_ann)
transformed_data

In [None]:
#collate samples into batches in a format that detr model will accept
def collate_fn(batch):
    pixel_values = [item["pixel_values"] for item in batch]
    encoding = image_processor.pad(pixel_values, return_tensors="pt")
    labels = [item["labels"] for item in batch]
    batch = {}
    batch["pixel_values"] = encoding["pixel_values"]
    batch["pixel_mask"] = encoding["pixel_mask"]
    batch["labels"] = labels
    return batch

### Finetuning the DETR model

the below is adapted from huggingface documentation code

In [None]:

model = AutoModelForObjectDetection.from_pretrained(
    checkpoint,
    id2label=id2label,
    label2id=label2id,
    ignore_mismatched_sizes=True,
)

In [None]:


training_args = TrainingArguments(
    output_dir="detr-resnet-50_finetuned_cppe5",
    per_device_train_batch_size=8,
    num_train_epochs=10,
    # fp16=True, #need GPU for this
    save_steps=200,
    logging_steps=50,
    learning_rate=1e-5,
    weight_decay=1e-4,
    save_total_limit=2,
    remove_unused_columns=False,
)

In [None]:
trainer = Trainer(
    model=model,
    args=training_args,
    data_collator=collate_fn,
    train_dataset=transformed_data,
    tokenizer=image_processor,
)

trainer.train()

In [None]:
trainer.save_model('./baseline/')

### Evaluate

Note -- the below is from the example documentation in huggingface. will customize as needed once an idea of the baseline run and any errors are established

In [None]:
# format annotations the same as for training, no need for data augmentation
def val_formatted_anns(image_id, objects):
    annotations = []
    for i in range(0, len(objects["id"])):
        new_ann = {
            "id": objects["id"][i],
            "category_id": objects["category"][i],
            "iscrowd": 0,
            "image_id": image_id,
            "area": objects["area"][i],
            "bbox": objects["bbox"][i],
        }
        annotations.append(new_ann)

    return annotations


# Save images and annotations into the files torchvision.datasets.CocoDetection expects
def save_test_annotation_file_images(music_data):
    output_json = {}
    path_output_msr = f"{os.getcwd()}/music_data/"

    if not os.path.exists(path_output_msr):
        os.makedirs(path_output_msr)

    path_anno = os.path.join(path_output_msr, "msr_ann.json")
    categories_json = [{"supercategory": "none", "id": id, "name": id2label[id]} for id in id2label]
    output_json["images"] = []
    output_json["annotations"] = []
    for example in music_data:
        ann = val_formatted_anns(example["image_id"], example["objects"])
        output_json["images"].append(
            {
                "id": example["image_id"],
                "width": example["image"].width,
                "height": example["image"].height,
                "file_name": f"{example['image_id']}.png",
            }
        )
        output_json["annotations"].extend(ann)
    output_json["categories"] = categories_json

    with open(path_anno, "w") as file:
        json.dump(output_json, file, ensure_ascii=False, indent=4)

    for im, img_id in zip(music_data["image"], music_data["image_id"]):
        path_img = os.path.join(path_output_msr, f"{img_id}.png")
        im.save(path_img)

    return path_output_msr, path_anno

In [None]:
class CocoDetection(torchvision.datasets.CocoDetection):
    def __init__(self, img_folder, feature_extractor, ann_file):
        super().__init__(img_folder, ann_file)
        self.feature_extractor = feature_extractor

    def __getitem__(self, idx):
        # read in PIL image and target in COCO format
        img, target = super(CocoDetection, self).__getitem__(idx)

        # preprocess image and target: converting target to DETR format,
        # resizing + normalization of both image and target)
        image_id = self.ids[idx]
        target = {"image_id": image_id, "annotations": target}
        encoding = self.feature_extractor(images=img, annotations=target, return_tensors="pt")
        pixel_values = encoding["pixel_values"].squeeze()  # remove batch dimension
        target = encoding["labels"][0]  # remove batch dimension

        return {"pixel_values": pixel_values, "labels": target}


im_processor = AutoImageProcessor.from_pretrained("./baseline")

path_output, path_anno = save_test_annotation_file_images(data)
test_ds_coco_format = CocoDetection(path_output, im_processor, path_anno)

In [None]:
model = AutoModelForObjectDetection.from_pretrained("./baseline/")
module = evaluate.load("ybelkada/cocoevaluate", coco=test_ds_coco_format.coco) #this space appears to be down...will try to see if it works.
val_dataloader = torch.utils.data.DataLoader(
    test_ds_coco_format, batch_size=8, shuffle=False, num_workers=4, collate_fn=collate_fn
)

with torch.no_grad():
    for idx, batch in enumerate(tqdm(val_dataloader)):
        pixel_values = batch["pixel_values"]
        pixel_mask = batch["pixel_mask"]

        labels = [
            {k: v for k, v in t.items()} for t in batch["labels"]
        ]  # these are in DETR format, resized + normalized

        # forward pass
        outputs = model(pixel_values=pixel_values, pixel_mask=pixel_mask)

        orig_target_sizes = torch.stack([target["orig_size"] for target in labels], dim=0)
        results = im_processor.post_process(outputs, orig_target_sizes)  # convert outputs of model to COCO api

        module.add(prediction=results, reference=labels)
        del batch

results = module.compute()
print(results)