In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
# Parameters
params = {
    "pipeline_params": {
        "env_name": "user",
        "pipeline_name": "yolox_mmdet",
        "zone_name": "test",
    },
    "step_params": {"step_name": "model_train"},
    "substeps_params": [
        {"substep_name": "0_cache_datasets.ipynb", "substep_params": {}},
        {
            "substep_name": "1_configure_train.ipynb",
            "substep_params": {
                "MAX_SIZE": 640,
                "BATCH": 8,
                "WORKERS": 0,
                "SEED": 42,
                "EPOCH_COUNT": 5,
                "MODEL_NAME": "yolox_s",
                "optimizer_lr": 0.001,
                "pretrain_weights": "https://download.openmmlab.com/mmdetection/v2.0/yolox/yolox_s_8x8_300e_coco/yolox_s_8x8_300e_coco_20211121_095711-4592a793.pth",
            },
        },
        {"substep_name": "2_train.ipynb", "substep_params": {"SEED": 42}},
    ],
}
pipeline_params = {
    "env_name": "user",
    "pipeline_name": "yolox_mmdet",
    "zone_name": "test",
}
step_params = {"step_name": "model_train"}
substep_params = {}


In [3]:
#3 define substep interface
from sinara.substep import NotebookSubstep, default_param_values, ENV_NAME, PIPELINE_NAME, ZONE_NAME, STEP_NAME, RUN_ID, ENTITY_NAME, ENTITY_PATH, SUBSTEP_NAME

substep = NotebookSubstep(pipeline_params, step_params, substep_params, **default_param_values("params/step_params.json"))

substep.interface(
    inputs =
    [ 
      { STEP_NAME: "data_prep", ENTITY_NAME: "train_data"},
      { STEP_NAME: "data_prep", ENTITY_NAME: "eval_data"},
      { STEP_NAME: "data_prep", ENTITY_NAME: "train_eval_config"}
    ],
    
    tmp_outputs =
    [
        { ENTITY_NAME: "load_train_data" },
        { ENTITY_NAME: "load_eval_data" },
        { ENTITY_NAME: "aug_dataset" },
        { ENTITY_NAME: "train_eval_config" }
    ]
)

substep.print_interface_info()

substep.exit_in_visualize_mode()

**INPUTS:**


[{'user.yolox_mmdet.test.data_prep.train_data': '/data/home/jovyan/yolox_mmdet/test/data_prep/run-23-10-26-211120/train_data'},
 {'user.yolox_mmdet.test.data_prep.eval_data': '/data/home/jovyan/yolox_mmdet/test/data_prep/run-23-10-26-211120/eval_data'},
 {'user.yolox_mmdet.test.data_prep.train_eval_config': '/data/home/jovyan/yolox_mmdet/test/data_prep/run-23-10-26-211120/train_eval_config'}]




**TMP OUTPUTS:**


[{'tmp:user.yolox_mmdet.test.model_train.load_train_data': '/data/tmp/user/yolox_mmdet/test/model_train/run-23-10-26-215221/load_train_data'},
 {'tmp:user.yolox_mmdet.test.model_train.load_eval_data': '/data/tmp/user/yolox_mmdet/test/model_train/run-23-10-26-215221/load_eval_data'},
 {'tmp:user.yolox_mmdet.test.model_train.aug_dataset': '/data/tmp/user/yolox_mmdet/test/model_train/run-23-10-26-215221/aug_dataset'},
 {'tmp:user.yolox_mmdet.test.model_train.train_eval_config': '/data/tmp/user/yolox_mmdet/test/model_train/run-23-10-26-215221/train_eval_config'}]




![interface 0_cache_datasets](./imgs/0_cache_datasets.drawio.png)

In [4]:
#4 get substep.interface
inputs = substep.inputs(step_name = "data_prep")
outputs = substep.outputs()
tmp_outputs = substep.tmp_outputs()

