# Preprocessing
This script loads the images and crops them down multiple times for each cell, that is contained in the image.

In [None]:
import numpy as np
import os
from pathlib import Path
from PIL import Image
import json
import matplotlib.pyplot as plt
from tqdm import tqdm
from torchvision import transforms as T
import torch
import random
from torch.utils.data import Dataset
from torchvision import datasets, transforms
from torch.utils.data import DataLoader


In [None]:
# ------------  CONFIG  -------------------
base_path = '/scratch/cv-course-group-5/data/dataset_jpg'
TARGET_SIZE = 224
src_root   = Path(base_path + '/dataset')
dst_root   = Path(base_path + '/preprocessed_dataset')
lmdb_path = Path(base_path + '/lmdb')
anno_file  = Path(base_path + '/dataset/annotations.json')

## Crop images

In [None]:
# load annotations
annos_dict = json.loads(anno_file.read_text())

annos = annos_dict.get('annotations', [])
videos = annos_dict.get('videos', [])
images = annos_dict.get('images', [])

video_id2name = {v["id"]: v["name"] for v in videos}
image_by_id = {img["id"]: img for img in images}

In [None]:
# reorder annotations in new dictionary
annos_sorted = {}

for ann in annos:
    video_id = ann["video_id"]
    image_id = ann["image_id"]

    frame_nmbr = image_by_id[image_id]['file_name'][-7:-4]
    video_name = video_id2name[video_id]

    # if there is no entry yet in the dictionary
    if video_name not in annos_sorted:
        annos_sorted[video_name] = {}
    if frame_nmbr not in annos_sorted[video_name]:
        annos_sorted[video_name][frame_nmbr] = []

    # add annotation
    annos_sorted[video_name][frame_nmbr].append(ann)

In [None]:
video_folder_list = os.listdir(src_root)
video_folder_list = [f for f in video_folder_list if f != 'annotations.json']
video_folder_list = sorted(video_folder_list, key=lambda x: int(x))

In [None]:
# missing folder names in annotations (written manually)
missing_folders = ['052', '060', '160', '256', '265']

## Initialize LMDB with croped images

In [None]:
import lmdb
import cv2
from io import BytesIO

map_size = 1 << 40

env = lmdb.open(str(lmdb_path), map_size=map_size)

keys = []

with env.begin(write=True) as txn:
    for _, vid_name in tqdm(enumerate(video_folder_list), total=len(video_folder_list), ncols=100, position=0):

        # define paths for specific video folder
        org_video_folder = src_root / vid_name / 'images'

        try:
            annos_by_frame = annos_sorted[vid_name]
        except KeyError:
            print(vid_name)
            continue

        # iterate over every image file in that folder
        for img_path in sorted(org_video_folder.glob("*.jpg")):
            # frame index is the last 3 digits in “…t086.jpg”
            frame_idx = img_path.stem[-3:]

            # skip if no cells in this frame
            if frame_idx not in annos_by_frame:
                print(img_path)
                continue

            img = Image.open(img_path).convert("RGB")

            for ann in annos_by_frame[frame_idx]:
                cell_id = str(ann['cell_id'])
                x_c, y_c, w, h = ann['bbox']

                # calculate values for cropping
                left = int(x_c - w / 2)
                upper = int(y_c - h / 2)
                right = int(x_c + w / 2)
                lower = int(y_c + h / 2)

                crop = img.crop((left, upper, right, lower))

                # resize image if it is larger than the Target size
                if crop.width > TARGET_SIZE or crop.height > TARGET_SIZE:
                    scale = TARGET_SIZE / max(crop.width, crop.height)
                    new_size = (int(crop.width * scale), int(crop.height * scale))
                    crop = crop.resize(new_size, Image.BICUBIC)

                # put image on black background of Target size
                canvas = Image.new("RGB", (TARGET_SIZE, TARGET_SIZE), (0, 0, 0))  # black background
                x_off = (TARGET_SIZE - crop.width) // 2
                y_off = (TARGET_SIZE - crop.height) // 2
                canvas.paste(crop, (x_off, y_off))

                # plt.imshow(canvas)
                # plt.title('Video: ' + vid_id_test + ', Frame: ' + frame_idx + ', Cell_Id: ' + cell_id)
                # plt.show()

                # save cropped and resized image
                key = f"{vid_name}/{frame_idx}/{cell_id.zfill(3)}.jpg"
                keys.append(key)

                # convert image to bytes and store it in the lmdb
                buffer = BytesIO()
                canvas.save(buffer, format="JPEG")  # or "PNG" if needed
                txn.put(key.encode("utf-8"), buffer.getvalue())

    #store keys and len in the db for easier access
    txn.put(b"__keys__", b"\n".join(key.encode("utf-8") for key in keys))
    txn.put(b"__len__", str(len(keys)).encode("ascii"))

