Skip to content
Permalink
Browse files

[SPARK-22883][ML] ML test for StructuredStreaming: spark.ml.feature, I-M

This backports #20964 to branch-2.3.

## What changes were proposed in this pull request?

Adds structured streaming tests using testTransformer for these suites:
* IDF
* Imputer
* Interaction
* MaxAbsScaler
* MinHashLSH
* MinMaxScaler
* NGram

## How was this patch tested?

It is a bunch of tests!

Author: Joseph K. Bradley <josephdatabricks.com>

Author: Joseph K. Bradley <joseph@databricks.com>

Closes #21042 from jkbradley/SPARK-22883-part2-2.3backport.
  • Loading branch information...
jkbradley committed Apr 11, 2018
1 parent 320269e commit acfc156df551632007a47b7ec7a7c901a713082d
@@ -17,17 +17,15 @@

package org.apache.spark.ml.feature

import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors}
import org.apache.spark.ml.param.ParamsSuite
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
import org.apache.spark.ml.util.TestingUtils._
import org.apache.spark.mllib.feature.{IDFModel => OldIDFModel}
import org.apache.spark.mllib.linalg.VectorImplicits._
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.sql.Row

class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
class IDFSuite extends MLTest with DefaultReadWriteTest {

import testImplicits._

@@ -57,7 +55,7 @@ class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead
Vectors.dense(0.0, 1.0, 2.0, 3.0),
Vectors.sparse(numOfFeatures, Array(1), Array(1.0))
)
val numOfData = data.size
val numOfData = data.length
val idf = Vectors.dense(Array(0, 3, 1, 2).map { x =>
math.log((numOfData + 1.0) / (x + 1.0))
})
@@ -72,7 +70,7 @@ class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead

MLTestingUtils.checkCopyAndUids(idfEst, idfModel)

idfModel.transform(df).select("idfValue", "expected").collect().foreach {
testTransformer[(Vector, Vector)](df, idfModel, "idfValue", "expected") {
case Row(x: Vector, y: Vector) =>
assert(x ~== y absTol 1e-5, "Transformed vector is different with expected vector.")
}
@@ -85,7 +83,7 @@ class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead
Vectors.dense(0.0, 1.0, 2.0, 3.0),
Vectors.sparse(numOfFeatures, Array(1), Array(1.0))
)
val numOfData = data.size
val numOfData = data.length
val idf = Vectors.dense(Array(0, 3, 1, 2).map { x =>
if (x > 0) math.log((numOfData + 1.0) / (x + 1.0)) else 0
})
@@ -99,7 +97,7 @@ class IDFSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead
.setMinDocFreq(1)
.fit(df)

idfModel.transform(df).select("idfValue", "expected").collect().foreach {
testTransformer[(Vector, Vector)](df, idfModel, "idfValue", "expected") {
case Row(x: Vector, y: Vector) =>
assert(x ~== y absTol 1e-5, "Transformed vector is different with expected vector.")
}
@@ -16,13 +16,12 @@
*/
package org.apache.spark.ml.feature

import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.ml.util.DefaultReadWriteTest
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.SparkException
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest}
import org.apache.spark.mllib.util.TestingUtils._
import org.apache.spark.sql.{DataFrame, Row}

class ImputerSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
class ImputerSuite extends MLTest with DefaultReadWriteTest {

test("Imputer for Double with default missing Value NaN") {
val df = spark.createDataFrame( Seq(
@@ -76,6 +75,28 @@ class ImputerSuite extends SparkFunSuite with MLlibTestSparkContext with Default
ImputerSuite.iterateStrategyTest(imputer, df)
}

test("Imputer should work with Structured Streaming") {
val localSpark = spark
import localSpark.implicits._
val df = Seq[(java.lang.Double, Double)](
(4.0, 4.0),
(10.0, 10.0),
(10.0, 10.0),
(Double.NaN, 8.0),
(null, 8.0)
).toDF("value", "expected_mean_value")
val imputer = new Imputer()
.setInputCols(Array("value"))
.setOutputCols(Array("out"))
.setStrategy("mean")
val model = imputer.fit(df)
testTransformer[(java.lang.Double, Double)](df, model, "expected_mean_value", "out") {
case Row(exp: java.lang.Double, out: Double) =>
assert((exp.isNaN && out.isNaN) || (exp == out),
s"Imputed values differ. Expected: $exp, actual: $out")
}
}

test("Imputer throws exception when surrogate cannot be computed") {
val df = spark.createDataFrame( Seq(
(0, Double.NaN, 1.0, 1.0),
@@ -164,8 +185,6 @@ object ImputerSuite {
* @param df DataFrame with columns "id", "value", "expected_mean", "expected_median"
*/
def iterateStrategyTest(imputer: Imputer, df: DataFrame): Unit = {
val inputCols = imputer.getInputCols

Seq("mean", "median").foreach { strategy =>
imputer.setStrategy(strategy)
val model = imputer.fit(df)
@@ -19,15 +19,15 @@ package org.apache.spark.ml.feature

import scala.collection.mutable.ArrayBuilder

import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.SparkException
import org.apache.spark.ml.attribute._
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamsSuite
import org.apache.spark.ml.util.DefaultReadWriteTest
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest}
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions.col

class InteractionSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
class InteractionSuite extends MLTest with DefaultReadWriteTest {

import testImplicits._

@@ -63,24 +63,25 @@ class InteractionSuite extends SparkFunSuite with MLlibTestSparkContext with Def

test("numeric interaction") {
val data = Seq(
(2, Vectors.dense(3.0, 4.0)),
(1, Vectors.dense(1.0, 5.0))
).toDF("a", "b")
(2, Vectors.dense(3.0, 4.0), Vectors.dense(6.0, 8.0)),
(1, Vectors.dense(1.0, 5.0), Vectors.dense(1.0, 5.0))
).toDF("a", "b", "expected")
val groupAttr = new AttributeGroup(
"b",
Array[Attribute](
NumericAttribute.defaultAttr.withName("foo"),
NumericAttribute.defaultAttr.withName("bar")))
val df = data.select(
col("a").as("a", NumericAttribute.defaultAttr.toMetadata()),
col("b").as("b", groupAttr.toMetadata()))
col("b").as("b", groupAttr.toMetadata()),
col("expected"))
val trans = new Interaction().setInputCols(Array("a", "b")).setOutputCol("features")
testTransformer[(Int, Vector, Vector)](df, trans, "features", "expected") {
case Row(features: Vector, expected: Vector) =>
assert(features === expected)
}

val res = trans.transform(df)
val expected = Seq(
(2, Vectors.dense(3.0, 4.0), Vectors.dense(6.0, 8.0)),
(1, Vectors.dense(1.0, 5.0), Vectors.dense(1.0, 5.0))
).toDF("a", "b", "features")
assert(res.collect() === expected.collect())
val attrs = AttributeGroup.fromStructField(res.schema("features"))
val expectedAttrs = new AttributeGroup(
"features",
@@ -92,9 +93,9 @@ class InteractionSuite extends SparkFunSuite with MLlibTestSparkContext with Def

test("nominal interaction") {
val data = Seq(
(2, Vectors.dense(3.0, 4.0)),
(1, Vectors.dense(1.0, 5.0))
).toDF("a", "b")
(2, Vectors.dense(3.0, 4.0), Vectors.dense(0, 0, 0, 0, 3, 4)),
(1, Vectors.dense(1.0, 5.0), Vectors.dense(0, 0, 1, 5, 0, 0))
).toDF("a", "b", "expected")
val groupAttr = new AttributeGroup(
"b",
Array[Attribute](
@@ -103,14 +104,15 @@ class InteractionSuite extends SparkFunSuite with MLlibTestSparkContext with Def
val df = data.select(
col("a").as(
"a", NominalAttribute.defaultAttr.withValues(Array("up", "down", "left")).toMetadata()),
col("b").as("b", groupAttr.toMetadata()))
col("b").as("b", groupAttr.toMetadata()),
col("expected"))
val trans = new Interaction().setInputCols(Array("a", "b")).setOutputCol("features")
testTransformer[(Int, Vector, Vector)](df, trans, "features", "expected") {
case Row(features: Vector, expected: Vector) =>
assert(features === expected)
}

val res = trans.transform(df)
val expected = Seq(
(2, Vectors.dense(3.0, 4.0), Vectors.dense(0, 0, 0, 0, 3, 4)),
(1, Vectors.dense(1.0, 5.0), Vectors.dense(0, 0, 1, 5, 0, 0))
).toDF("a", "b", "features")
assert(res.collect() === expected.collect())
val attrs = AttributeGroup.fromStructField(res.schema("features"))
val expectedAttrs = new AttributeGroup(
"features",
@@ -14,15 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.ml.feature

import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
import org.apache.spark.sql.Row

class MaxAbsScalerSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
class MaxAbsScalerSuite extends MLTest with DefaultReadWriteTest {

import testImplicits._

@@ -45,9 +44,10 @@ class MaxAbsScalerSuite extends SparkFunSuite with MLlibTestSparkContext with De
.setOutputCol("scaled")

val model = scaler.fit(df)
model.transform(df).select("expected", "scaled").collect()
.foreach { case Row(vector1: Vector, vector2: Vector) =>
assert(vector1.equals(vector2), s"MaxAbsScaler ut error: $vector2 should be $vector1")
testTransformer[(Vector, Vector)](df, model, "expected", "scaled") {
case Row(expectedVec: Vector, actualVec: Vector) =>
assert(expectedVec === actualVec,
s"MaxAbsScaler error: Expected $expectedVec but computed $actualVec")
}

MLTestingUtils.checkCopyAndUids(scaler, model)
@@ -17,14 +17,13 @@

package org.apache.spark.ml.feature

import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamsSuite
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.sql.Dataset
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
import org.apache.spark.sql.{Dataset, Row}

class MinHashLSHSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {

class MinHashLSHSuite extends MLTest with DefaultReadWriteTest {

@transient var dataset: Dataset[_] = _

@@ -167,4 +166,20 @@ class MinHashLSHSuite extends SparkFunSuite with MLlibTestSparkContext with Defa
assert(precision == 1.0)
assert(recall >= 0.7)
}

test("MinHashLSHModel.transform should work with Structured Streaming") {
val localSpark = spark
import localSpark.implicits._

val model = new MinHashLSHModel("mh", randCoefficients = Array((1, 0)))
model.set(model.inputCol, "keys")
testTransformer[Tuple1[Vector]](dataset.toDF(), model, "keys", model.getOutputCol) {
case Row(_: Vector, output: Seq[_]) =>
assert(output.length === model.randCoefficients.length)
// no AND-amplification yet: SPARK-18450, so each hash output is of length 1
output.foreach {
case hashOutput: Vector => assert(hashOutput.size === 1)
}
}
}
}
@@ -17,13 +17,11 @@

package org.apache.spark.ml.feature

import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils}
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTest, MLTestingUtils}
import org.apache.spark.sql.Row

class MinMaxScalerSuite extends SparkFunSuite with MLlibTestSparkContext with DefaultReadWriteTest {
class MinMaxScalerSuite extends MLTest with DefaultReadWriteTest {

import testImplicits._

@@ -48,9 +46,9 @@ class MinMaxScalerSuite extends SparkFunSuite with MLlibTestSparkContext with De
.setMax(5)

val model = scaler.fit(df)
model.transform(df).select("expected", "scaled").collect()
.foreach { case Row(vector1: Vector, vector2: Vector) =>
assert(vector1.equals(vector2), "Transformed vector is different with expected.")
testTransformer[(Vector, Vector)](df, model, "expected", "scaled") {
case Row(vector1: Vector, vector2: Vector) =>
assert(vector1 === vector2, "Transformed vector is different with expected.")
}

MLTestingUtils.checkCopyAndUids(scaler, model)
@@ -114,7 +112,7 @@ class MinMaxScalerSuite extends SparkFunSuite with MLlibTestSparkContext with De
val model = scaler.fit(df)
model.transform(df).select("expected", "scaled").collect()
.foreach { case Row(vector1: Vector, vector2: Vector) =>
assert(vector1.equals(vector2), "Transformed vector is different with expected.")
assert(vector1 === vector2, "Transformed vector is different with expected.")
}
}
}
@@ -84,7 +84,7 @@ class NGramSuite extends MLTest with DefaultReadWriteTest {

def testNGram(t: NGram, dataFrame: DataFrame): Unit = {
testTransformer[(Seq[String], Seq[String])](dataFrame, t, "nGrams", "wantedNGrams") {
case Row(actualNGrams : Seq[String], wantedNGrams: Seq[String]) =>
case Row(actualNGrams : Seq[_], wantedNGrams: Seq[_]) =>
assert(actualNGrams === wantedNGrams)
}
}

0 comments on commit acfc156

Please sign in to comment.
You can’t perform that action at this time.