In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
#2. specify parameters
pipeline_params={
}
step_params={
}
substep_params={
    "FILTER_EMPTY_GT"    : False,
    "MIN_OBJECT_SIZE"    : 5
}

In [None]:
#3 define substep interface
from sinara.substep import NotebookSubstep, default_param_values, ENV_NAME, PIPELINE_NAME, ZONE_NAME, STEP_NAME, RUN_ID, ENTITY_NAME, ENTITY_PATH, SUBSTEP_NAME

substep = NotebookSubstep(pipeline_params, step_params, substep_params, **default_param_values("params/step_params.json"))

substep.interface(
    outputs = 
    [
        { ENTITY_NAME: "train_coco_data"},
        { ENTITY_NAME: "eval_coco_data"},
        { ENTITY_NAME: "test_coco_data"},
        { ENTITY_NAME: "config"}
    ],
    tmp_inputs =
    [
        {ENTITY_NAME: "cache_data"},
        {ENTITY_NAME: "cache_config"}
    ]
)

substep.print_interface_info()

substep.exit_in_visualize_mode()

![interface 0_data_load](./imgs/1_data_prep_inteface.drawio.png)

In [None]:
#4 substep.interface to variables
outputs = substep.outputs()
tmp_inputs = substep.tmp_inputs()

In [None]:
# view urls interface of outputs
print(f"{outputs.train_coco_data=}")
print(f"{outputs.eval_coco_data=}")
print(f"{outputs.test_coco_data=}")
print(f"{outputs.config=}")

In [None]:
# view urls interface of tmp_inputs
print(f"{tmp_inputs.cache_data=}")
print(f"{tmp_inputs.cache_config=}")

In [None]:
#5 run spark
from sinara.spark import SinaraSpark

spark = SinaraSpark.run_session(0)
SinaraSpark.ui_url()

### Processing annotations

Checking annotations for empty objects, selecting object categories

In [None]:
from utils.coco import join_coco_files, load as load_coco
from utils.coco import preview_coco_file
from utils.coco import show_item
from utils.coco import get_dataset
from sklearn.model_selection import train_test_split
from tqdm import tqdm
import numpy as np
import os.path as osp
import os

In [None]:
# Load annotation from json
coco_annotation = load_coco(osp.join(tmp_inputs.cache_data, "annotations", "instances_val2017.json"))

In [None]:
# Selection of object types for subsequent neural network training
select_object_names = ["person", "bicycle", "car", "motorcycle", "bus", "truck"]
CLASSES = select_object_names
select_categories= [cat_info.copy() for cat_info in coco_annotation["categories"] if cat_info["name"] in select_object_names]
for new_id, cat_info in enumerate(select_categories, 1):
    cat_info["old_id"] = cat_info["id"]
    cat_info["id"] = new_id 
select_categories

In [None]:
# Select annotation object by select_categories
reid_categories_ids = {cat_info["old_id"]: cat_info["id"] for cat_info in select_categories} # reidentification categories

new_coco_annotations = []
for annot in coco_annotation["annotations"]:
    new_annot = annot.copy()
    category_id = new_annot["category_id"]
    if category_id in reid_categories_ids.keys():
        new_annot["category_id"] = reid_categories_ids[category_id]
        new_coco_annotations.append(new_annot)

In [None]:
# apply new annotation
coco_annotation["categories"] = select_categories.copy()
coco_annotation["annotations"] = new_coco_annotations.copy()

In [None]:
# checking segmentation in annotations (list in segmentation)
bad_image_ids = [annot["image_id"] for annot in coco_annotation["annotations"] if not isinstance(annot["segmentation"], list)]

true_coco_images = [img_info for img_info in coco_annotation["images"] if not img_info["id"] in bad_image_ids]
true_coco_annotations = [annot for annot in coco_annotation["annotations"] if not annot["image_id"] in bad_image_ids]
coco_annotation["images"] = true_coco_images.copy()
coco_annotation["annotations"] = true_coco_annotations.copy()

In [None]:
# checking the existence of annotations for an image

coco_image_ids =  [img_info["id"] for img_info in coco_annotation["images"]]
coco_annotations_image_ids = [annot["image_id"] for annot in coco_annotation["annotations"]]

bad_image_ids = list(set(np.unique(coco_image_ids)) - set(np.unique(coco_annotations_image_ids)))

true_coco_images = [img_info for img_info in coco_annotation["images"] if not img_info["id"] in bad_image_ids]
true_coco_annotations = [annot for annot in coco_annotation["annotations"] if not annot["image_id"] in bad_image_ids]

coco_annotation["images"] = true_coco_images.copy()
coco_annotation["annotations"] = true_coco_annotations.copy()

In [None]:
len(coco_annotation["images"])

### Split Dataset to Train and Valid

In [None]:
# split to train and valid parts
train_coco_images, val_coco_images = train_test_split(coco_annotation["images"], test_size=0.33, random_state=42)
val_coco_images, test_coco_images = train_test_split(val_coco_images.copy(), test_size=0.1, random_state=42)

train_images_ids = [img_info["id"] for img_info in train_coco_images]
val_images_ids = [img_info["id"] for img_info in val_coco_images]
test_images_ids = [img_info["id"] for img_info in test_coco_images]

train_images_names = [img_info["file_name"] for img_info in train_coco_images]
val_images_names = [img_info["file_name"] for img_info in val_coco_images]
test_images_names = [img_info["file_name"] for img_info in test_coco_images]

In [None]:
# print count images from train, valid and test datasets
print("Count images for train datasets: ", len(train_images_ids))
print("Count images for valid datasets: ", len(val_images_ids))
print("Count images for test datasets: ", len(test_images_ids))

