Skip to content
This repository has been archived by the owner on Jan 9, 2024. It is now read-only.

Commit

Permalink
Fixing bugs in code on missing cache_manager in y_preparer and enable… (
Browse files Browse the repository at this point in the history
#175)

* Fixing bugs in code on missing cache_manager in y_preparer and enable multiprocessing option in both CLI and API
  • Loading branch information
jzhang-gp committed Nov 21, 2019
1 parent c864045 commit 2a77c84
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 84 deletions.
1 change: 1 addition & 0 deletions foreshadow/cachemanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def __init__(self, *args, **kwargs):
"metastat": True,
"graph": True,
"override": True,
"config": True,
}
self.__acceptable_keys = PrettyDefaultDict(get_false, acceptable_keys)

Expand Down
4 changes: 1 addition & 3 deletions foreshadow/console.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,7 @@ def generate_model(args): # noqa: C901
raise ValueError("Invalid Level. Only levels 1 and 3 supported.")

if cargs.multiprocess:
# TODO reconsider this implementation as it will not work if
# foreshadow is used as a library/API.
config.set_multiprocess(True)
fs.configure_multiprocessing(-1)
logging.info("multiprocessing enabled.")

return fs, X_train, y_train, X_test, y_test
Expand Down
21 changes: 18 additions & 3 deletions foreshadow/foreshadow.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@
ConcreteSerializerMixin,
_make_deserializable,
)

from foreshadow.utils import Override, ProblemType, check_df, get_transformer
from foreshadow.utils import (
ConfigKey,
Override,
ProblemType,
check_df,
get_transformer,
)


class Foreshadow(BaseEstimator, ConcreteSerializerMixin):
Expand Down Expand Up @@ -150,7 +155,8 @@ def y_preparer(self, yp):
raise ValueError("Invalid value passed as y_preparer")
else:
self._y_preprocessor = DataPreparer(
cache_manager=CacheManager(), y_var=True,
cache_manager=CacheManager(),
y_var=True,
problem_type=self.problem_type,
)

Expand Down Expand Up @@ -540,3 +546,12 @@ def override_intent(self, column_name: str, intent: str) -> NoReturn:
"_".join([Override.INTENT, column_name])
] = intent
self.X_preparer.cache_manager["intent"][column_name] = intent

def configure_multiprocessing(self, n_job: int = 1) -> NoReturn:
"""Configure the multiprocessing option.
Args:
n_job: the number of processes to run the job.
"""
self.X_preparer.cache_manager["config"][ConfigKey.N_JOBS] = n_job
14 changes: 11 additions & 3 deletions foreshadow/preparer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
IntentMapper,
Preprocessor,
)

from foreshadow.utils import ConfigureCacheManagerMixin, ProblemType

