# Dependencies

3d party packages:
* b3w - boto3 wrapper (with boto3 dependency)
* cvat-sdk - CVAT Python API
* python-dotenv - environment variables from .env file (just like docker-compose)

In [None]:
%%capture
%pip uninstall -y b3w
%pip install -U git+https://github.com/ValV/b3w.git cvat-sdk python-dotenv

# Paths

In [None]:
from os import makedirs, path as osp


PATH_DATA = osp.join('.', 'data')
PATH_IMAGES = osp.join(PATH_DATA, 'images')
PATH_ANNOTATIONS = osp.join(PATH_DATA, 'annotations')

makedirs(PATH_IMAGES, exist_ok=True)
makedirs(PATH_ANNOTATIONS, exist_ok=True)

# Images

Images are stored in VK Cloud S3 storage. Use B3W (boto3 wrapper) to download them locally.

In [None]:
from os import environ

from dotenv import load_dotenv


load_dotenv()  # take environment variables from .env (not overwrite existing)

# S3-like object storage bucket name
S3_BUCKET = environ.get('S3_BUCKET', None) or ''
assert S3_BUCKET, f"Provide S3_BUCKET environment variable or set above!"

# S3-like object storage host address
S3_HOST = environ.get('S3_HOST', None) or ''
assert S3_HOST, f"Provide S3_HOST environment variable or set above!"

In [None]:
from b3w import B3W


b3w = B3W(S3_BUCKET, local_path=PATH_IMAGES,
          profile_name=S3_BUCKET, endpoint_url=S3_HOST)

filenames = [
    filename for filename in b3w.ls() if filename.upper().endswith('.JPG')
]

len(filenames)

Target directory before downloading.

In [None]:
%ls {PATH_IMAGES}

## Downloading

> NOTE: uncomment to download.

In [None]:
# for filename in filenames:
#     b3w.get(filename)

Target directory after loading.

In [None]:
%ls {PATH_IMAGES}

# Annotations

Annotations are being processed with CVAT. Use CVAT SDK (Python API) to get annotations.

## Annotations retrieval function

CVAT SDK was chosen over Datumaro (and static annotations from GitHub) because of ability to choose annotation based on job status (annotations only from 'completed' jobs are required).

In [None]:
from json import loads
from urllib3.exceptions import InsecureRequestWarning
from warnings import catch_warnings, filterwarnings

from cvat_sdk import Client, Config


def get_annotations(host, user, password):
    with catch_warnings():
        # Suppress insecure SSL warnings
        filterwarnings("ignore", category=InsecureRequestWarning)

        # Create a config instance
        config = Config(verify_ssl=False)

        # Create a client instance
        client = Client(host, config=config)

        # Log in to the CVAT server
        client.login((user, password))

        jobs = loads(client.jobs.list(return_json=True))
        jobs_completed = {
            job['id']: job for job in jobs if job['status'] == 'completed'
        }

        # Download the annotations for each completed task
        annotations = []
        for job_id in jobs_completed:
            job = client.jobs.retrieve(int(job_id))
            meta = job.get_meta().to_dict()
            labels = {
                label['id']: label
                for label in map(lambda l: l.to_dict(), job.get_labels())
            }
            annotation = job.get_annotations().to_dict()['shapes']
            # Hope frames are returned from the server in correct order
            frames = job.get_frames_info()
            [a.update({
                'job_id': job_id,  # job id to rule them all
                'label': labels[a['label_id']]['name'],  # label name
                'color': labels[a['label_id']]['color']  # and original color
            }) for a in annotation]
            [a.update(
                frames[a['frame'] - meta['start_frame']].to_dict()
            ) for a in annotation]
            annotations.append(annotation)

        # Close connection
        client.logout()

    return sum(annotations, []), jobs_completed

## Fetch annotations from CVAT

Set up environment variables for `CVAT_SERVER`, `CVAT_USER`, `CVAT_PASSWORD` in bash shell, .env file or in the cell below.

In [None]:
from getpass import getpass


# CVAT host address
CVAT_SERVER = environ.get('CVAT_SERVER', None) or ''
assert CVAT_SERVER, f"Provide CVAT_SERVER environment variable or set above!"

