In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
#1. import dsml base module
from dsml_s8e.module import DSMLModule

In [None]:
#2. specify parameters

# Parameters
run_parameters = {
    "env_name": "user",
    "product_name": "cv_example",
    "stand_name": "YOLOX_mmdet",
    "docker_image": "cv-no-gpu:latest",
    "conda_env": "gpu",
    "business_report_repo": "",
    "infra": {},
    "comment": {},
}

parameters = {
    "loggingLevel"       : "INFO",
    "FILTER_EMPTY_GT"    : False,
    "AUGMENTATION_TYPE"  : 1,
    "MIN_OBJECT_SIZE"    : 5,
    "MAX_SIZE"           : 1024,
    "KEEP_RATIO"         : True,
}

In [None]:
import json

print(json.dumps(parameters, indent=4))

In [None]:
from dsml_s8e.spark import SparkEnvironment

SparkEnvironment.stopSparkSession()
spark = SparkEnvironment.runSparkSession(0)
SparkEnvironment.showSparkUI()

import atexit
_=atexit.register(SparkEnvironment.stopSparkSession)

In [None]:
import os

module = DSMLModule(parameters, run_parameters)

cache_urls = module.make_cache_urls(
    new_cache_entity_names=["aug_dataset"],
    last_cache_entity_names=["cache_data"]
)

resource_urls = module.make_component_resource_urls(
    "1_data_import", 
    entity_names=["split_config"]
)

a7s_urls = module.make_artifacts_urls(
    entity_names=[
        "train_coco_data",
        "eval_coco_data",
        "cache_config",
    ]
)

module.print_urls()

In [None]:
from dsml_s8e.store import DSMLStore

DSMLStore.copy_file_to_cache(
    os.path.join(resource_urls.split_config, 'split_config.json'),
    os.path.join(cache_urls.cache_data, 'split_config.json')
)

In [None]:
import json

split_config_path = os.path.join(cache_urls.cache_data, 'split_config.json')

with open(split_config_path) as f:
    SPLIT_CONFIG = json.load(f)

In [None]:
CONFIG = dict(
    TRAIN_DATASET_DIRS=[os.path.join(cache_urls.cache_data, d) for d in SPLIT_CONFIG['train_datasets']],
    EVAL_DATASET_DIRS=[os.path.join(cache_urls.cache_data, d) for d in SPLIT_CONFIG['eval_datasets']],
)

cache_config_path = os.path.join(cache_urls.cache_data, 'config.json')

with open(cache_config_path, 'w') as f:
    json.dump(CONFIG, f, indent=4)
    
!cat {cache_config_path}

In [None]:
max_size           = parameters['MAX_SIZE']
filter_empty_gt    = parameters['FILTER_EMPTY_GT']
eval_file_folders  = CONFIG.get('EVAL_DATASET_DIRS')
train_file_folders = CONFIG.get('TRAIN_DATASET_DIRS')
min_gt_bbox_wh     = parameters.get('MIN_OBJECT_SIZE'), parameters.get('MIN_OBJECT_SIZE')

CONFIG = dict(CONFIG, **parameters)
CONFIG

In [None]:
from utils.coco import join_coco_files, load as load_coco
from utils.coco import show_item
from utils.coco import get_dataset
from tqdm import tqdm
import concurrent.futures

In [None]:
os.makedirs(cache_urls.aug_dataset, exist_ok=True)

eval_max_object_size = 0
if eval_file_folders:
    eval_coco_file = os.path.join(cache_urls.aug_dataset, 'eval.json')
    eval_max_object_size = join_coco_files(eval_file_folders, output_file=eval_coco_file)

print()

train_max_object_size = 0
if train_file_folders:
    train_coco_file = os.path.join(cache_urls.aug_dataset, 'train.json')
    train_max_object_size = join_coco_files(train_file_folders, output_file=train_coco_file)

In [None]:
overlap = 0.5

print(f"{overlap=}")

In [None]:
loader_pipeline      = [dict(type="LoadImageFromFile")]
post_loader_pipeline = [dict(type="LoadImageFromFile")]

collect_pipeline = [
    dict(type="Collect", keys=["img", "gt_bboxes", "gt_labels", "gt_masks"]
         , meta_keys=('filename', 'img_shape')),
]

resize_pipeline = [
    dict(type='Resize', img_scale=(CONFIG['MAX_SIZE'], CONFIG['MAX_SIZE']), keep_ratio=CONFIG['KEEP_RATIO']),
    dict(type='FilterAnnotationsBugFix', min_gt_bbox_wh=(5,5)),
    
]

