From ed890d35ff1e9edbe2a557f68732835b3e911906 Mon Sep 17 00:00:00 2001 From: Lu WANG Date: Mon, 16 Apr 2018 10:32:02 -0700 Subject: [PATCH 1/8] add Array input support for KMeans --- .../apache/spark/ml/clustering/KMeans.scala | 33 +++++++++++++++---- .../spark/ml/clustering/KMeansSuite.scala | 29 ++++++++++++++++ 2 files changed, 56 insertions(+), 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala index 987a4285ebad4..1bc420697ba5c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala @@ -22,7 +22,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.ml.{Estimator, Model} -import org.apache.spark.ml.linalg.{Vector, VectorUDT} +import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT} import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ @@ -32,7 +32,7 @@ import org.apache.spark.mllib.linalg.VectorImplicits._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.functions.{col, udf} -import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType, IntegerType, StructType} import org.apache.spark.storage.StorageLevel import org.apache.spark.util.VersionUtils.majorVersion @@ -90,7 +90,12 @@ private[clustering] trait KMeansParams extends Params with HasMaxIter with HasFe * @return output schema */ protected def validateAndTransformSchema(schema: StructType): StructType = { - SchemaUtils.checkColumnType(schema, $(featuresCol), new VectorUDT) + val typeCandidates = List( new VectorUDT, + new ArrayType(DoubleType, true), + new ArrayType(DoubleType, false), + new ArrayType(FloatType, true), + new ArrayType(FloatType, false)) + SchemaUtils.checkColumnTypes(schema, $(featuresCol), typeCandidates) SchemaUtils.appendColumn(schema, $(predictionCol), IntegerType) } } @@ -123,8 +128,15 @@ class KMeansModel private[ml] ( @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { transformSchema(dataset.schema, logging = true) - val predictUDF = udf((vector: Vector) => predict(vector)) - dataset.withColumn($(predictionCol), predictUDF(col($(featuresCol)))) + // val predictUDF = udf((vector: Vector) => predict(vector)) + if (dataset.schema($(featuresCol)).dataType.equals(new VectorUDT)) { + val predictUDF = udf((vector: Vector) => predict(vector)) + dataset.withColumn($(predictionCol), predictUDF(col($(featuresCol)))) + } else { + val predictUDF = udf((vector: Seq[_]) => + predict(Vectors.dense(vector.asInstanceOf[Seq[Double]].toArray))) + dataset.withColumn($(predictionCol), predictUDF(col($(featuresCol)))) + } } @Since("1.5.0") @@ -144,7 +156,12 @@ class KMeansModel private[ml] ( // TODO: Replace the temp fix when we have proper evaluators defined for clustering. @Since("2.0.0") def computeCost(dataset: Dataset[_]): Double = { - SchemaUtils.checkColumnType(dataset.schema, $(featuresCol), new VectorUDT) + val typeCandidates = List( new VectorUDT, + new ArrayType(DoubleType, true), + new ArrayType(DoubleType, false), + new ArrayType(FloatType, true), + new ArrayType(FloatType, false)) + SchemaUtils.checkColumnTypes(dataset.schema, $(featuresCol), typeCandidates) val data: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map { case Row(point: Vector) => OldVectors.fromML(point) } @@ -305,6 +322,8 @@ class KMeans @Since("1.5.0") ( @Since("1.5.0") def setSeed(value: Long): this.type = set(seed, value) + case class SeqDouble(value: Seq[Double]) + @Since("2.0.0") override def fit(dataset: Dataset[_]): KMeansModel = { transformSchema(dataset.schema, logging = true) @@ -312,6 +331,8 @@ class KMeans @Since("1.5.0") ( val handlePersistence = dataset.storageLevel == StorageLevel.NONE val instances: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map { case Row(point: Vector) => OldVectors.fromML(point) + case Row(point: Seq[_]) => + OldVectors.fromML(Vectors.dense(point.asInstanceOf[Seq[Double]].toArray)) } if (handlePersistence) { diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala index 32830b39407ad..2595715735f30 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} import org.apache.spark.mllib.clustering.{DistanceMeasure, KMeans => MLlibKMeans} import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} +import org.apache.spark.sql.functions._ private[clustering] case class TestRow(features: Vector) @@ -194,6 +195,34 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultR assert(e.getCause.getMessage.contains("Cosine distance is not defined")) } + test("KMean with Array input") { + val featuresColName = "array_model_features" + + val arrayUDF = udf { (features: Vector) => + features.toArray + } + val newdataset = dataset.withColumn(featuresColName, arrayUDF(col("features")) ) + + val kmeans = new KMeans() + .setFeaturesCol(featuresColName) + + assert(kmeans.getK === 2) + assert(kmeans.getFeaturesCol === featuresColName) + assert(kmeans.getPredictionCol === "prediction") + assert(kmeans.getMaxIter === 20) + assert(kmeans.getInitMode === MLlibKMeans.K_MEANS_PARALLEL) + assert(kmeans.getInitSteps === 2) + assert(kmeans.getTol === 1e-4) + assert(kmeans.getDistanceMeasure === DistanceMeasure.EUCLIDEAN) + val model = kmeans.setMaxIter(1).fit(newdataset) + + MLTestingUtils.checkCopyAndUids(kmeans, model) + assert(model.hasSummary) + val copiedModel = model.copy(ParamMap.empty) + assert(copiedModel.hasSummary) + } + + test("read/write") { def checkModelData(model: KMeansModel, model2: KMeansModel): Unit = { assert(model.clusterCenters === model2.clusterCenters) From badb0cc5ca6ca69bb8e8fc0fce5ea05a4100bca0 Mon Sep 17 00:00:00 2001 From: Lu WANG Date: Mon, 16 Apr 2018 10:49:00 -0700 Subject: [PATCH 2/8] remove redundent code --- .../src/main/scala/org/apache/spark/ml/clustering/KMeans.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala index 1bc420697ba5c..1a5e2b67b1d9a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala @@ -322,8 +322,6 @@ class KMeans @Since("1.5.0") ( @Since("1.5.0") def setSeed(value: Long): this.type = set(seed, value) - case class SeqDouble(value: Seq[Double]) - @Since("2.0.0") override def fit(dataset: Dataset[_]): KMeansModel = { transformSchema(dataset.schema, logging = true) From 6d222a3f257c850e653c9c048fb8c15e44d2c48f Mon Sep 17 00:00:00 2001 From: Lu WANG Date: Mon, 16 Apr 2018 15:10:16 -0700 Subject: [PATCH 3/8] make sure the code works for Float type and add the unit test --- .../apache/spark/ml/clustering/KMeans.scala | 35 +++++++++---- .../spark/ml/clustering/KMeansSuite.scala | 52 ++++++++++++------- 2 files changed, 59 insertions(+), 28 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala index 1a5e2b67b1d9a..f126218acb4a8 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala @@ -129,14 +129,21 @@ class KMeansModel private[ml] ( override def transform(dataset: Dataset[_]): DataFrame = { transformSchema(dataset.schema, logging = true) // val predictUDF = udf((vector: Vector) => predict(vector)) - if (dataset.schema($(featuresCol)).dataType.equals(new VectorUDT)) { - val predictUDF = udf((vector: Vector) => predict(vector)) - dataset.withColumn($(predictionCol), predictUDF(col($(featuresCol)))) - } else { - val predictUDF = udf((vector: Seq[_]) => - predict(Vectors.dense(vector.asInstanceOf[Seq[Double]].toArray))) - dataset.withColumn($(predictionCol), predictUDF(col($(featuresCol)))) + val predictUDF = if (dataset.schema($(featuresCol)).dataType.equals(new VectorUDT)) { + udf((vector: Vector) => predict(vector)) } + else { + udf((vector: Seq[_]) => { + val featureArray = Array.fill[Double](vector.size)(0.0) + for (idx <- 0 until vector.size) { + featureArray(idx) = vector(idx).toString().toDouble + } + OldVectors.fromML(Vectors.dense(featureArray)) + predict(Vectors.dense(featureArray)) + }) + } + + dataset.withColumn($(predictionCol), predictUDF(col($(featuresCol)))) } @Since("1.5.0") @@ -164,6 +171,12 @@ class KMeansModel private[ml] ( SchemaUtils.checkColumnTypes(dataset.schema, $(featuresCol), typeCandidates) val data: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map { case Row(point: Vector) => OldVectors.fromML(point) + case Row(point: Seq[_]) => + val featureArray = Array.fill[Double](point.size)(0.0) + for (idx <- 0 until point.size) { + featureArray(idx) = point(idx).toString().toDouble + } + OldVectors.fromML(Vectors.dense(featureArray)) } parentModel.computeCost(data) } @@ -330,8 +343,12 @@ class KMeans @Since("1.5.0") ( val instances: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map { case Row(point: Vector) => OldVectors.fromML(point) case Row(point: Seq[_]) => - OldVectors.fromML(Vectors.dense(point.asInstanceOf[Seq[Double]].toArray)) - } + val featureArray = Array.fill[Double](point.size)(0.0) + for (idx <- 0 until point.size) { + featureArray(idx) = point(idx).toString().toDouble + } + OldVectors.fromML(Vectors.dense(featureArray)) + } if (handlePersistence) { instances.persist(StorageLevel.MEMORY_AND_DISK) diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala index 2595715735f30..85a4e701ed11a 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala @@ -27,6 +27,7 @@ import org.apache.spark.mllib.clustering.{DistanceMeasure, KMeans => MLlibKMeans import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType, IntegerType, StructType} private[clustering] case class TestRow(features: Vector) @@ -196,30 +197,43 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultR } test("KMean with Array input") { - val featuresColName = "array_model_features" + val featuresColNameD = "array_double_features" + val featuresColNameF = "array_float_features" - val arrayUDF = udf { (features: Vector) => - features.toArray + val doubleUDF = udf { (features: Vector) => + val featureArray = Array.fill[Double](features.size)(0.0) + features.foreachActive((idx, value) => featureArray(idx) = value.toFloat) + featureArray + } + val floatUDF = udf { (features: Vector) => + val featureArray = Array.fill[Float](features.size)(0.0f) + features.foreachActive((idx, value) => featureArray(idx) = value.toFloat) + featureArray } - val newdataset = dataset.withColumn(featuresColName, arrayUDF(col("features")) ) - val kmeans = new KMeans() - .setFeaturesCol(featuresColName) + val newdatasetD = dataset.withColumn(featuresColNameD, doubleUDF(col("features"))) + .drop("features") + val newdatasetF = dataset.withColumn(featuresColNameF, floatUDF(col("features"))) + .drop("features") - assert(kmeans.getK === 2) - assert(kmeans.getFeaturesCol === featuresColName) - assert(kmeans.getPredictionCol === "prediction") - assert(kmeans.getMaxIter === 20) - assert(kmeans.getInitMode === MLlibKMeans.K_MEANS_PARALLEL) - assert(kmeans.getInitSteps === 2) - assert(kmeans.getTol === 1e-4) - assert(kmeans.getDistanceMeasure === DistanceMeasure.EUCLIDEAN) - val model = kmeans.setMaxIter(1).fit(newdataset) + assert(newdatasetD.schema(featuresColNameD).dataType.equals(new ArrayType(DoubleType, false))) + assert(newdatasetF.schema(featuresColNameF).dataType.equals(new ArrayType(FloatType, false))) + + val kmeansD = new KMeans().setK(k).setFeaturesCol(featuresColNameD).setSeed(1) + val kmeansF = new KMeans().setK(k).setFeaturesCol(featuresColNameF).setSeed(1) + val modelD = kmeansD.fit(newdatasetD) + val modelF = kmeansF.fit(newdatasetF) + + val transformedD = modelD.transform(newdatasetD) + val transformedF = modelF.transform(newdatasetF) + + val predictDifference = transformedD.select("prediction") + .except(transformedF.select("prediction")) + + assert(predictDifference.count() == 0) + + assert(modelD.computeCost(newdatasetD) == modelF.computeCost(newdatasetF) ) - MLTestingUtils.checkCopyAndUids(kmeans, model) - assert(model.hasSummary) - val copiedModel = model.copy(ParamMap.empty) - assert(copiedModel.hasSummary) } From 009b918c8734b19f9f9b34a31c23d6ad582c7465 Mon Sep 17 00:00:00 2001 From: Lu WANG Date: Thu, 19 Apr 2018 11:33:04 -0700 Subject: [PATCH 4/8] consolidating featuretovector --- .../apache/spark/ml/clustering/KMeans.scala | 85 +++++++++++++------ 1 file changed, 59 insertions(+), 26 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala index f126218acb4a8..41994ffaa314d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala @@ -30,7 +30,7 @@ import org.apache.spark.mllib.clustering.{DistanceMeasure, KMeans => MLlibKMeans import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors} import org.apache.spark.mllib.linalg.VectorImplicits._ import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{DataFrame, Dataset, Row} +import org.apache.spark.sql.{Column, DataFrame, Dataset, Row} import org.apache.spark.sql.functions.{col, udf} import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType, IntegerType, StructType} import org.apache.spark.storage.StorageLevel @@ -91,9 +91,7 @@ private[clustering] trait KMeansParams extends Params with HasMaxIter with HasFe */ protected def validateAndTransformSchema(schema: StructType): StructType = { val typeCandidates = List( new VectorUDT, - new ArrayType(DoubleType, true), new ArrayType(DoubleType, false), - new ArrayType(FloatType, true), new ArrayType(FloatType, false)) SchemaUtils.checkColumnTypes(schema, $(featuresCol), typeCandidates) SchemaUtils.appendColumn(schema, $(predictionCol), IntegerType) @@ -125,25 +123,32 @@ class KMeansModel private[ml] ( @Since("2.0.0") def setPredictionCol(value: String): this.type = set(predictionCol, value) + @Since("2.4.0") + def featureToVector(dataset: Dataset[_], col: Column): Column = { + val featuresDataType = dataset.schema(getFeaturesCol).dataType + val transferUDF = featuresDataType match { + case _: VectorUDT => udf((vector: Vector) => vector) + case fdt: ArrayType => fdt.elementType match { + case _: FloatType => udf(f = (vector: Seq[Float]) => { + val featureArray = Array.fill[Double](vector.size)(0.0) + vector.indices.foreach(idx => featureArray(idx) = vector(idx).toDouble) + Vectors.dense(featureArray) + }) + case _: DoubleType => udf((vector: Seq[Double]) => { + Vectors.dense(vector.toArray) + }) + } + } + transferUDF(col) + } + @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { transformSchema(dataset.schema, logging = true) - // val predictUDF = udf((vector: Vector) => predict(vector)) - val predictUDF = if (dataset.schema($(featuresCol)).dataType.equals(new VectorUDT)) { - udf((vector: Vector) => predict(vector)) - } - else { - udf((vector: Seq[_]) => { - val featureArray = Array.fill[Double](vector.size)(0.0) - for (idx <- 0 until vector.size) { - featureArray(idx) = vector(idx).toString().toDouble - } - OldVectors.fromML(Vectors.dense(featureArray)) - predict(Vectors.dense(featureArray)) - }) - } - dataset.withColumn($(predictionCol), predictUDF(col($(featuresCol)))) + val predictUDF = udf((vector: Vector) => predict(vector)) + + dataset.withColumn($(predictionCol), predictUDF(featureToVector(dataset, col(getFeaturesCol)))) } @Since("1.5.0") @@ -164,20 +169,24 @@ class KMeansModel private[ml] ( @Since("2.0.0") def computeCost(dataset: Dataset[_]): Double = { val typeCandidates = List( new VectorUDT, - new ArrayType(DoubleType, true), new ArrayType(DoubleType, false), - new ArrayType(FloatType, true), new ArrayType(FloatType, false)) SchemaUtils.checkColumnTypes(dataset.schema, $(featuresCol), typeCandidates) - val data: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map { + + /* val data: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map { case Row(point: Vector) => OldVectors.fromML(point) case Row(point: Seq[_]) => val featureArray = Array.fill[Double](point.size)(0.0) - for (idx <- 0 until point.size) { - featureArray(idx) = point(idx).toString().toDouble + for (idx <- point.indices) { + featureArray(idx) = point(idx).toString.toDouble } OldVectors.fromML(Vectors.dense(featureArray)) } + */ + val data: RDD[OldVector] = dataset.select(featureToVector(dataset, col(getFeaturesCol))) + .rdd.map { + case Row(point: Vector) => OldVectors.fromML(point) + } parentModel.computeCost(data) } @@ -335,21 +344,45 @@ class KMeans @Since("1.5.0") ( @Since("1.5.0") def setSeed(value: Long): this.type = set(seed, value) + @Since("2.4.0") + def featureToVector(dataset: Dataset[_], col: Column): Column = { + val featuresDataType = dataset.schema(getFeaturesCol).dataType + val transferUDF = featuresDataType match { + case _: VectorUDT => udf((vector: Vector) => vector) + case fdt: ArrayType => fdt.elementType match { + case _: FloatType => udf(f = (vector: Seq[Float]) => { + val featureArray = Array.fill[Double](vector.size)(0.0) + vector.indices.foreach(idx => featureArray(idx) = vector(idx).toDouble) + Vectors.dense(featureArray) + }) + case _: DoubleType => udf((vector: Seq[Double]) => { + Vectors.dense(vector.toArray) + }) + } + } + transferUDF(col) + } + @Since("2.0.0") override def fit(dataset: Dataset[_]): KMeansModel = { transformSchema(dataset.schema, logging = true) val handlePersistence = dataset.storageLevel == StorageLevel.NONE + val instances: RDD[OldVector] = dataset.select(featureToVector(dataset, col(getFeaturesCol))) + .rdd.map { + case Row(point: Vector) => OldVectors.fromML(point) + } + /* val instances: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map { case Row(point: Vector) => OldVectors.fromML(point) case Row(point: Seq[_]) => val featureArray = Array.fill[Double](point.size)(0.0) - for (idx <- 0 until point.size) { - featureArray(idx) = point(idx).toString().toDouble + for (idx <- point.indices) { + featureArray(idx) = point(idx).toString.toDouble } OldVectors.fromML(Vectors.dense(featureArray)) } - +*/ if (handlePersistence) { instances.persist(StorageLevel.MEMORY_AND_DISK) } From cd988c7f2e4c2cb1f2006e264a24529a72c9a5cf Mon Sep 17 00:00:00 2001 From: Lu WANG Date: Fri, 20 Apr 2018 12:23:36 -0700 Subject: [PATCH 5/8] change featureToVector to KMeanParams and add the scala docs add validateSchema and use it in computeCost addressed the comments from @jkbradley --- .../apache/spark/ml/clustering/KMeans.scala | 110 +++++++----------- 1 file changed, 41 insertions(+), 69 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala index 41994ffaa314d..304da383927b4 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala @@ -85,17 +85,50 @@ private[clustering] trait KMeansParams extends Params with HasMaxIter with HasFe def getInitSteps: Int = $(initSteps) /** - * Validates and transforms the input schema. + * Validates the input schema. * @param schema input schema - * @return output schema */ - protected def validateAndTransformSchema(schema: StructType): StructType = { + protected def validateSchema(schema: StructType): Unit = { val typeCandidates = List( new VectorUDT, new ArrayType(DoubleType, false), new ArrayType(FloatType, false)) SchemaUtils.checkColumnTypes(schema, $(featuresCol), typeCandidates) + } + /** + * Validates and transforms the input schema. + * @param schema input schema + * @return output schema + */ + protected def validateAndTransformSchema(schema: StructType): StructType = { + validateSchema(schema) SchemaUtils.appendColumn(schema, $(predictionCol), IntegerType) } + + /** + * preprocessing the input feature column to Vector + * @param dataset DataFrame with columns for features + * @param colName column name for features + * @return Vector feature column + */ + @Since("2.4.0") + protected def featureToVector(dataset: Dataset[_], colName: String): Column = { + val featuresDataType = dataset.schema(colName).dataType + featuresDataType match { + case _: VectorUDT => col(colName) + case fdt: ArrayType => + val transferUDF = fdt.elementType match { + case _: FloatType => udf(f = (vector: Seq[Float]) => { + val featureArray = Array.fill[Double](vector.size)(0.0) + vector.indices.foreach(idx => featureArray(idx) = vector(idx).toDouble) + Vectors.dense(featureArray) + }) + case _: DoubleType => udf((vector: Seq[Double]) => { + Vectors.dense(vector.toArray) + }) + } + transferUDF(col(colName)) + } + } } /** @@ -123,32 +156,13 @@ class KMeansModel private[ml] ( @Since("2.0.0") def setPredictionCol(value: String): this.type = set(predictionCol, value) - @Since("2.4.0") - def featureToVector(dataset: Dataset[_], col: Column): Column = { - val featuresDataType = dataset.schema(getFeaturesCol).dataType - val transferUDF = featuresDataType match { - case _: VectorUDT => udf((vector: Vector) => vector) - case fdt: ArrayType => fdt.elementType match { - case _: FloatType => udf(f = (vector: Seq[Float]) => { - val featureArray = Array.fill[Double](vector.size)(0.0) - vector.indices.foreach(idx => featureArray(idx) = vector(idx).toDouble) - Vectors.dense(featureArray) - }) - case _: DoubleType => udf((vector: Seq[Double]) => { - Vectors.dense(vector.toArray) - }) - } - } - transferUDF(col) - } - @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { transformSchema(dataset.schema, logging = true) val predictUDF = udf((vector: Vector) => predict(vector)) - dataset.withColumn($(predictionCol), predictUDF(featureToVector(dataset, col(getFeaturesCol)))) + dataset.withColumn($(predictionCol), predictUDF(featureToVector(dataset, getFeaturesCol))) } @Since("1.5.0") @@ -168,22 +182,9 @@ class KMeansModel private[ml] ( // TODO: Replace the temp fix when we have proper evaluators defined for clustering. @Since("2.0.0") def computeCost(dataset: Dataset[_]): Double = { - val typeCandidates = List( new VectorUDT, - new ArrayType(DoubleType, false), - new ArrayType(FloatType, false)) - SchemaUtils.checkColumnTypes(dataset.schema, $(featuresCol), typeCandidates) + validateSchema(dataset.schema) - /* val data: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map { - case Row(point: Vector) => OldVectors.fromML(point) - case Row(point: Seq[_]) => - val featureArray = Array.fill[Double](point.size)(0.0) - for (idx <- point.indices) { - featureArray(idx) = point(idx).toString.toDouble - } - OldVectors.fromML(Vectors.dense(featureArray)) - } - */ - val data: RDD[OldVector] = dataset.select(featureToVector(dataset, col(getFeaturesCol))) + val data: RDD[OldVector] = dataset.select(featureToVector(dataset, getFeaturesCol)) .rdd.map { case Row(point: Vector) => OldVectors.fromML(point) } @@ -344,45 +345,16 @@ class KMeans @Since("1.5.0") ( @Since("1.5.0") def setSeed(value: Long): this.type = set(seed, value) - @Since("2.4.0") - def featureToVector(dataset: Dataset[_], col: Column): Column = { - val featuresDataType = dataset.schema(getFeaturesCol).dataType - val transferUDF = featuresDataType match { - case _: VectorUDT => udf((vector: Vector) => vector) - case fdt: ArrayType => fdt.elementType match { - case _: FloatType => udf(f = (vector: Seq[Float]) => { - val featureArray = Array.fill[Double](vector.size)(0.0) - vector.indices.foreach(idx => featureArray(idx) = vector(idx).toDouble) - Vectors.dense(featureArray) - }) - case _: DoubleType => udf((vector: Seq[Double]) => { - Vectors.dense(vector.toArray) - }) - } - } - transferUDF(col) - } - @Since("2.0.0") override def fit(dataset: Dataset[_]): KMeansModel = { transformSchema(dataset.schema, logging = true) val handlePersistence = dataset.storageLevel == StorageLevel.NONE - val instances: RDD[OldVector] = dataset.select(featureToVector(dataset, col(getFeaturesCol))) + val instances: RDD[OldVector] = dataset.select(featureToVector(dataset, getFeaturesCol)) .rdd.map { case Row(point: Vector) => OldVectors.fromML(point) } - /* - val instances: RDD[OldVector] = dataset.select(col($(featuresCol))).rdd.map { - case Row(point: Vector) => OldVectors.fromML(point) - case Row(point: Seq[_]) => - val featureArray = Array.fill[Double](point.size)(0.0) - for (idx <- point.indices) { - featureArray(idx) = point(idx).toString.toDouble - } - OldVectors.fromML(Vectors.dense(featureArray)) - } -*/ + if (handlePersistence) { instances.persist(StorageLevel.MEMORY_AND_DISK) } From 3ffb32291503a628c63a7014d27f2313f04c5497 Mon Sep 17 00:00:00 2001 From: Lu WANG Date: Mon, 23 Apr 2018 13:23:25 -0700 Subject: [PATCH 6/8] change featureToVector and validateSchema to private functions --- .../main/scala/org/apache/spark/ml/clustering/KMeans.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala index 304da383927b4..72c0b11f17c1f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala @@ -88,7 +88,7 @@ private[clustering] trait KMeansParams extends Params with HasMaxIter with HasFe * Validates the input schema. * @param schema input schema */ - protected def validateSchema(schema: StructType): Unit = { + private[clustering] def validateSchema(schema: StructType): Unit = { val typeCandidates = List( new VectorUDT, new ArrayType(DoubleType, false), new ArrayType(FloatType, false)) @@ -111,7 +111,7 @@ private[clustering] trait KMeansParams extends Params with HasMaxIter with HasFe * @return Vector feature column */ @Since("2.4.0") - protected def featureToVector(dataset: Dataset[_], colName: String): Column = { + private[clustering] def featureToVector(dataset: Dataset[_], colName: String): Column = { val featuresDataType = dataset.schema(colName).dataType featuresDataType match { case _: VectorUDT => col(colName) From 3e012fba3470c0f938e33cd3c783dff5ee068fcf Mon Sep 17 00:00:00 2001 From: Lu WANG Date: Mon, 23 Apr 2018 14:04:09 -0700 Subject: [PATCH 7/8] move featureToVector to util, so that other methods could use it to allow array input --- .../apache/spark/ml/clustering/KMeans.scala | 38 +++---------- .../apache/spark/ml/util/DatasetUtils.scala | 54 +++++++++++++++++++ 2 files changed, 61 insertions(+), 31 deletions(-) create mode 100644 mllib/src/main/scala/org/apache/spark/ml/util/DatasetUtils.scala diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala index 1f2e4b6c6cd93..50ef6f05cbc20 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala @@ -24,7 +24,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.ml.{Estimator, Model, PipelineStage} -import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT} +import org.apache.spark.ml.linalg.{Vector, VectorUDT} import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ @@ -32,7 +32,7 @@ import org.apache.spark.mllib.clustering.{DistanceMeasure, KMeans => MLlibKMeans import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors} import org.apache.spark.mllib.linalg.VectorImplicits._ import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession} +import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} import org.apache.spark.sql.functions.{col, udf} import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType, IntegerType, StructType} import org.apache.spark.storage.StorageLevel @@ -105,32 +105,6 @@ private[clustering] trait KMeansParams extends Params with HasMaxIter with HasFe validateSchema(schema) SchemaUtils.appendColumn(schema, $(predictionCol), IntegerType) } - - /** - * preprocessing the input feature column to Vector - * @param dataset DataFrame with columns for features - * @param colName column name for features - * @return Vector feature column - */ - @Since("2.4.0") - private[clustering] def featureToVector(dataset: Dataset[_], colName: String): Column = { - val featuresDataType = dataset.schema(colName).dataType - featuresDataType match { - case _: VectorUDT => col(colName) - case fdt: ArrayType => - val transferUDF = fdt.elementType match { - case _: FloatType => udf(f = (vector: Seq[Float]) => { - val featureArray = Array.fill[Double](vector.size)(0.0) - vector.indices.foreach(idx => featureArray(idx) = vector(idx).toDouble) - Vectors.dense(featureArray) - }) - case _: DoubleType => udf((vector: Seq[Double]) => { - Vectors.dense(vector.toArray) - }) - } - transferUDF(col(colName)) - } - } } /** @@ -164,7 +138,8 @@ class KMeansModel private[ml] ( val predictUDF = udf((vector: Vector) => predict(vector)) - dataset.withColumn($(predictionCol), predictUDF(featureToVector(dataset, getFeaturesCol))) + dataset.withColumn($(predictionCol), + predictUDF(DatasetUtils.columnToVector(dataset, getFeaturesCol))) } @Since("1.5.0") @@ -186,7 +161,7 @@ class KMeansModel private[ml] ( def computeCost(dataset: Dataset[_]): Double = { validateSchema(dataset.schema) - val data: RDD[OldVector] = dataset.select(featureToVector(dataset, getFeaturesCol)) + val data: RDD[OldVector] = dataset.select(DatasetUtils.columnToVector(dataset, getFeaturesCol)) .rdd.map { case Row(point: Vector) => OldVectors.fromML(point) } @@ -375,7 +350,8 @@ class KMeans @Since("1.5.0") ( transformSchema(dataset.schema, logging = true) val handlePersistence = dataset.storageLevel == StorageLevel.NONE - val instances: RDD[OldVector] = dataset.select(featureToVector(dataset, getFeaturesCol)) + val instances: RDD[OldVector] = dataset.select( + DatasetUtils.columnToVector(dataset, getFeaturesCol)) .rdd.map { case Row(point: Vector) => OldVectors.fromML(point) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/DatasetUtils.scala b/mllib/src/main/scala/org/apache/spark/ml/util/DatasetUtils.scala new file mode 100644 index 0000000000000..9fb96219c2221 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/util/DatasetUtils.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.util + +import org.apache.spark.annotation.Since +import org.apache.spark.ml.linalg.{Vectors, VectorUDT} +import org.apache.spark.sql.{Column, Dataset} +import org.apache.spark.sql.functions.{col, udf} +import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType} + + +private[spark] object DatasetUtils { + + /** + * preprocessing the input feature column to Vector + * @param dataset DataFrame with columns for features + * @param colName column name for features + * @return Vector feature column + */ + @Since("2.4.0") + def columnToVector(dataset: Dataset[_], colName: String): Column = { + val featuresDataType = dataset.schema(colName).dataType + featuresDataType match { + case _: VectorUDT => col(colName) + case fdt: ArrayType => + val transferUDF = fdt.elementType match { + case _: FloatType => udf(f = (vector: Seq[Float]) => { + val featureArray = Array.fill[Double](vector.size)(0.0) + vector.indices.foreach(idx => featureArray(idx) = vector(idx).toDouble) + Vectors.dense(featureArray) + }) + case _: DoubleType => udf((vector: Seq[Double]) => { + Vectors.dense(vector.toArray) + }) + } + transferUDF(col(colName)) + } + } +} From c4e1a51551993008d7b082b112d2296cbc4eb97b Mon Sep 17 00:00:00 2001 From: Lu WANG Date: Mon, 23 Apr 2018 16:57:01 -0700 Subject: [PATCH 8/8] fix the comments and correct code style --- .../apache/spark/ml/clustering/KMeans.scala | 3 +- .../apache/spark/ml/util/DatasetUtils.scala | 31 ++++++++++++------- .../spark/ml/clustering/KMeansSuite.scala | 9 ++---- 3 files changed, 24 insertions(+), 19 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala index 50ef6f05cbc20..d475c726e6f08 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/KMeans.scala @@ -33,7 +33,7 @@ import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors import org.apache.spark.mllib.linalg.VectorImplicits._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} -import org.apache.spark.sql.functions.{col, udf} +import org.apache.spark.sql.functions.udf import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType, IntegerType, StructType} import org.apache.spark.storage.StorageLevel import org.apache.spark.util.VersionUtils.majorVersion @@ -94,6 +94,7 @@ private[clustering] trait KMeansParams extends Params with HasMaxIter with HasFe val typeCandidates = List( new VectorUDT, new ArrayType(DoubleType, false), new ArrayType(FloatType, false)) + SchemaUtils.checkColumnTypes(schema, $(featuresCol), typeCandidates) } /** diff --git a/mllib/src/main/scala/org/apache/spark/ml/util/DatasetUtils.scala b/mllib/src/main/scala/org/apache/spark/ml/util/DatasetUtils.scala index 9fb96219c2221..52619cb65489a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/util/DatasetUtils.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/util/DatasetUtils.scala @@ -17,7 +17,6 @@ package org.apache.spark.ml.util -import org.apache.spark.annotation.Since import org.apache.spark.ml.linalg.{Vectors, VectorUDT} import org.apache.spark.sql.{Column, Dataset} import org.apache.spark.sql.functions.{col, udf} @@ -27,28 +26,38 @@ import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType} private[spark] object DatasetUtils { /** - * preprocessing the input feature column to Vector - * @param dataset DataFrame with columns for features - * @param colName column name for features - * @return Vector feature column + * Cast a column in a Dataset to Vector type. + * + * The supported data types of the input column are + * - Vector + * - float/double type Array. + * + * Note: The returned column does not have Metadata. + * + * @param dataset input DataFrame + * @param colName column name. + * @return Vector column */ - @Since("2.4.0") def columnToVector(dataset: Dataset[_], colName: String): Column = { - val featuresDataType = dataset.schema(colName).dataType - featuresDataType match { + val columnDataType = dataset.schema(colName).dataType + columnDataType match { case _: VectorUDT => col(colName) case fdt: ArrayType => val transferUDF = fdt.elementType match { case _: FloatType => udf(f = (vector: Seq[Float]) => { - val featureArray = Array.fill[Double](vector.size)(0.0) - vector.indices.foreach(idx => featureArray(idx) = vector(idx).toDouble) - Vectors.dense(featureArray) + val inputArray = Array.fill[Double](vector.size)(0.0) + vector.indices.foreach(idx => inputArray(idx) = vector(idx).toDouble) + Vectors.dense(inputArray) }) case _: DoubleType => udf((vector: Seq[Double]) => { Vectors.dense(vector.toArray) }) + case other => + throw new IllegalArgumentException(s"Array[$other] column cannot be cast to Vector") } transferUDF(col(colName)) + case other => + throw new IllegalArgumentException(s"$other column cannot be cast to Vector") } } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala index 9ddd26ee3a290..5445ebe5c95eb 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/KMeansSuite.scala @@ -220,25 +220,20 @@ class KMeansSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultR .drop("features") val newdatasetF = dataset.withColumn(featuresColNameF, floatUDF(col("features"))) .drop("features") - assert(newdatasetD.schema(featuresColNameD).dataType.equals(new ArrayType(DoubleType, false))) assert(newdatasetF.schema(featuresColNameF).dataType.equals(new ArrayType(FloatType, false))) - val kmeansD = new KMeans().setK(k).setFeaturesCol(featuresColNameD).setSeed(1) - val kmeansF = new KMeans().setK(k).setFeaturesCol(featuresColNameF).setSeed(1) + val kmeansD = new KMeans().setK(k).setMaxIter(1).setFeaturesCol(featuresColNameD).setSeed(1) + val kmeansF = new KMeans().setK(k).setMaxIter(1).setFeaturesCol(featuresColNameF).setSeed(1) val modelD = kmeansD.fit(newdatasetD) val modelF = kmeansF.fit(newdatasetF) - val transformedD = modelD.transform(newdatasetD) val transformedF = modelF.transform(newdatasetF) val predictDifference = transformedD.select("prediction") .except(transformedF.select("prediction")) - assert(predictDifference.count() == 0) - assert(modelD.computeCost(newdatasetD) == modelF.computeCost(newdatasetF) ) - }