Skip to content

Commit

Permalink
Fix/parallelism (#3)
Browse files Browse the repository at this point in the history
* add example with parallel processing of feature pipelines

* fix big validation size

* refactoring of computations manager

* add settings configuration for parallelism settings

* add seq and parallel impls

* add comp manager

* fixed interface for folds computing

* refactoring

* bugfix of deepcopy

* add test for sequential and parallel computations manager

---------

Co-authored-by: fonhorst <fonhorst@alipoov.nb@gmail.com>
  • Loading branch information
fonhorst and fonhorst committed Jun 5, 2023
1 parent 6d81978 commit 3ea26af
Show file tree
Hide file tree
Showing 38 changed files with 1,582 additions and 698 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

# settings and data
cv = 5
feat_pipe = "lgb_adv" # linear, lgb_simple or lgb_adv
feat_pipe = "linear" # linear, lgb_simple or lgb_adv
dataset_name = "lama_test_dataset"
dataset = get_dataset(dataset_name)
df = spark.read.csv(dataset.path, header=True)
Expand All @@ -32,4 +32,4 @@
ds = feature_pipe.fit_transform(ds)

# save processed data
ds.save(f"/tmp/{dataset_name}__{feat_pipe}__features.dataset")
ds.save(f"/tmp/{dataset_name}__{feat_pipe}__features.dataset", save_mode='overwrite')
50 changes: 50 additions & 0 deletions examples/spark/parallel/feature-processing-parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import logging.config

from examples.spark.examples_utils import get_spark_session, get_dataset
from sparklightautoml.computations.parallel import ParallelComputationsManager
from sparklightautoml.pipelines.features.base import SparkFeaturesPipeline
from sparklightautoml.pipelines.features.lgb_pipeline import SparkLGBAdvancedPipeline, SparkLGBSimpleFeatures
from sparklightautoml.pipelines.features.linear_pipeline import SparkLinearFeatures
from sparklightautoml.reader.base import SparkToSparkReader
from sparklightautoml.tasks.base import SparkTask
from sparklightautoml.utils import logging_config, VERBOSE_LOGGING_FORMAT

logging.config.dictConfig(logging_config(level=logging.DEBUG, log_filename='/tmp/slama.log'))
logging.basicConfig(level=logging.DEBUG, format=VERBOSE_LOGGING_FORMAT)
logger = logging.getLogger(__name__)


feature_pipelines = {
"linear": SparkLinearFeatures(),
"lgb_simple": SparkLGBSimpleFeatures(),
"lgb_adv": SparkLGBAdvancedPipeline()
}


if __name__ == "__main__":
spark = get_spark_session()

# settings and data
cv = 5
dataset_name = "lama_test_dataset"
parallelism = 2

dataset = get_dataset(dataset_name)
df = spark.read.csv(dataset.path, header=True)

computations_manager = ParallelComputationsManager(parallelism=parallelism)
task = SparkTask(name=dataset.task_type)
reader = SparkToSparkReader(task=task, cv=cv, advanced_roles=False)

ds = reader.fit_read(train_data=df, roles=dataset.roles)

def build_task(name: str, feature_pipe: SparkFeaturesPipeline):
def func():
logger.info(f"Calculating feature pipeline: {name}")
feature_pipe.fit_transform(ds).data.write.mode('overwrite').format('noop').save()
logger.info(f"Finished calculating pipeline: {name}")
return func

tasks = [build_task(name, feature_pipe) for name, feature_pipe in feature_pipelines.items()]

computations_manager.session(tasks)
90 changes: 90 additions & 0 deletions examples/spark/parallel/mlalgo-folds-parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import logging
from logging import config
from typing import Tuple, Union

import os

from lightautoml.ml_algo.tuning.base import DefaultTuner
from lightautoml.ml_algo.utils import tune_and_fit_predict
from pyspark.sql import functions as sf

from sparklightautoml.computations.parallel import ParallelComputationsManager
from sparklightautoml.dataset.base import SparkDataset
from sparklightautoml.dataset.persistence import PlainCachePersistenceManager
from sparklightautoml.ml_algo.boost_lgbm import SparkBoostLGBM
from sparklightautoml.ml_algo.linear_pyspark import SparkLinearLBFGS
from sparklightautoml.utils import logging_config, VERBOSE_LOGGING_FORMAT, log_exec_timer
from sparklightautoml.validation.iterators import SparkFoldsIterator
from examples.spark.examples_utils import get_spark_session

logging.config.dictConfig(logging_config(level=logging.DEBUG, log_filename='/tmp/slama.log'))
logging.basicConfig(level=logging.DEBUG, format=VERBOSE_LOGGING_FORMAT)
logger = logging.getLogger(__name__)


def train_test_split(dataset: SparkDataset, test_slice_or_fold_num: Union[float, int] = 0.2) \
-> Tuple[SparkDataset, SparkDataset]:

if isinstance(test_slice_or_fold_num, float):
assert 0 <= test_slice_or_fold_num <= 1
train, test = dataset.data.randomSplit([1 - test_slice_or_fold_num, test_slice_or_fold_num])
else:
train = dataset.data.where(sf.col(dataset.folds_column) != test_slice_or_fold_num)
test = dataset.data.where(sf.col(dataset.folds_column) == test_slice_or_fold_num)

train_dataset, test_dataset = dataset.empty(), dataset.empty()
train_dataset.set_data(train, dataset.features, roles=dataset.roles)
test_dataset.set_data(test, dataset.features, roles=dataset.roles)

return train_dataset, test_dataset


if __name__ == "__main__":
spark = get_spark_session()

# available feat_pipe: linear, lgb_simple or lgb_adv
# available ml_algo: linear_l2, lgb
# feat_pipe, ml_algo_name = "linear", "linear_l2"
feat_pipe, ml_algo_name = "lgb_adv", "lgb"
parallelism = 1
dataset_name = os.environ.get("DATASET", "lama_test_dataset")

# load and prepare data
ds = SparkDataset.load(
path=f"/tmp/{dataset_name}__{feat_pipe}__features.dataset",
persistence_manager=PlainCachePersistenceManager()
)
train_ds, test_ds = train_test_split(ds, test_slice_or_fold_num=4)
train_ds, test_ds = train_ds.persist(), test_ds.persist()

# create main entities
computations_manager = ParallelComputationsManager(parallelism=parallelism)
iterator = SparkFoldsIterator(train_ds)#.convert_to_holdout_iterator()
if ml_algo_name == "lgb":
ml_algo = SparkBoostLGBM(experimental_parallel_mode=True, computations_settings=computations_manager)
else:
ml_algo = SparkLinearLBFGS(default_params={'regParam': [1e-5]}, computations_settings=computations_manager)

score = ds.task.get_dataset_metric()

# fit and predict
with log_exec_timer("Model fitting"):
model, oof_preds = tune_and_fit_predict(ml_algo, DefaultTuner(), iterator)
with log_exec_timer("Model inference"):
test_preds = ml_algo.predict(test_ds)

# estimate oof and test metrics
oof_metric_value = score(oof_preds.data.select(
SparkDataset.ID_COLUMN,
sf.col(ds.target_column).alias('target'),
sf.col(ml_algo.prediction_feature).alias('prediction')
))

test_metric_value = score(test_preds.data.select(
SparkDataset.ID_COLUMN,
sf.col(ds.target_column).alias('target'),
sf.col(ml_algo.prediction_feature).alias('prediction')
))

print(f"OOF metric: {oof_metric_value}")
print(f"Test metric: {test_metric_value}")
File renamed without changes.
2 changes: 1 addition & 1 deletion examples/spark/tabular-preset-automl.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def main(spark: SparkSession, dataset_name: str, seed: int):
},
linear_l2_params={'default_params': {'regParam': [1e-5]}},
reader_params={"cv": cv, "advanced_roles": False},
parallelism_mode=("no_parallelism", -1)
computation_settings=("no_parallelism", -1)
)