train_pipeline = [
    dict(type="LoadAnnotations", with_bbox=True, with_mask=True, poly2mask=False),
    dict(type='FilterAnnotationsBugFix', min_gt_bbox_wh=min_gt_bbox_wh),
]

eval_pipeline = [
    dict(type="LoadAnnotations", with_bbox=True, with_mask=True, poly2mask=False),
    dict(type='FilterAnnotationsBugFix', min_gt_bbox_wh=min_gt_bbox_wh),
]

In [None]:
from mmdet_utils.pipelines.train_pipelines import *
from mmdet_utils.pipelines.test_pipelines import *

In [None]:
import mmdet
import mmcv
import torch

print(mmdet.__version__)
print(mmcv.__version__)
print(torch.__version__)

!python3 --version

In [None]:
# mmdet_fp = "https://files.pythonhosted.org/packages/33/da/c979cca457c732e598131a3939c15d7d57b14b7ba6b79cba52a0cd604d99/mmdet-2.25.1-py3-none-any.whl"
# mmcv_fp  = "https://download.openmmlab.com/mmcv/dist/cu113/torch1.10.0/mmcv_full-1.6.0-cp39-cp39-manylinux1_x86_64.whl"

# assert mmdet.__version__ == '2.25.1', f'{mmdet.__version__} is not (2.25.1) download from "{mmdet_fp}"'
# assert mmcv.__version__ == '1.6.0', f'{mmcv.__version__} is not (1.6.0) download from "{mmcv_fp}"'

In [None]:
eval_datasets = []
if eval_max_object_size > 0:        
    if CONFIG['AUGMENTATION_TYPE'] in [0]:
        eval_datasets.append({
            "aug_type": 0,
            "dataset" : get_dataset(eval_coco_file, 
                                    loader_pipeline + eval_pipeline + collect_pipeline, filter_empty_gt),
        })

    if CONFIG['AUGMENTATION_TYPE'] in [1]:
        eval_datasets.append({
            "aug_type": 1,
            "dataset" : get_dataset(eval_coco_file, 
                                    loader_pipeline + eval_pipeline + resize_pipeline + collect_pipeline, filter_empty_gt)
        })

    for eval_dataset in eval_datasets:
        print(eval_dataset['dataset'])
        
        show_item(eval_dataset['dataset'], 0)

In [None]:
train_datasets = []

if train_max_object_size > 0:
    if CONFIG['AUGMENTATION_TYPE'] in [0]:
        train_datasets.append({
            "aug_type": 0,
            "dataset" : get_dataset(train_coco_file, 
                                    loader_pipeline + train_pipeline + collect_pipeline, filter_empty_gt),
        })

    if CONFIG['AUGMENTATION_TYPE'] in [1]:
        train_datasets.append({
            "aug_type": 1,
            "dataset" : get_dataset(train_coco_file, 
                                    loader_pipeline + train_pipeline + resize_pipeline + collect_pipeline, filter_empty_gt)
        })

    for train_dataset in train_datasets:
        print(train_dataset['dataset'])
        
        show_item(train_dataset['dataset'], 0)

In [None]:
from utils.coco.item import item_to_coco

In [None]:
def process_dataset(dataset, out_folder, max_size, save_all_image):
    pbar = tqdm(total=len(dataset))
    pbar.set_description(f"Processing [{dataset.ann_file}]")

    with concurrent.futures.ProcessPoolExecutor(8) as executor:
        futures = []
        for i in range(len(dataset)):
            futures.append(
                executor.submit(
                    item_to_coco, dataset, i, out_folder, max_size, dataset.cat_ids, dataset.CLASSES, overlap=overlap, save_all_image=save_all_image
                )
            )
            pbar.update(0.5)
        
        
        for future in concurrent.futures.as_completed(futures):
            _result = future.result()
            if type(_result) is Exception:
                try:
                    executor.shutdown(wait=False)
                except OSError:
                    pass
                
                break
            
            pbar.update(0.5)

    pbar.close()

In [None]:
%%time

