Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32676][3.0][ML] Fix double caching in KMeans/BiKMeans #29528

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -28,9 +28,8 @@ import org.apache.spark.ml.util._
import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.mllib.clustering.{BisectingKMeans => MLlibBisectingKMeans,
BisectingKMeansModel => MLlibBisectingKMeansModel}
import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors}
import org.apache.spark.mllib.linalg.{Vectors => OldVectors}
import org.apache.spark.mllib.linalg.VectorImplicits._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DoubleType, IntegerType, StructType}
Expand Down Expand Up @@ -275,21 +274,6 @@ class BisectingKMeans @Since("2.0.0") (
override def fit(dataset: Dataset[_]): BisectingKMeansModel = instrumented { instr =>
transformSchema(dataset.schema, logging = true)

val handlePersistence = dataset.storageLevel == StorageLevel.NONE
val w = if (isDefined(weightCol) && $(weightCol).nonEmpty) {
col($(weightCol)).cast(DoubleType)
} else {
lit(1.0)
}

val instances: RDD[(OldVector, Double)] = dataset
.select(DatasetUtils.columnToVector(dataset, getFeaturesCol), w).rdd.map {
case Row(point: Vector, weight: Double) => (OldVectors.fromML(point), weight)
}
if (handlePersistence) {
instances.persist(StorageLevel.MEMORY_AND_DISK)
}

instr.logPipelineStage(this)
instr.logDataset(dataset)
instr.logParams(this, featuresCol, predictionCol, k, maxIter, seed,
Expand All @@ -301,11 +285,18 @@ class BisectingKMeans @Since("2.0.0") (
.setMinDivisibleClusterSize($(minDivisibleClusterSize))
.setSeed($(seed))
.setDistanceMeasure($(distanceMeasure))
val parentModel = bkm.runWithWeight(instances, Some(instr))
val model = copyValues(new BisectingKMeansModel(uid, parentModel).setParent(this))
if (handlePersistence) {
instances.unpersist()

val w = if (isDefined(weightCol) && $(weightCol).nonEmpty) {
col($(weightCol)).cast(DoubleType)
} else {
lit(1.0)
}
val instances = dataset.select(DatasetUtils.columnToVector(dataset, getFeaturesCol), w)
.rdd.map { case Row(point: Vector, weight: Double) => (OldVectors.fromML(point), weight) }

val handlePersistence = dataset.storageLevel == StorageLevel.NONE
val parentModel = bkm.runWithWeight(instances, handlePersistence, Some(instr))
val model = copyValues(new BisectingKMeansModel(uid, parentModel).setParent(this))

val summary = new BisectingKMeansSummary(
model.transform(dataset),
Expand Down
33 changes: 12 additions & 21 deletions mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala
Expand Up @@ -31,7 +31,6 @@ import org.apache.spark.ml.util.Instrumentation.instrumented
import org.apache.spark.mllib.clustering.{DistanceMeasure, KMeans => MLlibKMeans, KMeansModel => MLlibKMeansModel}
import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors}
import org.apache.spark.mllib.linalg.VectorImplicits._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DoubleType, IntegerType, StructType}
Expand Down Expand Up @@ -329,22 +328,6 @@ class KMeans @Since("1.5.0") (
override def fit(dataset: Dataset[_]): KMeansModel = instrumented { instr =>
transformSchema(dataset.schema, logging = true)

val handlePersistence = dataset.storageLevel == StorageLevel.NONE
val w = if (isDefined(weightCol) && $(weightCol).nonEmpty) {
col($(weightCol)).cast(DoubleType)
} else {
lit(1.0)
}

val instances: RDD[(OldVector, Double)] = dataset
.select(DatasetUtils.columnToVector(dataset, getFeaturesCol), w).rdd.map {
case Row(point: Vector, weight: Double) => (OldVectors.fromML(point), weight)
}

if (handlePersistence) {
instances.persist(StorageLevel.MEMORY_AND_DISK)
}

instr.logPipelineStage(this)
instr.logDataset(dataset)
instr.logParams(this, featuresCol, predictionCol, k, initMode, initSteps, distanceMeasure,
Expand All @@ -357,8 +340,19 @@ class KMeans @Since("1.5.0") (
.setSeed($(seed))
.setEpsilon($(tol))
.setDistanceMeasure($(distanceMeasure))
val parentModel = algo.runWithWeight(instances, Option(instr))

val w = if (isDefined(weightCol) && $(weightCol).nonEmpty) {
col($(weightCol)).cast(DoubleType)
} else {
lit(1.0)
}
val instances = dataset.select(DatasetUtils.columnToVector(dataset, getFeaturesCol), w)
.rdd.map { case Row(point: Vector, weight: Double) => (OldVectors.fromML(point), weight) }

val handlePersistence = dataset.storageLevel == StorageLevel.NONE
val parentModel = algo.runWithWeight(instances, handlePersistence, Some(instr))
val model = copyValues(new KMeansModel(uid, parentModel).setParent(this))

val summary = new KMeansSummary(
model.transform(dataset),
$(predictionCol),
Expand All @@ -369,9 +363,6 @@ class KMeans @Since("1.5.0") (

model.setSummary(Some(summary))
instr.logNamedValue("clusterSizes", summary.clusterSizes)
if (handlePersistence) {
instances.unpersist()
}
model
}

Expand Down
Expand Up @@ -153,30 +153,25 @@ class BisectingKMeans private (
this
}

private[spark] def run(
input: RDD[Vector],
instr: Option[Instrumentation]): BisectingKMeansModel = {
val instances: RDD[(Vector, Double)] = input.map {
case (point) => (point, 1.0)
}
runWithWeight(instances, None)
}

private[spark] def runWithWeight(
input: RDD[(Vector, Double)],
instances: RDD[(Vector, Double)],
handlePersistence: Boolean,
instr: Option[Instrumentation]): BisectingKMeansModel = {
val d = input.map(_._1.size).first
val d = instances.map(_._1.size).first
logInfo(s"Feature dimension: $d.")

val dMeasure: DistanceMeasure = DistanceMeasure.decodeFromString(this.distanceMeasure)
// Compute and cache vector norms for fast distance computation.
val norms = input.map(d => Vectors.norm(d._1, 2.0))
val vectors = input.zip(norms).map {
case ((x, weight), norm) => new VectorWithNorm(x, norm, weight)
}
if (input.getStorageLevel == StorageLevel.NONE) {
val dMeasure = DistanceMeasure.decodeFromString(this.distanceMeasure)
val norms = instances.map(d => Vectors.norm(d._1, 2.0))
val vectors = instances.zip(norms)
.map { case ((x, weight), norm) => new VectorWithNorm(x, norm, weight) }

if (handlePersistence) {
vectors.persist(StorageLevel.MEMORY_AND_DISK)
} else {
// Compute and cache vector norms for fast distance computation.
norms.persist(StorageLevel.MEMORY_AND_DISK)
}

var assignments = vectors.map(v => (ROOT_INDEX, v))
var activeClusters = summarize(d, assignments, dMeasure)
instr.foreach(_.logNumExamples(activeClusters.values.map(_.size).sum))
Expand Down Expand Up @@ -244,13 +239,11 @@ class BisectingKMeans private (
}
level += 1
}
if (preIndices != null) {
preIndices.unpersist()
}
if (indices != null) {
indices.unpersist()
}
vectors.unpersist()

if (preIndices != null) { preIndices.unpersist() }
if (indices != null) { indices.unpersist() }
if (handlePersistence) { vectors.unpersist() } else { norms.unpersist() }

val clusters = activeClusters ++ inactiveClusters
val root = buildTree(clusters, dMeasure)
val totalCost = root.leafNodes.map(_.cost).sum
Expand All @@ -264,7 +257,9 @@ class BisectingKMeans private (
*/
@Since("1.6.0")
def run(input: RDD[Vector]): BisectingKMeansModel = {
run(input, None)
val instances = input.map(point => (point, 1.0))
val handlePersistence = input.getStorageLevel == StorageLevel.NONE
runWithWeight(instances, handlePersistence, None)
}

/**
Expand Down
33 changes: 15 additions & 18 deletions mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
Expand Up @@ -209,30 +209,27 @@ class KMeans private (
*/
@Since("0.8.0")
def run(data: RDD[Vector]): KMeansModel = {
val instances: RDD[(Vector, Double)] = data.map {
case (point) => (point, 1.0)
}
runWithWeight(instances, None)
val instances = data.map(point => (point, 1.0))
val handlePersistence = data.getStorageLevel == StorageLevel.NONE
runWithWeight(instances, handlePersistence, None)
}

private[spark] def runWithWeight(
data: RDD[(Vector, Double)],
instances: RDD[(Vector, Double)],
handlePersistence: Boolean,
instr: Option[Instrumentation]): KMeansModel = {
val norms = instances.map { case (v, _) => Vectors.norm(v, 2.0) }
val vectors = instances.zip(norms)
.map { case ((v, w), norm) => new VectorWithNorm(v, norm, w) }

// Compute squared norms and cache them.
val norms = data.map { case (v, _) =>
Vectors.norm(v, 2.0)
}

val zippedData = data.zip(norms).map { case ((v, w), norm) =>
new VectorWithNorm(v, norm, w)
}

if (data.getStorageLevel == StorageLevel.NONE) {
zippedData.persist(StorageLevel.MEMORY_AND_DISK)
if (handlePersistence) {
vectors.persist(StorageLevel.MEMORY_AND_DISK)
} else {
// Compute squared norms and cache them.
norms.persist(StorageLevel.MEMORY_AND_DISK)
}
val model = runAlgorithmWithWeight(zippedData, instr)
zippedData.unpersist()
val model = runAlgorithmWithWeight(vectors, instr)
if (handlePersistence) { vectors.unpersist() } else { norms.unpersist() }

model
}
Expand Down