Skip to content

Commit

Permalink
[SPARK-22883][ML][TEST] Streaming tests for spark.ml.feature, from A …
Browse files Browse the repository at this point in the history
…to H

## What changes were proposed in this pull request?

Adds structured streaming tests using testTransformer for these suites:
* BinarizerSuite
* BucketedRandomProjectionLSHSuite
* BucketizerSuite
* ChiSqSelectorSuite
* CountVectorizerSuite
* DCTSuite.scala
* ElementwiseProductSuite
* FeatureHasherSuite
* HashingTFSuite

## How was this patch tested?

It tests itself because it is a bunch of tests!

Author: Joseph K. Bradley <joseph@databricks.com>

Closes #20111 from jkbradley/SPARK-22883-streaming-featureAM.
  • Loading branch information
jkbradley committed Mar 2, 2018
1 parent 34811e0 commit 119f6a0
Show file tree
Hide file tree
Showing 9 changed files with 126 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@

package org.apache.spark.ml.feature

import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamsSuite
import org.apache.spark.ml.util.DefaultReadWriteTest
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest}
import org.apache.spark.sql.{DataFrame, Row}

class BinarizerSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
class BinarizerSuite extends MLTest with DefaultReadWriteTest {

import testImplicits._

Expand All @@ -47,7 +45,7 @@ class BinarizerSuite extends SparkFunSuite with MLlibTestSparkContext with Defau
.setInputCol("feature")
.setOutputCol("binarized_feature")

binarizer.transform(dataFrame).select("binarized_feature", "expected").collect().foreach {
testTransformer[(Double, Double)](dataFrame, binarizer, "binarized_feature", "expected") {
case Row(x: Double, y: Double) =>
assert(x === y, "The feature value is not correct after binarization.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,15 @@ package org.apache.spark.ml.feature
import breeze.numerics.{cos, sin}
import breeze.numerics.constants.Pi

import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamsSuite
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
import org.apache.spark.ml.util.TestingUtils._
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.{Dataset, Row}

class BucketedRandomProjectionLSHSuite
extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
class BucketedRandomProjectionLSHSuite extends MLTest with DefaultReadWriteTest {

import testImplicits._

@transient var dataset: Dataset[_] = _

Expand Down Expand Up @@ -98,6 +97,21 @@ class BucketedRandomProjectionLSHSuite
MLTestingUtils.checkCopyAndUids(brp, brpModel)
}

test("BucketedRandomProjectionLSH: streaming transform") {
val brp = new BucketedRandomProjectionLSH()
.setNumHashTables(2)
.setInputCol("keys")
.setOutputCol("values")
.setBucketLength(1.0)
.setSeed(12345)
val brpModel = brp.fit(dataset)

testTransformer[Tuple1[Vector]](dataset.toDF(), brpModel, "values") {
case Row(values: Seq[_]) =>
assert(values.length === brp.getNumHashTables)
}
}

test("BucketedRandomProjectionLSH: test of LSH property") {
// Project from 2 dimensional Euclidean Space to 1 dimensions
val brp = new BucketedRandomProjectionLSH()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,13 @@ import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.param.ParamsSuite
import org.apache.spark.ml.util.DefaultReadWriteTest
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest}
import org.apache.spark.ml.util.TestingUtils._
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

class BucketizerSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
class BucketizerSuite extends MLTest with DefaultReadWriteTest {

import testImplicits._

Expand All @@ -50,7 +49,7 @@ class BucketizerSuite extends SparkFunSuite with MLlibTestSparkContext with Defa
.setOutputCol("result")
.setSplits(splits)

bucketizer.transform(dataFrame).select("result", "expected").collect().foreach {
testTransformer[(Double, Double)](dataFrame, bucketizer, "result", "expected") {
case Row(x: Double, y: Double) =>
assert(x === y,
s"The feature value is not correct after bucketing. Expected $y but found $x")
Expand Down Expand Up @@ -84,7 +83,7 @@ class BucketizerSuite extends SparkFunSuite with MLlibTestSparkContext with Defa
.setOutputCol("result")
.setSplits(splits)

bucketizer.transform(dataFrame).select("result", "expected").collect().foreach {
testTransformer[(Double, Double)](dataFrame, bucketizer, "result", "expected") {
case Row(x: Double, y: Double) =>
assert(x === y,
s"The feature value is not correct after bucketing. Expected $y but found $x")
Expand All @@ -103,7 +102,7 @@ class BucketizerSuite extends SparkFunSuite with MLlibTestSparkContext with Defa
.setSplits(splits)

bucketizer.setHandleInvalid("keep")
bucketizer.transform(dataFrame).select("result", "expected").collect().foreach {
testTransformer[(Double, Double)](dataFrame, bucketizer, "result", "expected") {
case Row(x: Double, y: Double) =>
assert(x === y,
s"The feature value is not correct after bucketing. Expected $y but found $x")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@

package org.apache.spark.ml.feature

import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamsSuite
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
import org.apache.spark.ml.util.TestingUtils._
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.sql.{Dataset, Row}

class ChiSqSelectorSuite extends SparkFunSuite with MLlibTestSparkContext
with DefaultReadWriteTest {
class ChiSqSelectorSuite extends MLTest with DefaultReadWriteTest {

import testImplicits._

@transient var dataset: Dataset[_] = _

Expand Down Expand Up @@ -119,32 +118,32 @@ class ChiSqSelectorSuite extends SparkFunSuite with MLlibTestSparkContext
test("Test Chi-Square selector: numTopFeatures") {
val selector = new ChiSqSelector()
.setOutputCol("filtered").setSelectorType("numTopFeatures").setNumTopFeatures(1)
val model = ChiSqSelectorSuite.testSelector(selector, dataset)
val model = testSelector(selector, dataset)
MLTestingUtils.checkCopyAndUids(selector, model)
}

test("Test Chi-Square selector: percentile") {
val selector = new ChiSqSelector()
.setOutputCol("filtered").setSelectorType("percentile").setPercentile(0.17)
ChiSqSelectorSuite.testSelector(selector, dataset)
testSelector(selector, dataset)
}

test("Test Chi-Square selector: fpr") {
val selector = new ChiSqSelector()
.setOutputCol("filtered").setSelectorType("fpr").setFpr(0.02)
ChiSqSelectorSuite.testSelector(selector, dataset)
testSelector(selector, dataset)
}

test("Test Chi-Square selector: fdr") {
val selector = new ChiSqSelector()
.setOutputCol("filtered").setSelectorType("fdr").setFdr(0.12)
ChiSqSelectorSuite.testSelector(selector, dataset)
testSelector(selector, dataset)
}

test("Test Chi-Square selector: fwe") {
val selector = new ChiSqSelector()
.setOutputCol("filtered").setSelectorType("fwe").setFwe(0.12)
ChiSqSelectorSuite.testSelector(selector, dataset)
testSelector(selector, dataset)
}

test("read/write") {
Expand All @@ -163,18 +162,19 @@ class ChiSqSelectorSuite extends SparkFunSuite with MLlibTestSparkContext
assert(expected.selectedFeatures === actual.selectedFeatures)
}
}
}

object ChiSqSelectorSuite {

private def testSelector(selector: ChiSqSelector, dataset: Dataset[_]): ChiSqSelectorModel = {
val selectorModel = selector.fit(dataset)
selectorModel.transform(dataset).select("filtered", "topFeature").collect()
.foreach { case Row(vec1: Vector, vec2: Vector) =>
private def testSelector(selector: ChiSqSelector, data: Dataset[_]): ChiSqSelectorModel = {
val selectorModel = selector.fit(data)
testTransformer[(Double, Vector, Vector)](data.toDF(), selectorModel,
"filtered", "topFeature") {
case Row(vec1: Vector, vec2: Vector) =>
assert(vec1 ~== vec2 absTol 1e-1)
}
}
selectorModel
}
}

object ChiSqSelectorSuite {

/**
* Mapping from all Params to valid settings which differ from the defaults.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,13 @@
*/
package org.apache.spark.ml.feature

import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamsSuite
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
import org.apache.spark.ml.util.TestingUtils._
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.sql.Row

class CountVectorizerSuite extends SparkFunSuite with MLlibTestSparkContext
with DefaultReadWriteTest {
class CountVectorizerSuite extends MLTest with DefaultReadWriteTest {

import testImplicits._

Expand All @@ -50,7 +47,7 @@ class CountVectorizerSuite extends SparkFunSuite with MLlibTestSparkContext
val cv = new CountVectorizerModel(Array("a", "b", "c", "d"))
.setInputCol("words")
.setOutputCol("features")
cv.transform(df).select("features", "expected").collect().foreach {
testTransformer[(Int, Seq[String], Vector)](df, cv, "features", "expected") {
case Row(features: Vector, expected: Vector) =>
assert(features ~== expected absTol 1e-14)
}
Expand All @@ -72,7 +69,7 @@ class CountVectorizerSuite extends SparkFunSuite with MLlibTestSparkContext
MLTestingUtils.checkCopyAndUids(cv, cvm)
assert(cvm.vocabulary.toSet === Set("a", "b", "c", "d", "e"))

cvm.transform(df).select("features", "expected").collect().foreach {
testTransformer[(Int, Seq[String], Vector)](df, cvm, "features", "expected") {
case Row(features: Vector, expected: Vector) =>
assert(features ~== expected absTol 1e-14)
}
Expand Down Expand Up @@ -100,7 +97,7 @@ class CountVectorizerSuite extends SparkFunSuite with MLlibTestSparkContext
.fit(df)
assert(cvModel2.vocabulary === Array("a", "b"))

cvModel2.transform(df).select("features", "expected").collect().foreach {
testTransformer[(Int, Seq[String], Vector)](df, cvModel2, "features", "expected") {
case Row(features: Vector, expected: Vector) =>
assert(features ~== expected absTol 1e-14)
}
Expand All @@ -113,7 +110,7 @@ class CountVectorizerSuite extends SparkFunSuite with MLlibTestSparkContext
.fit(df)
assert(cvModel3.vocabulary === Array("a", "b"))

cvModel3.transform(df).select("features", "expected").collect().foreach {
testTransformer[(Int, Seq[String], Vector)](df, cvModel3, "features", "expected") {
case Row(features: Vector, expected: Vector) =>
assert(features ~== expected absTol 1e-14)
}
Expand Down Expand Up @@ -219,7 +216,7 @@ class CountVectorizerSuite extends SparkFunSuite with MLlibTestSparkContext
.setInputCol("words")
.setOutputCol("features")
.setMinTF(3)
cv.transform(df).select("features", "expected").collect().foreach {
testTransformer[(Int, Seq[String], Vector)](df, cv, "features", "expected") {
case Row(features: Vector, expected: Vector) =>
assert(features ~== expected absTol 1e-14)
}
Expand All @@ -238,7 +235,7 @@ class CountVectorizerSuite extends SparkFunSuite with MLlibTestSparkContext
.setInputCol("words")
.setOutputCol("features")
.setMinTF(0.3)
cv.transform(df).select("features", "expected").collect().foreach {
testTransformer[(Int, Seq[String], Vector)](df, cv, "features", "expected") {
case Row(features: Vector, expected: Vector) =>
assert(features ~== expected absTol 1e-14)
}
Expand All @@ -258,7 +255,7 @@ class CountVectorizerSuite extends SparkFunSuite with MLlibTestSparkContext
.setOutputCol("features")
.setBinary(true)
.fit(df)
cv.transform(df).select("features", "expected").collect().foreach {
testTransformer[(Int, Seq[String], Vector)](df, cv, "features", "expected") {
case Row(features: Vector, expected: Vector) =>
assert(features ~== expected absTol 1e-14)
}
Expand All @@ -268,7 +265,7 @@ class CountVectorizerSuite extends SparkFunSuite with MLlibTestSparkContext
.setInputCol("words")
.setOutputCol("features")
.setBinary(true)
cv2.transform(df).select("features", "expected").collect().foreach {
testTransformer[(Int, Seq[String], Vector)](df, cv2, "features", "expected") {
case Row(features: Vector, expected: Vector) =>
assert(features ~== expected absTol 1e-14)
}
Expand Down
14 changes: 5 additions & 9 deletions mllib/src/test/scala/org/apache/spark/ml/feature/DCTSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,14 @@ import scala.beans.BeanInfo

import edu.emory.mathcs.jtransforms.dct.DoubleDCT_1D

import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.util.DefaultReadWriteTest
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest}
import org.apache.spark.sql.Row

@BeanInfo
case class DCTTestData(vec: Vector, wantedVec: Vector)

class DCTSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
class DCTSuite extends MLTest with DefaultReadWriteTest {

import testImplicits._

Expand Down Expand Up @@ -72,11 +70,9 @@ class DCTSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead
.setOutputCol("resultVec")
.setInverse(inverse)

transformer.transform(dataset)
.select("resultVec", "wantedVec")
.collect()
.foreach { case Row(resultVec: Vector, wantedVec: Vector) =>
assert(Vectors.sqdist(resultVec, wantedVec) < 1e-6)
testTransformer[(Vector, Vector)](dataset, transformer, "resultVec", "wantedVec") {
case Row(resultVec: Vector, wantedVec: Vector) =>
assert(Vectors.sqdist(resultVec, wantedVec) < 1e-6)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,31 @@

package org.apache.spark.ml.feature

import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.util.DefaultReadWriteTest
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest}
import org.apache.spark.ml.util.TestingUtils._
import org.apache.spark.sql.Row

class ElementwiseProductSuite
extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
class ElementwiseProductSuite extends MLTest with DefaultReadWriteTest {

import testImplicits._

test("streaming transform") {
val scalingVec = Vectors.dense(0.1, 10.0)
val data = Seq(
(Vectors.dense(0.1, 1.0), Vectors.dense(0.01, 10.0)),
(Vectors.dense(0.0, -1.1), Vectors.dense(0.0, -11.0))
)
val df = spark.createDataFrame(data).toDF("features", "expected")
val ep = new ElementwiseProduct()
.setInputCol("features")
.setOutputCol("actual")
.setScalingVec(scalingVec)
testTransformer[(Vector, Vector)](df, ep, "actual", "expected") {
case Row(actual: Vector, expected: Vector) =>
assert(actual ~== expected relTol 1e-14)
}
}

test("read/write") {
val ep = new ElementwiseProduct()
Expand Down
Loading

0 comments on commit 119f6a0

Please sign in to comment.