Skip to content

Commit

Permalink
Paper/shuffle opt (#4)
Browse files Browse the repository at this point in the history
* small fix to run Scala-based tests on local-cluster instead of local

* add the full coalescer

* add parametrized tests for the full coalescer

* fix problem with SparkSession.getActiveSession()

* add correct settings for simple parallelism mode

* add parallel_optuna script

---------

Co-authored-by: fonhorst <fonhorst@alipoov.nb@gmail.com>
  • Loading branch information
fonhorst and fonhorst committed Jun 8, 2023
1 parent 3ea26af commit bc7e75b
Show file tree
Hide file tree
Showing 21 changed files with 454 additions and 123 deletions.
12 changes: 6 additions & 6 deletions examples/spark/examples_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
from pyspark.sql import SparkSession

from sparklightautoml.dataset import persistence
from sparklightautoml.utils import SparkDataFrame
from sparklightautoml.utils import SparkDataFrame, get_current_session

BUCKET_NUMS = 16
BUCKET_NUMS = 6
PERSISTENCE_MANAGER_ENV_VAR = "PERSISTENCE_MANAGER"
BASE_DATASETS_PATH = "file:///opt/spark_data/"

Expand All @@ -24,7 +24,7 @@ class Dataset:
file_format_options: Dict[str, Any] = field(default_factory=lambda: {"header": True, "escape": "\""})

def load(self) -> SparkDataFrame:
spark = SparkSession.getActiveSession()
spark = get_current_session()
return spark.read.format(self.file_format).options(**self.file_format_options).load(self.path)


Expand Down Expand Up @@ -108,7 +108,7 @@ def prepare_test_and_train(
) -> Tuple[SparkDataFrame, SparkDataFrame]:
assert 0 <= test_size <= 1

spark = SparkSession.getActiveSession()
spark = get_current_session()

execs = int(spark.conf.get('spark.executor.instances', '1'))
cores = int(spark.conf.get('spark.executor.cores', '8'))
Expand Down Expand Up @@ -200,13 +200,13 @@ class FSOps:
"""
@staticmethod
def get_sc() -> SparkContext:
spark = SparkSession.getActiveSession()
spark = get_current_session()
sc = spark.sparkContext
return sc

@staticmethod
def get_default_fs() -> str:
spark = SparkSession.getActiveSession()
spark = get_current_session()
hadoop_conf = spark._jsc.hadoopConfiguration()
default_fs = hadoop_conf.get("fs.defaultFS")
return default_fs
Expand Down
2 changes: 1 addition & 1 deletion examples/spark/parallel/feature-processing-only.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

# settings and data
cv = 5
feat_pipe = "linear" # linear, lgb_simple or lgb_adv
feat_pipe = "lgb_adv" # linear, lgb_simple or lgb_adv
dataset_name = "lama_test_dataset"
dataset = get_dataset(dataset_name)
df = spark.read.csv(dataset.path, header=True)
Expand Down
104 changes: 104 additions & 0 deletions examples/spark/parallel/parallel_optuna.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import logging
from logging import config
from typing import Tuple, Union, Callable

import optuna
from lightautoml.ml_algo.tuning.optuna import TunableAlgo
from lightautoml.ml_algo.utils import tune_and_fit_predict
from pyspark.sql import functions as sf

from examples.spark.examples_utils import get_spark_session
from sparklightautoml.computations.parallel import ParallelComputationsManager
from sparklightautoml.dataset.base import SparkDataset
from sparklightautoml.dataset.persistence import PlainCachePersistenceManager
from sparklightautoml.ml_algo.boost_lgbm import SparkBoostLGBM
from sparklightautoml.ml_algo.tuning.parallel_optuna import ParallelOptunaTuner
from sparklightautoml.utils import logging_config, VERBOSE_LOGGING_FORMAT
from sparklightautoml.validation.base import SparkBaseTrainValidIterator
from sparklightautoml.validation.iterators import SparkFoldsIterator

logging.config.dictConfig(logging_config(level=logging.DEBUG, log_filename='/tmp/slama.log'))
logging.basicConfig(level=logging.DEBUG, format=VERBOSE_LOGGING_FORMAT)
logger = logging.getLogger(__name__)


class ProgressReportingOptunaTuner(ParallelOptunaTuner):
def _get_objective(self,
ml_algo: TunableAlgo,
estimated_n_trials: int,
train_valid_iterator: SparkBaseTrainValidIterator) \
-> Callable[[optuna.trial.Trial], Union[float, int]]:
obj_func = super()._get_objective(ml_algo, estimated_n_trials, train_valid_iterator)

def func(*args, **kwargs):
obj_score = obj_func(*args, **kwargs)
logger.info(f"Objective score: {obj_score}")
return obj_score

return func


def train_test_split(dataset: SparkDataset, test_slice_or_fold_num: Union[float, int] = 0.2) \
-> Tuple[SparkDataset, SparkDataset]:

if isinstance(test_slice_or_fold_num, float):
assert 0 <= test_slice_or_fold_num <= 1
train, test = dataset.data.randomSplit([1 - test_slice_or_fold_num, test_slice_or_fold_num])
else:
train = dataset.data.where(sf.col(dataset.folds_column) != test_slice_or_fold_num)
test = dataset.data.where(sf.col(dataset.folds_column) == test_slice_or_fold_num)

train_dataset, test_dataset = dataset.empty(), dataset.empty()
train_dataset.set_data(train, dataset.features, roles=dataset.roles)
test_dataset.set_data(test, dataset.features, roles=dataset.roles)

return train_dataset, test_dataset


if __name__ == "__main__":
spark = get_spark_session()

feat_pipe = "lgb_adv" # linear, lgb_simple or lgb_adv
dataset_name = "lama_test_dataset"
parallelism = 3

# load and prepare data
ds = SparkDataset.load(
path=f"/tmp/{dataset_name}__{feat_pipe}__features.dataset",
persistence_manager=PlainCachePersistenceManager()
)
train_ds, test_ds = train_test_split(ds, test_slice_or_fold_num=4)

# create main entities
computations_manager = ParallelComputationsManager(parallelism=parallelism, use_location_prefs_mode=True)
iterator = SparkFoldsIterator(train_ds).convert_to_holdout_iterator()
tuner = ProgressReportingOptunaTuner(
n_trials=10,
timeout=300,
parallelism=parallelism,
computations_manager=computations_manager
)
ml_algo = SparkBoostLGBM(default_params={"numIterations": 500}, computations_settings=computations_manager)
score = ds.task.get_dataset_metric()

# fit and predict
model, oof_preds = tune_and_fit_predict(ml_algo, tuner, iterator)
test_preds = ml_algo.predict(test_ds)

# estimate oof and test metrics
oof_metric_value = score(oof_preds.data.select(
SparkDataset.ID_COLUMN,
sf.col(ds.target_column).alias('target'),
sf.col(ml_algo.prediction_feature).alias('prediction')
))

test_metric_value = score(test_preds.data.select(
SparkDataset.ID_COLUMN,
sf.col(ds.target_column).alias('target'),
sf.col(ml_algo.prediction_feature).alias('prediction')
))

print(f"OOF metric: {oof_metric_value}")
print(f"Test metric: {oof_metric_value}")

spark.stop()
2 changes: 1 addition & 1 deletion examples/spark/tabular-preset-automl.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def main(spark: SparkSession, dataset_name: str, seed: int):
},
linear_l2_params={'default_params': {'regParam': [1e-5]}},
reader_params={"cv": cv, "advanced_roles": False},
computation_settings=("no_parallelism", -1)
computation_settings=("parallelism", 3)
)

oof_predictions = automl.fit_predict(
Expand Down
Binary file modified jars/spark-lightautoml_2.12-0.1.1.jar
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package org.apache.spark.lightautoml.utils

import org.apache.spark.Partitioner
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.rdd.{PartitionPruningRDD, ShuffledRDD}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

import scala.collection.JavaConverters._

class PrefferedLocsPartitionCoalescerTransformer(override val uid: String,
Expand Down Expand Up @@ -49,6 +53,10 @@ class PrefferedLocsPartitionCoalescerTransformer(override val uid: String,
override def transformSchema(schema: StructType): StructType = schema.copy()
}

class TrivialPartitioner(override val numPartitions: Int) extends Partitioner {
override def getPartition(key: Any): Int = key.asInstanceOf[Int]
}


object SomeFunctions {
def func[T](x: T): T = x
Expand All @@ -68,4 +76,107 @@ object SomeFunctions {
.toList.asJava
}
}

def test_func(df: DataFrame): Long = {
df.rdd.barrier().mapPartitions(SomeFunctions.func).count()
}

def test_sleep(df: DataFrame, sleep_millis: Int = 5000 ): Array[Row] = {
df.rdd.mapPartitions(x => {Thread.sleep(sleep_millis); x}).collect()
}

/**
* Makes numSlots copies of dataset and produce a list of dataframes where each one is a copy of the initial dataset.
* Every copy is coalesced to a number of executors by setting appropriate Preffered Locations.
* Subsequent map and aggregate operations should happen only on a subset of executors matched with an output dataframe.
* Be aware that:
* 1. There may be some unused cores if number of cores x number of executors cannot be divided
* by number of slots without remainder
* 2. Number of slots will be reduced down to number of cores x number of executors if number of slots is greater
* 3. Enabling enforce_division_without_reminder will lead to an exception
* if number of cores x number of executors cannot be divided by number of slots without remainder
* */
def duplicateOnNumSlotsWithLocationsPreferences(df: DataFrame,
numSlots: Int,
materialize_base_rdd: Boolean = true,
enforce_division_without_reminder: Boolean = true):
(java.util.List[DataFrame], DataFrame) = {
// prepare and identify params for slots
val spark = SparkSession.active
val master = spark.sparkContext.master
val execs = SomeFunctions.executors()
val numExecs = execs.size()
val foundCoresNum = spark.conf.getOption("spark.executor.cores") match {
case Some(cores_option) => Some(cores_option.toInt)
case None => if (master.startsWith("local-cluster")){
val cores = master.slice("local-cluster[".length, master.length - 1).split(',')(1).trim.toInt
Some(cores)
} else if (master.startsWith("local")) {
val num_cores = master.slice("local[".length, master.length - 1)
val cores = if (num_cores == "*") { java.lang.Runtime.getRuntime.availableProcessors } else { num_cores.toInt }
Some(cores)
} else {
None
}
}

val numPartitions = numExecs * foundCoresNum.get

if (enforce_division_without_reminder) {
assert(numPartitions % numSlots == 0,
s"Resulting num partitions should be exactly dividable by num slots: $numPartitions % $numSlots != 0")
assert(numExecs % numSlots == 0,
s"Resulting num executors should be exactly dividable by num slots: $numExecs % $numSlots != 0")
}

val realNumSlots = math.min(numSlots, numPartitions)

val partitionsPerSlot = numPartitions / realNumSlots
val prefLocsForAllPartitions = execs.asScala.flatMap(e_id => (0 until foundCoresNum.get).map(_ => e_id)).toList

val partition_id_col = "__partition_id"
// prepare the initial dataset by duplicating its content and assigning partition_id for a specific duplicated rows
val duplicated_df = df
.withColumn(
partition_id_col,
explode(array((0 until realNumSlots).map(x => lit(x)):_*))
)
.withColumn(
partition_id_col,
col(partition_id_col) * lit(partitionsPerSlot)
+ (lit(partitionsPerSlot) * rand(seed = 42)).cast("int")
)

// repartition the duplicated dataset to force all desired copies into specific subsets of partitions
// should work even with standard HashPartitioner
val new_rdd_2 = new ShuffledRDD[Int, Row, Row](
duplicated_df.rdd.map(row => (row.getInt(row.fieldIndex(partition_id_col)), row)),
new TrivialPartitioner(numPartitions)
).map(x => x._2)

val new_rdd_df = spark.createDataFrame(new_rdd_2.coalesce(
numPartitions,
shuffle = false,
partitionCoalescer = Some(new PrefferedLocsPartitionCoalescer(prefLocsForAllPartitions))
), duplicated_df.schema).cache()

// not sure if it is needed or not to perform all operation in parallel
val copies_rdd_df = if (materialize_base_rdd) {
val cached_rdd_df = new_rdd_df.cache()
cached_rdd_df.write.mode("overwrite").format("noop").save()
cached_rdd_df
} else {
new_rdd_df
}

// select subsets of partitions that contains independent copies of the initial dataset
// assign it preferred locations and convert the resulting rdds into DataFrames
val prefLocsDfs = (0 until realNumSlots)
.map (slotId => new PartitionPruningRDD(copies_rdd_df.rdd, x => x / partitionsPerSlot == slotId))
.map(rdd => spark.createDataFrame(rdd, schema = duplicated_df.schema).drop(partition_id_col))
.toList
.asJava

(prefLocsDfs, copies_rdd_df)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class TestBalancedUnionPartitionCoalescer extends AnyFunSuite with BeforeAndAfte
val spark: SparkSession = SparkSession
.builder()
.master(s"local-cluster[$num_workers, $num_cores, 1024]")
.config("spark.jars", "target/scala-2.12/spark-lightautoml_2.12-0.1.jar")
.config("spark.jars", "target/scala-2.12/spark-lightautoml_2.12-0.1.1.jar")
.getOrCreate()

override protected def afterAll(): Unit = {
Expand Down

0 comments on commit bc7e75b

Please sign in to comment.