## Store Images as jpg instead to database (not used in training)

In [None]:
for _, vid_name in tqdm(enumerate(video_folder_list), total=len(video_folder_list), ncols=100, position=0):

    # define paths for specific video folder
    org_video_folder = src_root / vid_name / 'images'
    dst_video_folder = dst_root / vid_name / 'images'

    if os.path.exists(dst_video_folder):
        continue
    else:
        dst_video_folder.mkdir(parents=True, exist_ok=True)

    try:
        annos_by_frame = annos_sorted[vid_name]
    except KeyError:
        print(vid_name)
        continue

    # iterate over every image file in that folder
    for img_path in sorted(org_video_folder.glob("*.jpg")):
        # frame index is the last 3 digits in “…t086.jpg”
        frame_idx = img_path.stem[-3:]

        # skip if no cells in this frame
        if frame_idx not in annos_by_frame:
            print(img_path)
            continue

        img = Image.open(img_path).convert("RGB")

        for ann in annos_by_frame[frame_idx]:
            cell_id = str(ann['cell_id'])
            x_c, y_c, w, h = ann['bbox']

            # calculate values for cropping
            left   = int(x_c - w / 2)
            upper  = int(y_c - h / 2)
            right  = int(x_c + w / 2)
            lower  = int(y_c + h / 2)

            crop = img.crop((left, upper, right, lower))

            # resize image if it is larger than the Target size
            if crop.width > TARGET_SIZE or crop.height > TARGET_SIZE:
                scale = TARGET_SIZE / max(crop.width, crop.height)
                new_size = (int(crop.width  * scale), int(crop.height * scale))
                crop = crop.resize(new_size, Image.BICUBIC)

            # put image on black background of Target size
            canvas = Image.new("RGB", (TARGET_SIZE, TARGET_SIZE), (0, 0, 0))  # black background
            x_off = (TARGET_SIZE - crop.width)  // 2
            y_off = (TARGET_SIZE - crop.height) // 2
            canvas.paste(crop, (x_off, y_off))

            # plt.imshow(canvas)
            # plt.title('Video: ' + vid_id_test + ', Frame: ' + frame_idx + ', Cell_Id: ' + cell_id)
            # plt.show()

            # save cropped and resized image
            save_name  = f"v{vid_name}_f{frame_idx}_c{cell_id.zfill(3)}.jpg"
            canvas.save(dst_video_folder / save_name)

## Train Test Split

In [None]:
videos_split = [{k: v[k] for k in ['id', 'name']} for v in videos if v['name'] not in missing_folders]

test_size = int(len(videos_split) * 0.2) # 20 % Testdata
val_size = int(len(videos_split) * 0.8 * 0.2) # 20 % of remaining validation data

test_videos = []
val_videos = []

random.seed(42)

# add random videos to val / test
for i in range(test_size):
    test_videos.append(videos.pop(random.randint(0, len(videos) - 1)))

for i in range(val_size):
    val_videos.append(videos.pop(random.randint(0, len(videos) - 1)))

def sort(video):
    return video['id']

# sort videos by id
train_videos = videos
test_videos.sort(key=sort)
val_videos.sort(key=sort)

print(f"Train ({len(train_videos)}): {train_videos}")
print(f"Test ({len(test_videos)}):   {test_videos}")
print(f"Validation ({len(val_videos)}):   {val_videos}")

# store split into file
train_test_split = {'train': train_videos, 'test': test_videos, 'val': val_videos}
json.dump(train_test_split, open('video_lists/train_test_split.json', 'w'))

In [None]:
# Path to your JSON file
json_path = Path(base_path + '/train_test_split.json')

# Load the file
with open(json_path, 'r') as f:
    split_data = json.load(f)

# Access the train and test entries
train_list = split_data.get("train", [])
train_list