Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MiniBatchSequentialPipeline & Joiner previously called Streaming pipeline #87

Merged
merged 14 commits into from Oct 3, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
34 changes: 34 additions & 0 deletions neuraxle/base.py
Expand Up @@ -561,6 +561,9 @@ class NonFittableMixin:

Note: fit methods are not implemented"""

def handle_fit_transform(self, data_container: DataContainer):
return self, self.handle_transform(data_container)

def fit(self, data_inputs, expected_outputs=None) -> 'NonFittableMixin':
"""
Don't fit.
Expand Down Expand Up @@ -862,12 +865,17 @@ def __getitem__(self, key):
raise KeyError(
"Start or stop ('{}' or '{}') not found in '{}'.".format(start, stop, self.steps.keys()))
for key, val in self.steps_as_tuple:
if start == stop == key:
new_steps_as_tuple.append((key, val))

if stop == key:
break

if not started and start == key:
started = True
if started:
new_steps_as_tuple.append((key, val))

self_shallow_copy.steps_as_tuple = new_steps_as_tuple
self_shallow_copy.steps = OrderedDict(new_steps_as_tuple)
return self_shallow_copy
Expand Down Expand Up @@ -939,6 +947,32 @@ def __len__(self):
"""
return len(self.steps_as_tuple)

def split(self, type_name_to_split_from: str) -> List['TruncableSteps']:
"""
Split truncable steps by a step class name.

:param type_name_to_split_from: step class name to split from.
:return: list of truncable steps containing the splitted steps
"""
sub_pipelines = []

previous_sub_pipeline_end_index = 0
for index, (step_name, step) in enumerate(self.items()):
if step.__class__.__name__ == type_name_to_split_from:
sub_pipelines.append(
self[previous_sub_pipeline_end_index:index + 1]
)
previous_sub_pipeline_end_index = index + 1

sub_pipelines.append(
self[previous_sub_pipeline_end_index:-1]
)

return sub_pipelines

def ends_with_type_name(self, type_name: str):
return self[-1].__class__.__name__ == type_name
alexbrillant marked this conversation as resolved.
Show resolved Hide resolved


class ResumableStepMixin:
alexbrillant marked this conversation as resolved.
Show resolved Hide resolved
"""
Expand Down
228 changes: 223 additions & 5 deletions neuraxle/pipeline.py
Expand Up @@ -23,13 +23,17 @@
import os
from abc import ABC, abstractmethod
from copy import copy
from typing import Any, Tuple
from typing import Any, Tuple, List, Iterable

from conv import convolved_1d
from joblib import load, dump

from neuraxle.base import BaseStep, TruncableSteps, NamedTupleList, ResumableStepMixin, DataContainer
from neuraxle.base import BaseStep, TruncableSteps, NamedTupleList, ResumableStepMixin, DataContainer, NonFittableMixin, \
NonTransformableMixin
from neuraxle.checkpoints import BaseCheckpointStep

BARRIER_STEP_NAME = 'Barrier'
alexbrillant marked this conversation as resolved.
Show resolved Hide resolved

DEFAULT_CACHE_FOLDER = 'cache'


Expand Down Expand Up @@ -215,7 +219,7 @@ def transform(self, data_inputs: Any):
:param data_inputs: the data input to transform
:return: transformed data inputs
"""
self.setup() # TODO: perhaps, remove this to pass path in context
self.setup() # TODO: perhaps, remove this to pass path in context

current_ids = self.hash(
current_ids=None,
Expand All @@ -235,7 +239,7 @@ def fit_transform(self, data_inputs, expected_outputs=None) -> ('Pipeline', Any)
:param expected_outputs: the expected data output to fit on
:return: the pipeline itself
"""
self.setup() # TODO: perhaps, remove this to pass path in context
self.setup() # TODO: perhaps, remove this to pass path in context

