In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
#2. specify parameters
pipeline_params={
}
step_params={
}
substep_params={
    "SEED"         : 42, 
}

In [None]:
#3 define substep interface
from sinara.substep import NotebookSubstep, default_param_values, ENV_NAME, PIPELINE_NAME, ZONE_NAME, STEP_NAME, RUN_ID, ENTITY_NAME, ENTITY_PATH, SUBSTEP_NAME

substep = NotebookSubstep(pipeline_params, step_params, substep_params, **default_param_values("params/step_params.json"))

substep.interface(
    inputs =
    [ { STEP_NAME: "2_model_train", ENTITY_NAME: "aug_dataset",  RUN_ID: "run-23-10-12-231808"},
      { STEP_NAME: "2_model_train", ENTITY_NAME: "config",  RUN_ID: "run-23-10-12-231808"},
      { STEP_NAME: "2_model_train", ENTITY_NAME: "cache_data",  RUN_ID: "run-23-10-12-231808"}
    ],
    
    outputs = 
    [
        { ENTITY_NAME: "model"}
    ]
    
)

substep.print_interface_info()

substep.exit_in_visualize_mode()

In [None]:
#4 get substep.interface
inputs = substep.inputs(step_name = "2_model_train", run_id="run-23-10-12-231808")
outputs = substep.outputs()

print(f"{inputs.aug_dataset=}")
print(f"{inputs.config=}")
print(f"{inputs.cache_data=}")

print(f"{outputs.model=}")


In [None]:
import logging
logging.root.setLevel(substep_params.get('loggingLevel', 'INFO'))
logging.debug('Запись.')

In [None]:
#5 run spark
from sinara.spark import SinaraSpark

spark = SinaraSpark.run_session(0)
SinaraSpark.ui_url()

In [None]:
import os.path as osp
import os

### Обучение модели

#### Инициализация модулей из mmdetection, mmcv  

In [None]:
import torch, torchvision

print(f"{torch.__version__=}")
print(f"{torch.cuda.is_available()=}")
print(f"{torchvision.__version__=}")

if torch.cuda.is_available():
    device_id = torch.cuda.current_device()
    device_name = torch.cuda.get_device_name(device_id)
    print(f"{device_name=}")
    print(f"{torch.cuda.device_count()=}")

In [None]:
from mmdet.datasets import PIPELINES

try:
    @PIPELINES.register_module()
    class DataAsList:
        def __call__(self, results):
            aug_data_dict = {key: [val] for key, val in results.items()}
            return aug_data_dict
except Exception as e:
    print(e)

In [None]:
import copy
import os
import time
import warnings
import math

import mmcv
from mmcv import Config, ConfigDict

import mmdet
from mmdet.apis import init_random_seed, set_random_seed, train_detector
from mmdet.datasets import build_dataset
from mmdet.models import build_detector
from mmdet.utils import get_root_logger

print(f"{mmcv.__version__=}")
print(f"{mmdet.__version__=}")

In [None]:
import json
config_fn = os.path.join(inputs.config, 'config.json')

with open(config_fn) as f_id:
    CONFIG = json.load(f_id)

In [None]:
CONFIG

In [None]:
cfg = Config.fromfile(CONFIG['config_file'])

In [None]:
with open(osp.join(cfg.work_dir, 'config.json'), 'w') as f_id:
    json.dump(CONFIG, f_id, indent=4)

In [None]:
# init the logger before other steps
timestamp = time.strftime('%Y%m%d_%H%M%S', time.localtime())
log_file = osp.join(cfg.work_dir, f'latest.log')
logger = get_root_logger(log_file=log_file, log_level=cfg.log_level)

In [None]:
base_seed = substep_params['SEED']**2
deterministic = False

meta = dict()
meta['config'] = cfg.pretty_text

# set random seeds
seed = init_random_seed(base_seed)
# logger.info(f'Set random seed to {seed}, '
#             f'deterministic: {deterministic}')
set_random_seed(seed, deterministic=deterministic)
cfg['seed'] = seed
meta['seed'] = seed
meta['exp_name'] = CONFIG['config_file']

#### Инициализация модели на основе pretrain весов

In [None]:
model = build_detector(cfg.model)

model.init_weights()

In [None]:
datasets = [build_dataset(cfg.data.train)]

