diff --git a/fast_llm/engine/schedule/config.py b/fast_llm/engine/schedule/config.py index 141490ac3..272b7c6ae 100644 --- a/fast_llm/engine/schedule/config.py +++ b/fast_llm/engine/schedule/config.py @@ -1,6 +1,5 @@ import enum import functools -import warnings from fast_llm.config import Config, Field, FieldHint, check_field, config_class, test_field from fast_llm.engine.distributed.config import DistributedConfig @@ -105,11 +104,6 @@ def _validate(self) -> None: if self._distributed.pipeline_parallel > 1 and self.depth_first_micro_batches > 1: raise NotImplementedError("Depth-first pipeline parallelism not yet implemented") - if self.depth_first_micro_batches > 1 and self.breadth_first_micro_batches > 1: - warnings.warn( - "Mixing of breadth-first and depth-first gradient accumulation is not thoroughly tested." - " Use at your own risk." - ) super()._validate() diff --git a/fast_llm/layers/ssm/discrete_mamba2.py b/fast_llm/layers/ssm/discrete_mamba2.py index 31e81e99b..b0aa96805 100644 --- a/fast_llm/layers/ssm/discrete_mamba2.py +++ b/fast_llm/layers/ssm/discrete_mamba2.py @@ -7,6 +7,7 @@ from fast_llm.engine.config_utils.tensor_space import TensorDim, TensorSpace from fast_llm.layers.common.linear import Linear from fast_llm.layers.ssm.config import SSMConfig, SSMDimNames +from fast_llm.layers.transformer.config import TransformerKwargs from fast_llm.tensor import ParameterMeta, init_ones_, init_uniform_, init_zeros_, kaiming_init_ from fast_llm.utils import get_lr_scale @@ -157,6 +158,8 @@ def forward(self, hidden_states, kwargs): outputs["hidden_states"]: (B, L, D). outputs["state"]: inference cache. """ + if kwargs[TransformerKwargs.sequence_first]: + raise NotImplementedError(f"Sequence-first not supported for SSMs.") assert _mamba_available input_ = hidden_states diff --git a/fast_llm/logging.py b/fast_llm/logging.py index 385a8b960..e8334de6e 100644 --- a/fast_llm/logging.py +++ b/fast_llm/logging.py @@ -137,6 +137,7 @@ def log_tensor[ ) -> (T | None): if level < 1: return + tensor = tensor.detach() save_stats = TensorLogs.config.save shape = tuple(tensor.shape) _, dtype = str(tensor.dtype).split("torch.") diff --git a/fast_llm/models/ssm/config.py b/fast_llm/models/ssm/config.py index 3c47ff0b2..ecd8908ee 100644 --- a/fast_llm/models/ssm/config.py +++ b/fast_llm/models/ssm/config.py @@ -197,6 +197,12 @@ def _validate(self): logger.warning( "HybridSSMModelConfig is being instantiated. This model is experimental and may not work as expected." ) + if ( + self.base_model.sequence_first + or self.distributed.sequence_data_parallel > 1 + or self.distributed.sequence_tensor_parallel + ): + raise NotImplementedError(f"Sequence-first not supported for SSMs.") super()._validate() diff --git a/fast_llm/utils.py b/fast_llm/utils.py index 821ec5874..472f5e9b7 100644 --- a/fast_llm/utils.py +++ b/fast_llm/utils.py @@ -145,7 +145,7 @@ def multiple(x, y): @staticmethod def rms_close(x, y, threshold): - rms = rms_diff(x, y).item() + rms = rms_diff(x, y).detach().item() assert rms <= threshold, f"Rms diff too big ({rms:.3e} > {threshold:.3e}) between tensors {x} and {y}" @staticmethod diff --git a/tests/conftest.py b/tests/conftest.py index e9011979a..298117e1d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -95,10 +95,8 @@ def pytest_configure(config): else: worker_id = 0 - # TODO: Remove the whole `TEST_RESULTS_PATH` once `get_test_dataset` is parallel-safe. - model_result_path = TEST_RESULTS_PATH / "models" - if model_result_path.exists(): - shutil.rmtree(model_result_path) + if TEST_RESULTS_PATH.exists(): + shutil.rmtree(TEST_RESULTS_PATH) num_gpus = torch.cuda.device_count() if num_gpus > 0 and is_parallel: diff --git a/tests/data/test_concatenated_memmap.py b/tests/data/test_concatenated_memmap.py index 0ab7c7fe4..1cc22250d 100644 --- a/tests/data/test_concatenated_memmap.py +++ b/tests/data/test_concatenated_memmap.py @@ -1,3 +1,5 @@ +import pytest + from fast_llm.data.dataset.gpt.config import GPTConcatenatedMemmapConfig from tests.data.common import ( compare_indexed_dataset, @@ -42,10 +44,11 @@ def test_gpt_concatenated_memmap(): # Make sure dataset splitting works and check for unintended changes in behavior. _get_test_dataset_concatenated_memmap() # samples[9:18] - dataset = get_dataset_config( - {"type": "concatenated_memmap", "path": _DATASET_PREFIX_MIX_CONCATENATED_MEMMAP}, - GPTConcatenatedMemmapConfig, - ).build() + with pytest.warns(DeprecationWarning): + dataset = get_dataset_config( + {"type": "concatenated_memmap", "path": _DATASET_PREFIX_MIX_CONCATENATED_MEMMAP}, + GPTConcatenatedMemmapConfig, + ).build() compare_indexed_dataset( dataset, CONCATENATED_MEMMAP_DATASET_LENGTH, @@ -58,16 +61,17 @@ def test_gpt_concatenated_memmap(): def test_gpt_concatenated_memmap_data(): _get_test_dataset_concatenated_memmap() - get_test_data_and_compare_samples( - { - "datasets": { - "Training": { - "type": "concatenated_memmap", - "path": _DATASET_PREFIX_MIX_CONCATENATED_MEMMAP, + with pytest.warns(DeprecationWarning): + get_test_data_and_compare_samples( + { + "datasets": { + "Training": { + "type": "concatenated_memmap", + "path": _DATASET_PREFIX_MIX_CONCATENATED_MEMMAP, + } } - } - }, - 8, - sequence_length=5, - expected_samples=CONCATENATED_MEMMAP_SAMPLES, - ) + }, + 8, + sequence_length=5, + expected_samples=CONCATENATED_MEMMAP_SAMPLES, + ) diff --git a/tests/models/distributed_test_model.py b/tests/models/distributed_test_model.py index 933b215e7..564920bd5 100644 --- a/tests/models/distributed_test_model.py +++ b/tests/models/distributed_test_model.py @@ -27,6 +27,8 @@ def main(args: list[str] | None = None) -> None: group = pool.get_process_group(range(world_size), rank) for name, config in DISTRIBUTED_TESTING_CONFIGS.items(): + if model_testing_config.should_skip(config): + continue if world_size < config.num_gpus: logger.warning(f"{name} {f"SKIPPED (not enough GPUs: {world_size} < {config.num_gpus})"})") continue diff --git a/tests/models/test_checkpoint.py b/tests/models/test_checkpoint.py index ecd23649f..05acf23dc 100644 --- a/tests/models/test_checkpoint.py +++ b/tests/models/test_checkpoint.py @@ -19,7 +19,7 @@ from fast_llm.engine.checkpoint.convert import ConvertConfig from fast_llm.engine.multi_stage.config import FastLLMModelConfig, ShardName from fast_llm.utils import Assert -from tests.utils.compare_tensor_logs import CompareConfig, compare_logged_tensor +from tests.utils.compare_tensor_logs import CompareConfig from tests.utils.distributed_configs import DistributedTestingConfig from tests.utils.model_configs import ModelTestingConfig, ModelTestingGroup from tests.utils.save_load_configs import DISTRIBUTED_SAVE_LOAD_CONFIGS, DistributedSaveLoadConfig @@ -65,12 +65,15 @@ def do_prepare_resume(distributed_testing_config: DistributedTestingConfig): @pytest.mark.model_testing_group(ModelTestingGroup.checkpoint) def test_resume(run_test_script_for_all_models, compare_results_for_all_models, prepare_resume): distributed_testing_config = DistributedTestingConfig( - name="resume", compare="checkpoint_and_eval", config_args=_CHECKPOINT_AND_EVAL_ARGS + name="resume", + compare="checkpoint_and_eval", + config_args=_CHECKPOINT_AND_EVAL_ARGS, + compare_config=CompareConfig(sub_configs={(("init", "train_1"), None): CompareConfig(ignore_tensors=True)}), ) prepare_resume(distributed_testing_config) # Resume from iteration=1 and compare outputs with the baseline run. run_test_script_for_all_models(distributed_testing_config) - compare_results_for_all_models(distributed_testing_config, ("train_2",)) + compare_results_for_all_models(distributed_testing_config) @requires_cuda @@ -304,7 +307,6 @@ def test_huggingface_model(model_testing_config, get_convert_path): ) ) errors = [] - compare = CompareConfig() auto_model = ( transformers.AutoModel if model_testing_config.name in ("diffusion_llama", "dream") @@ -320,13 +322,12 @@ def test_huggingface_model(model_testing_config, get_convert_path): print(name) output = model(test_input) # TODO: Make a generic comparison util. - compare_logged_tensor( + CompareConfig().compare_tensors( {"samples": output_ref.logits, "shape": output_ref.logits.shape, "step": 0}, {"samples": output.logits, "shape": output.logits.shape, "step": 0}, errors, name, "logits", - compare, ) if errors: diff --git a/tests/models/test_match_megatron.py b/tests/models/test_match_megatron.py index be5ddb608..30667cd17 100644 --- a/tests/models/test_match_megatron.py +++ b/tests/models/test_match_megatron.py @@ -3,7 +3,7 @@ import pytest from tests.utils.compare_tensor_logs import CompareConfig -from tests.utils.dataset import DATASET_PREFIX, get_test_dataset +from tests.utils.dataset import MODEL_DATASET_PREFIX, get_model_test_dataset from tests.utils.distributed_configs import DistributedTestingConfig from tests.utils.model_configs import ModelTestingGroup from tests.utils.utils import requires_cuda @@ -17,7 +17,7 @@ def test_megatron(run_distributed_script, model_testing_config, run_test_script_ # Prevent Megatron from complaining. env["CUDA_DEVICE_MAX_CONNECTIONS"] = "1" env["NVTE_FLASH_ATTN"] = "0" - get_test_dataset() + get_model_test_dataset() run_distributed_script( [ "Megatron-LM/pretrain_gpt.py", @@ -36,15 +36,15 @@ def test_megatron(run_distributed_script, model_testing_config, run_test_script_ def test_match_megatron(run_test_script_for_all_models, model_testing_config, compare_results_for_all_models): assert model_testing_config.megatron_args is not None - ignore_tensors = [ + ignore_tensors = ( ".self_attn.query_key_value.", ".self_attn.query.", ".self_attn.key_value.", ".mlp.layer_2.weight", ".mlp.experts.", - ] + ) if model_testing_config.name == "mixtral": - ignore_tensors.extend([".mlp.experts.", ".mlp.layer_1.weight"]) + ignore_tensors += (".mlp.experts.", ".mlp.layer_1.weight") distributed_testing_config = DistributedTestingConfig( name="match_megatron", @@ -52,11 +52,11 @@ def test_match_megatron(run_test_script_for_all_models, model_testing_config, co config_args=[ "model.distributed.training_dtype=fp32", "data.datasets={}", - f"data.path={DATASET_PREFIX}", + f"data.path={MODEL_DATASET_PREFIX}", "model.base_model.use_megatron_initialization=True", ], num_gpus=1, - compare_config=CompareConfig(ignore_tensors=ignore_tensors), + compare_config=CompareConfig(sub_configs={(None, ignore_tensors): CompareConfig(ignore_tensors=True)}), ) run_test_script_for_all_models(distributed_testing_config) diff --git a/tests/models/test_model.py b/tests/models/test_model.py index 91670b253..5c4897646 100644 --- a/tests/models/test_model.py +++ b/tests/models/test_model.py @@ -28,10 +28,16 @@ def test_model_simple(run_test_script_for_all_models, run_test_script_base_path) # Parametrize with config name so it shows in test name. @pytest.mark.parametrize("config_name", SINGLE_GPU_TESTING_CONFIGS) def test_and_compare_model( - run_test_script_for_all_models, compare_results_for_all_models, config_name, run_test_script_base_path + run_test_script_for_all_models, + compare_results_for_all_models, + config_name, + run_test_script_base_path, + model_testing_config, ): # We can expect tests to respect the ordering of `SINGLE_GPU_TESTING_CONFIGS`, so compare should have run already. config = SINGLE_GPU_TESTING_CONFIGS[config_name] + if model_testing_config.should_skip(config): + pytest.skip(f"Configuration not supported.") if config.compare is not None: check_subtest_success(run_test_script_base_path / config.compare) # A baseline config (single-gpu, bf16, flash-attn). @@ -40,7 +46,7 @@ def test_and_compare_model( set_subtest_success(run_test_script_base_path / config.name) if config.compare is not None: - compare_results_for_all_models(config, ("init", "train_1", "train_2")) + compare_results_for_all_models(config) @requires_cuda @@ -73,12 +79,15 @@ def test_model_distributed( config_name, run_test_script_base_path, report_subtest, + model_testing_config, ): config = DISTRIBUTED_TESTING_CONFIGS[config_name] + if model_testing_config.should_skip(config): + pytest.skip(f"Configuration not supported.") if torch.cuda.device_count() < config.num_gpus: pytest.skip(f"Not enough GPUs: {torch.cuda.device_count()} < {config.num_gpus}") report_subtest(run_test_script_base_path / config.name, config.num_gpus) if config.compare is not None: if not check_subtest_success(run_test_script_base_path / config.compare): pytest.fail(f"Test {config.compare} failed", pytrace=False) - compare_results_for_all_models(config, ("init", "train_1", "train_2")) + compare_results_for_all_models(config) diff --git a/tests/utils/compare_tensor_logs.py b/tests/utils/compare_tensor_logs.py index f22859dfd..51ee66d31 100644 --- a/tests/utils/compare_tensor_logs.py +++ b/tests/utils/compare_tensor_logs.py @@ -2,164 +2,186 @@ import dataclasses import pathlib import typing -import warnings import torch _TENSOR_LOG_PREFIX = "tensor_logs_" +def _compare_pattern(pattern: typing.Iterable[str] | str | None, name: str): + # TODO: Regex? + return ( + True + if pattern is None + else pattern in name if isinstance(pattern, str) else any(pattern_ in name for pattern_ in pattern) + ) + + @dataclasses.dataclass() class CompareConfig: - rms_eps: float = 1e-3 - rms_rel_tolerance: float = 3e-2 - rms_abs_tolerance: float = 5e-3 - max_rel_tolerance: float = 1.5e-1 - max_abs_tolerance: float = 5e-2 + rms_eps: float = 1e-4 + rms_rel_tolerance: float = 3e-3 + rms_abs_tolerance: float = 5e-4 + max_rel_tolerance: float = 1.5e-2 + max_abs_tolerance: float = 5e-3 + # Test tensors are scaled by this amount (ex. gradient scaling). Unscale (divide) them before comparison. + scale: float = 1.0 show_samples: int = 10 - ignore_tensors: list[str] = dataclasses.field(default_factory=list) - ignore_duplicates: list[str] = dataclasses.field(default_factory=list) - - -def extract_tensor_logs( - artifact_path: pathlib.Path, errors, config: CompareConfig, artifacts: typing.Sequence[str] | None = None -): - tensor_logs = {} - ignore_keys = set() - for rank_path in sorted(artifact_path.iterdir()): - for p in rank_path.iterdir(): - if p.name.startswith(_TENSOR_LOG_PREFIX) and p.suffix == ".pt": - step_name = p.stem[len(_TENSOR_LOG_PREFIX) :] - if artifacts is not None and step_name not in artifacts: - continue - step_logs = torch.load(p) - if step_name not in tensor_logs: - tensor_logs[step_name] = {} - for step_log in step_logs: - name = step_log["name"] - if any(ignore_name in name for ignore_name in config.ignore_tensors): - ignore_keys.add(name) - else: - if name in tensor_logs[step_name] and not any( - ignore_name in name for ignore_name in config.ignore_duplicates - ): - errors.append(f"Duplicate tensor log in step {step_name}: {name}") - tensor_logs[step_name][name] = step_log - if ignore_keys: - warnings.warn(f"Ignoring keys in {artifact_path}: {ignore_keys}") - return tensor_logs - - -def compare_dict_keys(dict_ref, dict_test, errors, name): - keys_ref = set(dict_ref) - keys_test = set(dict_test) - if keys_ref != keys_test: - errors.append(f">>>> {name} do not match. Missing = {keys_ref-keys_test}, extra = {keys_test-keys_ref}.") - - # Avoid set to preserve ordering. - return [key for key in dict_test if key in dict_ref] - - -def compare_logged_tensor(tensor_ref, tensor_test, errors, step, name, config: CompareConfig): - if tensor_ref["shape"] != tensor_test["shape"]: - errors.append( - "\n".join( - [f">>>> [{step}] Incompatible shape for tensor {name}: {tensor_test['shape']}!={tensor_ref['shape']}"] - ) + ignore_tensors: bool = False + ignore_duplicates: bool = False + # Use a different config for specific step and/or tensor names. First match is used. + sub_configs: dict[tuple[typing.Iterable[str] | str | None, typing.Iterable[str] | str | None], "CompareConfig"] = ( + dataclasses.field(default_factory=dict) + ) + + def rescale(self, factor: float) -> typing.Self: + # Scale all tolerances by this factor. + if factor == 1.0: + return self + return dataclasses.replace( + self, + rms_eps=self.rms_eps * factor, + rms_rel_tolerance=self.rms_rel_tolerance * factor, + rms_abs_tolerance=self.rms_abs_tolerance * factor, + max_rel_tolerance=self.max_rel_tolerance * factor, + max_abs_tolerance=self.max_abs_tolerance * factor, + sub_configs={key: sub_config.rescale(factor) for key, sub_config in self.sub_configs.items()}, ) - return - if tensor_ref["step"] != tensor_test["step"]: - errors.append( - "\n".join( - [ - f">>>> [{step}] Incompatible sampling rate for tensor {name}: {tensor_test['step']}!={tensor_ref['step']}" - ] - ) - ) - return - samples_ref = tensor_ref["samples"].flatten().float() - samples_test = tensor_test["samples"].flatten().float() - scale_unreg = (samples_ref**2).mean() ** 0.5 - rms_scale = (scale_unreg**2 + config.rms_eps**2) ** 0.5 - rms = ((samples_ref - samples_test) ** 2).mean() ** 0.5 - max_diff = (samples_ref - samples_test).abs().max() - - tensor_errors = [] + def _get_sub_config(self, step_name: str, tensor_name: str) -> typing.Self: + for (step_key, name_key), sub_config in self.sub_configs.items(): + if _compare_pattern(step_key, step_name) and _compare_pattern(name_key, tensor_name): + return sub_config._get_sub_config(step_name, tensor_name) + return self + + def _extract_tensor_logs(self, artifact_path: pathlib.Path, errors): + tensor_logs = {} + for rank_path in sorted(artifact_path.iterdir()): + for p in rank_path.iterdir(): + if p.name.startswith(_TENSOR_LOG_PREFIX) and p.suffix == ".pt": + step_name = p.stem[len(_TENSOR_LOG_PREFIX) :] + for step_log in torch.load(p): + tensor_name = step_log["name"] + sub_config = self._get_sub_config(step_name, tensor_name) + if not sub_config.ignore_tensors: + if step_name not in tensor_logs: + tensor_logs[step_name] = {} + if ( + tensor_name in (tensor_step_logs := tensor_logs[step_name]) + and not sub_config.ignore_duplicates + ): + errors.append(f"Duplicate tensor log in step {step_name}: {tensor_name}") + tensor_step_logs[tensor_name] = step_log + return tensor_logs + + def _compare_dict_keys(self, dict_ref, dict_test, errors, name): + keys_ref = set(dict_ref) + keys_test = set(dict_test) + if keys_ref != keys_test: + errors.append( + f">>>> {name} do not match. Missing = {keys_ref - keys_test}, extra = {keys_test - keys_ref}." + ) - if rms > config.rms_abs_tolerance: - tensor_errors.append(f" * RMS diff absolute = {rms} > {config.rms_abs_tolerance}") + # Avoid set to preserve ordering. + return [key for key in dict_test if key in dict_ref] + + def compare_tensors(self, tensor_ref, tensor_test, errors, step_name, tensor_name): + sub_config = self._get_sub_config(step_name, tensor_name) + if tensor_ref["shape"] != tensor_test["shape"]: + errors.append( + "\n".join( + [ + f">>>> [{step_name}] Incompatible shape for tensor {tensor_name}: {tensor_test['shape']}!={tensor_ref['shape']}" + ] + ) + ) + return + if tensor_ref["step"] != tensor_test["step"]: + errors.append( + "\n".join( + [ + f">>>> [{step_name}] Incompatible sampling rate for tensor {tensor_name}: {tensor_test['step']}!={tensor_ref['step']}" + ] + ) + ) + return - if rms / rms_scale > config.rms_rel_tolerance: - tensor_errors.append( - f" * RMS diff scaled = {rms/rms_scale} > {config.rms_rel_tolerance} (scale={rms_scale}, unregularized={scale_unreg})" - ) + samples_ref = tensor_ref["samples"].flatten().float() + samples_test = tensor_test["samples"].flatten().float() + if sub_config.scale != 1.0: + samples_test = samples_test / sub_config.scale + scale_unreg = (samples_ref**2).mean() ** 0.5 + rms_scale = (scale_unreg**2 + sub_config.rms_eps**2) ** 0.5 + rms = ((samples_ref - samples_test) ** 2).mean() ** 0.5 + max_diff = (samples_ref - samples_test).abs().max() - if max_diff > config.max_abs_tolerance: - tensor_errors.append(f" * Max diff absolute = {max_diff} > {config.max_abs_tolerance}") + tensor_errors = [] - if max_diff / rms_scale > config.max_rel_tolerance: - tensor_errors.append( - f" * Max diff scaled = {max_diff/rms_scale} > {config.max_rel_tolerance} (scale={rms_scale}, unregularized={scale_unreg})" - ) + if rms > sub_config.rms_abs_tolerance: + tensor_errors.append(f" * RMS diff absolute = {rms} > {sub_config.rms_abs_tolerance}") - if tensor_errors: - tensor_errors.extend( - [ - f" Test samples: " + "".join(f"{x:12.4e}" for x in samples_test[: config.show_samples].tolist()), - f" Ref samples: " + "".join(f"{x:12.4e}" for x in samples_ref[: config.show_samples].tolist()), - ] - ) - errors.append("\n".join([f">>>> [{step}] Excessive diff for tensor {name}:"] + tensor_errors)) - - -def compare_tensor_logs_base( - artifact_path_ref: pathlib.Path, - artifact_path_test: pathlib.Path, - config: CompareConfig | None = None, - artifacts: typing.Sequence[str] | None = None, -): - errors = [] - - if config is None: - config = CompareConfig() - - logs_ref = extract_tensor_logs(artifact_path_ref, errors, config=config, artifacts=artifacts) - logs_test = extract_tensor_logs(artifact_path_test, errors, config=config, artifacts=artifacts) - - for step_key in sorted(compare_dict_keys(logs_ref, logs_test, errors, "Logged steps")): - step_logs_ref = logs_ref[step_key] - step_logs_test = logs_test[step_key] - - for tensor_key in compare_dict_keys( - step_logs_ref, step_logs_test, errors=errors, name=f"[{step_key}] Tensor keys" - ): - compare_logged_tensor( - step_logs_ref[tensor_key], - step_logs_test[tensor_key], - errors, - step_key, - tensor_key, - config, + if rms / rms_scale > sub_config.rms_rel_tolerance: + tensor_errors.append( + f" * RMS diff scaled = {rms / rms_scale} > {sub_config.rms_rel_tolerance} (scale={rms_scale}, unregularized={scale_unreg})" ) - return errors + if max_diff > sub_config.max_abs_tolerance: + tensor_errors.append(f" * Max diff absolute = {max_diff} > {sub_config.max_abs_tolerance}") + if max_diff / rms_scale > sub_config.max_rel_tolerance: + tensor_errors.append( + f" * Max diff scaled = {max_diff / rms_scale} > {sub_config.max_rel_tolerance} (scale={rms_scale}, unregularized={scale_unreg})" + ) -def compare_tensor_logs( - artifact_path_ref: pathlib.Path, - artifact_path_test: pathlib.Path, - config: CompareConfig | None = None, - artifacts: typing.Sequence[str] | None = None, -): - print(f'Comparing tensor logs in "{artifact_path_test}" with reference logs "{artifact_path_ref}"') - errors = compare_tensor_logs_base(artifact_path_ref, artifact_path_test, config, artifacts) - if errors: - for error in errors: - print(error) - raise ValueError(f"Comparison failed ({len(errors)} errors)") - else: - print("Comparison succeeded!") + if tensor_errors: + tensor_errors.extend( + [ + f" Test samples: " + "".join(f"{x:12.4e}" for x in samples_test[: self.show_samples].tolist()), + f" Ref samples: " + "".join(f"{x:12.4e}" for x in samples_ref[: self.show_samples].tolist()), + ] + ) + errors.append("\n".join([f">>>> [{step_name}] Excessive diff for tensor {tensor_name}:"] + tensor_errors)) + + def _compare_tensor_logs( + self, + artifact_path_ref: pathlib.Path, + artifact_path_test: pathlib.Path, + ): + errors = [] + + logs_ref = self._extract_tensor_logs(artifact_path_ref, errors) + logs_test = self._extract_tensor_logs(artifact_path_test, errors) + + for step_key in sorted(self._compare_dict_keys(logs_ref, logs_test, errors, "Logged steps")): + step_logs_ref = logs_ref[step_key] + step_logs_test = logs_test[step_key] + + for tensor_key in self._compare_dict_keys( + step_logs_ref, step_logs_test, errors=errors, name=f"[{step_key}] Tensor keys" + ): + self.compare_tensors( + step_logs_ref[tensor_key], + step_logs_test[tensor_key], + errors, + step_key, + tensor_key, + ) + + return errors + + def compare_tensor_logs( + self, + artifact_path_ref: pathlib.Path, + artifact_path_test: pathlib.Path, + ): + print(f'Comparing tensor logs in "{artifact_path_test}" with reference logs "{artifact_path_ref}"') + errors = self._compare_tensor_logs(artifact_path_ref, artifact_path_test) + if errors: + for error in errors: + print(error) + raise ValueError(f"Comparison failed ({len(errors)} errors)") + else: + print("Comparison succeeded!") if __name__ == "__main__": @@ -167,4 +189,4 @@ def compare_tensor_logs( parser.add_argument("path_ref", type=pathlib.Path) parser.add_argument("path_test", type=pathlib.Path) args = parser.parse_args() - compare_tensor_logs(args.path_ref, args.path_test) + CompareConfig().compare_tensor_logs(args.path_ref, args.path_test) diff --git a/tests/utils/dataset.py b/tests/utils/dataset.py index 2a12c4f7d..a4136c40e 100644 --- a/tests/utils/dataset.py +++ b/tests/utils/dataset.py @@ -7,19 +7,22 @@ from fast_llm.data.dataset.gpt.memmap import GPTMemmapDataset from fast_llm.data.dataset.gpt.sampled import GPTSample -from tests.utils.utils import TEST_RESULTS_PATH +from tests.utils.utils import SHARED_RESULT_PATH, TEST_RESULTS_PATH # TODO: Fixtures -TOKENIZER_PATH = TEST_RESULTS_PATH / "tokenizer" / "common" +TOKENIZER_PATH = SHARED_RESULT_PATH / "tokenizer" TOKENIZER_FILE = TOKENIZER_PATH / "tokenizer.json" -DATASET_CACHE = TEST_RESULTS_PATH / "dataset" -DATASET_PREFIX = DATASET_CACHE / "common" / "dataset" -DATASET_SAMPLING_CACHE = TEST_RESULTS_PATH / "dataset" / "cache" +DATASET_CACHE = SHARED_RESULT_PATH / "dataset" +DATASET_PREFIX = DATASET_CACHE / "common_dataset" +DATASET_SAMPLING_CACHE = TEST_RESULTS_PATH / "dataset_sampling_cache" TEST_VOCAB_SIZE = 8192 # Random lowercase: 80.7% (3.1% each); space: 18.6%; doc end: 0.6% TEST_CHARACTERS = (string.ascii_lowercase) * 5 + " " * 30 + "\n" TEST_DATASET_TOKENS = 1000000 +MODEL_DATASET_PREFIX = DATASET_CACHE / "model_dataset" +MODEL_TEST_VOCAB_SIZE = 384 + def get_test_dataset( prefix: pathlib.Path = DATASET_PREFIX, @@ -60,6 +63,13 @@ def get_test_dataset( ) +def get_model_test_dataset( + prefix: pathlib.Path = MODEL_DATASET_PREFIX, + vocab_size: int = MODEL_TEST_VOCAB_SIZE, +): + return get_test_dataset(prefix=prefix, vocab_size=vocab_size) + + def get_test_concatenated_memmap_dataset( path: pathlib.Path, num_files: int, diff --git a/tests/utils/distributed_configs.py b/tests/utils/distributed_configs.py index c38939eae..c3064d987 100644 --- a/tests/utils/distributed_configs.py +++ b/tests/utils/distributed_configs.py @@ -1,3 +1,4 @@ +import copy import dataclasses import logging @@ -13,6 +14,65 @@ class DistributedTestingConfig: config_args: list[str] num_gpus: int = 1 compare_config: CompareConfig | None = None + # Scale the comparison thresholds for specific distributed configs. + compare_factor: float = 1.0 + + +def get_config(relative: float = 0, absolute: float = 0, **kwargs) -> CompareConfig: + return CompareConfig( + rms_rel_tolerance=relative, + max_rel_tolerance=relative * 10, + rms_abs_tolerance=absolute, + max_abs_tolerance=absolute * 10, + rms_eps=absolute / 10, + **kwargs, + ) + + +# TODO: Ajust +_compare_layer_match = get_config( + sub_configs={ + ("init", None): get_config(), + (None, "fw"): get_config(1e-3, 1e-4), + (None, "bw"): get_config(3e-3, 1e-5), + # Biases have higher absolute error. + (None, "bias"): get_config(3e-3, 5e-5), + (None, "gradient"): get_config(3e-3, 3e-5), + } +) + +_compare_layer_mismatch = copy.deepcopy(_compare_layer_match) +_pp_tied_weight_compare = copy.deepcopy(_compare_layer_match) +_z3_accumulation_compare = copy.deepcopy(_compare_layer_match) +_z3_accumulation_compare.sub_configs[(None, "bias")].ignore_duplicates = True +_z3_accumulation_compare.sub_configs[(None, "gradient")].ignore_duplicates = True +_pp_tied_weight_compare.sub_configs[(None, "gradient")].ignore_duplicates = True +_pp_tied_weight_compare.sub_configs[("init", None)].ignore_duplicates = True +for tensor in ("fw", "bw"): + _compare_layer_mismatch.sub_configs[(None, tensor)].ignore_tensors = True + _pp_tied_weight_compare.sub_configs[(None, tensor)].ignore_duplicates = True + + +_bf16_compare = get_config( + sub_configs={ + ("init", None): get_config(), + (None, "fw"): get_config(1e-2, 1e-3), + (None, "bw"): get_config(1.5e-2, 1e-5), + (None, "bias"): get_config(2e-2, 1e-3), + (None, "gradient"): get_config(2e-2, 5e-5), + } +) + +_fp16_compare = get_config( + sub_configs={ + ("init", None): get_config(), + # Saved gradient include the gradient scaling by 2**16 (default initial value) + (None, "fw"): get_config(1e-3, 3e-4), + (None, "bw"): get_config(3e-3, 1e-5, scale=2**16), + (None, "bias"): get_config(3e-3, 1e-4, scale=2**16), + (None, "gradient"): get_config(3e-3, 5e-5, scale=2**16), + } +) # Baseline (also tests data-parallel workers) @@ -24,33 +84,51 @@ class DistributedTestingConfig: ) _SINGLE_GPU_TESTING_CONFIGS = [ + DistributedTestingConfig( + name="bf16", + compare="simple", + config_args=["model.distributed.training_dtype=bf16"], + num_gpus=1, + compare_config=_bf16_compare, + ), + DistributedTestingConfig( + name="fp16", + compare="simple", + config_args=["model.distributed.training_dtype=fp16"], + num_gpus=1, + compare_config=_fp16_compare, + ), # Sequence-first baseline DistributedTestingConfig( name="sf", - compare=None, + compare="simple", config_args=["model.base_model.sequence_first=True"], num_gpus=1, + compare_config=_compare_layer_mismatch, ), # Cross-entropy splits. DistributedTestingConfig( name="ce4", - compare=None, + compare="simple", config_args=["model.base_model.cross_entropy_splits=4"], num_gpus=1, + compare_config=_compare_layer_mismatch, ), # Micro-sequence baseline DistributedTestingConfig( name="ms", - compare=None, + compare="simple", config_args=["batch.micro_sequence_length=256"], num_gpus=1, + compare_config=_compare_layer_mismatch, ), # Gradient accumulation baseline. DistributedTestingConfig( name="df4", - compare=None, + compare="simple", config_args=["batch.depth_first_micro_batches=4"], num_gpus=1, + compare_config=_compare_layer_mismatch, ), # Breadth-first gradient accumulation. DistributedTestingConfig( @@ -58,6 +136,7 @@ class DistributedTestingConfig: compare="df4", config_args=["batch.breadth_first_micro_batches=4"], num_gpus=1, + compare_config=_compare_layer_match, ), # Mixed gradient accumulation. DistributedTestingConfig( @@ -65,13 +144,15 @@ class DistributedTestingConfig: compare="df4", config_args=["batch.depth_first_micro_batches=2", "batch.breadth_first_micro_batches=2"], num_gpus=1, + compare_config=_compare_layer_match, ), # Sequence-first gradient accumulation baseline. DistributedTestingConfig( name="df4_sf", - compare=None, + compare="simple", config_args=["batch.depth_first_micro_batches=4", "model.base_model.sequence_first=True"], num_gpus=1, + compare_config=_compare_layer_mismatch, ), ] @@ -86,6 +167,7 @@ class DistributedTestingConfig: compare="simple", config_args=[], num_gpus=2, + compare_config=_compare_layer_match, ), # Zero stage 2 DistributedTestingConfig( @@ -93,6 +175,7 @@ class DistributedTestingConfig: compare="simple", config_args=["model.multi_stage.zero_stage=2"], num_gpus=2, + compare_config=_compare_layer_match, ), # Zero stage 3 DistributedTestingConfig( @@ -100,6 +183,7 @@ class DistributedTestingConfig: compare="simple", config_args=["model.multi_stage.zero_stage=3"], num_gpus=2, + compare_config=_compare_layer_match, ), # Depth-first micro-batches DistributedTestingConfig( @@ -107,11 +191,7 @@ class DistributedTestingConfig: compare="df4", config_args=["model.multi_stage.zero_stage=3", "batch.depth_first_micro_batches=4"], num_gpus=2, - compare_config=CompareConfig( - ignore_duplicates=[ - "Global gradient", - ] - ), + compare_config=_z3_accumulation_compare, ), # Sequence-data-parallel DistributedTestingConfig( @@ -119,6 +199,7 @@ class DistributedTestingConfig: compare="sf", config_args=["model.distributed.sequence_data_parallel=2"], num_gpus=2, + compare_config=_compare_layer_match, ), # ===== Tensor-parallel configs # Simple tensor-parallel @@ -127,13 +208,19 @@ class DistributedTestingConfig: compare="simple", config_args=["model.distributed.tensor_parallel=2"], num_gpus=2, + compare_config=_compare_layer_match, ), # Simple sequence-tensor-parallel DistributedTestingConfig( name="stp2", compare="sf", - config_args=["model.distributed.tensor_parallel=2", "model.distributed.sequence_tensor_parallel=True"], + config_args=[ + "model.distributed.tensor_parallel=2", + "model.distributed.sequence_tensor_parallel=True", + "model.base_model.transformer.dropless_moe=False", + ], num_gpus=2, + compare_config=_compare_layer_match, ), # Cross-entropy splits DistributedTestingConfig( @@ -142,10 +229,12 @@ class DistributedTestingConfig: config_args=[ "model.distributed.tensor_parallel=2", "model.distributed.sequence_tensor_parallel=True", + "model.base_model.transformer.dropless_moe=False", "model.base_model.parallel_embeddings=False", "model.base_model.cross_entropy_splits=4", ], num_gpus=2, + compare_config=_compare_layer_match, ), # ===== 2d configs (Data + Tensor) # Simple @@ -155,8 +244,10 @@ class DistributedTestingConfig: config_args=[ "model.distributed.tensor_parallel=2", "model.distributed.sequence_tensor_parallel=True", + "model.base_model.transformer.dropless_moe=False", ], num_gpus=4, + compare_config=_compare_layer_match, ), # Depth-first micro-batches, tensor-parallel DistributedTestingConfig( @@ -167,6 +258,7 @@ class DistributedTestingConfig: "batch.depth_first_micro_batches=4", ], num_gpus=4, + compare_config=_compare_layer_match, ), # Breadth-first micro-batches DistributedTestingConfig( @@ -176,9 +268,11 @@ class DistributedTestingConfig: "model.distributed.sequence_data_parallel=2", "model.distributed.tensor_parallel=2", "model.distributed.sequence_tensor_parallel=True", + "model.base_model.transformer.dropless_moe=False", "batch.breadth_first_micro_batches=4", ], num_gpus=4, + compare_config=_compare_layer_match, ), # Sequence-data-parallel DistributedTestingConfig( @@ -188,8 +282,10 @@ class DistributedTestingConfig: "model.distributed.sequence_data_parallel=2", "model.distributed.tensor_parallel=2", "model.distributed.sequence_tensor_parallel=True", + "model.base_model.transformer.dropless_moe=False", ], num_gpus=4, + compare_config=_compare_layer_match, ), # ===== Pipeline-parallel configs # Simple [mb] @@ -202,6 +298,7 @@ class DistributedTestingConfig: "batch.breadth_first_micro_batches=4", ], num_gpus=2, + compare_config=_compare_layer_match, ), # Tied weights on different ranks DistributedTestingConfig( @@ -213,12 +310,7 @@ class DistributedTestingConfig: "batch.breadth_first_micro_batches=4", ], num_gpus=2, - compare_config=CompareConfig( - ignore_duplicates=[ - "layers.0.word_embeddings_weight", - "layers.0.position_embeddings_weight", - ] - ), + compare_config=_pp_tied_weight_compare, ), # Micro-sequence [ms] DistributedTestingConfig( @@ -230,6 +322,7 @@ class DistributedTestingConfig: "batch.micro_sequence_length=256", ], num_gpus=2, + compare_config=_compare_layer_match, ), # ===== 2d configs (Data + Pipeline) # Simple @@ -242,6 +335,7 @@ class DistributedTestingConfig: "batch.breadth_first_micro_batches=4", ], num_gpus=4, + compare_config=_compare_layer_match, ), # ===== 2d configs (Tensor + Pipeline) # Simple [sf, mb] @@ -251,17 +345,13 @@ class DistributedTestingConfig: config_args=[ "model.distributed.tensor_parallel=2", "model.distributed.sequence_tensor_parallel=True", + "model.base_model.transformer.dropless_moe=False", "model.distributed.pipeline_parallel=2", "model.multi_stage.layers_per_stage=2", "batch.breadth_first_micro_batches=4", ], num_gpus=4, - compare_config=CompareConfig( - ignore_duplicates=[ - "layers.0.word_embeddings_weight", - "layers.0.position_embeddings_weight", - ] - ), + compare_config=_pp_tied_weight_compare, ), # ===== Data + Tensor + Pipeline # Simple @@ -270,11 +360,14 @@ class DistributedTestingConfig: compare="mb", config_args=[ "model.distributed.tensor_parallel=2", + "model.distributed.sequence_tensor_parallel=True", + "model.base_model.transformer.dropless_moe=False", "model.distributed.pipeline_parallel=2", "model.multi_stage.layers_per_stage=2", "batch.breadth_first_micro_batches=4", ], num_gpus=8, + compare_config=_compare_layer_match, ), # Tied weights on different ranks DistributedTestingConfig( @@ -288,12 +381,7 @@ class DistributedTestingConfig: "batch.breadth_first_micro_batches=4", ], num_gpus=8, - compare_config=CompareConfig( - ignore_duplicates=[ - "layers.0.word_embeddings_weight", - "layers.0.position_embeddings_weight", - ] - ), + compare_config=_pp_tied_weight_compare, ), # Micro-sequence DistributedTestingConfig( @@ -303,11 +391,13 @@ class DistributedTestingConfig: "model.distributed.sequence_data_parallel=2", "model.distributed.tensor_parallel=2", "model.distributed.sequence_tensor_parallel=True", + "model.base_model.transformer.dropless_moe=False", "model.distributed.pipeline_parallel=2", "model.multi_stage.layers_per_stage=2", "batch.micro_sequence_length=256", ], num_gpus=8, + compare_config=_compare_layer_match, ), ] diff --git a/tests/utils/model_configs.py b/tests/utils/model_configs.py index 199d5b72c..f1890aff8 100644 --- a/tests/utils/model_configs.py +++ b/tests/utils/model_configs.py @@ -20,7 +20,8 @@ Starcoder2GPTHuggingfaceCheckpointFormat, ) from fast_llm.models.ssm.config import LLambaHuggingfaceCheckpointFormat -from tests.utils.dataset import DATASET_PREFIX, TEST_VOCAB_SIZE +from tests.utils.dataset import MODEL_DATASET_PREFIX, MODEL_TEST_VOCAB_SIZE +from tests.utils.distributed_configs import DistributedTestingConfig _LOG_LEVEL = int(os.environ.get("LOG_LEVEL", 13)) @@ -55,6 +56,10 @@ class ModelTestingConfig: megatron_args: list[str] | None checkpoint_format: type[CheckpointFormat] | None groups: dict[ModelTestingGroup, ModelTestingGroupAction] + # Scale the comparison thresholds for specific models. + compare_factor: float = 1.0 + # Option to skip specific distributed configuration with name containing any of the provided strings. + skip_tests: tuple[str] = () @functools.cached_property def trainer_config_class(self) -> type[TrainerConfig]: @@ -86,6 +91,9 @@ def model_class(self): def base_model_config_class(self): return self.model_config_class.get_base_model_config_class() + def should_skip(self, distributed_config: DistributedTestingConfig) -> bool: + return any(key in distributed_config.name for key in self.skip_tests) + def _update_and_add_testing_config( old_name: str, @@ -94,8 +102,8 @@ def _update_and_add_testing_config( model_type: str | None = None, extra_args: list[str] | None = None, megatron_args: list[str] | None = ..., - checkpoint_format: CheckpointFormat | None = ..., groups: dict[ModelTestingGroup, ModelTestingGroupAction], + **kwargs, ): config = MODEL_CONFIGS[old_name] updates: dict[str, typing.Any] = { @@ -113,8 +121,7 @@ def _update_and_add_testing_config( updates["megatron_args"] = megatron_args else: updates["megatron_args"] = config.megatron_args + megatron_args - if checkpoint_format is not ...: - updates["checkpoint_format"] = checkpoint_format + updates.update(kwargs) MODEL_CONFIGS[new_name] = dataclasses.replace(config, **updates) @@ -136,7 +143,7 @@ def _update_and_add_testing_config( "model.base_model.transformer.num_attention_heads=8", "model.base_model.transformer.head_groups=8", "model.base_model.transformer.init_method_std=0.022", - f"model.base_model.vocab_size={TEST_VOCAB_SIZE}", + f"model.base_model.vocab_size={MODEL_TEST_VOCAB_SIZE}", f"model.multi_stage.debug_param_init={_LOG_LEVEL}", f"model.multi_stage.debug_layer_outputs={_LOG_LEVEL}", f"model.multi_stage.debug_layer_gradients={_LOG_LEVEL}", @@ -144,7 +151,6 @@ def _update_and_add_testing_config( "model.multi_stage.debug_tensor_parallel=True", "model.distributed.reproducible_init=True", "model.distributed.timeout=20", - "model.distributed.training_dtype=bf16", "training.train_iters=2", "training.num_workers=0", "training.timeout=30", @@ -153,17 +159,17 @@ def _update_and_add_testing_config( "data.datasets.training.type=slice", "data.datasets.training.end=0.969", "data.datasets.training.dataset.type=memmap", - f"data.datasets.training.dataset.path={DATASET_PREFIX}", + f"data.datasets.training.dataset.path={MODEL_DATASET_PREFIX}", "data.datasets.validation.type=slice", "data.datasets.validation.begin=0.969", "data.datasets.validation.end=0.999", "data.datasets.validation.dataset.type=memmap", - f"data.datasets.validation.dataset.path={DATASET_PREFIX}", + f"data.datasets.validation.dataset.path={MODEL_DATASET_PREFIX}", "data.datasets.test.type=slice", "data.datasets.test.begin=0.999", "data.datasets.test.end=1", "data.datasets.test.dataset.type=memmap", - f"data.datasets.test.dataset.path={DATASET_PREFIX}", + f"data.datasets.test.dataset.path={MODEL_DATASET_PREFIX}", "optimizer.learning_rate.base=0.0001", ], megatron_args=[ @@ -190,8 +196,8 @@ def _update_and_add_testing_config( "--valid-num-workers=0", "--tokenizer-type=NullTokenizer", # Megatron messes with the vocab size, so we have to subtract 1. - f"--vocab-size={TEST_VOCAB_SIZE - 1}", - f"--data-path={DATASET_PREFIX}", + f"--vocab-size={MODEL_TEST_VOCAB_SIZE - 1}", + f"--data-path={MODEL_DATASET_PREFIX}", "--lr-decay-style=constant", # Initialization is set up to match MCore models (MCore inverts self-attn qkv and dense layers compared to original Megatron) "--use-mcore-models", @@ -358,6 +364,7 @@ def _update_and_add_testing_config( ModelTestingGroup.megatron: ModelTestingGroupAction.not_implemented, ModelTestingGroup.distributed: ModelTestingGroupAction.unimportant, }, + compare_factor=2.0, ) _update_and_add_testing_config( @@ -440,6 +447,7 @@ def _update_and_add_testing_config( ModelTestingGroup.megatron: ModelTestingGroupAction.normal, ModelTestingGroup.distributed: ModelTestingGroupAction.normal, }, + compare_factor=2.0, ) _update_and_add_testing_config( @@ -467,6 +475,9 @@ def _update_and_add_testing_config( # TODO: Fix and bring back to `testing_groups` ModelTestingGroup.distributed: ModelTestingGroupAction.broken, }, + compare_factor=2.0, + # SSMs don't support sequence-first configurations. + skip_tests=("sf", "sdp", "stp", "ms"), ) diff --git a/tests/utils/run_test_script.py b/tests/utils/run_test_script.py index 602afeb23..b8f996a82 100644 --- a/tests/utils/run_test_script.py +++ b/tests/utils/run_test_script.py @@ -2,6 +2,7 @@ import functools import os import pathlib +import pprint import subprocess import sys import typing @@ -10,8 +11,7 @@ from fast_llm.engine.distributed.config import DistributedConfig from fast_llm.utils import Assert -from tests.utils.compare_tensor_logs import compare_tensor_logs -from tests.utils.dataset import get_test_dataset +from tests.utils.dataset import get_model_test_dataset from tests.utils.distributed_configs import DistributedTestingConfig from tests.utils.model_configs import MODEL_CONFIGS, ModelTestingConfig @@ -71,7 +71,7 @@ def do_run_test_script_for_all_models( base_path: pathlib.Path, ): Assert.leq(distributed_testing_config.num_gpus, DistributedConfig.default_world_size) - get_test_dataset() + get_model_test_dataset() args = [ "fast-llm", "train", @@ -112,16 +112,15 @@ def parse_run_distributed_script(args: list[str] | None = None): def compare_results_for_all_models( worker_resources: "WorkerResources", run_test_script_base_path: pathlib.Path, + model_testing_config: ModelTestingConfig, ): - def do_compare_results_for_all_models( - config: DistributedTestingConfig, artifacts: typing.Iterable[str] | None = None - ): + def do_compare_results_for_all_models(config: DistributedTestingConfig): assert config.compare is not None - compare_tensor_logs( + compare_config = config.compare_config.rescale(config.compare_factor * model_testing_config.compare_factor) + pprint.pprint(compare_config) + compare_config.compare_tensor_logs( run_test_script_base_path / config.compare / ARTIFACT_PATH, run_test_script_base_path / config.name / ARTIFACT_PATH, - config.compare_config, - artifacts, ) return do_compare_results_for_all_models diff --git a/tests/utils/utils.py b/tests/utils/utils.py index 54efe0966..25d5221d8 100644 --- a/tests/utils/utils.py +++ b/tests/utils/utils.py @@ -1,6 +1,7 @@ import json import logging import math +import os import pathlib import sys import time @@ -23,9 +24,17 @@ requires_cuda = pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA is not available") - +# Directory for all test data and results. +# Cannot be a fixture because it's used outside testing environment (ex. distributed scripts). TEST_RESULTS_PATH = pathlib.Path("/tmp/fast_llm_tests") +# Directory for data that is shared between independent tests and may not be parallel-safe, +# ex. generated dataset and downloaded files. +if worker_name := os.environ.get("PYTEST_XDIST_WORKER"): + SHARED_RESULT_PATH = TEST_RESULTS_PATH / f"common_{worker_name}" +else: + SHARED_RESULT_PATH = TEST_RESULTS_PATH / "common" + @pytest.fixture(scope="session") def result_path():