Skip to content
This repository has been archived by the owner on Jan 9, 2024. It is now read-only.

Commit

Permalink
Merge pull request #147 from georgianpartners/parallelprocess_seriali…
Browse files Browse the repository at this point in the history
…zation

Parallelprocess serialization
  • Loading branch information
jzhang-gp committed Aug 28, 2019
2 parents d5153a2 + 06b96d5 commit 9b10ef8
Show file tree
Hide file tree
Showing 10 changed files with 421 additions and 52 deletions.
3 changes: 2 additions & 1 deletion foreshadow/intents/base.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
"""Base Intent for all intent definitions."""

from foreshadow.base import BaseEstimator, TransformerMixin
from foreshadow.serializers import ConcreteSerializerMixin


class BaseIntent(BaseEstimator, TransformerMixin):
class BaseIntent(BaseEstimator, TransformerMixin, ConcreteSerializerMixin):
"""Base for all intent definitions.
For each intent subclass a class attribute called `confidence_computation`
Expand Down
83 changes: 81 additions & 2 deletions foreshadow/parallelprocessor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Foreshadow extension of feature union for handling dataframes."""

import inspect

import pandas as pd
from sklearn.externals.joblib import Parallel, delayed
from sklearn.pipeline import (
Expand All @@ -10,11 +12,14 @@
)

from foreshadow.base import BaseEstimator
from foreshadow.utils.common import ConfigureColumnSharerMixin

from .serializers import ConcreteSerializerMixin
from .serializers import PipelineSerializerMixin, _make_serializable


