Skip to content

Commit

Permalink
Merge pull request #882 from ocworld/fix-rename-clusterutils-numcores
Browse files Browse the repository at this point in the history
  • Loading branch information
Keunhyun Oh committed Jul 7, 2020
1 parent e741993 commit f07e558
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 90 deletions.
30 changes: 15 additions & 15 deletions src/main/scala/com/microsoft/ml/spark/core/utils/ClusterUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,29 @@ import org.apache.spark.sql.{Dataset, SparkSession}
import org.slf4j.Logger

object ClusterUtil {
/** Get number of cores from dummy dataset for 1 executor.
/** Get number of tasks from dummy dataset for 1 executor.
* Note: all executors have same number of cores,
* and this is more reliable than getting value from conf.
* @param dataset The dataset containing the current spark session.
* @return The number of cores per executor.
* @return The number of tasks per executor.
*/
def getNumCoresPerExecutor(dataset: Dataset[_], log: Logger): Int = {
def getNumTasksPerExecutor(dataset: Dataset[_], log: Logger): Int = {
val spark = dataset.sparkSession
val confTaskCpus = getTaskCpus(dataset, log)
try {
val confCores = spark.sparkContext.getConf.get("spark.executor.cores").toInt
val coresPerExec = confCores / confTaskCpus
log.info(s"ClusterUtils calculated num cores per executor as $coresPerExec from $confCores " +
val tasksPerExec = confCores / confTaskCpus
log.info(s"ClusterUtils calculated num tasks per executor as $tasksPerExec from $confCores " +
s"cores and $confTaskCpus task CPUs")
coresPerExec
tasksPerExec
} catch {
case _: NoSuchElementException =>
// If spark.executor.cores is not defined, get the cores based on master
val defaultNumCores = getDefaultNumExecutorCores(spark, log)
val coresPerExec = defaultNumCores / confTaskCpus
log.info(s"ClusterUtils calculated num cores per executor as $coresPerExec from " +
val tasksPerExec = defaultNumCores / confTaskCpus
log.info(s"ClusterUtils calculated num tasks per executor as $tasksPerExec from " +
s"default num cores($defaultNumCores) from master and $confTaskCpus task CPUs")
coresPerExec
tasksPerExec
}
}

Expand Down Expand Up @@ -137,17 +137,17 @@ object ClusterUtil {
.map(_ => java.lang.Runtime.getRuntime.availableProcessors).collect.head
}

/** Returns the number of executors * number of cores.
/** Returns the number of executors * number of tasks.
* @param dataset The dataset containing the current spark session.
* @param numCoresPerExec The number of cores per executor.
* @return The number of executors * number of cores.
* @param numTasksPerExec The number of tasks per executor.
* @return The number of executors * number of tasks.
*/
def getNumExecutorCores(dataset: Dataset[_], numCoresPerExec: Int, log: Logger): Int = {
def getNumExecutorTasks(dataset: Dataset[_], numTasksPerExec: Int, log: Logger): Int = {
val executors = getExecutors(dataset)
log.info(s"Retrieving executors...")
if (!executors.isEmpty) {
log.info(s"Retrieved num executors ${executors.length} with num cores per executor ${numCoresPerExec}")
executors.length * numCoresPerExec
log.info(s"Retrieved num executors ${executors.length} with num tasks per executor $numTasksPerExec")
executors.length * numTasksPerExec
} else {
log.info(s"Could not retrieve executors from blockmanager, trying to get from configuration...")
val master = dataset.sparkSession.sparkContext.master
Expand Down
30 changes: 15 additions & 15 deletions src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMBase.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel]] extends Estimator[Traine
}

protected def prepareDataframe(dataset: Dataset[_], trainingCols: Array[(String, Seq[DataType])],
numWorkers: Int): DataFrame = {
numTasks: Int): DataFrame = {
val df = castColumns(dataset, trainingCols)
// Reduce number of partitions to number of executor cores
// Reduce number of partitions to number of executor tasks
/* Note: with barrier execution mode we must use repartition instead of coalesce when
* running on spark standalone.
* Using coalesce, we get the error:
Expand All @@ -114,18 +114,18 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel]] extends Estimator[Traine
* new cluster with more CPU cores or repartition the input RDD(s) to reduce the
* number of slots required to run this barrier stage.
*
* Hence we still need to estimate the number of workers and repartition even when using
* Hence we still need to estimate the number of tasks and repartition even when using
* barrier execution, which is unfortunate as repartition is more expensive than coalesce.
*/
if (getUseBarrierExecutionMode) {
val numPartitions = df.rdd.getNumPartitions
if (numPartitions > numWorkers) {
df.repartition(numWorkers)
if (numPartitions > numTasks) {
df.repartition(numTasks)
} else {
df
}
} else {
df.coalesce(numWorkers)
df.coalesce(numTasks)
}
}

Expand Down Expand Up @@ -165,29 +165,29 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel]] extends Estimator[Traine
*/
protected def innerTrain(dataset: Dataset[_], batchIndex: Int): TrainedModel = {
val sc = dataset.sparkSession.sparkContext
val numCoresPerExec = ClusterUtil.getNumCoresPerExecutor(dataset, log)
val numTasksPerExec = ClusterUtil.getNumTasksPerExecutor(dataset, log)
// By default, we try to intelligently calculate the number of executors, but user can override this with numTasks
val numWorkers =
val numTasks =
if (getNumTasks > 0) getNumTasks
else {
val numExecutorCores = ClusterUtil.getNumExecutorCores(dataset, numCoresPerExec, log)
min(numExecutorCores, dataset.rdd.getNumPartitions)
val numExecutorTasks = ClusterUtil.getNumExecutorTasks(dataset, numTasksPerExec, log)
min(numExecutorTasks, dataset.rdd.getNumPartitions)
}
// Only get the relevant columns
val trainingCols = getTrainingCols()

val df = prepareDataframe(dataset, trainingCols, numWorkers)
val df = prepareDataframe(dataset, trainingCols, numTasks)

val (inetAddress, port, future) =
LightGBMUtils.createDriverNodesThread(numWorkers, df, log, getTimeout, getUseBarrierExecutionMode,
LightGBMUtils.createDriverNodesThread(numTasks, df, log, getTimeout, getUseBarrierExecutionMode,
getDriverListenPort)

/* Run a parallel job via map partitions to initialize the native library and network,
* translate the data to the LightGBM in-memory representation and train the models
*/
val encoder = Encoders.kryo[LightGBMBooster]

val trainParams = getTrainParams(numWorkers, getCategoricalIndexes(df), dataset)
val trainParams = getTrainParams(numTasks, getCategoricalIndexes(df), dataset)
log.info(s"LightGBM parameters: ${trainParams.toString()}")
val networkParams = NetworkParams(getDefaultListenPort, inetAddress, port, getUseBarrierExecutionMode)
val (trainingData, validationData) =
Expand All @@ -200,7 +200,7 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel]] extends Estimator[Traine
val schema = preprocessedDF.schema
val columnParams = ColumnParams(getLabelCol, getFeaturesCol, get(weightCol), get(initScoreCol), getOptGroupCol)
val mapPartitionsFunc = TrainUtils.trainLightGBM(batchIndex, networkParams, columnParams, validationData, log,
trainParams, numCoresPerExec, schema)(_)
trainParams, numTasksPerExec, schema)(_)
val lightGBMBooster =
if (getUseBarrierExecutionMode) {
preprocessedDF.rdd.barrier().mapPartitions(mapPartitionsFunc).reduce((booster1, _) => booster1)
Expand Down Expand Up @@ -228,7 +228,7 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel]] extends Estimator[Traine
*
* @return train parameters.
*/
protected def getTrainParams(numWorkers: Int, categoricalIndexes: Array[Int], dataset: Dataset[_]): TrainParams
protected def getTrainParams(numTasks: Int, categoricalIndexes: Array[Int], dataset: Dataset[_]): TrainParams

protected def stringFromTrainedModel(model: TrainedModel): String

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class LightGBMClassifier(override val uid: String)
def getIsUnbalance: Boolean = $(isUnbalance)
def setIsUnbalance(value: Boolean): this.type = set(isUnbalance, value)

def getTrainParams(numWorkers: Int, categoricalIndexes: Array[Int], dataset: Dataset[_]): TrainParams = {
def getTrainParams(numTasks: Int, categoricalIndexes: Array[Int], dataset: Dataset[_]): TrainParams = {
/* The native code for getting numClasses is always 1 unless it is multiclass-classification problem
* so we infer the actual numClasses from the dataset here
*/
Expand All @@ -45,7 +45,7 @@ class LightGBMClassifier(override val uid: String)
ClassifierTrainParams(getParallelism, getTopK, getNumIterations, getLearningRate, getNumLeaves, getMaxBin,
getBinSampleCount, getBaggingFraction, getPosBaggingFraction, getNegBaggingFraction,
getBaggingFreq, getBaggingSeed, getEarlyStoppingRound, getImprovementTolerance,
getFeatureFraction, getMaxDepth, getMinSumHessianInLeaf, numWorkers, getObjective, modelStr,
getFeatureFraction, getMaxDepth, getMinSumHessianInLeaf, numTasks, getObjective, modelStr,
getIsUnbalance, getVerbosity, categoricalIndexes, actualNumClasses, getBoostFromAverage,
getBoostingType, getLambdaL1, getLambdaL2, getIsProvideTrainingMetric,
getMetric, getMinGainToSplit, getMaxDeltaStep, getMaxBinByFeature, getMinDataInLeaf, getSlotNames, getDelegate)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ object LightGBMConstants {
/** Multiclass classification objective
*/
val MulticlassObjective: String = "multiclass"
/** Ignore worker status, used to ignore workers that get empty partitions
/** Ignore task status, used to ignore tasks that get empty partitions
*/
val IgnoreStatus: String = "ignore"
/** Barrier execution flag telling driver that all tasks have completed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ class LightGBMRanker(override val uid: String)
def getEvalAt: Array[Int] = $(evalAt)
def setEvalAt(value: Array[Int]): this.type = set(evalAt, value)

def getTrainParams(numWorkers: Int, categoricalIndexes: Array[Int], dataset: Dataset[_]): TrainParams = {
def getTrainParams(numTasks: Int, categoricalIndexes: Array[Int], dataset: Dataset[_]): TrainParams = {
val modelStr = if (getModelString == null || getModelString.isEmpty) None else get(modelString)
RankerTrainParams(getParallelism, getTopK, getNumIterations, getLearningRate, getNumLeaves,
getObjective, getMaxBin, getBinSampleCount, getBaggingFraction, getPosBaggingFraction, getNegBaggingFraction,
getBaggingFreq, getBaggingSeed, getEarlyStoppingRound, getImprovementTolerance,
getFeatureFraction, getMaxDepth, getMinSumHessianInLeaf, numWorkers, modelStr,
getFeatureFraction, getMaxDepth, getMinSumHessianInLeaf, numTasks, modelStr,
getVerbosity, categoricalIndexes, getBoostingType, getLambdaL1, getLambdaL2, getMaxPosition, getLabelGain,
getIsProvideTrainingMetric, getMetric, getEvalAt, getMinGainToSplit, getMaxDeltaStep,
getMaxBinByFeature, getMinDataInLeaf, getSlotNames, getDelegate)
Expand Down Expand Up @@ -84,7 +84,7 @@ class LightGBMRanker(override val uid: String)
override def copy(extra: ParamMap): LightGBMRanker = defaultCopy(extra)

override def prepareDataframe(dataset: Dataset[_], trainingCols: Array[(String, Seq[DataType])],
numWorkers: Int): DataFrame = {
numTasks: Int): DataFrame = {
if (getRepartitionByGroupingColumn) {
val repartitionedDataset = getOptGroupCol match {
case None => dataset
Expand All @@ -95,9 +95,9 @@ class LightGBMRanker(override val uid: String)
df
}
}
super.prepareDataframe(repartitionedDataset, trainingCols, numWorkers)
super.prepareDataframe(repartitionedDataset, trainingCols, numTasks)
} else {
super.prepareDataframe(dataset, trainingCols, numWorkers)
super.prepareDataframe(dataset, trainingCols, numTasks)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ class LightGBMRegressor(override val uid: String)
def getTweedieVariancePower: Double = $(tweedieVariancePower)
def setTweedieVariancePower(value: Double): this.type = set(tweedieVariancePower, value)

def getTrainParams(numWorkers: Int, categoricalIndexes: Array[Int], dataset: Dataset[_]): TrainParams = {
def getTrainParams(numTasks: Int, categoricalIndexes: Array[Int], dataset: Dataset[_]): TrainParams = {
val modelStr = if (getModelString == null || getModelString.isEmpty) None else get(modelString)
RegressorTrainParams(getParallelism, getTopK, getNumIterations, getLearningRate, getNumLeaves,
getObjective, getAlpha, getTweedieVariancePower, getMaxBin, getBinSampleCount,
getBaggingFraction, getPosBaggingFraction, getNegBaggingFraction, getBaggingFreq, getBaggingSeed,
getEarlyStoppingRound, getImprovementTolerance, getFeatureFraction, getMaxDepth, getMinSumHessianInLeaf,
numWorkers, modelStr, getVerbosity, categoricalIndexes, getBoostFromAverage, getBoostingType, getLambdaL1,
numTasks, modelStr, getVerbosity, categoricalIndexes, getBoostFromAverage, getBoostingType, getLambdaL1,
getLambdaL2, getIsProvideTrainingMetric, getMetric, getMinGainToSplit, getMaxDeltaStep,
getMaxBinByFeature, getMinDataInLeaf, getSlotNames, getDelegate)
}
Expand Down
27 changes: 13 additions & 14 deletions src/main/scala/com/microsoft/ml/spark/lightgbm/LightGBMUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import com.microsoft.ml.lightgbm._
import com.microsoft.ml.spark.core.env.NativeLoader
import com.microsoft.ml.spark.core.utils.ClusterUtil
import com.microsoft.ml.spark.featurize.{Featurize, FeaturizeUtilities}
import org.apache.spark.lightgbm.BlockManagerUtils
import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.ml.PipelineModel
import org.apache.spark.ml.attribute._
Expand Down Expand Up @@ -111,10 +110,10 @@ object LightGBMUtils {
* waits for the host:port from the executors, and then sends back the
* information to the executors.
*
* @param numWorkers The total number of training workers to wait for.
* @param numTasks The total number of training tasks to wait for.
* @return The address and port of the driver socket.
*/
def createDriverNodesThread(numWorkers: Int, df: DataFrame,
def createDriverNodesThread(numTasks: Int, df: DataFrame,
log: Logger, timeout: Double,
barrierExecutionMode: Boolean,
driverServerPort: Int): (String, Int, Future[Unit]) = {
Expand All @@ -128,7 +127,7 @@ object LightGBMUtils {
driverServerSocket.setSoTimeout(duration.toMillis.toInt)
}
val f = Future {
var emptyWorkerCounter = 0
var emptyTaskCounter = 0
val hostAndPorts = ListBuffer[(Socket, String)]()
if (barrierExecutionMode) {
log.info(s"driver using barrier execution mode")
Expand All @@ -139,28 +138,28 @@ object LightGBMUtils {
val reader = new BufferedReader(new InputStreamReader(driverSocket.getInputStream))
val comm = reader.readLine()
if (comm == LightGBMConstants.FinishedStatus) {
log.info("driver received all workers from barrier stage")
log.info("driver received all tasks from barrier stage")
finished = true
} else if (comm == LightGBMConstants.IgnoreStatus) {
log.info("driver received ignore status from worker")
log.info("driver received ignore status from task")
} else {
log.info(s"driver received socket from worker: $comm")
log.info(s"driver received socket from task: $comm")
val socketAndComm = (driverSocket, comm)
hostAndPorts += socketAndComm
}
}
} else {
log.info(s"driver expecting $numWorkers connections...")
while (hostAndPorts.size + emptyWorkerCounter < numWorkers) {
log.info(s"driver expecting $numTasks connections...")
while (hostAndPorts.size + emptyTaskCounter < numTasks) {
log.info("driver accepting a new connection...")
val driverSocket = driverServerSocket.accept()
val reader = new BufferedReader(new InputStreamReader(driverSocket.getInputStream))
val comm = reader.readLine()
if (comm == LightGBMConstants.IgnoreStatus) {
log.info("driver received ignore status from worker")
emptyWorkerCounter += 1
log.info("driver received ignore status from task")
emptyTaskCounter += 1
} else {
log.info(s"driver received socket from worker: $comm")
log.info(s"driver received socket from task: $comm")
val socketAndComm = (driverSocket, comm)
hostAndPorts += socketAndComm
}
Expand All @@ -187,13 +186,13 @@ object LightGBMUtils {

/** Returns an integer ID for the current node.
*
* @return In cluster, returns the executor id. In local case, returns the worker id.
* @return In cluster, returns the executor id. In local case, returns the task id.
*/
def getId(): Int = {
val executorId = SparkEnv.get.executorId
val ctx = TaskContext.get
val partId = ctx.partitionId
// If driver, this is only in test scenario, make each partition a separate worker
// If driver, this is only in test scenario, make each partition a separate task
val id = if (executorId == "driver") partId else executorId
val idAsInt = id.toString.toInt
idAsInt
Expand Down
Loading

0 comments on commit f07e558

Please sign in to comment.