if len(cfg.workflow) == 2:
    val_dataset = copy.deepcopy(cfg.data.val)
    val_dataset.pipeline = cfg.data.train.pipeline
    datasets.append(build_dataset(val_dataset))
    
for i in range(len(datasets)):
    try:
        datasets[i].update_skip_type_keys
    except AttributeError:
        datasets[i].update_skip_type_keys = lambda x: x 

In [None]:
model.CLASSES = datasets[0].CLASSES
datasets[0].CLASSES

#### Запуск обучения 

In [None]:
 train_detector(
    model,
    datasets,
    cfg,
    validate=True,
    timestamp=timestamp,
    meta=meta)

#### Копирование изображения из валидационного датасета

In [None]:
val_dataset.ann_file

In [None]:
val_dataset

In [None]:
# save one example image from eval_dataset
from utils.coco import load as load_coco
from pathlib import Path
import shutil 

val_coco = load_coco(val_dataset.ann_file)
assert val_coco
select_file = osp.join(val_dataset.img_prefix, val_coco["images"][0]["file_name"])
assert osp.exists(select_file)

shutil.copy(select_file, osp.join(CONFIG['work_dir'], f"test{Path(select_file).suffix}"))

### Копирование обученной модели 
(весов, конфига, тестового изображения) в hdfs для последующей передачи в другие компоненты

Так как в процессе обучения могут создаваться промежуточные веса нейросети (например для эпох 10, 20, 30 и т.д)    
то не имеет особого смысла копировать все промежуточные файлы в другую компоненту.    
Следовательно, скопируем веса и необходимые конфиги в отдельную директорию и эти файлы уже будем копировать на HDFS 

In [None]:
!ls -lah {cfg.work_dir}

In [None]:
# copy selected files to another dir
import shutil 
import os.path as osp

weights_dir = osp.join(cfg.work_dir, "weights")
os.makedirs(weights_dir, exist_ok=True)

model_path = cfg.work_dir
files = [osp.join(model_path, file) for file in os.listdir(model_path)]
models_pth  = [file for file in files if '.pth' in file if osp.isfile(file)]
best_models = [file for file in models_pth if 'best' in file]
latest_models = [file for file in models_pth if 'latest' in file]

shutil.copy(osp.join(cfg.work_dir, "config.json"), osp.join(weights_dir, "config.json"))
shutil.copy(osp.join(cfg.work_dir, f"test{Path(select_file).suffix}"), osp.join(weights_dir, f"test{Path(select_file).suffix}"))
shutil.copy(cfg.filename, osp.join(weights_dir, osp.basename(cfg.filename)))

for fpath in latest_models:
    shutil.copy(fpath, fpath.replace(model_path, weights_dir))
for fpath in best_models:
    shutil.copy(fpath, fpath.replace(model_path, weights_dir))

In [None]:
!ls -lah {weights_dir}

#### delete information about use cache

In [None]:
cfg = Config.fromfile(osp.join(weights_dir, "last_cfg.py"))
cfg.load_from = ""
cfg.train_dataset.ann_file = ""
cfg.test_dataset.ann_file = ""
cfg.data.train.ann_file = ""
cfg.data.val.ann_file = ""
cfg.data.test.ann_file = ""
cfg.work_dir = "" 
config_file = osp.join(cfg.work_dir, "last_cfg.py")
cfg.dump(file=osp.join(weights_dir, "last_cfg.py"))

In [None]:
with open(osp.join(weights_dir, "config.json")) as f_id:
    temp_CONFIG = json.load(f_id)
temp_CONFIG.pop("eval_datasets") if "eval_datasets" in temp_CONFIG else ""
temp_CONFIG.pop("train_datasets") if "train_datasets" in temp_CONFIG else ""
temp_CONFIG.pop("train_output_file") if "train_output_file" in temp_CONFIG else ""
temp_CONFIG.pop("eval_output_file") if "eval_output_file" in temp_CONFIG else ""
temp_CONFIG.pop("work_dir") if "work_dir" in temp_CONFIG else ""
temp_CONFIG["config_file"] = osp.basename(temp_CONFIG["config_file"])

with open(osp.join(weights_dir, "config.json"), 'w') as f_id:
    json.dump(temp_CONFIG, f_id, indent=4)

Copy files of model to output

In [None]:
# !rm -r {outputs.model}

In [None]:
shutil.copytree(weights_dir, outputs.model)

In [None]:
#stop spark
SinaraSpark.stop_session()