class ParallelProcessor(FeatureUnion, ConcreteSerializerMixin):
class ParallelProcessor(
FeatureUnion, PipelineSerializerMixin, ConfigureColumnSharerMixin
):
"""Class to support parallel operation on dataframes.
This class functions similarly to a FeatureUnion except it divides a given
Expand Down Expand Up @@ -61,6 +66,80 @@ def __init__(
transformer_list, n_jobs, transformer_weights
)

def dict_serialize(self, deep=True):
"""Serialize the selected params of parallel_process.
Args:
deep (bool): see super
Returns:
dict: parallel_process serialized in customized form.
"""
params = self.get_params(deep=deep)
selected_params = self.__create_selected_params(params)

return _make_serializable(
selected_params, serialize_args=self.serialize_params
)

def configure_column_sharer(self, column_sharer):
"""Configure column sharer in each dynamic pipeline of the transformer_list.
Args:
column_sharer: a column_sharer instance
"""
for transformer_triple in self.transformer_list:
dynamic_pipeline = transformer_triple[1]
for step in dynamic_pipeline.steps:
step[1].column_sharer = column_sharer

def __create_selected_params(self, params):
"""Select only the params in the init signature.
Args:
params: params returned from the get_params method.
Returns:
dict: params that are in the init method signature.
"""
init_params = inspect.signature(self.__init__).parameters
selected_params = {
name: params.pop(name) for name in init_params if name != "self"
}
selected_params["transformer_list"] = self.__convert_transformer_list(
selected_params["transformer_list"]
)
return selected_params

@staticmethod
def __convert_transformer_list(transformer_list):
"""Convert the transformer list into a desired form.
Initially the transformer list has a form of
[("group_num", dynamic_pipeline, ["col1", "col2", ...]), ...].
We convert it into a form of
[{"col1,col2,col3,...": dynamic_pipeline}, ...].
Args:
transformer_list: the transformer list in the parallel_processor
Returns:
list: converted transformer list.
"""
result = []
for transformer_triple in transformer_list:
converted = {}
column_groups = transformer_triple[2]
dynamic_pipeline = transformer_triple[1]
converted[",".join(column_groups)] = dynamic_pipeline
result.append(converted)
return result

def get_params(self, deep=True):
"""Return parameters of internal transformers.
Expand Down
1 change: 1 addition & 0 deletions foreshadow/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""A serializable form of sklearn pipelines."""

import inspect
import re

Expand Down
120 changes: 116 additions & 4 deletions foreshadow/preparer.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
"""Data preparation and foreshadow pipeline."""

import inspect

from sklearn.pipeline import Pipeline

from foreshadow.pipeline import PipelineSerializerMixin
from foreshadow.serializers import (
PipelineSerializerMixin,
_make_deserializable,
_make_serializable,
)
from foreshadow.steps import (
CleanerMapper,
FeatureEngineererMapper,
FeatureReducerMapper,
IntentMapper,
Preprocessor,
)
from foreshadow.utils.common import ConfigureColumnSharerMixin

from .concrete import NoTransform

Expand Down Expand Up @@ -42,8 +49,10 @@ def _none_to_dict(name, val, column_sharer=None):
return val


class DataPreparer(Pipeline, PipelineSerializerMixin):
"""Predefined pipeline for foreshadow workflow. This Pipeline has 5 steps.
class DataPreparer(
Pipeline, PipelineSerializerMixin, ConfigureColumnSharerMixin
):
"""Predefined pipeline for the foreshadow workflow.
1. Cleaning
2. Intent selection (data type, one of Categorical, Numerical, and Text)
Expand Down Expand Up @@ -72,7 +81,7 @@ def __init__(
preprocessor_kwargs=None,
reducer_kwargs=None,
y_var=None,
**kwargs,
**kwargs
):
cleaner_kwargs_ = _none_to_dict(
"cleaner_kwargs", cleaner_kwargs, column_sharer
Expand Down Expand Up @@ -117,3 +126,106 @@ def _get_params(self, attr, deep=True):
out.update({"steps": steps}) # manually
# adding steps to the get_params()
return out

def dict_serialize(self, deep=True):
"""Serialize the data preparer.
Args:
deep: see super.
Returns:
dict: serialized data preparer.
"""
params = self.get_params(deep=deep)
selected_params = self.__create_selected_params(params)
serialized = _make_serializable(
selected_params, serialize_args=self.serialize_params
)
column_sharer_serialized = serialized.pop("column_sharer", None)
serialized = self.__remove_key_from(serialized, target="column_sharer")
# Add back the column_sharer in the end only once.
serialized["column_sharer"] = column_sharer_serialized
steps = serialized["steps"]
steps_reformatted = [{step[0]: step[1]} for step in steps]
serialized["steps"] = steps_reformatted
return serialized

@classmethod
def dict_deserialize(cls, data):
"""Deserialize the data preparer.
Args:
data: serialized data preparer in JSON format.
Returns:
a reconstructed data preparer.
"""
params = _make_deserializable(data)
params["steps"] = [list(step.items())[0] for step in params["steps"]]
deserialized = cls(**params)

deserialized.configure_column_sharer(deserialized.column_sharer)

return deserialized

def configure_column_sharer(self, column_sharer):
"""Configure column sharer for all the underlying components recursively.
Args:
column_sharer: the column sharer instance.
"""
for step in self.steps:
step[1].configure_column_sharer(column_sharer)

def __create_selected_params(self, params):
"""Extract params in the init method signature plus the steps.
Args:
params: params returned from get_params
Returns:
dict: selected params
"""
init_params = inspect.signature(self.__init__).parameters
selected_params = {
name: params.pop(name)
for name in init_params
if name not in ["self", "kwargs"]
}
selected_params["steps"] = params.pop("steps")
return selected_params

def __remove_key_from(self, data, target="column_sharer"):
"""Remove all column sharer block recursively from serialized data preparer.
Only the column sharer in the data preparer is preserved.
Args:
data: serialized data preparer (raw)
target: string that should match as a suffix of a key
Returns:
dict: a cleaned up serialized data preparer
"""
if isinstance(data, dict):
matching_keys = [key for key in data if key.endswith(target)]
for mk in matching_keys:
del data[mk]
data = {
key: self.__remove_key_from(data[key], target=target)
for key in data
}
elif isinstance(data, list):
data = [
self.__remove_key_from(item, target=target) for item in data
]
return data


if __name__ == "__main__":
pass
61 changes: 45 additions & 16 deletions foreshadow/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@ def _make_serializable(data, serialize_args={}):
return data
except TypeError:
if isinstance(data, dict):
return {
result = {
k: _make_serializable(v, serialize_args=serialize_args)
for k, v in data.items()
}
elif hasattr(data, "__next__"):
return [
# elif hasattr(data, "__iter__"): # I don't think __next__ is correct
elif isinstance(data, (list, tuple)):
result = [
_make_serializable(v, serialize_args=serialize_args)
for v in data
]
Expand All @@ -52,9 +53,11 @@ def _make_serializable(data, serialize_args={}):
# serialize it using the same args that were passed into the top
# level serialize method
if hasattr(data, "serialize"):
return data.serialize(**serialize_args)
result = data.serialize(**serialize_args)
else:
return _pickler.flatten(data)
result = _pickler.flatten(data)

return result


def _make_deserializable(data):
Expand All @@ -71,8 +74,9 @@ def _make_deserializable(data):
if isinstance(data, dict):
if any("py/" in s for s in data.keys()):
return _unpickler.restore(data)
if any("method" in s for s in data.keys()):
return _obj_deserializer_helper(data)
if any("_method" == s for s in data.keys()):
# TODO add test, watch out for keys like 'hash_method'
return deserialize(data)
else:
new_data = {}
for k, v in data.items():
Expand Down Expand Up @@ -271,8 +275,9 @@ def dict_serialize(self, deep=True):
dict: The initialization parameters of the transformer.
"""
to_serialize = self.get_params(deep)
return _make_serializable(
self.get_params(deep), serialize_args=self.serialize_params
to_serialize, serialize_args=self.serialize_params
)