current_ids = self.hash(
current_ids=None,
Expand All @@ -259,7 +263,7 @@ def fit(self, data_inputs, expected_outputs=None) -> 'Pipeline':
:param expected_outputs: the expected data output to fit on
:return: the pipeline itself
"""
self.setup() # TODO: perhaps, remove this to pass path in context
self.setup() # TODO: perhaps, remove this to pass path in context

current_ids = self.hash(
current_ids=None,
Expand Down Expand Up @@ -486,3 +490,217 @@ def should_resume(self, data_container: DataContainer) -> bool:
return True

return False


"""
Idea for checkpoints :

The streaming pipeline algorithm could go find the optional checkpoint step for each sub pipeline.
In the future, a ResumableStreamingPipeline that extends this class should exist to support checkpoints.

The Barrier should ideally join the data so that the ram does not blow up (iterable, lazy loading, cache ??)
Maybe we can implement a LazyLoadingDataContainer/CachedDataContainer class or something that could be returned.
alexbrillant marked this conversation as resolved.
Show resolved Hide resolved

pipeline = Pipeline([
MiniBatchSequentialPipeline([

A(),
B(),
Barrier(joiner=Joiner()),
alexbrillant marked this conversation as resolved.
Show resolved Hide resolved
[Checkpoint()]
alexbrillant marked this conversation as resolved.
Show resolved Hide resolved

C(),
D(),
Barrier(joiner=Joiner()),
[Checkpoint()]

]),
Model()
])

pipeline = Pipeline([
MiniBatchSequentialPipeline([
ParallelWrapper([
NonFittableA(),
NonFittableB(),
])
Barrier(joiner=Joiner()),
[Checkpoint()]

C(),
D(),
Barrier(joiner=Joiner()),
[Checkpoint()]

]),
Model()
])
"""


class Barrier(NonFittableMixin, NonTransformableMixin, BaseStep):
alexbrillant marked this conversation as resolved.
Show resolved Hide resolved
pass


class MiniBatchSequentialPipeline(NonFittableMixin, Pipeline):
"""
Streaming Pipeline class to create a pipeline for streaming, and batch processing.
"""

def __init__(self, steps: NamedTupleList, batch_size):
Pipeline.__init__(self, steps)
self.batch_size = batch_size

def transform(self, data_inputs: Any):
"""
:param data_inputs: the data input to transform
:return: transformed data inputs
"""
self.setup()

current_ids = self.hash(
current_ids=None,
hyperparameters=self.hyperparams,
data_inputs=data_inputs
)
alexbrillant marked this conversation as resolved.
Show resolved Hide resolved
data_container = DataContainer(current_ids=current_ids, data_inputs=data_inputs)
data_container = self.handle_transform(data_container)

return data_container.data_inputs

def fit_transform(self, data_inputs, expected_outputs=None) -> ('Pipeline', Any):
"""
:param data_inputs: the data input to fit on
:param expected_outputs: the expected data output to fit on
:return: the pipeline itself
"""
self.setup()

current_ids = self.hash(
current_ids=None,
hyperparameters=self.hyperparams,
data_inputs=data_inputs
)
data_container = DataContainer(
current_ids=current_ids,
data_inputs=data_inputs,
expected_outputs=expected_outputs
)
new_self, data_container = self.handle_transform(data_container)

return new_self, data_container.data_inputs

def handle_transform(self, data_container: DataContainer) -> DataContainer:
"""
Transform all sub pipelines splitted by the Barrier steps.

:param data_container: data container to transform.
:return: data container
"""
sub_pipelines = self._create_sub_pipelines()

for sub_pipeline in sub_pipelines:
data_container = self._handle_transform_sub_pipeline(sub_pipeline, data_container)

return data_container

def handle_fit_transform(self, data_container: DataContainer) -> \
Tuple['MiniBatchSequentialPipeline', DataContainer]:
"""
Transform all sub pipelines splitted by the Barrier steps.

:param data_container: data container to transform.
:return: data container
"""
sub_pipelines = self._create_sub_pipelines()

for sub_pipeline in sub_pipelines:
new_self, data_container = self._handle_fit_transform_sub_pipeline(sub_pipeline, data_container)

return self, data_container

def _handle_transform_sub_pipeline(self, sub_pipeline, data_container) -> DataContainer:
"""
Transform sub pipeline using join transform.
alexbrillant marked this conversation as resolved.
Show resolved Hide resolved

:param sub_pipeline: sub pipeline to be used to transform data container
:param data_container: data container to transform
:return:
"""
data_inputs = sub_pipeline.join_transform(data_container.data_inputs)
data_container.set_data_inputs(data_inputs)

current_ids = self.hash(data_container.current_ids, self.hyperparams, data_inputs)
data_container.set_current_ids(current_ids)
alexbrillant marked this conversation as resolved.
Show resolved Hide resolved

return data_container

def _handle_fit_transform_sub_pipeline(self, sub_pipeline, data_container) -> \
Tuple['MiniBatchSequentialPipeline', DataContainer]:
"""
Fit Transform sub pipeline using join fit transform.

:param sub_pipeline: sub pipeline to be used to transform data container
:param data_container: data container to fit transform
:return: fitted self, transformed data container
"""
_, data_inputs = sub_pipeline.join_fit_transform(
data_inputs=data_container.data_inputs,
expected_outputs=data_container.expected_outputs
)
data_container.set_data_inputs(data_inputs)

current_ids = self.hash(data_container.current_ids, self.hyperparams, data_inputs)
data_container.set_current_ids(current_ids)

return self, data_container

def _create_sub_pipelines(self) -> List['MiniBatchSequentialPipeline']:
"""
Create sub pipelines by splitting the steps by the join type name.

:return: list of sub pipelines
"""
sub_pipelines: List[MiniBatchSequentialPipeline] = self.split(BARRIER_STEP_NAME)
for sub_pipeline in sub_pipelines:
if not sub_pipeline.ends_with_type_name(BARRIER_STEP_NAME):
alexbrillant marked this conversation as resolved.
Show resolved Hide resolved
raise Exception(
'At least one Barrier step needs to be at the end of a streaming pipeline. '.format(
alexbrillant marked this conversation as resolved.
Show resolved Hide resolved
self.join_type_name)
)

return sub_pipelines

def join_transform(self, data_inputs: Iterable) -> Iterable:
"""
Concatenate the transform output of each batch of self.batch_size together.

:param data_inputs:
:return:
"""
outputs = []
for batch in convolved_1d(
iterable=data_inputs,
kernel_size=self.batch_size
):
batch_outputs = super().transform(batch)
outputs.extend(batch_outputs) # TODO: use a joiner here

return outputs

def join_fit_transform(self, data_inputs: Iterable) -> Tuple['MiniBatchSequentialPipeline', Iterable]:
"""
Concatenate the fit transform output of each batch of self.batch_size together.

:param data_inputs:
:return: fitted self, transformed data inputs
"""
outputs = []
for batch in convolved_1d(
iterable=data_inputs,
kernel_size=self.batch_size
):
_, batch_outputs = super().fit_transform(batch)
outputs.extend(batch_outputs) # TODO: use a joiner here
alexbrillant marked this conversation as resolved.
Show resolved Hide resolved

return self, outputs
1 change: 1 addition & 0 deletions requirements.txt
Expand Up @@ -6,3 +6,4 @@ scikit-learn>=0.20.3
joblib>=0.13.2
flask>=1.1.1
flask-restful
conv
49 changes: 49 additions & 0 deletions testing/test_minibatch_sequential_pipeline.py
@@ -0,0 +1,49 @@
from neuraxle.pipeline import MiniBatchSequentialPipeline, Barrier
from testing.test_pipeline import SomeStep


def test_mini_batch_sequential_pipeline_should_transform_steps_sequentially_for_each_barrier_for_each_batch():
p = MiniBatchSequentialPipeline([
SomeStep(),
SomeStep(),
Barrier(),
alexbrillant marked this conversation as resolved.
Show resolved Hide resolved
SomeStep(),
SomeStep(),
Barrier()
], batch_size=10)

outputs = p.transform(range(100))

# assert steps have received the right batches
# assert steps have transformed the data correctly


def test_mini_batch_sequential_pipeline_should_fit_transform_steps_sequentially_for_each_barrier_for_each_batch():
p = MiniBatchSequentialPipeline([
SomeStep(),
SomeStep(),
Barrier(),
SomeStep(),
SomeStep(),
Barrier()
], batch_size=10)

outputs = p.transform(range(100))

# assert steps have received the right batches
# assert steps have transformed the data correctly
# assert steps have been fitted with the right batches in the right order


def test_mini_batch_sequential_pipeline_joiner():
alexbrillant marked this conversation as resolved.
Show resolved Hide resolved
p = MiniBatchSequentialPipeline([
SomeStep(),
SomeStep(),
Barrier(),
SomeStep(),
SomeStep(),
Barrier()
], batch_size=10)

# TODO: joiner ????????