Skip to content

Commit

Permalink
Fix/parallel coalescer (#5)
Browse files Browse the repository at this point in the history
* minor corrections to the logic of parallel computations to overcome barrier stage issues

---------

Co-authored-by: fonhorst <fonhorst@alipoov.nb@gmail.com>
  • Loading branch information
fonhorst and fonhorst committed Jun 10, 2023
1 parent bc7e75b commit 9dad3ab
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 15 deletions.
3 changes: 2 additions & 1 deletion examples/spark/tabular-preset-automl.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ def main(spark: SparkSession, dataset_name: str, seed: int):
# 2. use_algos = [["lgb_tuned"]]
# 3. use_algos = [["linear_l2"]]
# 4. use_algos = [["lgb", "linear_l2"], ["lgb"]]
use_algos = [["lgb", "linear_l2"], ["lgb"]]
# use_algos = [["lgb", "linear_l2"], ["lgb"]]
use_algos = [["lgb"]]
cv = 3
dataset = get_dataset(dataset_name)

Expand Down
Binary file modified jars/spark-lightautoml_2.12-0.1.1.jar
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

import java.util
import scala.collection.JavaConverters._

class PrefferedLocsPartitionCoalescerTransformer(override val uid: String,
Expand Down Expand Up @@ -158,7 +159,7 @@ object SomeFunctions {
numPartitions,
shuffle = false,
partitionCoalescer = Some(new PrefferedLocsPartitionCoalescer(prefLocsForAllPartitions))
), duplicated_df.schema).cache()
), duplicated_df.schema)

