Skip to content
Permalink
Browse files

fix: fix barrier execution mode with repartition for spark standalone (

  • Loading branch information...
imatiach-msft authored and mhamilton723 committed Aug 26, 2019
1 parent 1e186ad commit 9805996143d4cf174895ff2e08bb61fd2c99c4f1
@@ -79,9 +79,9 @@ can be converted to PMML format through the

By default LightGBM uses regular spark paradigm for launching tasks and communicates with the driver to coordinate task execution.
The driver thread aggregates all task host:port information and then communicates the full list back to the workers in order for NetworkInit to be called.
There have been some issues on certain cluster configurations because the driver needs to know how many tasks there are, and this computation is surprisingly non-trivial in spark.
This requires the driver to know how many tasks there are, and if the expected number of tasks is different from actual this will cause the initialization to deadlock.
With the next v0.18 release there is a new UseBarrierExecutionMode flag, which when activated uses the barrier() stage to block all tasks.
The barrier execution mode simplifies the logic to aggregate host:port information across all tasks, so the driver will no longer need to precompute the number of tasks in advance.
The barrier execution mode simplifies the logic to aggregate host:port information across all tasks.
To use it in scala, you can call setUseBarrierExecutionMode(true), for example:

val lgbm = new LightGBMClassifier()
@@ -45,7 +45,7 @@ package (?:com\.microsoft\.ml\.spark|org\.apache\.spark|com\.microsoft\.CNTK|com
</check>
<check level="warning" class="org.scalastyle.scalariform.MethodNamesChecker" enabled="true"><parameters>
<parameter name="regex">^[a-z][A-Za-z0-9]*(_=)?$</parameter></parameters></check>
<check level="warning" class="org.scalastyle.scalariform.ClassTypeParameterChecker" enabled="true"><parameters>
<check level="warning" class="org.scalastyle.scalariform.ClassTypeParameterChecker" enabled="false"><parameters>
<parameter name="regex">^[A-Z_]$</parameter></parameters></check>
<check level="error" class="org.scalastyle.scalariform.EqualsHashCodeChecker" enabled="true"></check>
<check level="error" class="org.scalastyle.scalariform.IllegalImportsChecker" enabled="true"><parameters>
@@ -45,7 +45,7 @@ package (?:com\.microsoft\.ml\.spark|org\.apache\.spark|com\.microsoft\.CNTK|com
</check>
<check level="warning" class="org.scalastyle.scalariform.MethodNamesChecker" enabled="true"><parameters>
<parameter name="regex">^[a-z][A-Za-z0-9]*(_=)?$</parameter></parameters></check>
<check level="warning" class="org.scalastyle.scalariform.ClassTypeParameterChecker" enabled="true"><parameters>
<check level="warning" class="org.scalastyle.scalariform.ClassTypeParameterChecker" enabled="false"><parameters>
<parameter name="regex">^[A-Z_]$</parameter></parameters></check>
<check level="error" class="org.scalastyle.scalariform.EqualsHashCodeChecker" enabled="true"></check>
<check level="error" class="org.scalastyle.scalariform.IllegalImportsChecker" enabled="true"><parameters>
@@ -39,12 +39,45 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel]] extends Estimator[Traine
}
}

protected def innerTrain(dataset: Dataset[_]): TrainedModel = {
val sc = dataset.sparkSession.sparkContext
val numCoresPerExec = ClusterUtil.getNumCoresPerExecutor(dataset, log)
val numExecutorCores = ClusterUtil.getNumExecutorCores(dataset, numCoresPerExec)
val numWorkers = min(numExecutorCores, dataset.rdd.getNumPartitions)
// Only get the relevant columns
protected def prepareDataframe(dataset: Dataset[_], trainingCols: ListBuffer[String],
numWorkers: Int): DataFrame = {
// Reduce number of partitions to number of executor cores
val df = dataset.select(trainingCols.map(name => col(name)): _*).toDF()
/* Note: with barrier execution mode we must use repartition instead of coalesce when
* running on spark standalone.
* Using coalesce, we get the error:
*
* org.apache.spark.scheduler.BarrierJobUnsupportedRDDChainException:
* [SPARK-24820][SPARK-24821]: Barrier execution mode does not allow the following
* pattern of RDD chain within a barrier stage:
* 1. Ancestor RDDs that have different number of partitions from the resulting
* RDD (eg. union()/coalesce()/first()/take()/PartitionPruningRDD). A workaround
* for first()/take() can be barrierRdd.collect().head (scala) or barrierRdd.collect()[0] (python).
* 2. An RDD that depends on multiple barrier RDDs (eg. barrierRdd1.zip(barrierRdd2)).
*
* Without repartition, we may hit the error:
* org.apache.spark.scheduler.BarrierJobSlotsNumberCheckFailed: [SPARK-24819]:
* Barrier execution mode does not allow run a barrier stage that requires more
* slots than the total number of slots in the cluster currently. Please init a
* new cluster with more CPU cores or repartition the input RDD(s) to reduce the
* number of slots required to run this barrier stage.
*
* Hence we still need to estimate the number of workers and repartition even when using
* barrier execution, which is unfortunate as repartiton is more expensive than coalesce.
*/
if (getUseBarrierExecutionMode) {
val numPartitions = df.rdd.getNumPartitions
if (numPartitions > numWorkers) {
df.repartition(numWorkers)
} else {
df
}
} else {
df.coalesce(numWorkers)
}
}

protected def getTrainingCols(): ListBuffer[String] = {
val trainingCols = ListBuffer(getLabelCol, getFeaturesCol)
if (get(weightCol).isDefined) {
trainingCols += getWeightCol
@@ -58,8 +91,18 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel]] extends Estimator[Traine
if (get(initScoreCol).isDefined) {
trainingCols += getInitScoreCol
}
// Reduce number of partitions to number of executor cores
val df = dataset.select(trainingCols.map(name => col(name)): _*).toDF().coalesce(numWorkers)
trainingCols
}

protected def innerTrain(dataset: Dataset[_]): TrainedModel = {
val sc = dataset.sparkSession.sparkContext
val numCoresPerExec = ClusterUtil.getNumCoresPerExecutor(dataset, log)
val numExecutorCores = ClusterUtil.getNumExecutorCores(dataset, numCoresPerExec)
val numWorkers = min(numExecutorCores, dataset.rdd.getNumPartitions)
// Only get the relevant columns
val trainingCols = getTrainingCols()
val df = prepareDataframe(dataset, trainingCols, numWorkers)

val (inetAddress, port, future) =
LightGBMUtils.createDriverNodesThread(numWorkers, df, log, getTimeout, getUseBarrierExecutionMode)

0 comments on commit 9805996

Please sign in to comment.
You can’t perform that action at this time.