from .concrete import NoTransform
Expand Down Expand Up @@ -121,7 +120,14 @@ def __init__(
if problem_type == ProblemType.REGRESSION:
steps = [("output", NoTransform())]
elif problem_type == ProblemType.CLASSIFICATION:
steps = [("output", CategoricalEncoder(y_var=True))]
steps = [
(
"output",
CategoricalEncoder(
y_var=True, cache_manager=cache_manager
),
)
]
else:
raise ValueError(
"Invalid Problem " "Type {}".format(problem_type)
Expand Down Expand Up @@ -193,7 +199,9 @@ def configure_cache_manager(self, cache_manager):
"""
for step in self.steps:
if hasattr(step[1], "configure_cache_manager"):
if self.y_var:
step[1].cache_manager = cache_manager
elif hasattr(step[1], "configure_cache_manager"):
step[1].configure_cache_manager(cache_manager)

def __remove_key_from(self, data, target="cache_manager"):
Expand Down
78 changes: 7 additions & 71 deletions foreshadow/steps/preparerstep.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@

from foreshadow.base import BaseEstimator, TransformerMixin
from foreshadow.concrete.internals.notransform import NoTransform
from foreshadow.config import config
from foreshadow.logging import logging
from foreshadow.parallelprocessor import ParallelProcessor
from foreshadow.serializers import _make_deserializable
from foreshadow.utils import ConfigKey
from foreshadow.utils.common import ConfigureCacheManagerMixin

from ..cachemanager import CacheManager
Expand Down Expand Up @@ -168,74 +168,6 @@ def _check_parallelizable_batch(group_process, group_name):
return None


def _batch_parallelize(column_mapping):
"""Batch parallelizes any groups in column_mapping if not parallelized.
_check_parallelizable_batch will parallelize a group of columns across
all steps of transformers if possible. The rest that are left have
interdependencies and so the best we can do is to parallelize each step
across all groups of columns. This helper performs that task and creates a
Pipeline of steps that is parallelized across each group of cols at each
step. This enabled format two of inputs, where columns can be shuffled
around between steps.
Args:
column_mapping: the column_mapping from self.get_mapping()
Returns:
list of steps for Pipeline, all_cols
all_cols is the set of all cols that needs to be passed, as a list.
Raises:
ValueError: number inputs do not equal number of steps.
"""
total_steps = len(column_mapping[0])
steps = [] # each individual step, or dim1, will go in here.
all_cols = set()
for step_number in range(total_steps):
groups = []
for group_name, group_process in column_mapping:
if group_process.step_inputs is not None: # we do not have a
# transformer_list yet for this group.
inputs = column_mapping["inputs"]
steps = column_mapping["steps"]
if len(inputs) != len(steps):
raise ValueError(
"number of inputs: {} does not equal "
"number of steps: {}".format(len(inputs), len(steps))
)
list_of_steps = column_mapping[group_name]
step_for_group = list_of_steps[step_number]
transformer = step_for_group[0]
cols = step_for_group[1]
groups.append((group_name, transformer, cols))
for col in cols:
all_cols.add(col)
transformer_list = [
[
"group: {}, transformer: {}".format(
group_name, transformer.__name__
),
transformer,
cols,
]
for group_name, transformer, cols in groups
] # this is one step parallelized across the columns (dim1
# parallelized across dim2).
steps.append(
(
"step: {}".format(step_number),
ParallelProcessor(
transformer_list,
n_jobs=config.get_n_jobs_config(),
collapse_index=True,
),
)
) # list of steps for final pipeline.
return steps, list(all_cols)


class PreparerStep(
BaseEstimator,
TransformerMixin,
Expand Down Expand Up @@ -292,7 +224,7 @@ def configure_cache_manager(self, cache_manager):
"""
super().configure_cache_manager(cache_manager)
if isinstance(self._parallel_process, ParallelProcessor):
if hasattr(self._parallel_process, "configure_cache_manager"):
self._parallel_process.configure_cache_manager(cache_manager)

def dict_serialize(self, deep=False):
Expand Down Expand Up @@ -460,9 +392,13 @@ def parallelize_smart_steps(self, X):
]
if len(group_transformer_list) == 0:
return NoTransform()

if self.cache_manager["config"][ConfigKey.N_JOBS] is None:
self.cache_manager["config"][ConfigKey.N_JOBS] = 1

return ParallelProcessor(
group_transformer_list,
n_jobs=config.get_n_jobs_config(),
n_jobs=self.cache_manager["config"][ConfigKey.N_JOBS],
collapse_index=True,
)

Expand Down
13 changes: 10 additions & 3 deletions foreshadow/tests/test_foreshadow.py
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,9 @@ def test_foreshadow_serialization_adults_small_classification_override():
X_df, y_df, test_size=0.2
)

shadow = Foreshadow(estimator=LogisticRegression())
shadow = Foreshadow(
estimator=LogisticRegression(), problem_type=ProblemType.CLASSIFICATION
)
shadow.fit(X_train, y_train)
shadow.to_json("foreshadow_adults_small_logistic_regression_1.json")
score1 = shadow.score(X_test, y_test)
Expand Down Expand Up @@ -896,7 +898,9 @@ def test_foreshadow_adults_small_classification_override_upfront():
X_df, y_df, test_size=0.2
)

shadow = Foreshadow(estimator=LogisticRegression())
shadow = Foreshadow(
estimator=LogisticRegression(), problem_type=ProblemType.CLASSIFICATION
)

from foreshadow.intents import IntentType

Expand Down Expand Up @@ -953,7 +957,7 @@ def test_foreshadow_serialization_adults_classification():
assertions.assertAlmostEqual(score1, score2, places=3)


def test_foreshadow_serialization_boston_housing_regression():
def test_foreshadow_serialization_boston_housing_regression_multiprocessing():
from foreshadow.foreshadow import Foreshadow
import pandas as pd
import numpy as np
Expand All @@ -975,12 +979,15 @@ def test_foreshadow_serialization_boston_housing_regression():
estimator=LinearRegression(), problem_type=ProblemType.REGRESSION
)

shadow.configure_multiprocessing(n_job=-1)

shadow.fit(X_train, y_train)
shadow.to_json("foreshadow_boston_housing_linear_regression.json")

shadow2 = Foreshadow.from_json(
"foreshadow_boston_housing_linear_regression.json"
)

shadow2.fit(X_train, y_train)

score1 = shadow.score(X_test, y_test)
Expand Down
3 changes: 2 additions & 1 deletion foreshadow/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
get_config_path,
get_transformer,
)
from foreshadow.utils.constants import EstimatorFamily, ProblemType
from foreshadow.utils.constants import ConfigKey, EstimatorFamily, ProblemType
from foreshadow.utils.data_summary import (
get_outliers,
mode_freq,
Expand Down Expand Up @@ -49,4 +49,5 @@
"ProblemType",
"EstimatorFamily",
"Override",
"ConfigKey",
]
6 changes: 6 additions & 0 deletions foreshadow/utils/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,9 @@ class EstimatorFamily:
SVM = "svm"
RF = "random_forest"
NN = "neural_network"


class ConfigKey:
"""Constants of configuration key in foreshadow."""

N_JOBS = "n_jobs"

0 comments on commit 2a77c84

Please sign in to comment.