Skip to content
Permalink
Browse files

adding LightGBMRanker

  • Loading branch information...
imatiach-msft authored and mhamilton723 committed Apr 22, 2019
1 parent fa77857 commit 53c4b9e0fd917b91cd7fb195ebe44822cdd212ee
@@ -161,3 +161,14 @@ trait HasInitScoreCol extends Wrappable {
/** @group getParam */
def getInitScoreCol: String = $(initScoreCol)
}

trait HasGroupCol extends Wrappable {
/** The name of the group column
* @group param
*/
val groupCol = new Param[String](this, "groupCol", "The name of the group column")
/** @group setParam */
def setGroupCol(value: String): this.type = set(groupCol, value)
/** @group getParam */
def getGroupCol: String = $(groupCol)
}
@@ -122,4 +122,9 @@ object DatasetUtils {
def regressionTrainFile(name: String): File =
new File(s"${sys.env("DATASETS_HOME")}/Regression/Train", name)

def rankingTrainFile(name: String): File =
new File(s"${sys.env("DATASETS_HOME")}/Ranking/Train", name)

def rankingTestFile(name: String): File =
new File(s"${sys.env("DATASETS_HOME")}/Ranking/Test", name)
}
@@ -166,6 +166,7 @@ class FuzzingTest extends TestBase {
"org.apache.spark.ml.feature.FastVectorAssembler", // In Spark namespace
"com.microsoft.ml.spark.LightGBMClassifier", // HasFeaturesCol is part of spark's base class
"com.microsoft.ml.spark.LightGBMRegressor", // HasFeaturesCol is part of spark's base class
"com.microsoft.ml.spark.LightGBMRanker", // HasFeaturesCol is part of spark's base class
"com.microsoft.ml.spark.TextFeaturizer" // needs to hide setters from model
)
pipelineStages.foreach { stage =>
@@ -36,10 +36,11 @@ private[spark] abstract class Repository[S <: Schema] {

object FaultToleranceUtils {
def retryWithTimeout[T](times: Int, timeout: Duration)(f: => T): T ={
try{
try {
Await.result(Future(f)(ExecutionContext.global), timeout)
}catch{
case e:Exception if times>=1 =>
} catch {
case e: Exception if times >= 1 =>
print(s"Received exception on call, retrying: $e")
retryWithTimeout(times-1, timeout)(f)
}
}
@@ -48,15 +48,16 @@ class DownloaderSuite extends TestBase {

test("A downloader should be able to get all Models " +
"and maybeDownload should be fast if models are downloaded", TestBase.Extended) {

d.downloadModels()
val modTimes = d.localModels.map(s =>
new File(s.uri).lastModified())

d.downloadModels()
val modTimes2 = d.localModels.map(s =>
new File(s.uri).lastModified())

val (modTimes, modTimes2) = FaultToleranceUtils.retryWithTimeout(10, Duration.apply(500, "seconds")) {
d.downloadModels()
val modTimes = d.localModels.map(s =>
new File(s.uri).lastModified())

d.downloadModels()
val modTimes2 = d.localModels.map(s =>
new File(s.uri).lastModified())
(modTimes, modTimes2)
}
// No modification on second call because models are cached
assert(modTimes.toList === modTimes2.toList)

@@ -4,7 +4,7 @@
package com.microsoft.ml.spark

import org.apache.spark.ml.{Estimator, Model}
import org.apache.spark.sql.{Dataset, Encoders}
import org.apache.spark.sql.{DataFrame, Dataset, Encoders}

import scala.concurrent.Await
import scala.concurrent.duration.{Duration, SECONDS}
@@ -36,7 +36,7 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel]] extends Estimator[Traine
}
}

def innerTrain(dataset: Dataset[_]): TrainedModel = {
protected def innerTrain(dataset: Dataset[_]): TrainedModel = {
val sc = dataset.sparkSession.sparkContext
val numCoresPerExec = LightGBMUtils.getNumCoresPerExecutor(dataset)
val numExecutorCores = LightGBMUtils.getNumExecutorCores(dataset, numCoresPerExec)
@@ -62,18 +62,35 @@ trait LightGBMBase[TrainedModel <: Model[TrainedModel]] extends Estimator[Traine
if (get(validationIndicatorCol).isDefined && dataset.columns.contains(getValidationIndicatorCol))
Some(sc.broadcast(df.filter(x => x.getBoolean(x.fieldIndex(getValidationIndicatorCol))).collect()))
else None
val lightGBMBooster = df
val lightGBMBooster = preprocessData(df)
.mapPartitions(TrainUtils.trainLightGBM(networkParams, getLabelCol, getFeaturesCol, get(weightCol),
get(initScoreCol), validationData, log, trainParams, numCoresPerExec))(encoder)
get(initScoreCol), getOptGroupCol, validationData, log, trainParams, numCoresPerExec))(encoder)
.reduce((booster1, _) => booster1)
// Wait for future to complete (should be done by now)
Await.result(future, Duration(getTimeout, SECONDS))
getModel(trainParams, lightGBMBooster)
}

/** Optional group column for Ranking, set to None by default.
* @return None
*/
protected def getOptGroupCol: Option[String] = None

/** Gets the trained model given the train parameters and booster.
* @return trained model.
*/
protected def getModel(trainParams: TrainParams, lightGBMBooster: LightGBMBooster): TrainedModel

/** Gets the training parameters.
* @return train parameters.
*/
protected def getTrainParams(numWorkers: Int, categoricalIndexes: Array[Int], dataset: Dataset[_]): TrainParams

protected def stringFromTrainedModel(model: TrainedModel): String

/** Allow algorithm specific preprocessing of dataset.
* @param dataset The dataset to preprocess prior to training.
* @return The preprocessed data.
*/
protected def preprocessData(dataset: DataFrame): DataFrame = dataset
}
@@ -13,7 +13,7 @@ import scala.reflect.runtime.universe.{TypeTag, typeTag}

object LightGBMClassifier extends DefaultParamsReadable[LightGBMClassifier]

/** Trains a LightGBM Binary Classification model, a fast, distributed, high performance gradient boosting
/** Trains a LightGBM Classification model, a fast, distributed, high performance gradient boosting
* framework based on decision tree algorithms.
* For more information please see here: https://github.com/Microsoft/LightGBM.
* For parameter information see here: https://github.com/Microsoft/LightGBM/blob/master/docs/Parameters.rst
@@ -1,3 +1,3 @@
Trains a LightGBM Binary Classification model, a fast, distributed, high performance gradient boosting
Trains a LightGBM Classification model, a fast, distributed, high performance gradient boosting
framework based on decision tree algorithms.
For more information please see here: https://github.com/Microsoft/LightGBM.
@@ -13,6 +13,9 @@ object LightGBMConstants {
/** Default buffer length for model
*/
val defaultBufferLength: Int = 10000
/** Lambdarank ranking objective
*/
val rankObjective: String = "lambdarank"
/** Binary classification objective
*/
val binaryObjective: String = "binary"
@@ -33,7 +33,7 @@ class LightGBMDataset(val dataset: SWIGTYPE_p_void) extends AutoCloseable {
}
}

def addField(field: Array[Double], fieldName: String, numRows: Int): Unit = {
def addFloatField(field: Array[Double], fieldName: String, numRows: Int): Unit = {
// Generate the column and add to dataset
var colArray: Option[SWIGTYPE_p_float] = None
try {
@@ -69,6 +69,24 @@ class LightGBMDataset(val dataset: SWIGTYPE_p_void) extends AutoCloseable {
}
}

def addIntField(field: Array[Int], fieldName: String, numRows: Int): Unit = {
// Generate the column and add to dataset
var colArray: Option[SWIGTYPE_p_int] = None
try {
colArray = Some(lightgbmlib.new_intArray(numRows))
field.zipWithIndex.foreach(ri =>
lightgbmlib.intArray_setitem(colArray.get, ri._2, ri._1))
val colAsVoidPtr = lightgbmlib.int_to_voidp_ptr(colArray.get)
val data32bitType = lightgbmlibConstants.C_API_DTYPE_INT32
LightGBMUtils.validate(
lightgbmlib.LGBM_DatasetSetField(dataset, fieldName, colAsVoidPtr, numRows, data32bitType),
"DatasetSetField")
} finally {
// Free column
colArray.foreach(lightgbmlib.delete_intArray(_))
}
}

def setFeatureNames(featureNamesOpt: Option[Array[String]], numCols: Int): Unit = {
// Add in slot names if they exist
featureNamesOpt.foreach { featureNamesVal =>
@@ -0,0 +1,129 @@
// Copyright (C) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See LICENSE in project root for information.

package com.microsoft.ml.spark

import org.apache.spark.ml.param._
import org.apache.spark.ml.util._
import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors}
import org.apache.spark.ml.ranker.{Ranker, RankerModel}
import org.apache.spark.sql._

import scala.reflect.runtime.universe.{TypeTag, typeTag}

object LightGBMRanker extends DefaultParamsReadable[LightGBMRanker]

/** Trains a LightGBMRanker model, a fast, distributed, high performance gradient boosting
* framework based on decision tree algorithms.
* For more information please see here: https://github.com/Microsoft/LightGBM.
* For parameter information see here: https://github.com/Microsoft/LightGBM/blob/master/docs/Parameters.rst
* @param uid The unique ID.
*/
@InternalWrapper
class LightGBMRanker(override val uid: String)
extends Ranker[Vector, LightGBMRanker, LightGBMRankerModel]
with LightGBMBase[LightGBMRankerModel] {
def this() = this(Identifiable.randomUID("LightGBMRanker"))

// Set default objective to be ranking classification
setDefault(objective -> LightGBMConstants.rankObjective)

val maxPosition = new IntParam(this, "maxPosition", "optimized NDCG at this position")
setDefault(maxPosition -> 20)

def getMaxPosition: Int = $(maxPosition)
def setMaxPosition(value: Int): this.type = set(maxPosition, value)

val labelGain = new DoubleArrayParam(this, "labelGain", "parameter for Huber loss and Quantile regression")
setDefault(labelGain -> Array.empty[Double])

def getLabelGain: Array[Double] = $(labelGain)
def setLabelGain(value: Array[Double]): this.type = set(labelGain, value)

def getTrainParams(numWorkers: Int, categoricalIndexes: Array[Int], dataset: Dataset[_]): TrainParams = {
val modelStr = if (getModelString == null || getModelString.isEmpty) None else get(modelString)
RankerTrainParams(getParallelism, getNumIterations, getLearningRate, getNumLeaves,
getObjective, getMaxBin, getBaggingFraction, getBaggingFreq, getBaggingSeed, getEarlyStoppingRound,
getFeatureFraction, getMaxDepth, getMinSumHessianInLeaf, numWorkers, modelStr,
getVerbosity, categoricalIndexes, getBoostingType, getLambdaL1, getLambdaL2, getMaxPosition, getLabelGain)
}

def getModel(trainParams: TrainParams, lightGBMBooster: LightGBMBooster): LightGBMRankerModel = {
new LightGBMRankerModel(uid, lightGBMBooster, getLabelCol, getFeaturesCol, getPredictionCol)
}

def stringFromTrainedModel(model: LightGBMRankerModel): String = {
model.getModel.model
}

override def getOptGroupCol: Option[String] = Some(getGroupCol)

/** For Ranking, we need to sort the data within partitions by group prior to training to ensure training succeeds.
* @param dataset The dataset to preprocess prior to training.
* @return The preprocessed data, sorted within partiton by group.
*/
override def preprocessData(dataset: DataFrame): DataFrame = {
dataset.sortWithinPartitions(getOptGroupCol.get)
}

override def copy(extra: ParamMap): LightGBMRanker = defaultCopy(extra)
}

/** Model produced by [[LightGBMRanker]]. */
@InternalWrapper
class LightGBMRankerModel(override val uid: String, model: LightGBMBooster, labelColName: String,
featuresColName: String, predictionColName: String)
extends RankerModel[Vector, LightGBMRankerModel]
with ConstructorWritable[LightGBMRankerModel] {

// Update the underlying Spark ML params
// (for proper serialization to work we put them on constructor instead of using copy as in Spark ML)
set(labelCol, labelColName)
set(featuresCol, featuresColName)
set(predictionCol, predictionColName)

override def predict(features: Vector): Double = {
model.score(features, false, false)(0)
}

override def copy(extra: ParamMap): LightGBMRankerModel =
new LightGBMRankerModel(uid, model, labelColName, featuresColName, predictionColName)

override val ttag: TypeTag[LightGBMRankerModel] =
typeTag[LightGBMRankerModel]

override def objectsToSave: List[Any] =
List(uid, model, getLabelCol, getFeaturesCol, getPredictionCol)

def saveNativeModel(filename: String, overwrite: Boolean): Unit = {
val session = SparkSession.builder().getOrCreate()
model.saveNativeModel(session, filename, overwrite)
}

def getFeatureImportances(importanceType: String): Array[Double] = {
model.getFeatureImportances(importanceType)
}

def getModel: LightGBMBooster = this.model
}

object LightGBMRankerModel extends ConstructorReadable[LightGBMRankerModel] {
def loadNativeModelFromFile(filename: String, labelColName: String = "label",
featuresColName: String = "features",
predictionColName: String = "prediction"): LightGBMRankerModel = {
val uid = Identifiable.randomUID("LightGBMRanker")
val session = SparkSession.builder().getOrCreate()
val textRdd = session.read.text(filename)
val text = textRdd.collect().map { row => row.getString(0) }.mkString("\n")
val lightGBMBooster = new LightGBMBooster(text)
new LightGBMRankerModel(uid, lightGBMBooster, labelColName, featuresColName, predictionColName)
}

def loadNativeModelFromString(model: String, labelColName: String = "label",
featuresColName: String = "features",
predictionColName: String = "prediction"): LightGBMRankerModel = {
val uid = Identifiable.randomUID("LightGBMRanker")
val lightGBMBooster = new LightGBMBooster(model)
new LightGBMRankerModel(uid, lightGBMBooster, labelColName, featuresColName, predictionColName)
}
}
@@ -0,0 +1,3 @@
Trains a LightGBM Ranker model, a fast, distributed, high performance gradient boosting
framework based on decision tree algorithms.
For more information please see here: https://github.com/Microsoft/LightGBM.
@@ -0,0 +1,3 @@
Trains a LightGBM Regression model, a fast, distributed, high performance gradient boosting
framework based on decision tree algorithms.
For more information please see here: https://github.com/Microsoft/LightGBM.
@@ -39,11 +39,13 @@ object LightGBMUtils {
new NativeLoader("/com/microsoft/ml/lightgbm").loadLibraryByName(osPrefix + "_lightgbm_swig")
}

def featurizeData(dataset: Dataset[_], labelColumn: String, featuresColumn: String): PipelineModel = {
def featurizeData(dataset: Dataset[_], labelColumn: String, featuresColumn: String,
weightColumn: Option[String] = None, groupColumn: Option[String] = None): PipelineModel = {
// Create pipeline model to featurize the dataset
val oneHotEncodeCategoricals = true
val featuresToHashTo = FeaturizeUtilities.numFeaturesTreeOrNNBased
val featureColumns = dataset.columns.filter(col => col != labelColumn).toSeq
val featureColumns = dataset.columns.filter(col => col != labelColumn &&
weightColumn.forall(_ != col) && groupColumn.forall(_ != col)).toSeq
val featurizer = new Featurize()
.setFeatureColumns(Map(featuresColumn -> featureColumns))
.setOneHotEncodeCategoricals(oneHotEncodeCategoricals)
@@ -297,22 +299,6 @@ object LightGBMUtils {
nodes
}

def newDoubleArray(array: Array[Double]): (SWIGTYPE_p_void, SWIGTYPE_p_double) = {
val data = lightgbmlib.new_doubleArray(array.length)
array.zipWithIndex.foreach {
case (value, index) => lightgbmlib.doubleArray_setitem(data, index, value)
}
(lightgbmlib.double_to_voidp_ptr(data), data)
}

def newIntArray(array: Array[Int]): (SWIGTYPE_p_int32_t, SWIGTYPE_p_int) = {
val data = lightgbmlib.new_intArray(array.length)
array.zipWithIndex.foreach {
case (value, index) => lightgbmlib.intArray_setitem(data, index, value)
}
(lightgbmlib.int_to_int32_t_ptr(data), data)
}

def intToPtr(value: Int): SWIGTYPE_p_int64_t = {
val longPtr = lightgbmlib.new_longp()
lightgbmlib.longp_assign(longPtr, value)
@@ -0,0 +1,29 @@
// Copyright (C) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See LICENSE in project root for information.

package org.apache.spark.ml.ranker

// Note: a bit strange to have the mmlspark import here, but it works
import com.microsoft.ml.spark.HasGroupCol
import org.apache.spark.ml.{PredictionModel, Predictor, PredictorParams}

/**
* Ranker base class
*
* @tparam FeaturesType Type of input features. E.g., org.apache.spark.mllib.linalg.Vector
* @tparam Learner Concrete Estimator type
* @tparam M Concrete Model type
*/
abstract class Ranker[FeaturesType,
Learner <: Ranker[FeaturesType, Learner, M],
M <: RankerModel[FeaturesType, M]]
extends Predictor[FeaturesType, Learner, M] with PredictorParams with HasGroupCol

/**
* Model produced by a `Ranker`.
*
* @tparam FeaturesType Type of input features. E.g., org.apache.spark.mllib.linalg.Vector
* @tparam M Concrete Model type.
*/
abstract class RankerModel[FeaturesType, M <: RankerModel[FeaturesType, M]]
extends PredictionModel[FeaturesType, M] with PredictorParams

0 comments on commit 53c4b9e

Please sign in to comment.
You can’t perform that action at this time.