// not sure if it is needed or not to perform all operation in parallel
val copies_rdd_df = if (materialize_base_rdd) {
Expand All @@ -171,11 +172,41 @@ object SomeFunctions {

// select subsets of partitions that contains independent copies of the initial dataset
// assign it preferred locations and convert the resulting rdds into DataFrames
val prefLocsDfs = (0 until realNumSlots)
// We need to localCheckpoint these datasets because downstream SynapseML lightGBM
// cannot work with PrunningPartitionRDD. Precise error:
// org.apache.spark.scheduler.BarrierJobUnsupportedRDDChainException: [SPARK-24820][SPARK-24821]:
// Barrier execution mode does not allow the following pattern of RDD chain within a barrier stage:
// 1. Ancestor RDDs that have different number of partitions from the resulting RDD
// (e.g. union()/coalesce()/first()/take()/PartitionPruningRDD).
// A workaround for first()/take() can be barrierRdd.collect().head (scala) or barrierRdd.collect()[0] (python)
// 2. An RDD that depends on multiple barrier RDDs (e.g. barrierRdd1.zip(barrierRdd2)).
// at org.apache.spark.scheduler.DAGScheduler.checkBarrierStageWithRDDChainPattern(DAGScheduler.scala:447)
// at org.apache.spark.scheduler.DAGScheduler.createResultStage(DAGScheduler.scala:590)
// In the same time, we cannot skip barrier stage in SynapseML LightGBM,
// because it would lead to applying coalesce inside the lightgbm and will lead to breaking
// of preffered locations set earlier
val prefLocsDfs = new util.ArrayList[DataFrame]()
val threads = (0 until realNumSlots)
.map (slotId => new PartitionPruningRDD(copies_rdd_df.rdd, x => x / partitionsPerSlot == slotId))
.map(rdd => spark.createDataFrame(rdd, schema = duplicated_df.schema).drop(partition_id_col))
.map {
rdd =>
val thread = new Thread {
override def run(): Unit = {
val df = spark.createDataFrame(
rdd,
schema = duplicated_df.schema
).drop(partition_id_col).localCheckpoint(true)
prefLocsDfs.add(df)
}
}
thread.start()
thread
}
.toList
.asJava

threads.foreach(_.join())

copies_rdd_df.unpersist()

(prefLocsDfs, copies_rdd_df)
}
Expand Down
13 changes: 8 additions & 5 deletions sparklightautoml/computations/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,14 @@ def _make_slots_on_dataset_copies_coalesced_into_preffered_locations(self, datas
logger.info(f"Coalescing dataset into multiple copies (num copies: {self._parallelism}) "
f"with specified preffered locations")

dfs, self._base_pref_locs_df = duplicate_on_num_slots_with_locations_preferences(
df=dataset.data,
num_slots=self._parallelism,
enforce_division_without_reminder=False
)
if self._parallelism > 1:
dfs, self._base_pref_locs_df = duplicate_on_num_slots_with_locations_preferences(
df=dataset.data,
num_slots=self._parallelism,
enforce_division_without_reminder=False
)
else:
dfs, self._base_pref_locs_df = [dataset.data], None

assert len(dfs) > 0, "Not dataframe slots are prepared, cannot continue"

Expand Down
25 changes: 24 additions & 1 deletion sparklightautoml/ml_algo/boost_lgbm.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import warnings
from copy import copy
from typing import Dict, Optional, Tuple, Union, cast, List, Any

Expand Down Expand Up @@ -432,6 +433,7 @@ def fit_predict_single_fold(self,
validationIndicatorCol=validation_column,
verbosity=verbose_eval,
useSingleDatasetMode=self._use_single_dataset_mode,
useBarrierExecutionMode=self._use_barrier_execution_mode,
isProvideTrainingMetric=True,
chunkSize=self._chunk_size,
)
Expand Down Expand Up @@ -502,8 +504,29 @@ def _ensure_validation_size(self, full_data: SparkDataFrame, validation_column:
f"Reducing its size down according to max_validation_size setting:"
f" {self._max_validation_size}")
full_data = full_data.where(
~sf.col(validation_column) or sf.rand(seed=self._seed) < self._max_validation_size / val_data_size
(sf.col(validation_column) != sf.lit(1)) |
(sf.rand(seed=self._seed) < sf.lit(self._max_validation_size / val_data_size))
)

# checking if there are no empty partitions that may lead to hanging
rows = (
full_data
.withColumn("__partition_id__", sf.spark_partition_id())
.groupby("__partition_id__").agg(
sf.sum(validation_column).alias("val_values"),
sf.count("*").alias("all_values")
)
.collect()
)
for row in rows:
if row["val_values"] == row["all_values"] or row["all_values"] == 0:
warnings.warn(f"Empty partition encountered: partition id - {row['__partition_id_']},"
f"validation values count in the partition - {row['val_values']}, "
f"all values count in the partition - {row['all_values']}")
raise ValueError(f"Empty partition encountered: partition id - {row['__partition_id_']},"
f"validation values count in the partition - {row['val_values']}, "
f"all values count in the partition - {row['all_values']}")

return full_data

def _build_transformer(self) -> Transformer:
Expand Down
10 changes: 6 additions & 4 deletions sparklightautoml/ml_algo/tuning/parallel_optuna.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import logging
from copy import deepcopy
from typing import Optional, Tuple, Callable, Iterable, Union
from typing import Optional, Tuple, Callable, Union

import optuna
from lightautoml.ml_algo.tuning.optuna import OptunaTuner, TunableAlgo
from lightautoml.validation.base import HoldoutIterator, TrainValidIterator
from lightautoml.validation.base import HoldoutIterator

from sparklightautoml.computations.builder import build_computations_manager
from sparklightautoml.computations.base import ComputationsSettings, \
ComputationsManager, ComputationsSession
ComputationsSession
from sparklightautoml.computations.builder import build_computations_manager
from sparklightautoml.computations.sequential import SequentialComputationsManager
from sparklightautoml.computations.utils import deecopy_tviter_without_dataset
from sparklightautoml.dataset.base import SparkDataset
Expand Down Expand Up @@ -172,6 +172,8 @@ def objective(trial: optuna.trial.Trial) -> float:
suggested_params=_ml_algo.init_params_on_input(tv_iter),
)

logger.debug(f"Sampled ml_algo params: {_ml_algo.params}")

output_dataset = _ml_algo.fit_predict(train_valid_iterator=tv_iter)

return _ml_algo.score(output_dataset)
Expand Down

0 comments on commit 9dad3ab

Please sign in to comment.