Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix barrier execution mode with repartition for spark standalone #651

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/lightgbm.md
Expand Up @@ -79,9 +79,9 @@ can be converted to PMML format through the

By default LightGBM uses regular spark paradigm for launching tasks and communicates with the driver to coordinate task execution.
The driver thread aggregates all task host:port information and then communicates the full list back to the workers in order for NetworkInit to be called.
There have been some issues on certain cluster configurations because the driver needs to know how many tasks there are, and this computation is surprisingly non-trivial in spark.
This requires the driver to know how many tasks there are, and if the expected number of tasks is different from actual this will cause the initialization to deadlock.
With the next v0.18 release there is a new UseBarrierExecutionMode flag, which when activated uses the barrier() stage to block all tasks.
The barrier execution mode simplifies the logic to aggregate host:port information across all tasks, so the driver will no longer need to precompute the number of tasks in advance.
The barrier execution mode simplifies the logic to aggregate host:port information across all tasks.
To use it in scala, you can call setUseBarrierExecutionMode(true), for example:

val lgbm = new LightGBMClassifier()
Expand Down
2 changes: 1 addition & 1 deletion scalastyle-config.xml
Expand Up @@ -45,7 +45,7 @@ package (?:com\.microsoft\.ml\.spark|org\.apache\.spark|com\.microsoft\.CNTK|com
</check>
<check level="warning" class="org.scalastyle.scalariform.MethodNamesChecker" enabled="true"><parameters>
<parameter name="regex">^[a-z][A-Za-z0-9]*(_=)?$</parameter></parameters></check>
<check level="warning" class="org.scalastyle.scalariform.ClassTypeParameterChecker" enabled="true"><parameters>
<check level="warning" class="org.scalastyle.scalariform.ClassTypeParameterChecker" enabled="false"><parameters>
<parameter name="regex">^[A-Z_]$</parameter></parameters></check>
<check level="error" class="org.scalastyle.scalariform.EqualsHashCodeChecker" enabled="true"></check>
<check level="error" class="org.scalastyle.scalariform.IllegalImportsChecker" enabled="true"><parameters>
Expand Down
2 changes: 1 addition & 1 deletion scalastyle-test-config.xml
Expand Up @@ -45,7 +45,7 @@ package (?:com\.microsoft\.ml\.spark|org\.apache\.spark|com\.microsoft\.CNTK|com
</check>
<check level="warning" class="org.scalastyle.scalariform.MethodNamesChecker" enabled="true"><parameters>
<parameter name="regex">^[a-z][A-Za-z0-9]*(_=)?$</parameter></parameters></check>
<check level="warning" class="org.scalastyle.scalariform.ClassTypeParameterChecker" enabled="true"><parameters>
<check level="warning" class="org.scalastyle.scalariform.ClassTypeParameterChecker" enabled="false"><parameters>
imatiach-msft marked this conversation as resolved.
Show resolved Hide resolved
<parameter name="regex">^[A-Z_]$</parameter></parameters></check>
<check level="error" class="org.scalastyle.scalariform.EqualsHashCodeChecker" enabled="true"></check>
<check level="error" class="org.scalastyle.scalariform.IllegalImportsChecker" enabled="true"><parameters>
Expand Down
Expand Up @@ -39,12 +39,45 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel]] extends Estimator[Traine
}
}

protected def innerTrain(dataset: Dataset[_]): TrainedModel = {
val sc = dataset.sparkSession.sparkContext
val numCoresPerExec = ClusterUtil.getNumCoresPerExecutor(dataset, log)
val numExecutorCores = ClusterUtil.getNumExecutorCores(dataset, numCoresPerExec)
val numWorkers = min(numExecutorCores, dataset.rdd.getNumPartitions)
// Only get the relevant columns
protected def prepareDataframe(dataset: Dataset[_], trainingCols: ListBuffer[String],
numWorkers: Int): DataFrame = {
// Reduce number of partitions to number of executor cores
val df = dataset.select(trainingCols.map(name => col(name)): _*).toDF()
/* Note: with barrier execution mode we must use repartition instead of coalesce when
* running on spark standalone.
* Using coalesce, we get the error:
*
* org.apache.spark.scheduler.BarrierJobUnsupportedRDDChainException:
* [SPARK-24820][SPARK-24821]: Barrier execution mode does not allow the following
* pattern of RDD chain within a barrier stage:
* 1. Ancestor RDDs that have different number of partitions from the resulting
* RDD (eg. union()/coalesce()/first()/take()/PartitionPruningRDD). A workaround
* for first()/take() can be barrierRdd.collect().head (scala) or barrierRdd.collect()[0] (python).
* 2. An RDD that depends on multiple barrier RDDs (eg. barrierRdd1.zip(barrierRdd2)).
*
* Without repartition, we may hit the error:
* org.apache.spark.scheduler.BarrierJobSlotsNumberCheckFailed: [SPARK-24819]:
* Barrier execution mode does not allow run a barrier stage that requires more
* slots than the total number of slots in the cluster currently. Please init a
* new cluster with more CPU cores or repartition the input RDD(s) to reduce the
* number of slots required to run this barrier stage.
*
* Hence we still need to estimate the number of workers and repartition even when using
* barrier execution, which is unfortunate as repartiton is more expensive than coalesce.
*/
if (getUseBarrierExecutionMode) {
val numPartitions = df.rdd.getNumPartitions
if (numPartitions > numWorkers) {
df.repartition(numWorkers)
} else {
df
}
} else {
df.coalesce(numWorkers)
}
}

protected def getTrainingCols(): ListBuffer[String] = {
val trainingCols = ListBuffer(getLabelCol, getFeaturesCol)
if (get(weightCol).isDefined) {
trainingCols += getWeightCol
Expand All @@ -58,8 +91,18 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel]] extends Estimator[Traine
if (get(initScoreCol).isDefined) {
trainingCols += getInitScoreCol
}
// Reduce number of partitions to number of executor cores
val df = dataset.select(trainingCols.map(name => col(name)): _*).toDF().coalesce(numWorkers)
trainingCols
}

protected def innerTrain(dataset: Dataset[_]): TrainedModel = {
val sc = dataset.sparkSession.sparkContext
val numCoresPerExec = ClusterUtil.getNumCoresPerExecutor(dataset, log)
val numExecutorCores = ClusterUtil.getNumExecutorCores(dataset, numCoresPerExec)
val numWorkers = min(numExecutorCores, dataset.rdd.getNumPartitions)
// Only get the relevant columns
val trainingCols = getTrainingCols()
val df = prepareDataframe(dataset, trainingCols, numWorkers)

val (inetAddress, port, future) =
LightGBMUtils.createDriverNodesThread(numWorkers, df, log, getTimeout, getUseBarrierExecutionMode)

Expand Down