In [None]:
train_coco_annotations = [annot.copy() for annot in coco_annotation["annotations"] if annot["image_id"] in train_images_ids]
val_coco_annotations = [annot.copy() for annot in coco_annotation["annotations"] if annot["image_id"] in val_images_ids]
test_coco_annotations = [annot.copy() for annot in coco_annotation["annotations"] if annot["image_id"] in test_images_ids]

In [None]:
train_coco = coco_annotation.copy()
train_coco["images"] = train_coco_images
train_coco["annotations"] = train_coco_annotations

val_coco = coco_annotation.copy()
val_coco["images"] = val_coco_images
val_coco["annotations"] = val_coco_annotations

test_coco = coco_annotation.copy()
test_coco["images"] = test_coco_images
test_coco["annotations"] = test_coco_annotations

In [None]:
# preview data for train and valid dataset
preview_coco_file(train_coco, img_folder=osp.join(tmp_inputs.cache_data, "val2017"), count=2)
preview_coco_file(val_coco, img_folder=osp.join(tmp_inputs.cache_data, "val2017"), count=2)
preview_coco_file(test_coco, img_folder=osp.join(tmp_inputs.cache_data, "val2017"), count=2)

In [None]:
import matplotlib.pyplot as plt

areas  = []
counts = []
categories = []
categories_annotation = []

anns = coco_annotation.get('annotations', [])
for image in coco_annotation.get('images', []):
    image_anns = [ann for ann in anns if ann['image_id'] == image['id']]
    counts.append(len(image_anns))
    
    for ann in image_anns:
        areas.append(ann.get('area'))
        categories.append(ann.get('category_id'))
    
    categories_annotation += coco_annotation['categories']
           
counts = np.array(counts)
areas  = np.array(areas)

In [None]:
import plotly.express as px

fig = px.histogram(areas, title='Площать объектов на фотографиях датасета')
fig.layout.yaxis.title = 'Кол-во объектов'
fig.layout.xaxis.title = 'Площадь'
fig.show()

In [None]:
fig = px.histogram(counts, title='Кол-во объектов на фотографиях датасета')
fig.layout.yaxis.title = 'Кол-во объектов'
fig.layout.xaxis.title = 'Кол-во фотографий'
fig.show()

## Save train and validation datasets to outputs

In [None]:
### Save train, validation and test datasets to parquet
import json
from pathlib import Path
from tqdm import tqdm
import pandas as pd

def build_pandas_df(_coco_data, img_folder:str = ""):
    pack = []
    for img_info in tqdm(_coco_data["images"]):
        row = {
            "file_names"       : None,
            "files_binary"     : None,
        }
        file_name = osp.join(img_folder, img_info["file_name"])
        with open(file_name, 'rb') as file_fd:
            row['files_binary'] = file_fd.read()
        row["file_names"] = img_info["file_name"]
        pack.append(row.copy())
    return pack

In [None]:
data_train_df =  build_pandas_df(train_coco, img_folder=osp.join(tmp_inputs.cache_data, "val2017"))
data_train_df = spark.createDataFrame(data_train_df)
data_train_df.printSchema()

data_val_df =  build_pandas_df(val_coco, img_folder=osp.join(tmp_inputs.cache_data, "val2017"))
data_val_df = spark.createDataFrame(data_val_df)
data_val_df.printSchema()

data_test_df =  build_pandas_df(test_coco, img_folder=osp.join(tmp_inputs.cache_data, "val2017"))
data_test_df = spark.createDataFrame(data_test_df)
data_test_df.printSchema()

In [None]:
data_train_df.write.mode("overwrite").option("compression", "none").parquet(outputs.train_coco_data)
data_val_df.write.mode("overwrite").option("compression", "none").parquet(outputs.eval_coco_data)
data_test_df.write.mode("overwrite").option("compression", "none").parquet(outputs.test_coco_data)

In [None]:
# Touch _SUCCESS file to outputs
Path(osp.join(outputs.train_coco_data, '_SUCCESS')).touch()
Path(osp.join(outputs.eval_coco_data, '_SUCCESS')).touch()
Path(osp.join(outputs.test_coco_data, '_SUCCESS')).touch()

In [None]:
# save train and valid annotations to json
os.makedirs(outputs.config, exist_ok=True)
train_annotation_path = osp.join(tmp_inputs.cache_config, "train_coco_annotations.json")
val_annotation_path = osp.join(tmp_inputs.cache_config, "val_coco_annotations.json")
test_annotation_path = osp.join(tmp_inputs.cache_config, "test_coco_annotations.json")

with open(train_annotation_path, 'w') as f:
    json.dump(train_coco, f, indent=4)

with open(val_annotation_path, 'w') as f:
    json.dump(val_coco, f, indent=4)
    
with open(test_annotation_path, 'w') as f:
    json.dump(test_coco, f, indent=4)
    
CONFIG = dict(**substep_params)
CONFIG["train_coco_annotation"] = "train_coco_annotations.json"
CONFIG["val_coco_annotation"] = "val_coco_annotations.json"
CONFIG["test_coco_annotation"] = "test_coco_annotations.json"
CONFIG["train_images"] = "train_coco_data"
CONFIG["val_images"] = "eval_coco_data"
CONFIG["test_images"] = "test_coco_data"
CONFIG["CLASSES"] = CLASSES

config_path = osp.join(tmp_inputs.cache_config, "config.json")
with open(config_path, 'w') as f:
    json.dump(CONFIG, f, indent=4)
    


In [None]:
# saves tmp_outputs.cache_config (json files of configs) to outputs as parquets
from sinara.store import SinaraStore

SinaraStore.copy_tmp_files_to_store(tmp_dir=tmp_inputs.cache_config, store_dir=outputs.config, file_globs=["*"])
#  _SUCCESS file will be added automatically

In [None]:
#7 stop spark
SinaraSpark.stop_session()