print(f"{inputs.train_data=}")
print(f"{inputs.eval_data=}")
print(f"{inputs.train_eval_config=}")

print(f"{tmp_outputs.load_train_data=}")
print(f"{tmp_outputs.load_eval_data=}")
print(f"{tmp_outputs.aug_dataset=}")
print(f"{tmp_outputs.train_eval_config=}")

inputs.train_data='/data/home/jovyan/yolox_mmdet/test/data_prep/run-23-10-26-211120/train_data'
inputs.eval_data='/data/home/jovyan/yolox_mmdet/test/data_prep/run-23-10-26-211120/eval_data'
inputs.train_eval_config='/data/home/jovyan/yolox_mmdet/test/data_prep/run-23-10-26-211120/train_eval_config'
tmp_outputs.load_train_data='/data/tmp/user/yolox_mmdet/test/model_train/run-23-10-26-215221/load_train_data'
tmp_outputs.load_eval_data='/data/tmp/user/yolox_mmdet/test/model_train/run-23-10-26-215221/load_eval_data'
tmp_outputs.aug_dataset='/data/tmp/user/yolox_mmdet/test/model_train/run-23-10-26-215221/aug_dataset'
tmp_outputs.train_eval_config='/data/tmp/user/yolox_mmdet/test/model_train/run-23-10-26-215221/train_eval_config'


In [5]:
#5 run spark
from sinara.spark import SinaraSpark

spark = SinaraSpark.run_session(0)
SinaraSpark.ui_url()

Session is run


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/26 21:52:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


'http://localhost:4040'

### Loading train and val datasets and configuration files (from the previous component data_prep)

In [6]:
import os.path as osp
import os
from sinara.store import SinaraStore
from pathlib import Path

# copy config from previos step to outputs
SinaraStore.copy_store_files_to_tmp(store_dir=inputs.train_data, tmp_dir=tmp_outputs.load_train_data)
SinaraStore.copy_store_files_to_tmp(store_dir=inputs.eval_data, tmp_dir=tmp_outputs.load_eval_data)
SinaraStore.copy_store_files_to_tmp(store_dir=inputs.train_eval_config, tmp_dir=tmp_outputs.train_eval_config)

#### Unpack parquet from load_train_data and load_eval_data by files to aug_dataset

In [7]:
from functools import partial

### Save dataset from parquet to files
def save_file(row, tmp_dir): 
    total_img_path = osp.join(tmp_dir, row.file_names)
    os.makedirs(osp.dirname(total_img_path), exist_ok=True)
    with open(total_img_path, 'wb') as f_id:
        f_id.write(row.files_binary)

In [8]:
%%time

# LOAD Train Images

print(f"spark read start")
df_spark = spark.read.parquet(tmp_outputs.load_train_data)
df_spark.foreach(partial(save_file, tmp_dir=tmp_outputs.aug_dataset))

spark read start




CPU times: user 3.76 ms, sys: 8.46 ms, total: 12.2 ms
Wall time: 21.8 s


                                                                                                                                                         

In [9]:
%%time

# LOAD Valid Images

print(f"spark read start")
df_spark = spark.read.parquet(tmp_outputs.load_eval_data)
df_spark.foreach(partial(save_file, tmp_dir=tmp_outputs.aug_dataset))

spark read start


[Stage 3:>                                                                                                                                   (0 + 4) / 4]

CPU times: user 5.81 ms, sys: 665 µs, total: 6.48 ms
Wall time: 8.54 s


                                                                                                                                                         

In [10]:
#9 stop spark
SinaraSpark.stop_session()

In [11]:
if "substep" in globals():
    substep._serialize_run(
                                            "0_cache_datasets.ipynb",
                                            "run-23-10-26-215221_0_cache_datasets.ipynb",
                                            "2023-10-26 21:52:21.364466",
                                            pipeline_params,
                                            step_params,
                                            substep_params)
else:
    raise Exception('SINARA module must have defined module variable')