From ca477c9485b2a9c43b60cfd7251a522e8a5bd687 Mon Sep 17 00:00:00 2001 From: alexbrillant Date: Mon, 30 Sep 2019 18:07:34 -0400 Subject: [PATCH 01/13] Implement MiniBatchSequentialPipeline Wip --- neuraxle/base.py | 34 ++++++ neuraxle/pipeline.py | 189 ++++++++++++++++++++++++++++- requirements.txt | 1 + testing/test_streaming_pipeline.py | 2 + testing/test_truncable_steps.py | 36 ++++++ 5 files changed, 257 insertions(+), 5 deletions(-) create mode 100644 testing/test_streaming_pipeline.py create mode 100644 testing/test_truncable_steps.py diff --git a/neuraxle/base.py b/neuraxle/base.py index e4c2c85f..ead34096 100644 --- a/neuraxle/base.py +++ b/neuraxle/base.py @@ -561,6 +561,9 @@ class NonFittableMixin: Note: fit methods are not implemented""" + def handle_fit_transform(self, data_container: DataContainer): + return self, self.handle_transform(data_container) + def fit(self, data_inputs, expected_outputs=None) -> 'NonFittableMixin': """ Don't fit. @@ -862,12 +865,17 @@ def __getitem__(self, key): raise KeyError( "Start or stop ('{}' or '{}') not found in '{}'.".format(start, stop, self.steps.keys())) for key, val in self.steps_as_tuple: + if start == stop == key: + new_steps_as_tuple.append((key, val)) + if stop == key: break + if not started and start == key: started = True if started: new_steps_as_tuple.append((key, val)) + self_shallow_copy.steps_as_tuple = new_steps_as_tuple self_shallow_copy.steps = OrderedDict(new_steps_as_tuple) return self_shallow_copy @@ -939,6 +947,32 @@ def __len__(self): """ return len(self.steps_as_tuple) + def split(self, type_name_to_split_from: str) -> List['TruncableSteps']: + """ + Split truncable steps by a step class name. + + :param type_name_to_split_from: step class name to split from. + :return: list of truncable steps containing the splitted steps + """ + sub_pipelines = [] + + previous_sub_pipeline_end_index = 0 + for index, (step_name, step) in enumerate(self.items()): + if step.__class__.__name__ == type_name_to_split_from: + sub_pipelines.append( + self[previous_sub_pipeline_end_index:index + 1] + ) + previous_sub_pipeline_end_index = index + 1 + + sub_pipelines.append( + self[previous_sub_pipeline_end_index:-1] + ) + + return sub_pipelines + + def ends_with_type_name(self, type_name: str): + return self[-1].__class__.__name__ == type_name + class ResumableStepMixin: """ diff --git a/neuraxle/pipeline.py b/neuraxle/pipeline.py index 3773551f..559a851a 100644 --- a/neuraxle/pipeline.py +++ b/neuraxle/pipeline.py @@ -23,13 +23,17 @@ import os from abc import ABC, abstractmethod from copy import copy -from typing import Any, Tuple +from typing import Any, Tuple, List, Iterable +from conv import convolved_1d from joblib import load, dump -from neuraxle.base import BaseStep, TruncableSteps, NamedTupleList, ResumableStepMixin, DataContainer +from neuraxle.base import BaseStep, TruncableSteps, NamedTupleList, ResumableStepMixin, DataContainer, NonFittableMixin, \ + NonTransformableMixin from neuraxle.checkpoints import BaseCheckpointStep +BARRIER_STEP_NAME = 'Barrier' + DEFAULT_CACHE_FOLDER = 'cache' @@ -215,7 +219,7 @@ def transform(self, data_inputs: Any): :param data_inputs: the data input to transform :return: transformed data inputs """ - self.setup() # TODO: perhaps, remove this to pass path in context + self.setup() # TODO: perhaps, remove this to pass path in context current_ids = self.hash( current_ids=None, @@ -235,7 +239,7 @@ def fit_transform(self, data_inputs, expected_outputs=None) -> ('Pipeline', Any) :param expected_outputs: the expected data output to fit on :return: the pipeline itself """ - self.setup() # TODO: perhaps, remove this to pass path in context + self.setup() # TODO: perhaps, remove this to pass path in context current_ids = self.hash( current_ids=None, @@ -259,7 +263,7 @@ def fit(self, data_inputs, expected_outputs=None) -> 'Pipeline': :param expected_outputs: the expected data output to fit on :return: the pipeline itself """ - self.setup() # TODO: perhaps, remove this to pass path in context + self.setup() # TODO: perhaps, remove this to pass path in context current_ids = self.hash( current_ids=None, @@ -486,3 +490,178 @@ def should_resume(self, data_container: DataContainer) -> bool: return True return False + + +""" +Idea for checkpoints : + + The streaming pipeline algorithm could go find the optional checkpoint step for each sub pipeline. + In the future, a ResumableStreamingPipeline that extends this class should exist to support checkpoints. + + The Barrier should ideally join the data so that the ram does not blow up (iterable, lazy loading, cache ??) + Maybe we can implement a LazyLoadingDataContainer/CachedDataContainer class or something that could be returned. + + pipeline = Pipeline([ + MiniBatchSequentialPipeline([ + + A(), + B(), + Barrier(joiner=Joiner()), + [Checkpoint()] + + C(), + D(), + Barrier(joiner=Joiner()), + [Checkpoint()] + + ]), + Model() + ]) + + pipeline = Pipeline([ + MiniBatchSequentialPipeline([ + ParallelWrapper([ + NonFittableA(), + NonFittableB(), + ]) + Barrier(joiner=Joiner()), + [Checkpoint()] + + C(), + D(), + Barrier(joiner=Joiner()), + [Checkpoint()] + + ]), + Model() + ]) +""" + + +class Barrier(NonFittableMixin, NonTransformableMixin, BaseStep): + pass + + +class MiniBatchSequentialPipeline(NonFittableMixin, Pipeline): + """ + Streaming Pipeline class to create a pipeline for streaming, and batch processing. + """ + + def __init__(self, steps: NamedTupleList, batch_size): + Pipeline.__init__(self, steps) + self.batch_size = batch_size + + def handle_transform(self, data_container: DataContainer) -> DataContainer: + """ + Transform all sub pipelines splitted by the Barrier steps. + + :param data_container: data container to transform. + :return: data container + """ + sub_pipelines = self._create_sub_pipelines() + + for sub_pipeline in sub_pipelines: + data_container = self._handle_transform_sub_pipeline(sub_pipeline, data_container) + + return data_container + + def handle_fit_transform(self, data_container: DataContainer) -> \ + Tuple['MiniBatchSequentialPipeline', DataContainer]: + """ + Transform all sub pipelines splitted by the Barrier steps. + + :param data_container: data container to transform. + :return: data container + """ + sub_pipelines = self._create_sub_pipelines() + + for sub_pipeline in sub_pipelines: + new_self, data_container = self._handle_fit_transform_sub_pipeline(sub_pipeline, data_container) + + return self, data_container + + def _handle_transform_sub_pipeline(self, sub_pipeline, data_container) -> DataContainer: + """ + Transform sub pipeline using join transform. + + :param sub_pipeline: sub pipeline to be used to transform data container + :param data_container: data container to transform + :return: + """ + data_inputs = sub_pipeline.join_transform(data_container.data_inputs) + data_container.set_data_inputs(data_inputs) + + current_ids = self.hash(data_container.current_ids, self.hyperparams, data_inputs) + data_container.set_current_ids(current_ids) + + return data_container + + def _handle_fit_transform_sub_pipeline(self, sub_pipeline, data_container) -> \ + Tuple['MiniBatchSequentialPipeline', DataContainer]: + """ + Fit Transform sub pipeline using join fit transform. + + :param sub_pipeline: sub pipeline to be used to transform data container + :param data_container: data container to fit transform + :return: fitted self, transformed data container + """ + _, data_inputs = sub_pipeline.join_fit_transform( + data_inputs=data_container.data_inputs, + expected_outputs=data_container.expected_outputs + ) + data_container.set_data_inputs(data_inputs) + + current_ids = self.hash(data_container.current_ids, self.hyperparams, data_inputs) + data_container.set_current_ids(current_ids) + + return self, data_container + + def _create_sub_pipelines(self) -> List['MiniBatchSequentialPipeline']: + """ + Create sub pipelines by splitting the steps by the join type name. + + :return: list of sub pipelines + """ + sub_pipelines: List[MiniBatchSequentialPipeline] = self.split(BARRIER_STEP_NAME) + for sub_pipeline in sub_pipelines: + if not sub_pipeline.ends_with_type_name(BARRIER_STEP_NAME): + raise Exception( + 'At least one Barrier step needs to be at the end of a streaming pipeline. '.format( + self.join_type_name) + ) + + return sub_pipelines + + def join_transform(self, data_inputs: Iterable) -> Iterable: + """ + Concatenate the transform output of each batch of self.batch_size together. + + :param data_inputs: + :return: + """ + outputs = [] + for batch in convolved_1d( + iterable=data_inputs, + kernel_size=self.batch_size + ): + batch_outputs = super().transform(batch) + outputs.extend(batch_outputs) # TODO: use a joiner here + + return outputs + + def join_fit_transform(self, data_inputs: Iterable) -> Tuple['MiniBatchSequentialPipeline', Iterable]: + """ + Concatenate the fit transform output of each batch of self.batch_size together. + + :param data_inputs: + :return: fitted self, transformed data inputs + """ + outputs = [] + for batch in convolved_1d( + iterable=data_inputs, + kernel_size=self.batch_size + ): + _, batch_outputs = super().fit_transform(batch) + outputs.extend(batch_outputs) # TODO: use a joiner here + + return self, outputs diff --git a/requirements.txt b/requirements.txt index e38b14ce..056f6d2c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,3 +6,4 @@ scikit-learn>=0.20.3 joblib>=0.13.2 flask>=1.1.1 flask-restful +conv diff --git a/testing/test_streaming_pipeline.py b/testing/test_streaming_pipeline.py new file mode 100644 index 00000000..9ae676f4 --- /dev/null +++ b/testing/test_streaming_pipeline.py @@ -0,0 +1,2 @@ +def test_streaming_pipeline_should(): + pass diff --git a/testing/test_truncable_steps.py b/testing/test_truncable_steps.py new file mode 100644 index 00000000..469c3003 --- /dev/null +++ b/testing/test_truncable_steps.py @@ -0,0 +1,36 @@ +from neuraxle.base import BaseStep, NonFittableMixin +from neuraxle.pipeline import Pipeline +from testing.test_pipeline import SomeStep + + +class SomeSplitStep(NonFittableMixin, BaseStep): + def fit(self, data_inputs, expected_outputs=None) -> 'NonFittableMixin': + pass + + def fit_transform(self, data_inputs, expected_outputs=None): + pass + + def transform(self, data_inputs): + pass + + +def test_truncable_steps_should_split_by_type(): + pipeline = Pipeline([ + SomeStep(), + SomeStep(), + SomeSplitStep(), + SomeStep(), + SomeStep(), + SomeSplitStep(), + SomeStep(), + ]) + + sub_pipelines = pipeline.split('SomeSplitStep') + + assert 'SomeStep' in sub_pipelines[0] + assert 'SomeStep1' in sub_pipelines[0] + assert 'SomeSplitStep' in sub_pipelines[0] + assert 'SomeStep2' in sub_pipelines[1] + assert 'SomeStep3' in sub_pipelines[1] + assert 'SomeSplitStep1' in sub_pipelines[1] + assert 'SomeStep4' in sub_pipelines[2] From 8e058b534bc55c7ad06863dd4c7166a9f65031a6 Mon Sep 17 00:00:00 2001 From: alexbrillant Date: Mon, 30 Sep 2019 18:26:35 -0400 Subject: [PATCH 02/13] Add Transform, And Fit Transform To MiniBatchSequentialPipeline --- neuraxle/pipeline.py | 39 +++++++++++++++++++ testing/test_minibatch_sequential_pipeline.py | 15 +++++++ testing/test_streaming_pipeline.py | 2 - 3 files changed, 54 insertions(+), 2 deletions(-) create mode 100644 testing/test_minibatch_sequential_pipeline.py delete mode 100644 testing/test_streaming_pipeline.py diff --git a/neuraxle/pipeline.py b/neuraxle/pipeline.py index 559a851a..4a8a6c89 100644 --- a/neuraxle/pipeline.py +++ b/neuraxle/pipeline.py @@ -551,6 +551,45 @@ def __init__(self, steps: NamedTupleList, batch_size): Pipeline.__init__(self, steps) self.batch_size = batch_size + def transform(self, data_inputs: Any): + """ + :param data_inputs: the data input to transform + :return: transformed data inputs + """ + self.setup() + + current_ids = self.hash( + current_ids=None, + hyperparameters=self.hyperparams, + data_inputs=data_inputs + ) + data_container = DataContainer(current_ids=current_ids, data_inputs=data_inputs) + data_container = self.handle_transform(data_container) + + return data_container.data_inputs + + def fit_transform(self, data_inputs, expected_outputs=None) -> ('Pipeline', Any): + """ + :param data_inputs: the data input to fit on + :param expected_outputs: the expected data output to fit on + :return: the pipeline itself + """ + self.setup() + + current_ids = self.hash( + current_ids=None, + hyperparameters=self.hyperparams, + data_inputs=data_inputs + ) + data_container = DataContainer( + current_ids=current_ids, + data_inputs=data_inputs, + expected_outputs=expected_outputs + ) + new_self, data_container = self.handle_transform(data_container) + + return new_self, data_container.data_inputs + def handle_transform(self, data_container: DataContainer) -> DataContainer: """ Transform all sub pipelines splitted by the Barrier steps. diff --git a/testing/test_minibatch_sequential_pipeline.py b/testing/test_minibatch_sequential_pipeline.py new file mode 100644 index 00000000..7c08691f --- /dev/null +++ b/testing/test_minibatch_sequential_pipeline.py @@ -0,0 +1,15 @@ +from neuraxle.pipeline import MiniBatchSequentialPipeline, Barrier +from testing.test_pipeline import SomeStep + + +def test_streaming_pipeline_should_transform_steps_sequentially_for_each_batch(): + p = MiniBatchSequentialPipeline([ + SomeStep(), + SomeStep(), + Barrier(), + SomeStep(), + SomeStep(), + Barrier() + ], batch_size=10) + + p.transform() diff --git a/testing/test_streaming_pipeline.py b/testing/test_streaming_pipeline.py deleted file mode 100644 index 9ae676f4..00000000 --- a/testing/test_streaming_pipeline.py +++ /dev/null @@ -1,2 +0,0 @@ -def test_streaming_pipeline_should(): - pass From 4367b305dd97d6ba73ab2105d30e2322f3c7f086 Mon Sep 17 00:00:00 2001 From: alexbrillant Date: Mon, 30 Sep 2019 18:36:43 -0400 Subject: [PATCH 03/13] MiniBatchSequentialPipeline unit tests WIP --- testing/test_minibatch_sequential_pipeline.py | 38 ++++++++++++++++++- 1 file changed, 36 insertions(+), 2 deletions(-) diff --git a/testing/test_minibatch_sequential_pipeline.py b/testing/test_minibatch_sequential_pipeline.py index 7c08691f..a4934f4d 100644 --- a/testing/test_minibatch_sequential_pipeline.py +++ b/testing/test_minibatch_sequential_pipeline.py @@ -2,7 +2,7 @@ from testing.test_pipeline import SomeStep -def test_streaming_pipeline_should_transform_steps_sequentially_for_each_batch(): +def test_mini_batch_sequential_pipeline_should_transform_steps_sequentially_for_each_barrier_for_each_batch(): p = MiniBatchSequentialPipeline([ SomeStep(), SomeStep(), @@ -12,4 +12,38 @@ def test_streaming_pipeline_should_transform_steps_sequentially_for_each_batch() Barrier() ], batch_size=10) - p.transform() + outputs = p.transform(range(100)) + + # assert steps have received the right batches + # assert steps have transformed the data correctly + + +def test_mini_batch_sequential_pipeline_should_fit_transform_steps_sequentially_for_each_barrier_for_each_batch(): + p = MiniBatchSequentialPipeline([ + SomeStep(), + SomeStep(), + Barrier(), + SomeStep(), + SomeStep(), + Barrier() + ], batch_size=10) + + outputs = p.transform(range(100)) + + # assert steps have received the right batches + # assert steps have transformed the data correctly + # assert steps have been fitted with the right batches in the right order + + +def test_mini_batch_sequential_pipeline_joiner(): + p = MiniBatchSequentialPipeline([ + SomeStep(), + SomeStep(), + Barrier(), + SomeStep(), + SomeStep(), + Barrier() + ], batch_size=10) + + # TODO: joiner ???????? + From 7d82d7b3273282cba9b0f4c6f1ddf4b3ce8fd7e3 Mon Sep 17 00:00:00 2001 From: alexbrillant Date: Tue, 1 Oct 2019 11:44:25 -0400 Subject: [PATCH 04/13] Fix MiniBatchSequentialPipeline & implement unit tests --- neuraxle/base.py | 10 +- neuraxle/pipeline.py | 15 ++- neuraxle/steps/util.py | 16 +++ testing/test_minibatch_sequential_pipeline.py | 101 +++++++++++++----- 4 files changed, 108 insertions(+), 34 deletions(-) diff --git a/neuraxle/base.py b/neuraxle/base.py index ead34096..369616f6 100644 --- a/neuraxle/base.py +++ b/neuraxle/base.py @@ -823,6 +823,9 @@ def _step_name_to_index(self, step_name): return index def _step_index_to_name(self, step_index): + if step_index == len(self.items()): + return None + name, _ = self.steps_as_tuple[step_index] return name @@ -964,9 +967,10 @@ def split(self, type_name_to_split_from: str) -> List['TruncableSteps']: ) previous_sub_pipeline_end_index = index + 1 - sub_pipelines.append( - self[previous_sub_pipeline_end_index:-1] - ) + if previous_sub_pipeline_end_index < len(self.items()): + sub_pipelines.append( + self[previous_sub_pipeline_end_index:-1] + ) return sub_pipelines diff --git a/neuraxle/pipeline.py b/neuraxle/pipeline.py index 4a8a6c89..a89308e5 100644 --- a/neuraxle/pipeline.py +++ b/neuraxle/pipeline.py @@ -586,7 +586,7 @@ def fit_transform(self, data_inputs, expected_outputs=None) -> ('Pipeline', Any) data_inputs=data_inputs, expected_outputs=expected_outputs ) - new_self, data_container = self.handle_transform(data_container) + new_self, data_container = self.handle_fit_transform(data_container) return new_self, data_container.data_inputs @@ -680,6 +680,7 @@ def join_transform(self, data_inputs: Iterable) -> Iterable: """ outputs = [] for batch in convolved_1d( + stride=self.batch_size, iterable=data_inputs, kernel_size=self.batch_size ): @@ -688,19 +689,23 @@ def join_transform(self, data_inputs: Iterable) -> Iterable: return outputs - def join_fit_transform(self, data_inputs: Iterable) -> Tuple['MiniBatchSequentialPipeline', Iterable]: + def join_fit_transform(self, data_inputs: Iterable, expected_outputs: Iterable = None) -> \ + Tuple['MiniBatchSequentialPipeline', Iterable]: """ Concatenate the fit transform output of each batch of self.batch_size together. - :param data_inputs: + :param data_inputs: data inputs to fit transform on + :param expected_outputs: expected outputs to fit :return: fitted self, transformed data inputs """ outputs = [] for batch in convolved_1d( - iterable=data_inputs, + stride=self.batch_size, + iterable=zip(data_inputs, expected_outputs), kernel_size=self.batch_size ): - _, batch_outputs = super().fit_transform(batch) + di_eo_list = list(zip(*batch)) + _, batch_outputs = super().fit_transform(list(di_eo_list[0]), list(di_eo_list[1])) outputs.extend(batch_outputs) # TODO: use a joiner here return self, outputs diff --git a/neuraxle/steps/util.py b/neuraxle/steps/util.py index c49f3133..fb53d701 100644 --- a/neuraxle/steps/util.py +++ b/neuraxle/steps/util.py @@ -137,6 +137,21 @@ def inverse_transform_one(self, processed_output): return processed_output +class FitTransformCallbackStep(BaseStep): + def __init__(self, transform_callback_function, fit_callback_function, more_arguments: List = tuple(), + hyperparams=None): + BaseStep.__init__(self, hyperparams) + self.more_arguments = more_arguments + self.fit_callback_function = fit_callback_function + self.transform_callback_function = transform_callback_function + + def fit_transform(self, data_inputs, expected_outputs=None) -> ('BaseStep', Any): + self.fit_callback_function((data_inputs, expected_outputs)) + self.transform_callback_function(data_inputs) + + return self, data_inputs + + class TapeCallbackFunction: """This class's purpose is to be sent to the callback to accumulate information. @@ -265,6 +280,7 @@ def save(self, pipeline: 'Pipeline', data_container: DataContainer) -> 'Pipeline def load(self, pipeline: 'Pipeline', data_container: DataContainer) -> 'Pipeline': return pipeline + class OutputTransformerMixin: """ Base output transformer step that can modify data inputs, and expected_outputs at the same time. diff --git a/testing/test_minibatch_sequential_pipeline.py b/testing/test_minibatch_sequential_pipeline.py index a4934f4d..f8421aa3 100644 --- a/testing/test_minibatch_sequential_pipeline.py +++ b/testing/test_minibatch_sequential_pipeline.py @@ -1,49 +1,98 @@ +import numpy as np + from neuraxle.pipeline import MiniBatchSequentialPipeline, Barrier -from testing.test_pipeline import SomeStep +from neuraxle.steps.util import TransformCallbackStep, TapeCallbackFunction, FitTransformCallbackStep + + +class MultiplyBy2TransformCallbackStep(TransformCallbackStep): + def transform(self, data_inputs): + super().transform(data_inputs) + + return list(np.array(data_inputs) * 2) def test_mini_batch_sequential_pipeline_should_transform_steps_sequentially_for_each_barrier_for_each_batch(): + # Given + tape1 = TapeCallbackFunction() + tape2 = TapeCallbackFunction() + tape3 = TapeCallbackFunction() + tape4 = TapeCallbackFunction() p = MiniBatchSequentialPipeline([ - SomeStep(), - SomeStep(), + MultiplyBy2TransformCallbackStep(tape1, ["1"]), + MultiplyBy2TransformCallbackStep(tape2, ["2"]), Barrier(), - SomeStep(), - SomeStep(), + MultiplyBy2TransformCallbackStep(tape3, ["3"]), + MultiplyBy2TransformCallbackStep(tape4, ["4"]), Barrier() ], batch_size=10) - outputs = p.transform(range(100)) + # When + outputs = p.transform(range(20)) + + # Then + assert outputs == [0, 16, 32, 48, 64, 80, 96, 112, 128, 144, 160, 176, 192, 208, 224, 240, 256, 272, 288, 304] + + assert tape1.data == [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]] + assert tape1.name_tape == ["1", "1"] + + assert tape2.data == [[0, 2, 4, 6, 8, 10, 12, 14, 16, 18], [20, 22, 24, 26, 28, 30, 32, 34, 36, 38]] + assert tape2.name_tape == ["2", "2"] + + assert tape3.data == [[0, 4, 8, 12, 16, 20, 24, 28, 32, 36], [40, 44, 48, 52, 56, 60, 64, 68, 72, 76]] + assert tape3.name_tape == ["3", "3"] - # assert steps have received the right batches - # assert steps have transformed the data correctly + assert tape4.data == [[0, 8, 16, 24, 32, 40, 48, 56, 64, 72], [80, 88, 96, 104, 112, 120, 128, 136, 144, 152]] + assert tape4.name_tape == ["4", "4"] + + +class MultiplyBy2FitTransformCallbackStep(FitTransformCallbackStep): + def fit_transform(self, data_inputs, expected_outputs=None): + super().fit_transform(data_inputs, expected_outputs) + + return self, list(np.array(data_inputs) * 2) def test_mini_batch_sequential_pipeline_should_fit_transform_steps_sequentially_for_each_barrier_for_each_batch(): + # Given + tape1 = TapeCallbackFunction() + tape1_fit = TapeCallbackFunction() + tape2 = TapeCallbackFunction() + tape2_fit = TapeCallbackFunction() + tape3 = TapeCallbackFunction() + tape3_fit = TapeCallbackFunction() + tape4 = TapeCallbackFunction() + tape4_fit = TapeCallbackFunction() p = MiniBatchSequentialPipeline([ - SomeStep(), - SomeStep(), + MultiplyBy2FitTransformCallbackStep(tape1, tape1_fit, ["1"]), + MultiplyBy2FitTransformCallbackStep(tape2, tape2_fit, ["2"]), Barrier(), - SomeStep(), - SomeStep(), + MultiplyBy2FitTransformCallbackStep(tape3, tape3_fit, ["3"]), + MultiplyBy2FitTransformCallbackStep(tape4, tape4_fit, ["4"]), Barrier() ], batch_size=10) - outputs = p.transform(range(100)) + # When + p, outputs = p.fit_transform(range(20), range(20)) - # assert steps have received the right batches - # assert steps have transformed the data correctly - # assert steps have been fitted with the right batches in the right order + # Then + assert outputs == [0, 16, 32, 48, 64, 80, 96, 112, 128, 144, 160, 176, 192, 208, 224, 240, 256, 272, 288, 304] + assert tape1.data == [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]] + assert tape1_fit.data == [([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), + ([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], [10, 11, 12, 13, 14, 15, 16, 17, 18, 19])] + assert tape1.name_tape == ["1", "1"] -def test_mini_batch_sequential_pipeline_joiner(): - p = MiniBatchSequentialPipeline([ - SomeStep(), - SomeStep(), - Barrier(), - SomeStep(), - SomeStep(), - Barrier() - ], batch_size=10) + assert tape2.data == [[0, 2, 4, 6, 8, 10, 12, 14, 16, 18], [20, 22, 24, 26, 28, 30, 32, 34, 36, 38]] + assert tape2_fit.data == [([0, 2, 4, 6, 8, 10, 12, 14, 16, 18], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), + ([20, 22, 24, 26, 28, 30, 32, 34, 36, 38], [10, 11, 12, 13, 14, 15, 16, 17, 18, 19])] + assert tape2.name_tape == ["2", "2"] - # TODO: joiner ???????? + assert tape3.data == [[0, 4, 8, 12, 16, 20, 24, 28, 32, 36], [40, 44, 48, 52, 56, 60, 64, 68, 72, 76]] + assert tape3_fit.data == [([0, 4, 8, 12, 16, 20, 24, 28, 32, 36], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), + ([40, 44, 48, 52, 56, 60, 64, 68, 72, 76], [10, 11, 12, 13, 14, 15, 16, 17, 18, 19])] + assert tape3.name_tape == ["3", "3"] + assert tape4.data == [[0, 8, 16, 24, 32, 40, 48, 56, 64, 72], [80, 88, 96, 104, 112, 120, 128, 136, 144, 152]] + assert tape4_fit.data == [([0, 8, 16, 24, 32, 40, 48, 56, 64, 72], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]), + ([80, 88, 96, 104, 112, 120, 128, 136, 144, 152], [10, 11, 12, 13, 14, 15, 16, 17, 18, 19])] + assert tape4.name_tape == ["4", "4"] From fd8a31c6865668cb3ce35608162c2ea360f23450 Mon Sep 17 00:00:00 2001 From: alexbrillant Date: Tue, 1 Oct 2019 11:52:33 -0400 Subject: [PATCH 05/13] FitTransformCallbackStep : Fix Arguments Passed In Callback Functions --- neuraxle/steps/util.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/neuraxle/steps/util.py b/neuraxle/steps/util.py index fb53d701..ec61e1ce 100644 --- a/neuraxle/steps/util.py +++ b/neuraxle/steps/util.py @@ -146,8 +146,8 @@ def __init__(self, transform_callback_function, fit_callback_function, more_argu self.transform_callback_function = transform_callback_function def fit_transform(self, data_inputs, expected_outputs=None) -> ('BaseStep', Any): - self.fit_callback_function((data_inputs, expected_outputs)) - self.transform_callback_function(data_inputs) + self.fit_callback_function((data_inputs, expected_outputs), *self.more_arguments) + self.transform_callback_function(data_inputs, *self.more_arguments) return self, data_inputs From db7cb552dd210565de6edce7d007755bd126df58 Mon Sep 17 00:00:00 2001 From: alexbrillant Date: Tue, 1 Oct 2019 12:07:29 -0400 Subject: [PATCH 06/13] Fix Docstring For MiniBatchSequentialPipeline --- neuraxle/pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/neuraxle/pipeline.py b/neuraxle/pipeline.py index a89308e5..b5c48ffc 100644 --- a/neuraxle/pipeline.py +++ b/neuraxle/pipeline.py @@ -544,7 +544,7 @@ class Barrier(NonFittableMixin, NonTransformableMixin, BaseStep): class MiniBatchSequentialPipeline(NonFittableMixin, Pipeline): """ - Streaming Pipeline class to create a pipeline for streaming, and batch processing. + Mini Batch Sequential Pipeline class to create a pipeline processing data inputs in batch. """ def __init__(self, steps: NamedTupleList, batch_size): From 44199250653d4341d8f5f3579f7221ef82be32b6 Mon Sep 17 00:00:00 2001 From: alexbrillant Date: Wed, 2 Oct 2019 13:11:16 -0400 Subject: [PATCH 07/13] Pass Type Instead to BaseStep.ends_with method --- neuraxle/base.py | 4 ++-- neuraxle/pipeline.py | 5 ++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/neuraxle/base.py b/neuraxle/base.py index 369616f6..ceb8cf8c 100644 --- a/neuraxle/base.py +++ b/neuraxle/base.py @@ -974,8 +974,8 @@ def split(self, type_name_to_split_from: str) -> List['TruncableSteps']: return sub_pipelines - def ends_with_type_name(self, type_name: str): - return self[-1].__class__.__name__ == type_name + def ends_with(self, step_type: type): + return type(self[-1]) == step_type class ResumableStepMixin: diff --git a/neuraxle/pipeline.py b/neuraxle/pipeline.py index b5c48ffc..2167f69a 100644 --- a/neuraxle/pipeline.py +++ b/neuraxle/pipeline.py @@ -663,10 +663,9 @@ def _create_sub_pipelines(self) -> List['MiniBatchSequentialPipeline']: """ sub_pipelines: List[MiniBatchSequentialPipeline] = self.split(BARRIER_STEP_NAME) for sub_pipeline in sub_pipelines: - if not sub_pipeline.ends_with_type_name(BARRIER_STEP_NAME): + if not sub_pipeline.ends_with(type(Barrier())): raise Exception( - 'At least one Barrier step needs to be at the end of a streaming pipeline. '.format( - self.join_type_name) + 'At least one Barrier step needs to be at the end of a streaming pipeline.' ) return sub_pipelines From 7c4282dff49be3733536ae2bcaee2db2cce12963 Mon Sep 17 00:00:00 2001 From: alexbrillant Date: Wed, 2 Oct 2019 15:52:07 -0400 Subject: [PATCH 08/13] Implement Joiner Barrier Properly For Sequential Pipeline --- neuraxle/base.py | 116 +++++++++++++- neuraxle/pipeline.py | 149 +++--------------- testing/test_minibatch_sequential_pipeline.py | 9 +- testing/test_truncable_steps.py | 2 +- 4 files changed, 132 insertions(+), 144 deletions(-) diff --git a/neuraxle/base.py b/neuraxle/base.py index ceb8cf8c..c94d34cb 100644 --- a/neuraxle/base.py +++ b/neuraxle/base.py @@ -27,7 +27,9 @@ from abc import ABC, abstractmethod from collections import OrderedDict from copy import copy -from typing import Tuple, List, Union, Any +from typing import Tuple, List, Union, Any, Iterable + +from conv import convolved_1d from neuraxle.hyperparams.space import HyperparameterSpace, HyperparameterSamples @@ -84,6 +86,19 @@ def set_expected_outputs(self, expected_outputs: Any): def set_current_ids(self, current_ids: Any): self.current_ids = current_ids + def convolved_1d(self, stride, kernel_size) -> Iterable: + conv_current_ids = convolved_1d(stride=stride, iterable=self.current_ids, kernel_size=kernel_size) + conv_data_inputs = convolved_1d(stride=stride, iterable=self.data_inputs, kernel_size=kernel_size) + conv_expected_outputs = convolved_1d(stride=stride, iterable=self.expected_outputs, kernel_size=kernel_size) + + for current_ids, data_inputs, expected_outputs in zip(conv_current_ids, conv_data_inputs, + conv_expected_outputs): + yield DataContainer( + current_ids=current_ids, + data_inputs=data_inputs, + expected_outputs=expected_outputs + ) + def __iter__(self): current_ids = self.current_ids if self.current_ids is None: @@ -115,6 +130,11 @@ def append(self, current_id, data_input, expected_output): self.data_inputs.append(data_input) self.expected_outputs.append(expected_output) + def concat(self, data_container: DataContainer): + self.current_ids.extend(data_container.current_ids) + self.data_inputs.extend(data_container.data_inputs) + self.expected_outputs.extend(data_container.expected_outputs) + class BaseStep(ABC): def __init__( @@ -464,8 +484,8 @@ class MetaStepMixin: # TODO: remove equal None, and fix random search at the same time ? def __init__( - self, - wrapped: BaseStep = None + self, + wrapped: BaseStep = None ): self.wrapped: BaseStep = wrapped @@ -640,7 +660,7 @@ def are_steps_before_index_the_same(self, other: 'TruncableSteps', index: int): return True - def _load_saved_pipeline_steps_before_index(self, saved_pipeline: 'TruncableSteps', index: int): + def _replace_pipeline_steps_before_index_by_other_pipeline_steps_before_index(self, saved_pipeline: 'TruncableSteps', index: int): """ Load the cached pipeline steps before the index into the current steps @@ -950,18 +970,18 @@ def __len__(self): """ return len(self.steps_as_tuple) - def split(self, type_name_to_split_from: str) -> List['TruncableSteps']: + def split(self, step_type: type) -> List['TruncableSteps']: """ Split truncable steps by a step class name. - :param type_name_to_split_from: step class name to split from. + :param step_type: step class type to split from. :return: list of truncable steps containing the splitted steps """ sub_pipelines = [] previous_sub_pipeline_end_index = 0 for index, (step_name, step) in enumerate(self.items()): - if step.__class__.__name__ == type_name_to_split_from: + if isinstance(step, step_type): sub_pipelines.append( self[previous_sub_pipeline_end_index:index + 1] ) @@ -975,7 +995,7 @@ def split(self, type_name_to_split_from: str) -> List['TruncableSteps']: return sub_pipelines def ends_with(self, step_type: type): - return type(self[-1]) == step_type + return isinstance(self[-1], step_type) class ResumableStepMixin: @@ -986,3 +1006,83 @@ class ResumableStepMixin: @abstractmethod def should_resume(self, data_container: DataContainer) -> bool: raise NotImplementedError() + + +class Barrier(NonFittableMixin, NonTransformableMixin, BaseStep, ABC): + """ + A Barrier step to be used in a minibatch sequential pipeline. It forces all the + data inputs to get to the barrier in a sub pipeline before going through to the next sub-pipeline. + + + ``p = MiniBatchSequentialPipeline([ + SomeStep(), + SomeStep(), + Barrier(), # must be a concrete Barrier ex: Joiner() + SomeStep(), + SomeStep(), + Barrier(), # must be a concrete Barrier ex: Joiner() + ], batch_size=10)`` + + """ + + @abstractmethod + def join_transform(self, step: BaseStep, data_container: DataContainer) -> DataContainer: + raise NotImplementedError() + + @abstractmethod + def join_fit_transform(self, step: BaseStep, data_container: DataContainer) -> Tuple['Any', Iterable]: + raise NotImplementedError() + + +class Joiner(Barrier): + """ + A Special Barrier step that joins the transformed mini batches together with list.extend method. + """ + + def __init__(self, batch_size): + super().__init__() + self.batch_size = batch_size + + def join_transform(self, step: BaseStep, data_container: DataContainer) -> Iterable: + """ + Concatenate the pipeline transform output of each batch of self.batch_size together. + + :param step: pipeline to transform on + :param data_container: data container to transform + :return: + """ + data_container_batches = data_container.convolved_1d( + stride=self.batch_size, + kernel_size=self.batch_size + ) + + output_data_container = ListDataContainer.empty() + for data_container_batch in data_container_batches: + output_data_container.concat( + step._transform_core(data_container_batch) + ) + + return output_data_container + + def join_fit_transform(self, step: BaseStep, data_container: DataContainer) -> \ + Tuple['Any', Iterable]: + """ + Concatenate the pipeline fit transform output of each batch of self.batch_size together. + + :param step: pipeline to fit transform on + :param data_container: data container to fit transform on + :return: fitted self, transformed data inputs + """ + data_container_batches = data_container.convolved_1d( + stride=self.batch_size, + kernel_size=self.batch_size + ) + + output_data_container = ListDataContainer.empty() + for data_container_batch in data_container_batches: + step, data_container_batch = step._fit_transform_core(data_container_batch) + output_data_container.concat( + data_container_batch + ) + + return step, output_data_container diff --git a/neuraxle/pipeline.py b/neuraxle/pipeline.py index 2167f69a..c1b03726 100644 --- a/neuraxle/pipeline.py +++ b/neuraxle/pipeline.py @@ -29,7 +29,7 @@ from joblib import load, dump from neuraxle.base import BaseStep, TruncableSteps, NamedTupleList, ResumableStepMixin, DataContainer, NonFittableMixin, \ - NonTransformableMixin + NonTransformableMixin, Barrier from neuraxle.checkpoints import BaseCheckpointStep BARRIER_STEP_NAME = 'Barrier' @@ -417,7 +417,7 @@ def _load_checkpoint(self, data_container: DataContainer) -> Tuple[NamedTupleLis ): return self.steps_as_tuple, data_container - self._load_saved_pipeline_steps_before_index( + self._replace_pipeline_steps_before_index_by_other_pipeline_steps_before_index( saved_pipeline=saved_pipeline, index=new_starting_step_index ) @@ -492,56 +492,6 @@ def should_resume(self, data_container: DataContainer) -> bool: return False -""" -Idea for checkpoints : - - The streaming pipeline algorithm could go find the optional checkpoint step for each sub pipeline. - In the future, a ResumableStreamingPipeline that extends this class should exist to support checkpoints. - - The Barrier should ideally join the data so that the ram does not blow up (iterable, lazy loading, cache ??) - Maybe we can implement a LazyLoadingDataContainer/CachedDataContainer class or something that could be returned. - - pipeline = Pipeline([ - MiniBatchSequentialPipeline([ - - A(), - B(), - Barrier(joiner=Joiner()), - [Checkpoint()] - - C(), - D(), - Barrier(joiner=Joiner()), - [Checkpoint()] - - ]), - Model() - ]) - - pipeline = Pipeline([ - MiniBatchSequentialPipeline([ - ParallelWrapper([ - NonFittableA(), - NonFittableB(), - ]) - Barrier(joiner=Joiner()), - [Checkpoint()] - - C(), - D(), - Barrier(joiner=Joiner()), - [Checkpoint()] - - ]), - Model() - ]) -""" - - -class Barrier(NonFittableMixin, NonTransformableMixin, BaseStep): - pass - - class MiniBatchSequentialPipeline(NonFittableMixin, Pipeline): """ Mini Batch Sequential Pipeline class to create a pipeline processing data inputs in batch. @@ -600,7 +550,13 @@ def handle_transform(self, data_container: DataContainer) -> DataContainer: sub_pipelines = self._create_sub_pipelines() for sub_pipeline in sub_pipelines: - data_container = self._handle_transform_sub_pipeline(sub_pipeline, data_container) + barrier = sub_pipeline[-1] + data_container = barrier.join_transform( + step=sub_pipeline, + data_container=data_container + ) + current_ids = self.hash(data_container.current_ids, self.hyperparams) + data_container.set_current_ids(current_ids) return data_container @@ -615,43 +571,13 @@ def handle_fit_transform(self, data_container: DataContainer) -> \ sub_pipelines = self._create_sub_pipelines() for sub_pipeline in sub_pipelines: - new_self, data_container = self._handle_fit_transform_sub_pipeline(sub_pipeline, data_container) - - return self, data_container - - def _handle_transform_sub_pipeline(self, sub_pipeline, data_container) -> DataContainer: - """ - Transform sub pipeline using join transform. - - :param sub_pipeline: sub pipeline to be used to transform data container - :param data_container: data container to transform - :return: - """ - data_inputs = sub_pipeline.join_transform(data_container.data_inputs) - data_container.set_data_inputs(data_inputs) - - current_ids = self.hash(data_container.current_ids, self.hyperparams, data_inputs) - data_container.set_current_ids(current_ids) - - return data_container - - def _handle_fit_transform_sub_pipeline(self, sub_pipeline, data_container) -> \ - Tuple['MiniBatchSequentialPipeline', DataContainer]: - """ - Fit Transform sub pipeline using join fit transform. - - :param sub_pipeline: sub pipeline to be used to transform data container - :param data_container: data container to fit transform - :return: fitted self, transformed data container - """ - _, data_inputs = sub_pipeline.join_fit_transform( - data_inputs=data_container.data_inputs, - expected_outputs=data_container.expected_outputs - ) - data_container.set_data_inputs(data_inputs) - - current_ids = self.hash(data_container.current_ids, self.hyperparams, data_inputs) - data_container.set_current_ids(current_ids) + barrier = sub_pipeline[-1] + sub_pipeline, data_container = barrier.join_fit_transform( + step=sub_pipeline, + data_container=data_container + ) + current_ids = self.hash(data_container.current_ids, self.hyperparams) + data_container.set_current_ids(current_ids) return self, data_container @@ -661,50 +587,11 @@ def _create_sub_pipelines(self) -> List['MiniBatchSequentialPipeline']: :return: list of sub pipelines """ - sub_pipelines: List[MiniBatchSequentialPipeline] = self.split(BARRIER_STEP_NAME) + sub_pipelines: List[MiniBatchSequentialPipeline] = self.split(Barrier) for sub_pipeline in sub_pipelines: - if not sub_pipeline.ends_with(type(Barrier())): + if not sub_pipeline.ends_with(Barrier): raise Exception( 'At least one Barrier step needs to be at the end of a streaming pipeline.' ) return sub_pipelines - - def join_transform(self, data_inputs: Iterable) -> Iterable: - """ - Concatenate the transform output of each batch of self.batch_size together. - - :param data_inputs: - :return: - """ - outputs = [] - for batch in convolved_1d( - stride=self.batch_size, - iterable=data_inputs, - kernel_size=self.batch_size - ): - batch_outputs = super().transform(batch) - outputs.extend(batch_outputs) # TODO: use a joiner here - - return outputs - - def join_fit_transform(self, data_inputs: Iterable, expected_outputs: Iterable = None) -> \ - Tuple['MiniBatchSequentialPipeline', Iterable]: - """ - Concatenate the fit transform output of each batch of self.batch_size together. - - :param data_inputs: data inputs to fit transform on - :param expected_outputs: expected outputs to fit - :return: fitted self, transformed data inputs - """ - outputs = [] - for batch in convolved_1d( - stride=self.batch_size, - iterable=zip(data_inputs, expected_outputs), - kernel_size=self.batch_size - ): - di_eo_list = list(zip(*batch)) - _, batch_outputs = super().fit_transform(list(di_eo_list[0]), list(di_eo_list[1])) - outputs.extend(batch_outputs) # TODO: use a joiner here - - return self, outputs diff --git a/testing/test_minibatch_sequential_pipeline.py b/testing/test_minibatch_sequential_pipeline.py index f8421aa3..e4e4b32d 100644 --- a/testing/test_minibatch_sequential_pipeline.py +++ b/testing/test_minibatch_sequential_pipeline.py @@ -1,5 +1,6 @@ import numpy as np +from neuraxle.base import Joiner from neuraxle.pipeline import MiniBatchSequentialPipeline, Barrier from neuraxle.steps.util import TransformCallbackStep, TapeCallbackFunction, FitTransformCallbackStep @@ -20,10 +21,10 @@ def test_mini_batch_sequential_pipeline_should_transform_steps_sequentially_for_ p = MiniBatchSequentialPipeline([ MultiplyBy2TransformCallbackStep(tape1, ["1"]), MultiplyBy2TransformCallbackStep(tape2, ["2"]), - Barrier(), + Joiner(batch_size=10), MultiplyBy2TransformCallbackStep(tape3, ["3"]), MultiplyBy2TransformCallbackStep(tape4, ["4"]), - Barrier() + Joiner(batch_size=10) ], batch_size=10) # When @@ -65,10 +66,10 @@ def test_mini_batch_sequential_pipeline_should_fit_transform_steps_sequentially_ p = MiniBatchSequentialPipeline([ MultiplyBy2FitTransformCallbackStep(tape1, tape1_fit, ["1"]), MultiplyBy2FitTransformCallbackStep(tape2, tape2_fit, ["2"]), - Barrier(), + Joiner(batch_size=10), MultiplyBy2FitTransformCallbackStep(tape3, tape3_fit, ["3"]), MultiplyBy2FitTransformCallbackStep(tape4, tape4_fit, ["4"]), - Barrier() + Joiner(batch_size=10) ], batch_size=10) # When diff --git a/testing/test_truncable_steps.py b/testing/test_truncable_steps.py index 469c3003..8a16a440 100644 --- a/testing/test_truncable_steps.py +++ b/testing/test_truncable_steps.py @@ -25,7 +25,7 @@ def test_truncable_steps_should_split_by_type(): SomeStep(), ]) - sub_pipelines = pipeline.split('SomeSplitStep') + sub_pipelines = pipeline.split(SomeSplitStep) assert 'SomeStep' in sub_pipelines[0] assert 'SomeStep1' in sub_pipelines[0] From bae18e9267a2672c2a29ba0e7ee0e0f891f907d3 Mon Sep 17 00:00:00 2001 From: alexbrillant Date: Wed, 2 Oct 2019 16:20:55 -0400 Subject: [PATCH 09/13] Fit Subpipeline Steps properly inside MiniBatchSequentialPipeline --- neuraxle/pipeline.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/neuraxle/pipeline.py b/neuraxle/pipeline.py index c1b03726..7a0309c6 100644 --- a/neuraxle/pipeline.py +++ b/neuraxle/pipeline.py @@ -23,13 +23,12 @@ import os from abc import ABC, abstractmethod from copy import copy -from typing import Any, Tuple, List, Iterable +from typing import Any, Tuple, List -from conv import convolved_1d from joblib import load, dump from neuraxle.base import BaseStep, TruncableSteps, NamedTupleList, ResumableStepMixin, DataContainer, NonFittableMixin, \ - NonTransformableMixin, Barrier + Barrier from neuraxle.checkpoints import BaseCheckpointStep BARRIER_STEP_NAME = 'Barrier' @@ -569,6 +568,7 @@ def handle_fit_transform(self, data_container: DataContainer) -> \ :return: data container """ sub_pipelines = self._create_sub_pipelines() + index_start = 0 for sub_pipeline in sub_pipelines: barrier = sub_pipeline[-1] @@ -579,6 +579,13 @@ def handle_fit_transform(self, data_container: DataContainer) -> \ current_ids = self.hash(data_container.current_ids, self.hyperparams) data_container.set_current_ids(current_ids) + new_self = self[:index_start] + sub_pipeline + if index_start + len(sub_pipeline) < len(self): + new_self += self[index_start + len(sub_pipeline):] + + self.steps_as_tuple = new_self.steps_as_tuple + index_start += len(sub_pipeline) + return self, data_container def _create_sub_pipelines(self) -> List['MiniBatchSequentialPipeline']: From db6cf6f0251a7e3e014570c55beea2a68fab04f1 Mon Sep 17 00:00:00 2001 From: alexbrillant Date: Wed, 2 Oct 2019 16:47:12 -0400 Subject: [PATCH 10/13] Remove Dead Code --- neuraxle/base.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/neuraxle/base.py b/neuraxle/base.py index c94d34cb..7412391b 100644 --- a/neuraxle/base.py +++ b/neuraxle/base.py @@ -581,9 +581,6 @@ class NonFittableMixin: Note: fit methods are not implemented""" - def handle_fit_transform(self, data_container: DataContainer): - return self, self.handle_transform(data_container) - def fit(self, data_inputs, expected_outputs=None) -> 'NonFittableMixin': """ Don't fit. From a7f4a9857f58952d2302100c804b9b3b5e215192 Mon Sep 17 00:00:00 2001 From: alexbrillant Date: Wed, 2 Oct 2019 16:49:54 -0400 Subject: [PATCH 11/13] Remove Dead Code --- neuraxle/pipeline.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/neuraxle/pipeline.py b/neuraxle/pipeline.py index 7a0309c6..c2fe0261 100644 --- a/neuraxle/pipeline.py +++ b/neuraxle/pipeline.py @@ -31,8 +31,6 @@ Barrier from neuraxle.checkpoints import BaseCheckpointStep -BARRIER_STEP_NAME = 'Barrier' - DEFAULT_CACHE_FOLDER = 'cache' From 433c9fefffb26ef5b780349052d958cdc9d84e36 Mon Sep 17 00:00:00 2001 From: alexbrillant Date: Wed, 2 Oct 2019 16:55:10 -0400 Subject: [PATCH 12/13] Extract Method To Create Current Id --- neuraxle/pipeline.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/neuraxle/pipeline.py b/neuraxle/pipeline.py index c2fe0261..72cfd378 100644 --- a/neuraxle/pipeline.py +++ b/neuraxle/pipeline.py @@ -505,11 +505,7 @@ def transform(self, data_inputs: Any): """ self.setup() - current_ids = self.hash( - current_ids=None, - hyperparameters=self.hyperparams, - data_inputs=data_inputs - ) + current_ids = self._create_current_ids(data_inputs) data_container = DataContainer(current_ids=current_ids, data_inputs=data_inputs) data_container = self.handle_transform(data_container) @@ -523,11 +519,7 @@ def fit_transform(self, data_inputs, expected_outputs=None) -> ('Pipeline', Any) """ self.setup() - current_ids = self.hash( - current_ids=None, - hyperparameters=self.hyperparams, - data_inputs=data_inputs - ) + current_ids = self._create_current_ids(data_inputs) data_container = DataContainer( current_ids=current_ids, data_inputs=data_inputs, @@ -537,6 +529,14 @@ def fit_transform(self, data_inputs, expected_outputs=None) -> ('Pipeline', Any) return new_self, data_container.data_inputs + def _create_current_ids(self, data_inputs): + current_ids = self.hash( + current_ids=None, + hyperparameters=self.hyperparams, + data_inputs=data_inputs + ) + return current_ids + def handle_transform(self, data_container: DataContainer) -> DataContainer: """ Transform all sub pipelines splitted by the Barrier steps. From 0d53e130e76a47fbcefbd20b074c2bce056f01c1 Mon Sep 17 00:00:00 2001 From: alexbrillant Date: Wed, 2 Oct 2019 17:20:22 -0400 Subject: [PATCH 13/13] Remove Unwanted Rename --- neuraxle/base.py | 2 +- neuraxle/pipeline.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/neuraxle/base.py b/neuraxle/base.py index 7412391b..4c4e40ca 100644 --- a/neuraxle/base.py +++ b/neuraxle/base.py @@ -657,7 +657,7 @@ def are_steps_before_index_the_same(self, other: 'TruncableSteps', index: int): return True - def _replace_pipeline_steps_before_index_by_other_pipeline_steps_before_index(self, saved_pipeline: 'TruncableSteps', index: int): + def _load_saved_pipeline_steps_before_index(self, saved_pipeline: 'TruncableSteps', index: int): """ Load the cached pipeline steps before the index into the current steps diff --git a/neuraxle/pipeline.py b/neuraxle/pipeline.py index 72cfd378..92592c51 100644 --- a/neuraxle/pipeline.py +++ b/neuraxle/pipeline.py @@ -414,7 +414,7 @@ def _load_checkpoint(self, data_container: DataContainer) -> Tuple[NamedTupleLis ): return self.steps_as_tuple, data_container - self._replace_pipeline_steps_before_index_by_other_pipeline_steps_before_index( + self._load_saved_pipeline_steps_before_index( saved_pipeline=saved_pipeline, index=new_starting_step_index )