From 4802c6769d3b4c89faec3a8f0264ecd03117ceed Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Mon, 30 Mar 2015 17:37:11 +0800 Subject: [PATCH 01/10] add IDF transformer and test suite --- .../org/apache/spark/ml/feature/IDF.scala | 114 ++++++++++++++++++ .../apache/spark/ml/feature/IDFSuite.scala | 90 ++++++++++++++ 2 files changed, 204 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala create mode 100644 mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala new file mode 100644 index 0000000000000..98a21228aa497 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.feature + +import org.apache.spark.annotation.AlphaComponent +import org.apache.spark.ml._ +import org.apache.spark.ml.param._ +import org.apache.spark.mllib.feature +import org.apache.spark.mllib.linalg.{Vector, VectorUDT} +import org.apache.spark.sql._ +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.{StructField, StructType} + +/** + * Params for [[IDF]] and [[IDFModel]]. + */ +private[feature] trait IDFParams extends Params with HasInputCol with HasOutputCol { + val minDocFreq = new IntParam( + this, "minDocFreq", "minimum of documents in which a term should appear for filtering", Some(0)) + + def getMinDocFreq: Int = { + get(minDocFreq) + } + + def setMinDocFreq(value: Int): this.type = { + set(minDocFreq, value) + } +} + +/** + * :: AlphaComponent :: + * Compute the Inverse Document Frequency (IDF) given a collection of documents. + */ +@AlphaComponent +class IDF extends Estimator[IDFModel] with IDFParams { + + /** @group setParam */ + def setInputCol(value: String): this.type = set(inputCol, value) + + /** @group setParam */ + def setOutputCol(value: String): this.type = set(outputCol, value) + + override def fit(dataset: DataFrame, paramMap: ParamMap): IDFModel = { + transformSchema(dataset.schema, paramMap, logging = true) + val map = this.paramMap ++ paramMap + val input = dataset.select(map(inputCol)).map { case Row(v: Vector) => v } + val idf = new feature.IDF(getMinDocFreq).fit(input) + val model = new IDFModel(this, map, idf) + Params.inheritValues(map, this, model) + model + } + + override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { + val map = this.paramMap ++ paramMap + val inputType = schema(map(inputCol)).dataType + require(inputType.isInstanceOf[VectorUDT], + s"Input column ${map(inputCol)} must be a vector column") + require(!schema.fieldNames.contains(map(outputCol)), + s"Output column ${map(outputCol)} already exists.") + val outputFields = schema.fields :+ StructField(map(outputCol), new VectorUDT, false) + StructType(outputFields) + } +} + +/** + * :: AlphaComponent :: + * Model fitted by [[IDF]]. + */ +@AlphaComponent +class IDFModel private[ml] ( + override val parent: IDF, + override val fittingParamMap: ParamMap, + idfModel: feature.IDFModel) + extends Model[IDFModel] with IDFParams { + + /** @group setParam */ + def setInputCol(value: String): this.type = set(inputCol, value) + + /** @group setParam */ + def setOutputCol(value: String): this.type = set(outputCol, value) + + override def transform(dataset: DataFrame, paramMap: ParamMap): DataFrame = { + transformSchema(dataset.schema, paramMap, logging = true) + val map = this.paramMap ++ paramMap + val idf = udf((v: Vector) => { idfModel.transform(v) } : Vector) + dataset.withColumn(map(outputCol), idf(col(map(inputCol)))) + } + + override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { + val map = this.paramMap ++ paramMap + val inputType = schema(map(inputCol)).dataType + require(inputType.isInstanceOf[VectorUDT], + s"Input column ${map(inputCol)} must be a vector column") + require(!schema.fieldNames.contains(map(outputCol)), + s"Output column ${map(outputCol)} already exists.") + val outputFields = schema.fields :+ StructField(map(outputCol), new VectorUDT, false) + StructType(outputFields) + } +} diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala new file mode 100644 index 0000000000000..27c08e0fbf416 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.feature + +import org.scalatest.FunSuite + +import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors} +import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.mllib.util.TestingUtils._ +import org.apache.spark.sql.{DataFrame, Row, SQLContext} + +private case class DataSet(features: Vector) + +class IDFSuite extends FunSuite with MLlibTestSparkContext { + + @transient var data: Array[Vector] = _ + @transient var dataFrame: DataFrame = _ + @transient var idf: IDF = _ + @transient var expectedModel: Vector = _ + @transient var resultWithDefaultParam: Array[Vector] = _ + @transient var resultWithSetParam: Array[Vector] = _ + + override def beforeAll(): Unit = { + super.beforeAll() + + val n = 4 + data = Array( + Vectors.sparse(n, Array(1, 3), Array(1.0, 2.0)), + Vectors.dense(0.0, 1.0, 2.0, 3.0), + Vectors.sparse(n, Array(1), Array(1.0)) + ) + val m = data.size + expectedModel = Vectors.dense(Array(0, 3, 1, 2).map { x => + math.log((m + 1.0) / (x + 1.0)) + }) + + resultWithDefaultParam = Array( + Vectors.dense(1.0 * expectedModel(1), 2.0 * expectedModel(3)), + Vectors.dense(0.0, 1.0 * expectedModel(1), 2.0 * expectedModel(2), 3.0 * expectedModel(3)), + Vectors.dense(1.0 * expectedModel(1), 0.0, 0.0) + ) + + val sqlContext = new SQLContext(sc) + val dataFrame = sc.parallelize(data, 2) + idf = new IDF() + .setInputCol("features") + .setOutputCol("idf_value") + } + + def collectResult(result: DataFrame): Array[Vector] = { + result.select("idf_value").collect().map { + case Row(features: Vector) => features + } + } + + def assertValues(lhs: Array[Vector], rhs: Array[Vector]): Unit = { + assert((lhs, rhs).zipped.forall { (vector1, vector2) => + vector1 ~== vector2 absTol 1E-5 + }, "The vector value is not correct after normalization.") + } + + test("Normalization with default parameter") { + val idfModel = idf.fit(dataFrame) + val tfIdf = collectResult(idfModel.transform(dataFrame)) + + assertValues(tfIdf, result) + } + + test("Normalization with setter") { + val idfModel = idf.setMinDocFreq(1).fit(dataFrame) + val tfIdf = collectResult(idfModel.transform(dataFrame)) + + assertValues(tfIdf, result) + } +} From 2aa4be0e1d7ce052f8c901c6d9462c611c3a920a Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Mon, 30 Mar 2015 20:51:32 +0800 Subject: [PATCH 02/10] clean test suite --- .../apache/spark/ml/feature/IDFSuite.scala | 44 ++++++++++++------- 1 file changed, 28 insertions(+), 16 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala index 27c08e0fbf416..cd78696732304 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala @@ -19,19 +19,15 @@ package org.apache.spark.ml.feature import org.scalatest.FunSuite -import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors} +import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.mllib.util.TestingUtils._ import org.apache.spark.sql.{DataFrame, Row, SQLContext} -private case class DataSet(features: Vector) - class IDFSuite extends FunSuite with MLlibTestSparkContext { - @transient var data: Array[Vector] = _ @transient var dataFrame: DataFrame = _ @transient var idf: IDF = _ - @transient var expectedModel: Vector = _ @transient var resultWithDefaultParam: Array[Vector] = _ @transient var resultWithSetParam: Array[Vector] = _ @@ -39,24 +35,40 @@ class IDFSuite extends FunSuite with MLlibTestSparkContext { super.beforeAll() val n = 4 - data = Array( + val data = Array( Vectors.sparse(n, Array(1, 3), Array(1.0, 2.0)), Vectors.dense(0.0, 1.0, 2.0, 3.0), Vectors.sparse(n, Array(1), Array(1.0)) ) val m = data.size - expectedModel = Vectors.dense(Array(0, 3, 1, 2).map { x => + + val expectedDefaultModel = Vectors.dense(Array(0, 3, 1, 2).map { x => math.log((m + 1.0) / (x + 1.0)) }) - resultWithDefaultParam = Array( - Vectors.dense(1.0 * expectedModel(1), 2.0 * expectedModel(3)), - Vectors.dense(0.0, 1.0 * expectedModel(1), 2.0 * expectedModel(2), 3.0 * expectedModel(3)), - Vectors.dense(1.0 * expectedModel(1), 0.0, 0.0) - ) + val expectedSetModel = Vectors.dense(Array(0, 3, 1, 2).map { x => + if (x > 0) { + math.log((m + 1.0) / (x + 1.0)) + } else { + 0 + } + }) + + val result: (Vector) => Array[Vector] = { model: Vector => + Array( + Vectors.sparse(n, Array(1, 3), Array(1.0 * model(1), 2.0 * model(3))), + Vectors.dense(0.0, 1.0 * model(1), 2.0 * model(2), 3.0 * model(3)), + Vectors.sparse(n, Array(1), Array(1.0 * model(1))) + ) + } + + resultWithDefaultParam = result(expectedDefaultModel) + + resultWithSetParam = result(expectedSetModel) val sqlContext = new SQLContext(sc) - val dataFrame = sc.parallelize(data, 2) + import sqlContext.implicits._ + dataFrame = sc.parallelize(data, 2).map(Tuple1.apply).toDF("features") idf = new IDF() .setInputCol("features") .setOutputCol("idf_value") @@ -71,20 +83,20 @@ class IDFSuite extends FunSuite with MLlibTestSparkContext { def assertValues(lhs: Array[Vector], rhs: Array[Vector]): Unit = { assert((lhs, rhs).zipped.forall { (vector1, vector2) => vector1 ~== vector2 absTol 1E-5 - }, "The vector value is not correct after normalization.") + }, "The vector value is not correct after IDF.") } test("Normalization with default parameter") { val idfModel = idf.fit(dataFrame) val tfIdf = collectResult(idfModel.transform(dataFrame)) - assertValues(tfIdf, result) + assertValues(tfIdf, resultWithDefaultParam) } test("Normalization with setter") { val idfModel = idf.setMinDocFreq(1).fit(dataFrame) val tfIdf = collectResult(idfModel.transform(dataFrame)) - assertValues(tfIdf, result) + assertValues(tfIdf, resultWithSetParam) } } From 03fbecbe1e5deb08bcf178aab4c5fbf2e99d468c Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Thu, 2 Apr 2015 10:47:41 +0800 Subject: [PATCH 03/10] remove duplicated code --- .../org/apache/spark/ml/feature/IDF.scala | 34 +++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala index 98a21228aa497..3dee85edb189d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala @@ -40,6 +40,20 @@ private[feature] trait IDFParams extends Params with HasInputCol with HasOutputC def setMinDocFreq(value: Int): this.type = { set(minDocFreq, value) } + + /** + * Validate and transform the input schema. + */ + protected def validateAndTransformSchema(schema: StructType, paramMap: ParamMap): StructType = { + val map = this.paramMap ++ paramMap + val inputType = schema(map(inputCol)).dataType + require(inputType.isInstanceOf[VectorUDT], + s"Input column ${map(inputCol)} must be a vector column") + require(!schema.fieldNames.contains(map(outputCol)), + s"Output column ${map(outputCol)} already exists.") + val outputFields = schema.fields :+ StructField(map(outputCol), new VectorUDT, false) + StructType(outputFields) + } } /** @@ -66,14 +80,7 @@ class IDF extends Estimator[IDFModel] with IDFParams { } override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { - val map = this.paramMap ++ paramMap - val inputType = schema(map(inputCol)).dataType - require(inputType.isInstanceOf[VectorUDT], - s"Input column ${map(inputCol)} must be a vector column") - require(!schema.fieldNames.contains(map(outputCol)), - s"Output column ${map(outputCol)} already exists.") - val outputFields = schema.fields :+ StructField(map(outputCol), new VectorUDT, false) - StructType(outputFields) + validateAndTransformSchema(schema, paramMap) } } @@ -97,18 +104,11 @@ class IDFModel private[ml] ( override def transform(dataset: DataFrame, paramMap: ParamMap): DataFrame = { transformSchema(dataset.schema, paramMap, logging = true) val map = this.paramMap ++ paramMap - val idf = udf((v: Vector) => { idfModel.transform(v) } : Vector) + val idf = udf((v: Vector) => { idfModel.transform(v) }) dataset.withColumn(map(outputCol), idf(col(map(inputCol)))) } override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { - val map = this.paramMap ++ paramMap - val inputType = schema(map(inputCol)).dataType - require(inputType.isInstanceOf[VectorUDT], - s"Input column ${map(inputCol)} must be a vector column") - require(!schema.fieldNames.contains(map(outputCol)), - s"Output column ${map(outputCol)} already exists.") - val outputFields = schema.fields :+ StructField(map(outputCol), new VectorUDT, false) - StructType(outputFields) + validateAndTransformSchema(schema, paramMap) } } From 2add6917372ed997aa5d23a81e4d4266f09517f0 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Thu, 9 Apr 2015 14:51:04 +0800 Subject: [PATCH 04/10] fix code style and test --- .../org/apache/spark/ml/feature/IDF.scala | 14 +-- .../apache/spark/ml/feature/IDFSuite.scala | 113 ++++++++++-------- 2 files changed, 71 insertions(+), 56 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala index 3dee85edb189d..2af3ff812337d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala @@ -33,13 +33,11 @@ private[feature] trait IDFParams extends Params with HasInputCol with HasOutputC val minDocFreq = new IntParam( this, "minDocFreq", "minimum of documents in which a term should appear for filtering", Some(0)) - def getMinDocFreq: Int = { - get(minDocFreq) - } + /** @group getParam */ + def getMinDocFreq: Int = get(minDocFreq) - def setMinDocFreq(value: Int): this.type = { - set(minDocFreq, value) - } + /** @group setParam */ + def setMinDocFreq(value: Int): this.type = set(minDocFreq, value) /** * Validate and transform the input schema. @@ -104,8 +102,8 @@ class IDFModel private[ml] ( override def transform(dataset: DataFrame, paramMap: ParamMap): DataFrame = { transformSchema(dataset.schema, paramMap, logging = true) val map = this.paramMap ++ paramMap - val idf = udf((v: Vector) => { idfModel.transform(v) }) - dataset.withColumn(map(outputCol), idf(col(map(inputCol)))) + val idf: Vector => Vector = (vec) => idfModel.transform(vec) + dataset.withColumn(map(outputCol), callUDF(idf, new VectorUDT, col(map(inputCol)))) } override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala index cd78696732304..32f880033453a 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala @@ -19,84 +19,101 @@ package org.apache.spark.ml.feature import org.scalatest.FunSuite -import org.apache.spark.mllib.linalg.{Vector, Vectors} +import org.apache.spark.mllib.linalg.{SparseVector, DenseVector, Vector, Vectors} import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.mllib.util.TestingUtils._ import org.apache.spark.sql.{DataFrame, Row, SQLContext} class IDFSuite extends FunSuite with MLlibTestSparkContext { - @transient var dataFrame: DataFrame = _ - @transient var idf: IDF = _ - @transient var resultWithDefaultParam: Array[Vector] = _ - @transient var resultWithSetParam: Array[Vector] = _ + def getResultFromDF(result: DataFrame): Array[Vector] = { + result.select("idf_value").collect().map { + case Row(features: Vector) => features + } + } + + def assertValues(lhs: Array[Vector], rhs: Array[Vector]): Unit = { + assert((lhs, rhs).zipped.forall { (vector1, vector2) => + vector1 ~== vector2 absTol 1E-5 + }, "The vector value is not correct after IDF.") + } - override def beforeAll(): Unit = { - super.beforeAll() + def getResultFromVector(dataSet: Array[Vector], model: Vector): Array[Vector] = { + dataSet.map { + case data: DenseVector => + val res = data.toArray.zip(model.toArray).map { case (x, y) => x * y } + Vectors.dense(res) + case data: SparseVector => + val res = data.indices.zip(data.values).map { case (id, value) => + (id, value * model(id)) + }.filter(_._2 != 0.0) + Vectors.sparse(res.size, res) + } + } + test("Normalization with default parameter") { val n = 4 + val data = Array( Vectors.sparse(n, Array(1, 3), Array(1.0, 2.0)), Vectors.dense(0.0, 1.0, 2.0, 3.0), Vectors.sparse(n, Array(1), Array(1.0)) ) + val m = data.size - val expectedDefaultModel = Vectors.dense(Array(0, 3, 1, 2).map { x => + val sqlContext = new SQLContext(sc) + import sqlContext.implicits._ + + val dataFrame = sc.parallelize(data, 2).map(Tuple1.apply).toDF("features") + + val idf = new IDF() + .setInputCol("features") + .setOutputCol("idf_value") + + val expectedModel = Vectors.dense(Array(0, 3, 1, 2).map { x => math.log((m + 1.0) / (x + 1.0)) }) - val expectedSetModel = Vectors.dense(Array(0, 3, 1, 2).map { x => - if (x > 0) { - math.log((m + 1.0) / (x + 1.0)) - } else { - 0 - } - }) + val idfModel = idf.fit(dataFrame) - val result: (Vector) => Array[Vector] = { model: Vector => - Array( - Vectors.sparse(n, Array(1, 3), Array(1.0 * model(1), 2.0 * model(3))), - Vectors.dense(0.0, 1.0 * model(1), 2.0 * model(2), 3.0 * model(3)), - Vectors.sparse(n, Array(1), Array(1.0 * model(1))) - ) - } + assertValues( + getResultFromDF(idfModel.transform(dataFrame)), + getResultFromVector(data, expectedModel)) + } - resultWithDefaultParam = result(expectedDefaultModel) + test("Normalization with setter") { + val n = 4 - resultWithSetParam = result(expectedSetModel) + val data = Array( + Vectors.sparse(n, Array(1, 3), Array(1.0, 2.0)), + Vectors.dense(0.0, 1.0, 2.0, 3.0), + Vectors.sparse(n, Array(1), Array(1.0)) + ) + + val m = data.size val sqlContext = new SQLContext(sc) import sqlContext.implicits._ - dataFrame = sc.parallelize(data, 2).map(Tuple1.apply).toDF("features") - idf = new IDF() - .setInputCol("features") - .setOutputCol("idf_value") - } - - def collectResult(result: DataFrame): Array[Vector] = { - result.select("idf_value").collect().map { - case Row(features: Vector) => features - } - } - def assertValues(lhs: Array[Vector], rhs: Array[Vector]): Unit = { - assert((lhs, rhs).zipped.forall { (vector1, vector2) => - vector1 ~== vector2 absTol 1E-5 - }, "The vector value is not correct after IDF.") - } + val dataFrame = sc.parallelize(data, 2).map(Tuple1.apply).toDF("features") - test("Normalization with default parameter") { - val idfModel = idf.fit(dataFrame) - val tfIdf = collectResult(idfModel.transform(dataFrame)) + val idf = new IDF() + .setInputCol("features") + .setOutputCol("idf_value") - assertValues(tfIdf, resultWithDefaultParam) - } + val expectedModel = Vectors.dense(Array(0, 3, 1, 2).map { x => + if (x > 0) { + math.log((m + 1.0) / (x + 1.0)) + } else { + 0 + } + }) - test("Normalization with setter") { val idfModel = idf.setMinDocFreq(1).fit(dataFrame) - val tfIdf = collectResult(idfModel.transform(dataFrame)) - assertValues(tfIdf, resultWithSetParam) + assertValues( + getResultFromDF(idfModel.transform(dataFrame)), + getResultFromVector(data, expectedModel)) } } From 5760b491100ea0377162956372e246bf6179f38a Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Thu, 9 Apr 2015 15:26:19 +0800 Subject: [PATCH 05/10] fix code style --- .../apache/spark/ml/feature/IDFSuite.scala | 49 ++++++------------- 1 file changed, 16 insertions(+), 33 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala index 32f880033453a..e4bec4383a069 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.ml.feature import org.scalatest.FunSuite -import org.apache.spark.mllib.linalg.{SparseVector, DenseVector, Vector, Vectors} +import org.apache.spark.mllib.linalg.{Vector, Vectors, DenseVector, SparseVector} import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.mllib.util.TestingUtils._ import org.apache.spark.sql.{DataFrame, Row, SQLContext} @@ -46,72 +46,55 @@ class IDFSuite extends FunSuite with MLlibTestSparkContext { case data: SparseVector => val res = data.indices.zip(data.values).map { case (id, value) => (id, value * model(id)) - }.filter(_._2 != 0.0) - Vectors.sparse(res.size, res) + } + Vectors.sparse(data.size, res) } } test("Normalization with default parameter") { - val n = 4 - + val numOfFeatures = 4 val data = Array( - Vectors.sparse(n, Array(1, 3), Array(1.0, 2.0)), + Vectors.sparse(numOfFeatures, Array(1, 3), Array(1.0, 2.0)), Vectors.dense(0.0, 1.0, 2.0, 3.0), - Vectors.sparse(n, Array(1), Array(1.0)) + Vectors.sparse(numOfFeatures, Array(1), Array(1.0)) ) - - val m = data.size + val numOfData = data.size val sqlContext = new SQLContext(sc) import sqlContext.implicits._ - val dataFrame = sc.parallelize(data, 2).map(Tuple1.apply).toDF("features") - val idf = new IDF() - .setInputCol("features") - .setOutputCol("idf_value") + val idfModel = new IDF().setInputCol("features").setOutputCol("idf_value").fit(dataFrame) val expectedModel = Vectors.dense(Array(0, 3, 1, 2).map { x => - math.log((m + 1.0) / (x + 1.0)) + math.log((numOfData + 1.0) / (x + 1.0)) }) - val idfModel = idf.fit(dataFrame) - assertValues( getResultFromDF(idfModel.transform(dataFrame)), getResultFromVector(data, expectedModel)) } test("Normalization with setter") { - val n = 4 - + val numOfFeatures = 4 val data = Array( - Vectors.sparse(n, Array(1, 3), Array(1.0, 2.0)), + Vectors.sparse(numOfFeatures, Array(1, 3), Array(1.0, 2.0)), Vectors.dense(0.0, 1.0, 2.0, 3.0), - Vectors.sparse(n, Array(1), Array(1.0)) + Vectors.sparse(numOfFeatures, Array(1), Array(1.0)) ) - - val m = data.size + val numOfData = data.size val sqlContext = new SQLContext(sc) import sqlContext.implicits._ - val dataFrame = sc.parallelize(data, 2).map(Tuple1.apply).toDF("features") - val idf = new IDF() - .setInputCol("features") - .setOutputCol("idf_value") + val idfModel = new IDF().setInputCol("features").setOutputCol("idf_value").setMinDocFreq(1) + .fit(dataFrame) val expectedModel = Vectors.dense(Array(0, 3, 1, 2).map { x => - if (x > 0) { - math.log((m + 1.0) / (x + 1.0)) - } else { - 0 - } + if (x > 0) math.log((numOfData + 1.0) / (x + 1.0)) else 0 }) - val idfModel = idf.setMinDocFreq(1).fit(dataFrame) - assertValues( getResultFromDF(idfModel.transform(dataFrame)), getResultFromVector(data, expectedModel)) From aef2cdff43297b2038151e766ede0f39929c2c05 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Thu, 9 Apr 2015 15:35:09 +0800 Subject: [PATCH 06/10] add doc and group for param --- mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala index 2af3ff812337d..5a7974123a0a3 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala @@ -30,6 +30,11 @@ import org.apache.spark.sql.types.{StructField, StructType} * Params for [[IDF]] and [[IDFModel]]. */ private[feature] trait IDFParams extends Params with HasInputCol with HasOutputCol { + + /** + * The minimum of documents in which a term should appear. + * @group param + */ val minDocFreq = new IntParam( this, "minDocFreq", "minimum of documents in which a term should appear for filtering", Some(0)) From 5867c096186b6cdcccc54ae15f5d37bb112bfd60 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Wed, 22 Apr 2015 15:48:47 +0800 Subject: [PATCH 07/10] refine IDF transformer with new interfaces --- .../org/apache/spark/ml/feature/IDF.scala | 34 +++++++++---------- 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala index 5a7974123a0a3..201b4cb3df859 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala @@ -20,26 +20,29 @@ package org.apache.spark.ml.feature import org.apache.spark.annotation.AlphaComponent import org.apache.spark.ml._ import org.apache.spark.ml.param._ +import org.apache.spark.ml.param.shared._ +import org.apache.spark.ml.util.SchemaUtils import org.apache.spark.mllib.feature import org.apache.spark.mllib.linalg.{Vector, VectorUDT} import org.apache.spark.sql._ import org.apache.spark.sql.functions._ -import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.sql.types.StructType /** * Params for [[IDF]] and [[IDFModel]]. */ -private[feature] trait IDFParams extends Params with HasInputCol with HasOutputCol { +private[feature] trait IDFBase extends Params with HasInputCol with HasOutputCol { /** * The minimum of documents in which a term should appear. * @group param */ val minDocFreq = new IntParam( - this, "minDocFreq", "minimum of documents in which a term should appear for filtering", Some(0)) + this, "minDocFreq", "minimum of documents in which a term should appear for filtering") + setDefault(minDocFreq -> 0) /** @group getParam */ - def getMinDocFreq: Int = get(minDocFreq) + def getMinDocFreq: Int = getOrDefault(minDocFreq) /** @group setParam */ def setMinDocFreq(value: Int): this.type = set(minDocFreq, value) @@ -48,14 +51,9 @@ private[feature] trait IDFParams extends Params with HasInputCol with HasOutputC * Validate and transform the input schema. */ protected def validateAndTransformSchema(schema: StructType, paramMap: ParamMap): StructType = { - val map = this.paramMap ++ paramMap - val inputType = schema(map(inputCol)).dataType - require(inputType.isInstanceOf[VectorUDT], - s"Input column ${map(inputCol)} must be a vector column") - require(!schema.fieldNames.contains(map(outputCol)), - s"Output column ${map(outputCol)} already exists.") - val outputFields = schema.fields :+ StructField(map(outputCol), new VectorUDT, false) - StructType(outputFields) + val map = extractParamMap(paramMap) + SchemaUtils.checkColumnType(schema, map(inputCol), new VectorUDT) + SchemaUtils.appendColumn(schema, map(outputCol), new VectorUDT) } } @@ -64,7 +62,7 @@ private[feature] trait IDFParams extends Params with HasInputCol with HasOutputC * Compute the Inverse Document Frequency (IDF) given a collection of documents. */ @AlphaComponent -class IDF extends Estimator[IDFModel] with IDFParams { +class IDF extends Estimator[IDFModel] with IDFBase { /** @group setParam */ def setInputCol(value: String): this.type = set(inputCol, value) @@ -74,7 +72,7 @@ class IDF extends Estimator[IDFModel] with IDFParams { override def fit(dataset: DataFrame, paramMap: ParamMap): IDFModel = { transformSchema(dataset.schema, paramMap, logging = true) - val map = this.paramMap ++ paramMap + val map = extractParamMap(paramMap) val input = dataset.select(map(inputCol)).map { case Row(v: Vector) => v } val idf = new feature.IDF(getMinDocFreq).fit(input) val model = new IDFModel(this, map, idf) @@ -96,7 +94,7 @@ class IDFModel private[ml] ( override val parent: IDF, override val fittingParamMap: ParamMap, idfModel: feature.IDFModel) - extends Model[IDFModel] with IDFParams { + extends Model[IDFModel] with IDFBase { /** @group setParam */ def setInputCol(value: String): this.type = set(inputCol, value) @@ -106,9 +104,9 @@ class IDFModel private[ml] ( override def transform(dataset: DataFrame, paramMap: ParamMap): DataFrame = { transformSchema(dataset.schema, paramMap, logging = true) - val map = this.paramMap ++ paramMap - val idf: Vector => Vector = (vec) => idfModel.transform(vec) - dataset.withColumn(map(outputCol), callUDF(idf, new VectorUDT, col(map(inputCol)))) + val map = extractParamMap(paramMap) + val idf = udf { vec: Vector => idfModel.transform(vec) } + dataset.withColumn(map(outputCol), idf(col(map(inputCol)))) } override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { From c9c3759644bc1603306c803ad81542964e32f4f0 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Wed, 22 Apr 2015 16:18:27 +0800 Subject: [PATCH 08/10] simplify test suite --- .../org/apache/spark/ml/feature/IDF.scala | 1 + .../apache/spark/ml/feature/IDFSuite.scala | 73 +++++++++---------- 2 files changed, 37 insertions(+), 37 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala index 201b4cb3df859..0f9e0a76b0a21 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala @@ -39,6 +39,7 @@ private[feature] trait IDFBase extends Params with HasInputCol with HasOutputCol */ val minDocFreq = new IntParam( this, "minDocFreq", "minimum of documents in which a term should appear for filtering") + setDefault(minDocFreq -> 0) /** @group getParam */ diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala index e4bec4383a069..eaee3443c1f23 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala @@ -19,26 +19,21 @@ package org.apache.spark.ml.feature import org.scalatest.FunSuite -import org.apache.spark.mllib.linalg.{Vector, Vectors, DenseVector, SparseVector} +import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors} import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.mllib.util.TestingUtils._ -import org.apache.spark.sql.{DataFrame, Row, SQLContext} +import org.apache.spark.sql.{Row, SQLContext} class IDFSuite extends FunSuite with MLlibTestSparkContext { - def getResultFromDF(result: DataFrame): Array[Vector] = { - result.select("idf_value").collect().map { - case Row(features: Vector) => features - } - } + @transient var sqlContext: SQLContext = _ - def assertValues(lhs: Array[Vector], rhs: Array[Vector]): Unit = { - assert((lhs, rhs).zipped.forall { (vector1, vector2) => - vector1 ~== vector2 absTol 1E-5 - }, "The vector value is not correct after IDF.") + override def beforeAll(): Unit = { + super.beforeAll() + sqlContext = new SQLContext(sc) } - def getResultFromVector(dataSet: Array[Vector], model: Vector): Array[Vector] = { + def scaleDataWithIDF(dataSet: Array[Vector], model: Vector): Array[Vector] = { dataSet.map { case data: DenseVector => val res = data.toArray.zip(model.toArray).map { case (x, y) => x * y } @@ -51,7 +46,7 @@ class IDFSuite extends FunSuite with MLlibTestSparkContext { } } - test("Normalization with default parameter") { + test("compute IDF with default parameter") { val numOfFeatures = 4 val data = Array( Vectors.sparse(numOfFeatures, Array(1, 3), Array(1.0, 2.0)), @@ -59,23 +54,25 @@ class IDFSuite extends FunSuite with MLlibTestSparkContext { Vectors.sparse(numOfFeatures, Array(1), Array(1.0)) ) val numOfData = data.size - - val sqlContext = new SQLContext(sc) - import sqlContext.implicits._ - val dataFrame = sc.parallelize(data, 2).map(Tuple1.apply).toDF("features") - - val idfModel = new IDF().setInputCol("features").setOutputCol("idf_value").fit(dataFrame) - - val expectedModel = Vectors.dense(Array(0, 3, 1, 2).map { x => + val idf = Vectors.dense(Array(0, 3, 1, 2).map { x => math.log((numOfData + 1.0) / (x + 1.0)) }) + val expected = scaleDataWithIDF(data, idf) + + val df = sqlContext.createDataFrame(data.zip(expected)).toDF("features", "expected") + + val idfModel = new IDF() + .setInputCol("features") + .setOutputCol("idfValue") + .fit(df) - assertValues( - getResultFromDF(idfModel.transform(dataFrame)), - getResultFromVector(data, expectedModel)) + idfModel.transform(df).select("idfValue", "expected").collect().foreach { + case Row(x: Vector, y: Vector) => + assert(x ~== y absTol 1e-5, "Transformed vector is different with expected vector.") + } } - test("Normalization with setter") { + test("compute IDF with setter") { val numOfFeatures = 4 val data = Array( Vectors.sparse(numOfFeatures, Array(1, 3), Array(1.0, 2.0)), @@ -83,20 +80,22 @@ class IDFSuite extends FunSuite with MLlibTestSparkContext { Vectors.sparse(numOfFeatures, Array(1), Array(1.0)) ) val numOfData = data.size - - val sqlContext = new SQLContext(sc) - import sqlContext.implicits._ - val dataFrame = sc.parallelize(data, 2).map(Tuple1.apply).toDF("features") - - val idfModel = new IDF().setInputCol("features").setOutputCol("idf_value").setMinDocFreq(1) - .fit(dataFrame) - - val expectedModel = Vectors.dense(Array(0, 3, 1, 2).map { x => + val idf = Vectors.dense(Array(0, 3, 1, 2).map { x => if (x > 0) math.log((numOfData + 1.0) / (x + 1.0)) else 0 }) + val expected = scaleDataWithIDF(data, idf) - assertValues( - getResultFromDF(idfModel.transform(dataFrame)), - getResultFromVector(data, expectedModel)) + val df = sqlContext.createDataFrame(data.zip(expected)).toDF("features", "expected") + + val idfModel = new IDF() + .setInputCol("features") + .setOutputCol("idfValue") + .setMinDocFreq(1) + .fit(df) + + idfModel.transform(df).select("idfValue", "expected").collect().foreach { + case Row(x: Vector, y: Vector) => + assert(x ~== y absTol 1e-5, "Transformed vector is different with expected vector.") + } } } From d1699673013468f5b1461ffd37097c8286df9aad Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Wed, 22 Apr 2015 16:41:29 +0800 Subject: [PATCH 09/10] add final to param and IDF class --- mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala index 0f9e0a76b0a21..3d4ae631bd9d5 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala @@ -37,7 +37,7 @@ private[feature] trait IDFBase extends Params with HasInputCol with HasOutputCol * The minimum of documents in which a term should appear. * @group param */ - val minDocFreq = new IntParam( + final val minDocFreq = new IntParam( this, "minDocFreq", "minimum of documents in which a term should appear for filtering") setDefault(minDocFreq -> 0) @@ -63,7 +63,7 @@ private[feature] trait IDFBase extends Params with HasInputCol with HasOutputCol * Compute the Inverse Document Frequency (IDF) given a collection of documents. */ @AlphaComponent -class IDF extends Estimator[IDFModel] with IDFBase { +final class IDF extends Estimator[IDFModel] with IDFBase { /** @group setParam */ def setInputCol(value: String): this.type = set(inputCol, value) From 741db31f112469141a22634a406ab20feb13e678 Mon Sep 17 00:00:00 2001 From: Xusen Yin Date: Fri, 24 Apr 2015 16:19:13 +0800 Subject: [PATCH 10/10] get param from new paramMap --- mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala index 3d4ae631bd9d5..e6a62d998bb97 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala @@ -75,7 +75,7 @@ final class IDF extends Estimator[IDFModel] with IDFBase { transformSchema(dataset.schema, paramMap, logging = true) val map = extractParamMap(paramMap) val input = dataset.select(map(inputCol)).map { case Row(v: Vector) => v } - val idf = new feature.IDF(getMinDocFreq).fit(input) + val idf = new feature.IDF(map(minDocFreq)).fit(input) val model = new IDFModel(this, map, idf) Params.inheritValues(map, this, model) model