From daa3452abcdbde10e6890913bf1f5abd6206487a Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Mon, 26 Mar 2018 19:06:24 +0800 Subject: [PATCH 1/5] init pr --- .../spark/ml/feature/MaxAbsScaler.scala | 33 +++++++++++-------- .../org/apache/spark/ml/feature/PCA.scala | 12 ++++--- 2 files changed, 27 insertions(+), 18 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala index 85f9732f79f67..1938b4006f75f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala @@ -21,6 +21,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.annotation.Since import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.attribute.AttributeGroup import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT} import org.apache.spark.ml.param.{ParamMap, Params} import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol} @@ -30,21 +31,13 @@ import org.apache.spark.mllib.stat.Statistics import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.functions._ -import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.sql.types.StructType /** * Params for [[MaxAbsScaler]] and [[MaxAbsScalerModel]]. */ private[feature] trait MaxAbsScalerParams extends Params with HasInputCol with HasOutputCol { - /** Validates and transforms the input schema. */ - protected def validateAndTransformSchema(schema: StructType): StructType = { - SchemaUtils.checkColumnType(schema, $(inputCol), new VectorUDT) - require(!schema.fieldNames.contains($(outputCol)), - s"Output column ${$(outputCol)} already exists.") - val outputFields = schema.fields :+ StructField($(outputCol), new VectorUDT, false) - StructType(outputFields) - } } /** @@ -84,7 +77,16 @@ class MaxAbsScaler @Since("2.0.0") (@Since("2.0.0") override val uid: String) @Since("2.0.0") override def transformSchema(schema: StructType): StructType = { - validateAndTransformSchema(schema) + SchemaUtils.checkColumnType(schema, $(inputCol), new VectorUDT) + require(!schema.fieldNames.contains($(outputCol)), + s"Output column ${$(outputCol)} already exists.") + val group = AttributeGroup.fromStructField(schema($(inputCol))) + if (group.size < 0) { + SchemaUtils.appendColumn(schema, $(outputCol), new VectorUDT) + } else { + val attrGroup = new AttributeGroup($(outputCol), group.size) + SchemaUtils.appendColumn(schema, attrGroup.toStructField()) + } } @Since("2.0.0") @@ -120,19 +122,24 @@ class MaxAbsScalerModel private[ml] ( @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { - transformSchema(dataset.schema, logging = true) + val outputSchema = transformSchema(dataset.schema, logging = true) // TODO: this looks hack, we may have to handle sparse and dense vectors separately. val maxAbsUnzero = Vectors.dense(maxAbs.toArray.map(x => if (x == 0) 1 else x)) val reScale = udf { (vector: Vector) => val brz = vector.asBreeze / maxAbsUnzero.asBreeze Vectors.fromBreeze(brz) } - dataset.withColumn($(outputCol), reScale(col($(inputCol)))) + dataset.withColumn($(outputCol), reScale(col($(inputCol))), + outputSchema($(outputCol)).metadata) } @Since("2.0.0") override def transformSchema(schema: StructType): StructType = { - validateAndTransformSchema(schema) + SchemaUtils.checkColumnType(schema, $(inputCol), new VectorUDT) + require(!schema.fieldNames.contains($(outputCol)), + s"Output column ${$(outputCol)} already exists.") + val attrGroup = new AttributeGroup($(outputCol), maxAbs.size) + SchemaUtils.appendColumn(schema, attrGroup.toStructField()) } @Since("2.0.0") diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala index 4143d864d7930..4038d79e74e3c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala @@ -21,6 +21,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.annotation.Since import org.apache.spark.ml._ +import org.apache.spark.ml.attribute.AttributeGroup import org.apache.spark.ml.linalg._ import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ @@ -33,7 +34,7 @@ import org.apache.spark.mllib.linalg.VectorImplicits._ import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.functions._ -import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.sql.types.StructType import org.apache.spark.util.VersionUtils.majorVersion /** @@ -56,8 +57,8 @@ private[feature] trait PCAParams extends Params with HasInputCol with HasOutputC SchemaUtils.checkColumnType(schema, $(inputCol), new VectorUDT) require(!schema.fieldNames.contains($(outputCol)), s"Output column ${$(outputCol)} already exists.") - val outputFields = schema.fields :+ StructField($(outputCol), new VectorUDT, false) - StructType(outputFields) + val attrGroup = new AttributeGroup($(outputCol), $(k)) + SchemaUtils.appendColumn(schema, attrGroup.toStructField()) } } @@ -148,7 +149,7 @@ class PCAModel private[ml] ( */ @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { - transformSchema(dataset.schema, logging = true) + val outputSchema = transformSchema(dataset.schema, logging = true) val pcaModel = new feature.PCAModel($(k), OldMatrices.fromML(pc).asInstanceOf[OldDenseMatrix], OldVectors.fromML(explainedVariance).asInstanceOf[OldDenseVector]) @@ -157,7 +158,8 @@ class PCAModel private[ml] ( val transformer: Vector => Vector = v => pcaModel.transform(OldVectors.fromML(v)).asML val pcaOp = udf(transformer) - dataset.withColumn($(outputCol), pcaOp(col($(inputCol)))) + dataset.withColumn($(outputCol), pcaOp(col($(inputCol))), + outputSchema($(outputCol)).metadata) } @Since("1.5.0") From 491bb7dd5ae26217553eef572deb9a5af89aabd7 Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Mon, 26 Mar 2018 19:11:41 +0800 Subject: [PATCH 2/5] init pr --- .../scala/org/apache/spark/ml/feature/MaxAbsScaler.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala index 1938b4006f75f..b4f760d808baa 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/MaxAbsScaler.scala @@ -136,6 +136,11 @@ class MaxAbsScalerModel private[ml] ( @Since("2.0.0") override def transformSchema(schema: StructType): StructType = { SchemaUtils.checkColumnType(schema, $(inputCol), new VectorUDT) + val group = AttributeGroup.fromStructField(schema($(inputCol))) + if (group.size >= 0) { + require(group.size == maxAbs.size, + s"Length of input vectors do not match the expected size ${maxAbs.size}") + } require(!schema.fieldNames.contains($(outputCol)), s"Output column ${$(outputCol)} already exists.") val attrGroup = new AttributeGroup($(outputCol), maxAbs.size) From 87f5f2cfd5c73377e0b15547649b346008d055bb Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Mon, 26 Mar 2018 19:24:28 +0800 Subject: [PATCH 3/5] init pr --- .../org/apache/spark/ml/feature/PCA.scala | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala index 4038d79e74e3c..59a3c878829cd 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala @@ -52,15 +52,6 @@ private[feature] trait PCAParams extends Params with HasInputCol with HasOutputC /** @group getParam */ def getK: Int = $(k) - /** Validates and transforms the input schema. */ - protected def validateAndTransformSchema(schema: StructType): StructType = { - SchemaUtils.checkColumnType(schema, $(inputCol), new VectorUDT) - require(!schema.fieldNames.contains($(outputCol)), - s"Output column ${$(outputCol)} already exists.") - val attrGroup = new AttributeGroup($(outputCol), $(k)) - SchemaUtils.appendColumn(schema, attrGroup.toStructField()) - } - } /** @@ -103,7 +94,11 @@ class PCA @Since("1.5.0") ( @Since("1.5.0") override def transformSchema(schema: StructType): StructType = { - validateAndTransformSchema(schema) + SchemaUtils.checkColumnType(schema, $(inputCol), new VectorUDT) + require(!schema.fieldNames.contains($(outputCol)), + s"Output column ${$(outputCol)} already exists.") + val attrGroup = new AttributeGroup($(outputCol), $(k)) + SchemaUtils.appendColumn(schema, attrGroup.toStructField()) } @Since("1.5.0") @@ -164,7 +159,16 @@ class PCAModel private[ml] ( @Since("1.5.0") override def transformSchema(schema: StructType): StructType = { - validateAndTransformSchema(schema) + SchemaUtils.checkColumnType(schema, $(inputCol), new VectorUDT) + val group = AttributeGroup.fromStructField(schema($(inputCol))) + if (group.size < 0) { + require(group.size == pc.numRows, + s"Length of input vectors do not match the expected size ${pc.numRows}") + } + require(!schema.fieldNames.contains($(outputCol)), + s"Output column ${$(outputCol)} already exists.") + val attrGroup = new AttributeGroup($(outputCol), $(k)) + SchemaUtils.appendColumn(schema, attrGroup.toStructField()) } @Since("1.5.0") From dc4006989144eaee36b37b494a327f9727c4c56d Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Mon, 26 Mar 2018 19:24:49 +0800 Subject: [PATCH 4/5] init pr --- mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala index 59a3c878829cd..35b42242543f5 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala @@ -161,7 +161,7 @@ class PCAModel private[ml] ( override def transformSchema(schema: StructType): StructType = { SchemaUtils.checkColumnType(schema, $(inputCol), new VectorUDT) val group = AttributeGroup.fromStructField(schema($(inputCol))) - if (group.size < 0) { + if (group.size >= 0) { require(group.size == pc.numRows, s"Length of input vectors do not match the expected size ${pc.numRows}") } From 2098d088caa493924e218a8f9185ede234c1a2be Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Wed, 28 Mar 2018 14:59:04 +0800 Subject: [PATCH 5/5] init pr --- .../src/main/scala/org/apache/spark/ml/feature/PCA.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala index 35b42242543f5..7ddc7fa27c945 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala @@ -95,6 +95,9 @@ class PCA @Since("1.5.0") ( @Since("1.5.0") override def transformSchema(schema: StructType): StructType = { SchemaUtils.checkColumnType(schema, $(inputCol), new VectorUDT) + val group = AttributeGroup.fromStructField(schema($(inputCol))) + require(group.size < 0 || group.size >= $(k), + s"Input vector size ${group.size} must be no less than k=${$(k)}") require(!schema.fieldNames.contains($(outputCol)), s"Output column ${$(outputCol)} already exists.") val attrGroup = new AttributeGroup($(outputCol), $(k)) @@ -161,10 +164,8 @@ class PCAModel private[ml] ( override def transformSchema(schema: StructType): StructType = { SchemaUtils.checkColumnType(schema, $(inputCol), new VectorUDT) val group = AttributeGroup.fromStructField(schema($(inputCol))) - if (group.size >= 0) { - require(group.size == pc.numRows, - s"Length of input vectors do not match the expected size ${pc.numRows}") - } + require(group.size < 0 || group.size == pc.numRows, + s"Input vector size do not match the expected size ${pc.numRows}") require(!schema.fieldNames.contains($(outputCol)), s"Output column ${$(outputCol)} already exists.") val attrGroup = new AttributeGroup($(outputCol), $(k))