# CVAT user to login
CVAT_USER = environ.get('CVAT_USER', None) or ''
assert CVAT_USER, f"Provide CVAT_USER environment variable or set above!"

# CVAT password to login with
CVAT_PASSWORD = (
    environ.get('CVAT_PASSWORD', None) or
    getpass(f"Your password for {CVAT_USER}@{CVAT_SERVER}:")
)
if not CVAT_PASSWORD:
    print(f"WARNING: login to CVAT server with empty password was not tested!")

Fetch annotations from CVAT.

In [None]:
annotations, jobs_completed = get_annotations(
    CVAT_SERVER, CVAT_USER, CVAT_PASSWORD
)

## Explore annotations with DataFrames

In [None]:
import pandas as pd


frame_annotations = pd.DataFrame(annotations)

frame_annotations

## Draw annotations

Use Matplotlib to draw annotations. Source annotations may be composed of rectangles and polygons, but only rectangles are used for detection.

Rectangles:
* 4-component list [x1, y1, x2, y2] with absolute position values;
* float type.

Polygons:
* 2n-component list [x1, y1, x2, y2, ..., xn, yn] with absolute position values;
* float type.

In [None]:
from random import sample

from matplotlib import pyplot as plt
from matplotlib.patches import Polygon, Rectangle


NUM_SAMPLES = 8
groups = frame_annotations.groupby('name')
index = sample((range(len(groups))), k=NUM_SAMPLES)

count = 0
for filename, group in groups:
    if count in index:
        path = osp.join(PATH_IMAGES, filename)
        if not osp.isfile(path):
            print(f"File {path} does not exist!")
            continue
        else:
            print(f"File {path}")
        figure, axes = plt.subplots()
        axes.axis('off')
        image = plt.imread(path)
        axes.imshow(image)
        legends = {}
        for i, row in group.iterrows():
            color = row['color']
            label = row['label']
            if row['type'] == 'rectangle':
                x1, y1, x2, y2 = row['points']
                patch = Rectangle(
                    (x1, y1), x2 - x1, y2 - y1, fc='none', ec=color, lw=1,
                    label=label
                )
                axes.add_patch(patch)
            elif row['type'] == 'polygon':
                # Un-flatten [x1, y1, ..., xn, yn] to [[x1, y1], ..., [xn, yn]]
                vertices = tuple(zip(row['points'][0::2], row['points'][1::2]))
                patch = Polygon(
                    vertices, closed=True, fc='none', ec=color, lw=1,
                    label=label
                )
                axes.add_patch(patch)
            else:
                patch = None  # FIXME: it will disable legend if comes first
                print(f"Unimplemented {row['type']=}!")
            legends[label] = legends.get(label, patch)  # group legend by label
        print(f"Size = {(row['width'], row['height'])}")
        axes.legend(handles=legends.values(), labels=legends.keys())
        plt.show()
    count += 1

## Image sizes from annotations

In [None]:
frame_annotations[['width', 'height']].plot.scatter('width', 'height')

## Bbox sizes

In [None]:
frame_annotations_bbox = frame_annotations[
    frame_annotations['type'] == 'rectangle'
].apply(
    lambda x: (x['points'][2] - x['points'][0], x['points'][3] - x['points'][1]),
    result_type='expand',
    axis=1
).rename({0: 'width', 1: 'height'}, axis=1)
_ = frame_annotations_bbox.plot.scatter('width', 'height')

Sorting bbox sizes by width and height will provide clues on how much the source image may be downsampled to preserve bboxes.

> The source images may be downsampled x5 times.

In [None]:
frame_annotations_bbox.sort_values(['width', 'height'], ascending=True)

The source image sizes downsampled.

> YOLO detector model input size can be 1024x1024.

In [None]:
_ = frame_annotations[
    ['width', 'height']
].apply(lambda x: x / 5).plot.scatter('width', 'height')

## Files exist

In [None]:
file_exists = lambda x: osp.exists(osp.join(PATH_IMAGES, x))

frame_annotations['exists'] = frame_annotations['name'].apply(file_exists)

frame_exists = frame_annotations[['name', 'exists']].drop_duplicates('name')

print(frame_exists['exists'].sum(), 'of', frame_exists['name'].count())