oof_predictions = automl.fit_predict(
Expand Down
Binary file modified jars/spark-lightautoml_2.12-0.1.1.jar
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,17 @@ object SomeFunctions {

def executors(): java.util.List[java.lang.String] = {
import scala.collection.JavaConverters._
SparkSession.active.sparkContext.env.blockManager.master.getMemoryStatus
.map { case (blockManagerId, _) => blockManagerId}
.filter(_.executorId != "driver")
.map { executor => s"executor_${executor.host}_${executor.executorId}"}
.toList.asJava
if (SparkSession.active.sparkContext.master.startsWith("local[")) {
SparkSession.active.sparkContext.env.blockManager.master.getMemoryStatus
.map { case (blockManagerId, _) => blockManagerId }
.map { executor => s"executor_${executor.host}_${executor.executorId}" }
.toList.asJava
} else {
SparkSession.active.sparkContext.env.blockManager.master.getMemoryStatus
.map { case (blockManagerId, _) => blockManagerId }
.filter(_.executorId != "driver")
.map { executor => s"executor_${executor.host}_${executor.executorId}" }
.toList.asJava
}
}
}
23 changes: 5 additions & 18 deletions sparklightautoml/automl/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
from pyspark.sql.session import SparkSession

from .blend import SparkBlender, SparkBestModelSelector
from ..computations.manager import PoolType, build_named_parallelism_settings, \
build_computations_manager, ComputationsManager
from ..computations.builder import build_computations_manager
from ..computations.base import ComputationsManager, ComputationsSettings
from ..dataset.base import SparkDataset, PersistenceLevel, PersistenceManager
from ..dataset.persistence import PlainCachePersistenceManager
from ..pipelines.base import TransformerInputOutputRoles
Expand All @@ -29,15 +29,11 @@
from ..utils import ColumnsSelectorTransformer, SparkDataFrame
from ..validation.base import SparkBaseTrainValidIterator, mark_as_train, mark_as_val
from ..validation.iterators import SparkFoldsIterator, SparkHoldoutIterator, SparkDummyIterator
from lightautoml.reader.base import RolesDict
from lightautoml.utils.logging import set_stdout_level, verbosity_to_loglevel
from lightautoml.utils.timer import PipelineTimer

logger = logging.getLogger(__name__)

# Either path/full url, or pyspark.sql.DataFrame
ReadableIntoSparkDf = Union[str, SparkDataFrame]
ParallelismMode = Union[Tuple[str, int], Dict[str, Any]]


class SparkAutoML(TransformerInputOutputRoles):
Expand Down Expand Up @@ -87,7 +83,7 @@ def __init__(
blender: Optional[SparkBlender] = None,
skip_conn: bool = False,
return_all_predictions: bool = False,
parallelism_mode: ParallelismMode = ("no_parallelism", -1)
computation_settings: Optional[ComputationsSettings] = ("no_parallelism", -1)
):
"""
Expand Down Expand Up @@ -124,9 +120,8 @@ def __init__(
if reader and levels:
self._initialize(reader, levels, timer, blender, skip_conn, return_all_predictions)

self._parallelism_settings = self._parse_parallelism_mode(parallelism_mode)
self._computations_manager: Optional[ComputationsManager] = \
build_computations_manager(self._parallelism_settings)
build_computations_manager(computation_settings)

@property
def input_roles(self) -> Optional[RolesDict]:
Expand Down Expand Up @@ -529,7 +524,7 @@ def _parallel_level(self,
for k, ml_pipe in enumerate(level)
]

results = self._computations_manager.compute(fit_tasks, pool_type=PoolType.ml_pipelines)
results = self._computations_manager.compute(fit_tasks)

ml_pipes = [ml_pipe for ml_pipe, _ in results]
ml_pipes_preds = [pipe_preds for _, pipe_preds in results]
Expand All @@ -544,14 +539,6 @@ def _parallel_level(self,

return ml_pipes, ml_pipes_preds, flg_last_level

@staticmethod
def _parse_parallelism_mode(parallelism_mode: ParallelismMode):
if isinstance(parallelism_mode, Tuple):
mode, parallelism = parallelism_mode
return build_named_parallelism_settings(mode, parallelism)

return parallelism_mode


def _do_fit(ml_pipe: SparkMLPipeline, iterator: SparkBaseTrainValidIterator) \
-> Optional[Tuple[SparkMLPipeline, SparkDataset]]:
Expand Down
10 changes: 5 additions & 5 deletions sparklightautoml/automl/presets/base.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import logging
import os
import shutil
from typing import Optional, Any, Sequence, Iterable, Union
from typing import Optional, Any, Sequence, Iterable

import torch
import yaml
from lightautoml.utils.logging import verbosity_to_loglevel, set_stdout_level, add_filehandler
from lightautoml.utils.timer import PipelineTimer

from sparklightautoml.automl.base import SparkAutoML, ParallelismMode
from sparklightautoml.automl.base import SparkAutoML
from sparklightautoml.computations.base import ComputationsSettings
from sparklightautoml.dataset.base import SparkDataset, PersistenceManager
from sparklightautoml.tasks.base import SparkTask
from sparklightautoml.utils import SparkDataFrame

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -45,7 +45,7 @@ def __init__(
gpu_ids: Optional[str] = "all",
timing_params: Optional[dict] = None,
config_path: Optional[str] = None,
parallelism_mode: ParallelismMode = ("no_parallelism", -1),
computation_settings: Optional[ComputationsSettings] = ("no_parallelism", -1),
**kwargs: Any,
):
"""
Expand Down Expand Up @@ -76,7 +76,7 @@ def __init__(
**kwargs: Not used.
"""
super().__init__(parallelism_mode=parallelism_mode)
super().__init__(computation_settings=computation_settings)

self._set_config(config_path)

Expand Down

0 comments on commit 3ea26af

Please sign in to comment.