In [None]:
import boto3
import os
import json
import cv2
import numpy as np

In [None]:
from utils import utils

In [None]:
AWS_ACCESS_KEY_ID = os.environ.get("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.environ.get("AWS_SECRET_ACCESS_KEY")
AWS_S3_BUCKET = os.environ.get("AWS_S3_BUCKET")
AWS_S3_ENDPOINT = os.environ.get("AWS_S3_ENDPOINT")

TARGET_BUCKET_NAME = "label-studio-sink"

TMP_LABEL = "/tmp/ls-label.txt"
UTILS_DIR_PATH = "utils/"
LABELS_INFO = UTILS_DIR_PATH + "labels.json"
DATASET_PATH = "dataset/"

NB_FRAMES_PER_VIDEO = 20

session = boto3.Session(
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY
)
client = session.client("s3", endpoint_url=AWS_S3_ENDPOINT)

In [None]:
def generate_dirs():
    os.makedirs(DATASET_PATH, exist_ok = True)
    os.makedirs(DATASET_PATH + "labels/train", exist_ok = True)
    os.makedirs(DATASET_PATH + "labels/test", exist_ok = True)
    os.makedirs(DATASET_PATH + "labels/val", exist_ok = True)
    os.makedirs(DATASET_PATH + "images/train", exist_ok = True)
    os.makedirs(DATASET_PATH + "images/test", exist_ok = True)
    os.makedirs(DATASET_PATH + "images/val", exist_ok = True)

In [None]:
def download_whatever_annotation(bucket_name):
    paginator = client.get_paginator('list_objects_v2')
    operation_parameters = {'Bucket': bucket_name}
    page_iterator = paginator.paginate(**operation_parameters)
    page = page_iterator
    for page in page_iterator:
        if 'Contents' in page:
            for obj in page['Contents']:
                key = obj['Key']
                client.download_file(bucket_name, key, TMP_LABEL)
                break
            break

In [None]:
download_whatever_annotation(TARGET_BUCKET_NAME)

In [None]:
generate_dirs()
with open(TMP_LABEL) as f:
    payload = json.load(f)
with open(LABELS_INFO) as f:
    label_info = json.load(f)
label = payload['result'][0]['value']['rectanglelabels'][0]
label_accepted = label in label_info.keys()
if not label_accepted:
    raise Exception(f"Label {label} not found in {list(label_info.keys())}")
image_path = UTILS_DIR_PATH + label_info[label]["path"]
bbox_size = (int(label_info[label]["bbox_w"]), int(label_info[label]["bbox_h"]))
label_id = 0 # One class only
inserted_image = cv2.imread(image_path, cv2.IMREAD_UNCHANGED)
if inserted_image is None:
    raise Exception("Error: Couldn't load the inserted image file.")
video_path = UTILS_DIR_PATH + "video/video1.mp4"

In [None]:
random_frames = utils.extract_random_frames(video_path, NB_FRAMES_PER_VIDEO)
nb_frames = len(random_frames)
# Cut frames and insert image
for i, frame in enumerate(random_frames):
    # Insert image onto frame
    frame_height, frame_width, _ = frame.shape
    frame, coordinates = utils.insert_image(frame, inserted_image, frame_width, frame_height, bbox_size)

    # Save labels and images in yolo format
    yolo_format = utils.to_yolo_format(frame_width, frame_height, coordinates)
    split = utils.determine_split(i, nb_frames)
    utils.add_to_yolo_dataset(frame, split, label_id, label, yolo_format, DATASET_PATH)

In [None]:
! tar -czf dataset-full.tar.gz dataset