diff --git a/neuraxle/base.py b/neuraxle/base.py index 5fb82fc8..5ea7870f 100644 --- a/neuraxle/base.py +++ b/neuraxle/base.py @@ -27,7 +27,9 @@ from abc import ABC, abstractmethod from collections import OrderedDict from copy import copy -from typing import Tuple, List, Union, Any +from typing import Tuple, List, Union, Any, Iterable + +from conv import convolved_1d from neuraxle.hyperparams.space import HyperparameterSpace, HyperparameterSamples @@ -83,6 +85,19 @@ def set_expected_outputs(self, expected_outputs: Any): def set_current_ids(self, current_ids: Any): self.current_ids = current_ids + def convolved_1d(self, stride, kernel_size) -> Iterable: + conv_current_ids = convolved_1d(stride=stride, iterable=self.current_ids, kernel_size=kernel_size) + conv_data_inputs = convolved_1d(stride=stride, iterable=self.data_inputs, kernel_size=kernel_size) + conv_expected_outputs = convolved_1d(stride=stride, iterable=self.expected_outputs, kernel_size=kernel_size) + + for current_ids, data_inputs, expected_outputs in zip(conv_current_ids, conv_data_inputs, + conv_expected_outputs): + yield DataContainer( + current_ids=current_ids, + data_inputs=data_inputs, + expected_outputs=expected_outputs + ) + def __iter__(self): current_ids = self.current_ids if self.current_ids is None: @@ -114,6 +129,11 @@ def append(self, current_id, data_input, expected_output): self.data_inputs.append(data_input) self.expected_outputs.append(expected_output) + def concat(self, data_container: DataContainer): + self.current_ids.extend(data_container.current_ids) + self.data_inputs.extend(data_container.data_inputs) + self.expected_outputs.extend(data_container.expected_outputs) + class BaseStep(ABC): def __init__( @@ -463,8 +483,8 @@ class MetaStepMixin: # TODO: remove equal None, and fix random search at the same time ? def __init__( - self, - wrapped: BaseStep = None + self, + wrapped: BaseStep = None ): self.wrapped: BaseStep = wrapped @@ -556,9 +576,13 @@ def get_best_model(self) -> BaseStep: class NonFittableMixin: - """A pipeline step that requires no fitting: fitting just returns self when called to do no action. + """ + A pipeline step that requires no fitting: fitting just returns self when called to do no action. + Note: fit methods are not implemented + """ - Note: fit methods are not implemented""" + def handle_fit_transform(self, data_container: DataContainer): + return self, self.handle_transform(data_container) def fit(self, data_inputs, expected_outputs=None) -> 'NonFittableMixin': """ @@ -819,6 +843,9 @@ def _step_name_to_index(self, step_name): return index def _step_index_to_name(self, step_index): + if step_index == len(self.items()): + return None + name, _ = self.steps_as_tuple[step_index] return name @@ -861,12 +888,17 @@ def __getitem__(self, key): raise KeyError( "Start or stop ('{}' or '{}') not found in '{}'.".format(start, stop, self.steps.keys())) for key, val in self.steps_as_tuple: + if start == stop == key: + new_steps_as_tuple.append((key, val)) + if stop == key: break + if not started and start == key: started = True if started: new_steps_as_tuple.append((key, val)) + self_shallow_copy.steps_as_tuple = new_steps_as_tuple self_shallow_copy.steps = OrderedDict(new_steps_as_tuple) return self_shallow_copy @@ -938,6 +970,33 @@ def __len__(self): """ return len(self.steps_as_tuple) + def split(self, step_type: type) -> List['TruncableSteps']: + """ + Split truncable steps by a step class name. + + :param step_type: step class type to split from. + :return: list of truncable steps containing the splitted steps + """ + sub_pipelines = [] + + previous_sub_pipeline_end_index = 0 + for index, (step_name, step) in enumerate(self.items()): + if isinstance(step, step_type): + sub_pipelines.append( + self[previous_sub_pipeline_end_index:index + 1] + ) + previous_sub_pipeline_end_index = index + 1 + + if previous_sub_pipeline_end_index < len(self.items()): + sub_pipelines.append( + self[previous_sub_pipeline_end_index:-1] + ) + + return sub_pipelines + + def ends_with(self, step_type: type): + return isinstance(self[-1], step_type) + class ResumableStepMixin: """ @@ -947,3 +1006,83 @@ class ResumableStepMixin: @abstractmethod def should_resume(self, data_container: DataContainer) -> bool: raise NotImplementedError() + + +class Barrier(NonFittableMixin, NonTransformableMixin, BaseStep, ABC): + """ + A Barrier step to be used in a minibatch sequential pipeline. It forces all the + data inputs to get to the barrier in a sub pipeline before going through to the next sub-pipeline. + + + ``p = MiniBatchSequentialPipeline([ + SomeStep(), + SomeStep(), + Barrier(), # must be a concrete Barrier ex: Joiner() + SomeStep(), + SomeStep(), + Barrier(), # must be a concrete Barrier ex: Joiner() + ], batch_size=10)`` + + """ + + @abstractmethod + def join_transform(self, step: BaseStep, data_container: DataContainer) -> DataContainer: + raise NotImplementedError() + + @abstractmethod + def join_fit_transform(self, step: BaseStep, data_container: DataContainer) -> Tuple['Any', Iterable]: + raise NotImplementedError() + + +class Joiner(Barrier): + """ + A Special Barrier step that joins the transformed mini batches together with list.extend method. + """ + + def __init__(self, batch_size): + super().__init__() + self.batch_size = batch_size + + def join_transform(self, step: BaseStep, data_container: DataContainer) -> Iterable: + """ + Concatenate the pipeline transform output of each batch of self.batch_size together. + + :param step: pipeline to transform on + :param data_container: data container to transform + :return: + """ + data_container_batches = data_container.convolved_1d( + stride=self.batch_size, + kernel_size=self.batch_size + ) + + output_data_container = ListDataContainer.empty() + for data_container_batch in data_container_batches: + output_data_container.concat( + step._transform_core(data_container_batch) + ) + + return output_data_container + + def join_fit_transform(self, step: BaseStep, data_container: DataContainer) -> \ + Tuple['Any', Iterable]: + """ + Concatenate the pipeline fit transform output of each batch of self.batch_size together. + + :param step: pipeline to fit transform on + :param data_container: data container to fit transform on + :return: fitted self, transformed data inputs + """ + data_container_batches = data_container.convolved_1d( + stride=self.batch_size, + kernel_size=self.batch_size + ) + + output_data_container = ListDataContainer.empty() + for data_container_batch in data_container_batches: + step, data_container_batch = step._fit_transform_core(data_container_batch) + output_data_container.concat( + data_container_batch + ) + + return step, output_data_container diff --git a/neuraxle/pipeline.py b/neuraxle/pipeline.py index 3773551f..92592c51 100644 --- a/neuraxle/pipeline.py +++ b/neuraxle/pipeline.py @@ -23,11 +23,12 @@ import os from abc import ABC, abstractmethod from copy import copy -from typing import Any, Tuple +from typing import Any, Tuple, List from joblib import load, dump -from neuraxle.base import BaseStep, TruncableSteps, NamedTupleList, ResumableStepMixin, DataContainer +from neuraxle.base import BaseStep, TruncableSteps, NamedTupleList, ResumableStepMixin, DataContainer, NonFittableMixin, \ + Barrier from neuraxle.checkpoints import BaseCheckpointStep DEFAULT_CACHE_FOLDER = 'cache' @@ -215,7 +216,7 @@ def transform(self, data_inputs: Any): :param data_inputs: the data input to transform :return: transformed data inputs """ - self.setup() # TODO: perhaps, remove this to pass path in context + self.setup() # TODO: perhaps, remove this to pass path in context current_ids = self.hash( current_ids=None, @@ -235,7 +236,7 @@ def fit_transform(self, data_inputs, expected_outputs=None) -> ('Pipeline', Any) :param expected_outputs: the expected data output to fit on :return: the pipeline itself """ - self.setup() # TODO: perhaps, remove this to pass path in context + self.setup() # TODO: perhaps, remove this to pass path in context current_ids = self.hash( current_ids=None, @@ -259,7 +260,7 @@ def fit(self, data_inputs, expected_outputs=None) -> 'Pipeline': :param expected_outputs: the expected data output to fit on :return: the pipeline itself """ - self.setup() # TODO: perhaps, remove this to pass path in context + self.setup() # TODO: perhaps, remove this to pass path in context current_ids = self.hash( current_ids=None, @@ -486,3 +487,116 @@ def should_resume(self, data_container: DataContainer) -> bool: return True return False + + +class MiniBatchSequentialPipeline(NonFittableMixin, Pipeline): + """ + Mini Batch Sequential Pipeline class to create a pipeline processing data inputs in batch. + """ + + def __init__(self, steps: NamedTupleList, batch_size): + Pipeline.__init__(self, steps) + self.batch_size = batch_size + + def transform(self, data_inputs: Any): + """ + :param data_inputs: the data input to transform + :return: transformed data inputs + """ + self.setup() + + current_ids = self._create_current_ids(data_inputs) + data_container = DataContainer(current_ids=current_ids, data_inputs=data_inputs) + data_container = self.handle_transform(data_container) + + return data_container.data_inputs + + def fit_transform(self, data_inputs, expected_outputs=None) -> ('Pipeline', Any): + """ + :param data_inputs: the data input to fit on + :param expected_outputs: the expected data output to fit on + :return: the pipeline itself + """ + self.setup() + + current_ids = self._create_current_ids(data_inputs) + data_container = DataContainer( + current_ids=current_ids, + data_inputs=data_inputs, + expected_outputs=expected_outputs + ) + new_self, data_container = self.handle_fit_transform(data_container) + + return new_self, data_container.data_inputs + + def _create_current_ids(self, data_inputs): + current_ids = self.hash( + current_ids=None, + hyperparameters=self.hyperparams, + data_inputs=data_inputs + ) + return current_ids + + def handle_transform(self, data_container: DataContainer) -> DataContainer: + """ + Transform all sub pipelines splitted by the Barrier steps. + + :param data_container: data container to transform. + :return: data container + """ + sub_pipelines = self._create_sub_pipelines() + + for sub_pipeline in sub_pipelines: + barrier = sub_pipeline[-1] + data_container = barrier.join_transform( + step=sub_pipeline, + data_container=data_container + ) + current_ids = self.hash(data_container.current_ids, self.hyperparams) + data_container.set_current_ids(current_ids) + + return data_container + + def handle_fit_transform(self, data_container: DataContainer) -> \ + Tuple['MiniBatchSequentialPipeline', DataContainer]: + """ + Transform all sub pipelines splitted by the Barrier steps. + + :param data_container: data container to transform. + :return: data container + """ + sub_pipelines = self._create_sub_pipelines() + index_start = 0 + + for sub_pipeline in sub_pipelines: + barrier = sub_pipeline[-1] + sub_pipeline, data_container = barrier.join_fit_transform( + step=sub_pipeline, + data_container=data_container + ) + current_ids = self.hash(data_container.current_ids, self.hyperparams) + data_container.set_current_ids(current_ids) + + new_self = self[:index_start] + sub_pipeline + if index_start + len(sub_pipeline) < len(self): + new_self += self[index_start + len(sub_pipeline):] + + self.steps_as_tuple = new_self.steps_as_tuple + index_start += len(sub_pipeline) + + return self, data_container + + def _create_sub_pipelines(self) -> List['MiniBatchSequentialPipeline']: + """ + Create sub pipelines by splitting the steps by the join type name. + + :return: list of sub pipelines + """ + sub_pipelines: List[MiniBatchSequentialPipeline] = self.split(Barrier) + for sub_pipeline in sub_pipelines: + if not sub_pipeline.ends_with(Barrier): + raise Exception( + 'At least one Barrier step needs to be at the end of a streaming pipeline.' + ) + + return sub_pipelines diff --git a/neuraxle/steps/util.py b/neuraxle/steps/util.py index 437e294c..fe180814 100644 --- a/neuraxle/steps/util.py +++ b/neuraxle/steps/util.py @@ -108,12 +108,13 @@ def fit_one(self, data_input, expected_output=None) -> 'FitCallbackStep': return self -class FitTransformCallbackStep(BaseCallbackStep): +class TransformCallbackStep(NonFittableMixin, BaseCallbackStep): """Call a callback method on transform and inverse transform.""" - def fit(self, data_inputs, expected_outputs=None) -> 'BaseStep': - self._fit_callback(data_inputs, expected_outputs) - return self + def fit_transform(self, data_inputs, expected_outputs=None) -> ('BaseStep', Any): + self._callback(data_inputs) + + return self, data_inputs def transform(self, data_inputs): """ @@ -124,7 +125,6 @@ def transform(self, data_inputs): :return: the same data as input, unchanged (like the Identity class). """ self._callback(data_inputs) - if self.transform_function is not None: return self.transform_function(data_inputs) @@ -168,64 +168,33 @@ def inverse_transform_one(self, processed_output): return processed_output -class TransformCallbackStep(NonFittableMixin, BaseCallbackStep): - """Call a callback method on transform and inverse transform.""" - - def fit_transform(self, data_inputs, expected_outputs=None) -> ('BaseStep', Any): - self._callback(data_inputs) +class FitTransformCallbackStep(BaseStep): + def __init__(self, transform_callback_function, fit_callback_function, more_arguments: List = tuple(), + transform_function=None, + hyperparams=None): + BaseStep.__init__(self, hyperparams) + self.transform_function = transform_function + self.more_arguments = more_arguments + self.fit_callback_function = fit_callback_function + self.transform_callback_function = transform_callback_function - return self, data_inputs + def fit(self, data_inputs, expected_outputs=None): + self.fit_callback_function((data_inputs, expected_outputs), *self.more_arguments) + return self def transform(self, data_inputs): - """ - Will call the self._callback() with the data being processed and the extra arguments specified. - It has no other effect. - - :param data_inputs: the data to process - :return: the same data as input, unchanged (like the Identity class). - """ - self._callback(data_inputs) + self.transform_callback_function(data_inputs, *self.more_arguments) if self.transform_function is not None: return self.transform_function(data_inputs) - return data_inputs - def transform_one(self, data_input): - """ - Will call the self._callback() with the data being processed and the extra arguments specified. - It has no other effect. - - :param data_input: the data to process - :return: the same data as input, unchanged (like the Identity class). - """ - self._callback(data_input) - + def fit_transform(self, data_inputs, expected_outputs=None) -> ('BaseStep', Any): + self.fit_callback_function((data_inputs, expected_outputs), *self.more_arguments) + self.transform_callback_function(data_inputs, *self.more_arguments) if self.transform_function is not None: - return self.transform_function([data_input])[0] - - return data_input - - def inverse_transform(self, processed_outputs): - """ - Will call the self._callback() with the data being processed and the extra arguments specified. - It has no other effect. - - :param processed_outputs: the data to process - :return: the same data as input, unchanged (like the Identity class). - """ - self._callback(processed_outputs) - return processed_outputs - - def inverse_transform_one(self, processed_output): - """ - Will call the self._callback() with the data being processed and the extra arguments specified. - It has no other effect. + return self, self.transform_function(data_inputs) - :param processed_output: the data to process - :return: the same data as input, unchanged (like the Identity class). - """ - self._callback(processed_output) - return processed_output + return self, data_inputs class TapeCallbackFunction: diff --git a/requirements.txt b/requirements.txt index e38b14ce..056f6d2c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,3 +6,4 @@ scikit-learn>=0.20.3 joblib>=0.13.2 flask>=1.1.1 flask-restful +conv diff --git a/testing/steps/test_value_caching_wrapper.py b/testing/steps/test_value_caching_wrapper.py index 8a7bfab5..37c9e566 100644 --- a/testing/steps/test_value_caching_wrapper.py +++ b/testing/steps/test_value_caching_wrapper.py @@ -1,20 +1,26 @@ import numpy as np from neuraxle.pipeline import Pipeline -from neuraxle.steps.util import PickleValueCachingWrapper, TransformCallbackStep, TapeCallbackFunction, \ +from neuraxle.steps.util import PickleValueCachingWrapper, TapeCallbackFunction, \ FitTransformCallbackStep EXPECTED_OUTPUTS = [0.0, 0.0, 0.6931471805599453, 0.6931471805599453] +class LogFitTransformCallbackStep(FitTransformCallbackStep): + def fit_transform(self, data_inputs, expected_outputs=None): + super().fit_transform(data_inputs, expected_outputs) + return self, np.log(data_inputs) + + def test_transform_should_use_cache(tmpdir): tape_transform = TapeCallbackFunction() tape_fit = TapeCallbackFunction() p = Pipeline([ PickleValueCachingWrapper( - FitTransformCallbackStep( - callback_function=tape_transform, - fit_callback_function=tape_fit, + LogFitTransformCallbackStep( + tape_transform, + tape_fit, transform_function=np.log), tmpdir ) @@ -32,9 +38,9 @@ def test_fit_transform_should_fit_then_use_cache(tmpdir): tape_fit = TapeCallbackFunction() p = Pipeline([ PickleValueCachingWrapper( - FitTransformCallbackStep( - callback_function=tape_transform, - fit_callback_function=tape_fit, + LogFitTransformCallbackStep( + tape_transform, + tape_fit, transform_function=np.log), tmpdir ) @@ -51,9 +57,9 @@ def test_should_flush_cache_on_every_fit(tmpdir): tape_transform = TapeCallbackFunction() tape_fit = TapeCallbackFunction() wrapper = PickleValueCachingWrapper( - FitTransformCallbackStep( - callback_function=tape_transform, - fit_callback_function=tape_fit, + LogFitTransformCallbackStep( + tape_transform, + tape_fit, transform_function=np.log), tmpdir ) diff --git a/testing/test_minibatch_sequential_pipeline.py b/testing/test_minibatch_sequential_pipeline.py new file mode 100644 index 00000000..e4e4b32d --- /dev/null +++ b/testing/test_minibatch_sequential_pipeline.py @@ -0,0 +1,99 @@ +import numpy as np + +from neuraxle.base import Joiner +from neuraxle.pipeline import MiniBatchSequentialPipeline, Barrier +from neuraxle.steps.util import TransformCallbackStep, TapeCallbackFunction, FitTransformCallbackStep + + +class MultiplyBy2TransformCallbackStep(TransformCallbackStep): + def transform(self, data_inputs): + super().transform(data_inputs) + + return list(np.array(data_inputs) * 2) + + +def test_mini_batch_sequential_pipeline_should_transform_steps_sequentially_for_each_barrier_for_each_batch(): + # Given + tape1 = TapeCallbackFunction() + tape2 = TapeCallbackFunction() + tape3 = TapeCallbackFunction() + tape4 = TapeCallbackFunction() + p = MiniBatchSequentialPipeline([ + MultiplyBy2TransformCallbackStep(tape1, ["1"]), + MultiplyBy2TransformCallbackStep(tape2, ["2"]), + Joiner(batch_size=10), + MultiplyBy2TransformCallbackStep(tape3, ["3"]), + MultiplyBy2TransformCallbackStep(tape4, ["4"]), + Joiner(batch_size=10) + ], batch_size=10) + + # When + outputs = p.transform(range(20)) + + # Then + assert outputs == [0, 16, 32, 48, 64, 80, 96, 112, 128, 144, 160, 176, 192, 208, 224, 240, 256, 272, 288, 304] + + assert tape1.data == [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]] + assert tape1.name_tape == ["1", "1"] + + assert tape2.data == [[0, 2, 4, 6, 8, 10, 12, 14, 16, 18], [20, 22, 24, 26, 28, 30, 32, 34, 36, 38]] + assert tape2.name_tape == ["2", "2"] + + assert tape3.data == [[0, 4, 8, 12, 16, 20, 24, 28, 32, 36], [40, 44, 48, 52, 56, 60, 64, 68, 72, 76]] + assert tape3.name_tape == ["3", "3"] + + assert tape4.data == [[0, 8, 16, 24, 32, 40, 48, 56, 64, 72], [80, 88, 96, 104, 112, 120, 128, 136, 144, 152]] + assert tape4.name_tape == ["4", "4"] + + +class MultiplyBy2FitTransformCallbackStep(FitTransformCallbackStep): + def fit_transform(self, data_inputs, expected_outputs=None): + super().fit_transform(data_inputs, expected_outputs) + + return self, list(np.array(data_inputs) * 2) + + +def test_mini_batch_sequential_pipeline_should_fit_transform_steps_sequentially_for_each_barrier_for_each_batch(): + # Given + tape1 = TapeCallbackFunction() + tape1_fit = TapeCallbackFunction() + tape2 = TapeCallbackFunction() + tape2_fit = TapeCallbackFunction() + tape3 = TapeCallbackFunction() + tape3_fit = TapeCallbackFunction() + tape4 = TapeCallbackFunction() + tape4_fit = TapeCallbackFunction() + p = MiniBatchSequentialPipeline([ + MultiplyBy2FitTransformCallbackStep(tape1, tape1_fit, ["1"]), + MultiplyBy2FitTransformCallbackStep(tape2, tape2_fit, ["2"]), + Joiner(batch_size=10), + MultiplyBy2FitTransformCallbackStep(tape3, tape3_fit, ["3"]), + MultiplyBy2FitTransformCallbackStep(tape4, tape4_fit, ["4"]), + Joiner(batch_size=10) + ], batch_size=10) + + # When + p, outputs = p.fit_transform(range(20), range(20)) + + # Then + assert outputs == [0, 16, 32, 48, 64, 80, 96, 112, 128, 144, 160, 176, 192, 208, 224, 240, 256, 272, 288, 304] + + assert tape1.data == [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]] + assert tape1_fit.data == [([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), + ([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [10, 11, 12, 13, 14, 15, 16, 17, 18, 19])] + assert tape1.name_tape == ["1", "1"] + + assert tape2.data == [[0, 2, 4, 6, 8, 10, 12, 14, 16, 18], [20, 22, 24, 26, 28, 30, 32, 34, 36, 38]] + assert tape2_fit.data == [([0, 2, 4, 6, 8, 10, 12, 14, 16, 18], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), + ([20, 22, 24, 26, 28, 30, 32, 34, 36, 38], [10, 11, 12, 13, 14, 15, 16, 17, 18, 19])] + assert tape2.name_tape == ["2", "2"] + + assert tape3.data == [[0, 4, 8, 12, 16, 20, 24, 28, 32, 36], [40, 44, 48, 52, 56, 60, 64, 68, 72, 76]] + assert tape3_fit.data == [([0, 4, 8, 12, 16, 20, 24, 28, 32, 36], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), + ([40, 44, 48, 52, 56, 60, 64, 68, 72, 76], [10, 11, 12, 13, 14, 15, 16, 17, 18, 19])] + assert tape3.name_tape == ["3", "3"] + + assert tape4.data == [[0, 8, 16, 24, 32, 40, 48, 56, 64, 72], [80, 88, 96, 104, 112, 120, 128, 136, 144, 152]] + assert tape4_fit.data == [([0, 8, 16, 24, 32, 40, 48, 56, 64, 72], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), + ([80, 88, 96, 104, 112, 120, 128, 136, 144, 152], [10, 11, 12, 13, 14, 15, 16, 17, 18, 19])] + assert tape4.name_tape == ["4", "4"] diff --git a/testing/test_truncable_steps.py b/testing/test_truncable_steps.py new file mode 100644 index 00000000..8a16a440 --- /dev/null +++ b/testing/test_truncable_steps.py @@ -0,0 +1,36 @@ +from neuraxle.base import BaseStep, NonFittableMixin +from neuraxle.pipeline import Pipeline +from testing.test_pipeline import SomeStep + + +class SomeSplitStep(NonFittableMixin, BaseStep): + def fit(self, data_inputs, expected_outputs=None) -> 'NonFittableMixin': + pass + + def fit_transform(self, data_inputs, expected_outputs=None): + pass + + def transform(self, data_inputs): + pass + + +def test_truncable_steps_should_split_by_type(): + pipeline = Pipeline([ + SomeStep(), + SomeStep(), + SomeSplitStep(), + SomeStep(), + SomeStep(), + SomeSplitStep(), + SomeStep(), + ]) + + sub_pipelines = pipeline.split(SomeSplitStep) + + assert 'SomeStep' in sub_pipelines[0] + assert 'SomeStep1' in sub_pipelines[0] + assert 'SomeSplitStep' in sub_pipelines[0] + assert 'SomeStep2' in sub_pipelines[1] + assert 'SomeStep3' in sub_pipelines[1] + assert 'SomeSplitStep1' in sub_pipelines[1] + assert 'SomeStep4' in sub_pipelines[2]