@classmethod
Expand Down Expand Up @@ -440,21 +445,45 @@ class PipelineSerializerMixin(ConcreteSerializerMixin):
"""An custom serialization method to allow pipelines serialization."""

def dict_serialize(self, deep=False):
"""Serialize the init parameters (dictionary form) of a pipeline.
"""Serialize a pipeline by serializing selected fields.
Note:
This recursively serializes the individual steps to facilitate a
human readable form.
Steps in the pipeline are reformatted as {"step_name": step}
Args:
deep (bool): If True, will return the parameters for this estimator
recursively
deep: see super
Returns:
dict: The initialization parameters of the pipeline.
dict: serialized pipeline
"""
return super().dict_serialize(deep=deep)
to_serialize = {}
all_params = self.get_params(deep=deep)
to_serialize["memory"] = all_params.pop("memory", None)
to_serialize["steps"] = all_params.pop("steps")
serialized = _make_serializable(
to_serialize, serialize_args=self.serialize_params
)
serialized["steps"] = [
{step[0]: step[1]} for step in serialized["steps"]
]
return serialized

@classmethod
def dict_deserialize(cls, data):
"""Deserialize pipeline from JSON.
Steps in the pipeline are reformatted to [("step_name": step), ...]
Args:
data: the serailzied pipeline.
Returns:
a reconstructed pipeline
"""
params = _make_deserializable(data)
params["steps"] = [list(step.items())[0] for step in params["steps"]]
return cls(**params)


def deserialize(data):
Expand Down
2 changes: 2 additions & 0 deletions foreshadow/smart/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
TextEncoder,
)
from foreshadow.smart.cleaner import Cleaner # noqa: F401
from foreshadow.smart.feature_engineerer import FeatureEngineerer # noqa: F401
from foreshadow.smart.feature_reducer import FeatureReducer
from foreshadow.smart.flatten import Flatten # noqa: F401
from foreshadow.smart.intentresolver import IntentResolver
Expand All @@ -26,4 +27,5 @@
"Cleaner",
"IntentResolver",
"FeatureReducer",
"FeatureEngineerer",
]

0 comments on commit 9b10ef8

Please sign in to comment.