Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
init

init

init

init
  • Loading branch information
zhengruifeng committed Jul 2, 2020
1 parent ced8e0e commit 5d7432d
Show file tree
Hide file tree
Showing 9 changed files with 157 additions and 49 deletions.
Expand Up @@ -42,7 +42,7 @@ import org.apache.spark.storage.StorageLevel
/** Params for linear SVM Classifier. */
private[classification] trait LinearSVCParams extends ClassifierParams with HasRegParam
with HasMaxIter with HasFitIntercept with HasTol with HasStandardization with HasWeightCol
with HasAggregationDepth with HasThreshold with HasBlockSize {
with HasAggregationDepth with HasThreshold with HasMaxBlockMemoryInMB {

/**
* Param for threshold in binary classification prediction.
Expand Down Expand Up @@ -156,24 +156,9 @@ class LinearSVC @Since("2.2.0") (
def setAggregationDepth(value: Int): this.type = set(aggregationDepth, value)
setDefault(aggregationDepth -> 2)

/**
* Set block size for stacking input data in matrices.
* If blockSize == 1, then stacking will be skipped, and each vector is treated individually;
* If blockSize > 1, then vectors will be stacked to blocks, and high-level BLAS routines
* will be used if possible (for example, GEMV instead of DOT, GEMM instead of GEMV).
* Recommended size is between 10 and 1000. An appropriate choice of the block size depends
* on the sparsity and dim of input datasets, the underlying BLAS implementation (for example,
* f2jBLAS, OpenBLAS, intel MKL) and its configuration (for example, number of threads).
* Note that existing BLAS implementations are mainly optimized for dense matrices, if the
* input dataset is sparse, stacking may bring no performance gain, the worse is possible
* performance regression.
* Default is 1.
*
* @group expertSetParam
*/
@Since("3.1.0")
def setBlockSize(value: Int): this.type = set(blockSize, value)
setDefault(blockSize -> 1)
def setMaxBlockMemoryInMB(value: Int): this.type = set(maxBlockMemoryInMB, value)
setDefault(maxBlockMemoryInMB -> 0)

@Since("2.2.0")
override def copy(extra: ParamMap): LinearSVC = defaultCopy(extra)
Expand All @@ -182,17 +167,19 @@ class LinearSVC @Since("2.2.0") (
instr.logPipelineStage(this)
instr.logDataset(dataset)
instr.logParams(this, labelCol, weightCol, featuresCol, predictionCol, rawPredictionCol,
regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth, blockSize)
regParam, maxIter, fitIntercept, tol, standardization, threshold, aggregationDepth,
maxBlockMemoryInMB)
val enableBlockify = $(maxBlockMemoryInMB) > 0

val instances = extractInstances(dataset)
.setName("training instances")

if (dataset.storageLevel == StorageLevel.NONE && $(blockSize) == 1) {
if (dataset.storageLevel == StorageLevel.NONE && enableBlockify) {
instances.persist(StorageLevel.MEMORY_AND_DISK)
}

var requestedMetrics = Seq("mean", "std", "count")
if ($(blockSize) != 1) requestedMetrics +:= "numNonZeros"
if (enableBlockify) requestedMetrics +:= "numNonZeros"
val (summarizer, labelSummarizer) = Summarizer
.getClassificationSummarizers(instances, $(aggregationDepth), requestedMetrics)

Expand All @@ -204,14 +191,10 @@ class LinearSVC @Since("2.2.0") (
instr.logNamedValue("lowestLabelWeight", labelSummarizer.histogram.min.toString)
instr.logNamedValue("highestLabelWeight", labelSummarizer.histogram.max.toString)
instr.logSumOfWeights(summarizer.weightSum)
if ($(blockSize) > 1) {
if (enableBlockify) {
val scale = 1.0 / summarizer.count / numFeatures
val sparsity = 1 - summarizer.numNonzeros.toArray.map(_ * scale).sum
instr.logNamedValue("sparsity", sparsity.toString)
if (sparsity > 0.5) {
instr.logWarning(s"sparsity of input dataset is $sparsity, " +
s"which may hurt performance in high-level BLAS.")
}
}

val numClasses = MetadataUtils.getNumClasses(dataset.schema($(labelCol))) match {
Expand Down Expand Up @@ -250,10 +233,10 @@ class LinearSVC @Since("2.2.0") (
Note that the intercept in scaled space and original space is the same;
as a result, no scaling is needed.
*/
val (rawCoefficients, objectiveHistory) = if ($(blockSize) == 1) {
trainOnRows(instances, featuresStd, regularization, optimizer)
} else {
val (rawCoefficients, objectiveHistory) = if (enableBlockify) {
trainOnBlocks(instances, featuresStd, regularization, optimizer)
} else {
trainOnRows(instances, featuresStd, regularization, optimizer)
}
if (instances.getStorageLevel != StorageLevel.NONE) instances.unpersist()

Expand Down Expand Up @@ -331,9 +314,10 @@ class LinearSVC @Since("2.2.0") (
val func = StandardScalerModel.getTransformFunc(Array.empty, inverseStd, false, true)
iter.map { case Instance(label, weight, vec) => Instance(label, weight, func(vec)) }
}
val blocks = InstanceBlock.blokify(standardized, $(blockSize))
val maxMemoryUsage = $(maxBlockMemoryInMB) * 1024L * 1024L
val blocks = InstanceBlock.blokifyWithMaxMemoryUsage(standardized, maxMemoryUsage)
.persist(StorageLevel.MEMORY_AND_DISK)
.setName(s"training blocks (blockSize=${$(blockSize)})")
.setName(s"training blocks (maxBlockMemoryInMB=${$(maxBlockMemoryInMB)})")

val getAggregatorFunc = new BlockHingeAggregator($(fitIntercept))(_)
val costFun = new RDDLossFunction(blocks, getAggregatorFunc,
Expand Down
56 changes: 56 additions & 0 deletions mllib/src/main/scala/org/apache/spark/ml/feature/Instance.scala
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.ml.feature

import scala.collection.mutable

import org.apache.spark.ml.linalg._
import org.apache.spark.rdd.RDD

Expand Down Expand Up @@ -100,6 +102,18 @@ private[spark] case class InstanceBlock(

private[spark] object InstanceBlock {

private def getBlockSize(
numCols: Int,
numRows: Int,
nnz: Int): Long = {
val doubleBytes = java.lang.Double.BYTES
val arrayHeader = 12L
val denseSize = Matrices.getDenseSize(numCols, numRows)
val sparseSize = Matrices.getSparseSize(nnz, numRows + 1)
val matrixSize = math.min(denseSize, sparseSize)
matrixSize + doubleBytes * numRows * 2 + arrayHeader * 2
}

def fromInstances(instances: Seq[Instance]): InstanceBlock = {
val labels = instances.map(_.label).toArray
val weights = if (instances.exists(_.weight != 1)) {
Expand All @@ -114,6 +128,48 @@ private[spark] object InstanceBlock {
def blokify(instances: RDD[Instance], blockSize: Int): RDD[InstanceBlock] = {
instances.mapPartitions(_.grouped(blockSize).map(InstanceBlock.fromInstances))
}

def blokifyWithMaxMemoryUsage(
iterator: Iterator[Instance],
maxMemoryUsage: Long): Iterator[InstanceBlock] = {
require(maxMemoryUsage > 0)
val buff = mutable.ArrayBuilder.make[Instance]
var numCols = -1
var count = 0
var nnz = 0

iterator.flatMap { instance =>
if (numCols < 0) numCols = instance.features.size
require(numCols == instance.features.size)
val n = instance.features.numNonzeros
var block = Option.empty[InstanceBlock]
if (getBlockSize(numCols, count + 1, nnz + n) > maxMemoryUsage) {
val instances = buff.result()
if (instances.nonEmpty) block = Some(InstanceBlock.fromInstances(instances))
buff.clear()
count = 0
nnz = 0
require(getBlockSize(numCols, 1, n) <= maxMemoryUsage,
s"instance $instance exceeds memory limit $maxMemoryUsage")
}
buff += instance
count += 1
nnz += n
block.iterator
} ++ {
val instances = buff.result()
if (instances.nonEmpty) {
Iterator.single(InstanceBlock.fromInstances(instances))
} else Iterator.empty
}
}

def blokifyWithMaxMemoryUsage(
instances: RDD[Instance],
maxMemoryUsage: Long): RDD[InstanceBlock] = {
require(maxMemoryUsage > 0)
instances.mapPartitions(iter => blokifyWithMaxMemoryUsage(iter, maxMemoryUsage))
}
}


Expand Down
Expand Up @@ -107,8 +107,10 @@ private[shared] object SharedParamsCodeGen {
"validation."),
ParamDesc[Int]("blockSize", "block size for stacking input data in matrices. Data is " +
"stacked within partitions. If block size is more than remaining data in a partition " +
"then it is adjusted to the size of this data.",
isValid = "ParamValidators.gt(0)", isExpertParam = true)
"then it is adjusted to the size of this data",
isValid = "ParamValidators.gt(0)", isExpertParam = true),
ParamDesc[Int]("maxBlockMemoryInMB", "Maximum memory in MB allocated to stack instances " +
"to a block", isValid = "ParamValidators.gtEq(0)", isExpertParam = true)
)

val code = genSharedParams(params)
Expand Down
Expand Up @@ -554,12 +554,28 @@ trait HasValidationIndicatorCol extends Params {
trait HasBlockSize extends Params {

/**
* Param for block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data..
* Param for block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data.
* @group expertParam
*/
final val blockSize: IntParam = new IntParam(this, "blockSize", "block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data.", ParamValidators.gt(0))
final val blockSize: IntParam = new IntParam(this, "blockSize", "block size for stacking input data in matrices. Data is stacked within partitions. If block size is more than remaining data in a partition then it is adjusted to the size of this data", ParamValidators.gt(0))

/** @group expertGetParam */
final def getBlockSize: Int = $(blockSize)
}

/**
* Trait for shared param maxBlockMemoryInMB. This trait may be changed or
* removed between minor versions.
*/
trait HasMaxBlockMemoryInMB extends Params {

/**
* Param for Maximum memory in MB allocated to stack instances to a block.
* @group expertParam
*/
final val maxBlockMemoryInMB: IntParam = new IntParam(this, "maxBlockMemoryInMB", "Maximum memory in MB allocated to stack instances to a block", ParamValidators.gtEq(0))

/** @group expertGetParam */
final def getMaxBlockMemoryInMB: Int = $(maxBlockMemoryInMB)
}
// scalastyle:on
Expand Up @@ -214,8 +214,8 @@ class LinearSVCSuite extends MLTest with DefaultReadWriteTest {
.setFitIntercept(fitIntercept)
.setMaxIter(5)
val model = lsvc.fit(dataset)
Seq(4, 16, 64).foreach { blockSize =>
val model2 = lsvc.setBlockSize(blockSize).fit(dataset)
Seq(0, 1).foreach { memUsage =>
val model2 = lsvc.setMaxBlockMemoryInMB(memUsage).fit(dataset)
assert(model.intercept ~== model2.intercept relTol 1e-9)
assert(model.coefficients ~== model2.coefficients relTol 1e-9)
}
Expand Down
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.ml.feature

import scala.util.Random

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.internal.config.Kryo._
import org.apache.spark.ml.linalg.Vectors
Expand Down Expand Up @@ -74,4 +76,33 @@ class InstanceSuite extends SparkFunSuite{
}
}

test("InstanceBlock: memory usage limit") {
val rng = new Random(123L)
val instances = Seq.fill(1000) {
Instance(rng.nextDouble, rng.nextDouble, Vectors.dense(Array.fill(1000)(rng.nextDouble)))
}

Seq(1, 2, 3).foreach { maxBlockMemoryInMB =>
val maxMemoryUsage = maxBlockMemoryInMB * 1024L * 1024L
val blocks = InstanceBlock.blokifyWithMaxMemoryUsage(
instances.iterator, maxMemoryUsage).toSeq
val flatten = blocks.flatMap { block => block.instanceIterator }
assert(instances.size == flatten.size)
assert(instances.iterator.zip(flatten.iterator).forall(t => t._1 === t._2))
}

Seq(2, 4, 64).foreach { i =>
val instanceIter = Iterator.single(Instance(rng.nextDouble, rng.nextDouble,
Vectors.dense(Array.fill(1024 * i)(rng.nextDouble))))
assert(InstanceBlock.blokifyWithMaxMemoryUsage(instanceIter, 1024L * 1024L).size == 1)
}

Seq(128, 256).foreach { i =>
val instanceIter = Iterator.single(Instance(rng.nextDouble, rng.nextDouble,
Vectors.dense(Array.fill(1024 * i)(rng.nextDouble))))
intercept[IllegalArgumentException] {
InstanceBlock.blokifyWithMaxMemoryUsage(instanceIter, 1024L * 1024L).size
}
}
}
}
22 changes: 11 additions & 11 deletions python/pyspark/ml/classification.py
Expand Up @@ -499,7 +499,7 @@ def recallByThreshold(self):

class _LinearSVCParams(_ClassifierParams, HasRegParam, HasMaxIter, HasFitIntercept, HasTol,
HasStandardization, HasWeightCol, HasAggregationDepth, HasThreshold,
HasBlockSize):
HasMaxBlockMemoryInMB):
"""
Params for :py:class:`LinearSVC` and :py:class:`LinearSVCModel`.
Expand Down Expand Up @@ -548,8 +548,8 @@ class LinearSVC(_JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadabl
LinearSVCModel...
>>> model.getThreshold()
0.5
>>> model.getBlockSize()
1
>>> model.getMaxBlockMemoryInMB()
0
>>> model.coefficients
DenseVector([0.0, -0.2792, -0.1833])
>>> model.intercept
Expand Down Expand Up @@ -588,19 +588,19 @@ class LinearSVC(_JavaClassifier, _LinearSVCParams, JavaMLWritable, JavaMLReadabl
def __init__(self, featuresCol="features", labelCol="label", predictionCol="prediction",
maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction",
fitIntercept=True, standardization=True, threshold=0.0, weightCol=None,
aggregationDepth=2, blockSize=1):
aggregationDepth=2, maxBlockMemoryInMB=0):
"""
__init__(self, featuresCol="features", labelCol="label", predictionCol="prediction", \
maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", \
fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, \
aggregationDepth=2, blockSize=1):
aggregationDepth=2, maxBlockMemoryInMB=0):
"""
super(LinearSVC, self).__init__()
self._java_obj = self._new_java_obj(
"org.apache.spark.ml.classification.LinearSVC", self.uid)
self._setDefault(maxIter=100, regParam=0.0, tol=1e-6, fitIntercept=True,
standardization=True, threshold=0.0, aggregationDepth=2,
blockSize=1)
maxBlockMemoryInMB=0)
kwargs = self._input_kwargs
self.setParams(**kwargs)

Expand All @@ -609,12 +609,12 @@ def __init__(self, featuresCol="features", labelCol="label", predictionCol="pred
def setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction",
maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction",
fitIntercept=True, standardization=True, threshold=0.0, weightCol=None,
aggregationDepth=2, blockSize=1):
aggregationDepth=2, maxBlockMemoryInMB=0):
"""
setParams(self, featuresCol="features", labelCol="label", predictionCol="prediction", \
maxIter=100, regParam=0.0, tol=1e-6, rawPredictionCol="rawPrediction", \
fitIntercept=True, standardization=True, threshold=0.0, weightCol=None, \
aggregationDepth=2, blockSize=1):
aggregationDepth=2, maxBlockMemoryInMB=0):
Sets params for Linear SVM Classifier.
"""
kwargs = self._input_kwargs
Expand Down Expand Up @@ -680,11 +680,11 @@ def setAggregationDepth(self, value):
return self._set(aggregationDepth=value)

@since("3.1.0")
def setBlockSize(self, value):
def setMaxBlockMemoryInMB(self, value):
"""
Sets the value of :py:attr:`blockSize`.
Sets the value of :py:attr:`maxBlockMemoryInMB`.
"""
return self._set(blockSize=value)
return self._set(maxBlockMemoryInMB=value)


class LinearSVCModel(_JavaClassificationModel, _LinearSVCParams, JavaMLWritable, JavaMLReadable,
Expand Down
4 changes: 3 additions & 1 deletion python/pyspark/ml/param/_shared_params_code_gen.py
Expand Up @@ -167,7 +167,9 @@ def get$Name(self):
None, "TypeConverters.toString"),
("blockSize", "block size for stacking input data in matrices. Data is stacked within "
"partitions. If block size is more than remaining data in a partition then it is "
"adjusted to the size of this data.", None, "TypeConverters.toInt")]
"adjusted to the size of this data.", None, "TypeConverters.toInt"),
("maxBlockMemoryInMB", "Maximum memory in MB allocated to stack instances to a block.",
None, "TypeConverters.toInt")]

code = []
for name, doc, defaultValueStr, typeConverter in shared:
Expand Down
17 changes: 17 additions & 0 deletions python/pyspark/ml/param/shared.py
Expand Up @@ -597,3 +597,20 @@ def getBlockSize(self):
Gets the value of blockSize or its default value.
"""
return self.getOrDefault(self.blockSize)


class HasMaxBlockMemoryInMB(Params):
"""
Mixin for param maxBlockMemoryInMB: Maximum memory in MB allocated to stack instances to a block.
"""

maxBlockMemoryInMB = Param(Params._dummy(), "maxBlockMemoryInMB", "Maximum memory in MB allocated to stack instances to a block.", typeConverter=TypeConverters.toInt)

def __init__(self):
super(HasMaxBlockMemoryInMB, self).__init__()

def getMaxBlockMemoryInMB(self):
"""
Gets the value of maxBlockMemoryInMB or its default value.
"""
return self.getOrDefault(self.maxBlockMemoryInMB)

0 comments on commit 5d7432d

Please sign in to comment.