eval_out_folder = os.path.join(cache_urls.aug_dataset, 'eval_data')
!rm -rf {eval_out_folder}/*

for eval_dataset in eval_datasets:
    save_all_image = eval_dataset['aug_type'] == 1
    process_dataset(eval_dataset['dataset'], eval_out_folder, max_size, save_all_image)

In [None]:
eval_output_file = os.path.join(eval_out_folder, 'all_coco.json')
join_coco_files(eval_out_folder, output_file=eval_output_file)

In [None]:
preview_pipeline = [pp for pp in eval_pipeline if "Filter" not in pp['type']]

eval_dataset = get_dataset(eval_output_file, post_loader_pipeline + preview_pipeline + collect_pipeline)
if len(eval_dataset):
    show_item(eval_dataset, 0)

In [None]:
%%time

train_out_folder = osp.join(cache_urls.aug_dataset, 'train_data')
!rm -rf {train_out_folder}/*

for train_dataset in train_datasets:
    save_all_image = train_dataset['aug_type'] == 1
    process_dataset(train_dataset['dataset'], train_out_folder, max_size, save_all_image)

In [None]:
train_output_file = os.path.join(train_out_folder, 'all_coco.json')
join_coco_files(train_out_folder, output_file=train_output_file)

In [None]:
preview_pipeline = [pp for pp in train_pipeline if "Filter" not in pp['type']]

train_dataset = get_dataset(train_output_file, post_loader_pipeline + preview_pipeline + collect_pipeline)
if len(train_dataset):
    show_item(train_dataset, 0)

In [None]:
import numpy as np

for file in [eval_output_file, train_output_file]:
    all_coco_data = load_coco(file)

    mean = []
    std = []

    for image in all_coco_data['images']:
        if len(image.get('mean', [])) == 3:
            mean.append(image['mean'])
            std.append(image['std'])

    mean = np.array(mean).mean(axis=0).round(2).tolist()
    std = np.array(std).mean(axis=0).round(2).tolist()

    print(f"{file = }")
    print(f"{mean = }")
    print(f"{std  = }")
    print()

In [None]:
!ls {cache_urls.aug_dataset}/eval_data | grep json | wc -l

In [None]:
!ls {cache_urls.aug_dataset}/train_data | grep json | wc -l

In [None]:
from utils import get_files
from utils.coco import load as load_coco

from tqdm import tqdm
import pandas as pd

def build_pandas_df(_dir):
    pack = []
    files = get_files(_dir, "*.json")
    for file in tqdm(files):
        coco_data = load_coco(file)
        
        row = {
            "file_base_name"       : None,
            "image_file_bytes"     : None,
            "coco_json_file_bytes" : None,
            "dataset_folder_meta"  : os.path.basename(os.path.dirname(file)),
        }
        
        if len(coco_data['images']) != 1:
            continue
        
        with open(coco_data['images'][0]['file_name'], 'rb') as img_fd:
            row['image_file_bytes'] = img_fd.read()
        
        row["file_base_name"] = osp.basename(coco_data['images'][0]['file_name'])
        coco_data['images'][0]['file_name'] = row["file_base_name"]
        
        row['coco_json_file_bytes'] = json.dumps(coco_data).encode('utf-8')
        
        pack.append(row)
    
    return pack

Конвертируем аугментированные данные в формат `parquet` и загружаем их на *HDFS* используя *Spark*.

Поскольку мы создаем и загружаем их с помощью библиотеки `dsml_s8e.spark`, добавляя опцию `option("compression", "none")`, мы можем сразу загружать на *HDFS* "правильные" `parquet` файлы, которые уже не нужно после этого "нормализовывать".

In [None]:
eval_data_df = build_pandas_df(os.path.join(cache_urls.aug_dataset, "eval_data"))
eval_data_df = spark.createDataFrame(eval_data_df)
eval_data_df.printSchema()
print(f"{a7s_urls.eval_coco_data=}")
eval_data_df.write.mode("overwrite").option("compression", "none").parquet(a7s_urls.eval_coco_data)

In [None]:
train_data_df = build_pandas_df(osp.join(cache_urls.aug_dataset, "train_data"))
train_data_df = spark.createDataFrame(train_data_df)
train_data_df.printSchema()
print(f"{a7s_urls.train_coco_data=}")
train_data_df.write.mode("overwrite").option("compression", "none").parquet(a7s_urls.train_coco_data)

In [None]:
CONFIG['Normalize'] = {'type': 'Normalize', 'mean': mean, 'std': std, 'to_rgb': False}
CONFIG['CLASSES'] = train_dataset.CLASSES

In [None]:
if "TRAIN_DATASET_DIRS" in CONFIG:
    del CONFIG["TRAIN_DATASET_DIRS"]
if "EVAL_DATASET_DIRS" in CONFIG:
    del CONFIG["EVAL_DATASET_DIRS"]
CONFIG

In [None]:
with open(cache_config_path, 'w') as f:
    json.dump(CONFIG, f, indent=4)
    
!cat {cache_config_path}

In [None]:
from dsml_s8e.store import DSMLStore

DSMLStore.copy_file_to_store(os.path.join(cache_urls.cache_data, 'config.json'), os.path.join(a7s_urls.cache_config, 'config.json'))

In [None]:
#11 #SparkEnvironment.stopSparkSession()

SparkEnvironment.stopSparkSession()