Skip to content

Commit

Permalink
[SW-1368] Preparation for scoring package (#1393)
Browse files Browse the repository at this point in the history
(cherry picked from commit f9bf240)
  • Loading branch information
jakubhava committed Jul 31, 2019
1 parent 456673c commit 23ccf30
Show file tree
Hide file tree
Showing 19 changed files with 258 additions and 215 deletions.
6 changes: 4 additions & 2 deletions build.gradle
Expand Up @@ -42,7 +42,8 @@ ext {
project(':sparkling-water-examples'),
project(':sparkling-water-ml'),
project(':sparkling-water-package'),
project(":sparkling-water-doc")
project(":sparkling-water-doc"),
project(':sparkling-water-utils')
]
// Projects with integration tests
integTestProjects = [
Expand All @@ -65,7 +66,8 @@ ext {
project(':sparkling-water-app-streaming'),
project(':sparkling-water-package'),
project(':sparkling-water-benchmarks'),
project(':sparkling-water-macros')
project(':sparkling-water-macros'),
project(':sparkling-water-utils')
]
javaProjects = [
project(':sparkling-water-extension-stack-trace')
Expand Down
1 change: 1 addition & 0 deletions core/build.gradle
Expand Up @@ -25,6 +25,7 @@ jar {
}

dependencies {
compile project(':sparkling-water-utils')
compile("ai.h2o:h2o-ext-jython-cfunc:${h2oVersion}")

// Generic model support
Expand Down
Expand Up @@ -17,10 +17,11 @@

package ai.h2o.sparkling.bench

import ai.h2o.sparkling.ml.utils.SchemaUtils
import ai.h2o.sparkling.utils.schemas._
import org.apache.spark.SparkContext
import org.apache.spark.h2o.testdata.{DenseVectorHolder, SparseVectorHolder}
import org.apache.spark.h2o.utils.{H2OSchemaUtils, SharedH2OTestContext, TestFrameUtils}
import org.apache.spark.h2o.utils.{SharedH2OTestContext, TestFrameUtils}
import org.apache.spark.ml.linalg.{DenseVector, SparseVector}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
Expand Down Expand Up @@ -83,17 +84,17 @@ class DataFrameConverterBenchSuite extends BenchSuite with SharedH2OTestContext

def testflattenOnlyPerSchema(schemaHolder: TestFrameUtils.SchemaHolder): Unit = {
val df = TestFrameUtils.generateDataFrame(spark, schemaHolder, settings)
H2OSchemaUtils.flattenDataFrame(df).foreach(_ => {})
SchemaUtils.flattenDataFrame(df).foreach(_ => {})
}

def testflattenSchema(schemaHolder: TestFrameUtils.SchemaHolder): Unit = {
val df = TestFrameUtils.generateDataFrame(spark, schemaHolder, settings)
H2OSchemaUtils.flattenSchema(df)
SchemaUtils.flattenSchema(df)
}

def rowToSchema(schemaHolder: TestFrameUtils.SchemaHolder): Unit = {
val df = TestFrameUtils.generateDataFrame(spark, schemaHolder, settings)
H2OSchemaUtils.rowsToRowSchemas(df).foreach(_ => {})
SchemaUtils.rowsToRowSchemas(df).foreach(_ => {})
}

benchTest("Measure performance of conversion to H2OFrame on a data frame with wide sparse vectors") {
Expand Down
Expand Up @@ -17,8 +17,9 @@

package water.sparkling.itest.local

import ai.h2o.sparkling.ml.utils.SchemaUtils
import ai.h2o.sparkling.utils.schemas.ComplexSchema
import org.apache.spark.h2o.utils.{H2OSchemaUtils, SparkTestContext, TestFrameUtils}
import org.apache.spark.h2o.utils.{SparkTestContext, TestFrameUtils}
import org.apache.spark.sql.Row
import org.apache.spark.{SparkConf, SparkContext}
import org.junit.runner.RunWith
Expand Down Expand Up @@ -58,7 +59,7 @@ class H2OSchemaUtilsIntegrationTestSuite extends FunSuite with Matchers with Spa
def testFlatteningOnComplexType(settings: TestFrameUtils.GenerateDataFrameSettings, expectedNumberOfColumns: Int) = {
trackTime {
val complexDF = TestFrameUtils.generateDataFrame(spark, ComplexSchema, settings)
val flattened = H2OSchemaUtils.flattenDataFrame(complexDF)
val flattened = SchemaUtils.flattenDataFrame(complexDF)

val fieldTypeNames = flattened.schema.fields.map(_.dataType.typeName)
val numberOfFields = fieldTypeNames.length
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/h2o/DefaultSource.scala
Expand Up @@ -78,7 +78,8 @@ class DefaultSource extends RelationProvider
case SaveMode.Append =>
sys.error("Appending to H2O Frame is not supported.")
case SaveMode.Overwrite =>
DataSourceUtils.overwrite(key, originalFrame, data)
originalFrame.remove()
h2oContext.asH2OFrame(data, key)
case SaveMode.ErrorIfExists =>
sys.error(s"Frame with key '$key' already exists, if you want to override it set the save mode to override.")
case SaveMode.Ignore => // do nothing
Expand Down
Expand Up @@ -20,9 +20,9 @@ package org.apache.spark.h2o.converters
import org.apache.spark.h2o.H2OContext
import org.apache.spark.h2o.backends.external.{ExternalBackendUtils, ExternalWriteConverterCtx}
import org.apache.spark.h2o.converters.WriteConverterCtxUtils.UploadPlan
import org.apache.spark.h2o.utils.{H2OSchemaUtils, ReflectionUtils}
import org.apache.spark.h2o.utils.ReflectionUtils
import org.apache.spark.internal.Logging
import org.apache.spark.sql.types.{BinaryType, BooleanType, ByteType, DateType, DoubleType, FloatType, IntegerType, LongType, ShortType, StringType, TimestampType, _}
import org.apache.spark.sql.types.{BooleanType, ByteType, DateType, DoubleType, FloatType, IntegerType, LongType, ShortType, StringType, TimestampType, _}
import org.apache.spark.sql.{DataFrame, H2OFrameRelation, Row}
import org.apache.spark.{mllib, _}
import water.Key
Expand Down Expand Up @@ -50,7 +50,7 @@ private[h2o] object SparkDataFrameConverter extends Logging {

/** Transform Spark's DataFrame into H2O Frame */
def toH2OFrame(hc: H2OContext, dataFrame: DataFrame, frameKeyName: Option[String]): H2OFrame = {
import H2OSchemaUtils._
import ai.h2o.sparkling.ml.utils.SchemaUtils._

val flatDataFrame = flattenDataFrame(dataFrame)
val dfRdd = flatDataFrame.rdd
Expand Down
23 changes: 0 additions & 23 deletions core/src/main/scala/org/apache/spark/h2o/utils/DatasetShape.scala

This file was deleted.

47 changes: 44 additions & 3 deletions core/src/main/scala/org/apache/spark/sql/H2OSQLContextUtils.scala
Expand Up @@ -19,11 +19,11 @@ package org.apache.spark.sql

import org.apache.spark.h2o.H2OContext
import org.apache.spark.h2o.converters.H2ODataFrame
import org.apache.spark.h2o.utils.H2OSchemaUtils
import org.apache.spark.h2o.utils.ReflectionUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.{BaseRelation, PrunedScan, TableScan}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
import water.fvec.Frame

/**
Expand All @@ -50,11 +50,52 @@ case class H2OFrameRelation[T <: Frame](@transient h2oFrame: T,

override val needConversion = false

override val schema: StructType = H2OSchemaUtils.createSchema(h2oFrame, copyMetadata)
override val schema: StructType = createSchema(h2oFrame, copyMetadata)

override def buildScan(): RDD[Row] =
new H2ODataFrame(h2oFrame)(h2oContext).asInstanceOf[RDD[Row]]

override def buildScan(requiredColumns: Array[String]): RDD[Row] =
new H2ODataFrame(h2oFrame, requiredColumns)(h2oContext).asInstanceOf[RDD[Row]]

private def createSchema(f: T, copyMetadata: Boolean): StructType = {
import ReflectionUtils._

val types = new Array[StructField](f.numCols())
val vecs = f.vecs()
val names = f.names()
for (i <- 0 until f.numCols()) {
val vec = vecs(i)
types(i) = if (copyMetadata) {
var metadata = (new MetadataBuilder).
putLong("count", vec.length()).
putLong("naCnt", vec.naCnt())

if (vec.isCategorical) {
metadata = metadata.putStringArray("vals", vec.domain()).
putLong("cardinality", vec.cardinality().toLong)
} else if (vec.isNumeric) {
metadata = metadata.
putDouble("min", vec.min()).
putDouble("mean", vec.mean()).
putDoubleArray("percentiles", vec.pctiles()).
putDouble("max", vec.max()).
putDouble("std", vec.sigma()).
putDouble("sparsity", vec.nzCnt() / vec.length().toDouble)
}
StructField(
names(i), // Name of column
dataTypeFor(vec), // Catalyst type of column
vec.naCnt() > 0,
metadata.build())
} else {
StructField(
names(i), // Name of column
dataTypeFor(vec), // Catalyst type of column
vec.naCnt() > 0)
}
}
StructType(types)
}

}
Expand Up @@ -16,8 +16,9 @@
*/
package org.apache.spark.h2o

import ai.h2o.sparkling.ml.utils.SchemaUtils
import org.apache.spark.SparkContext
import org.apache.spark.h2o.utils.{H2OSchemaUtils, SparkTestContext, TestFrameUtils}
import org.apache.spark.h2o.utils.{SparkTestContext, TestFrameUtils}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{ArrayType, DoubleType, IntegerType, MapType, StringType, StructField, StructType}
import org.apache.spark.sql.functions._
Expand All @@ -39,7 +40,7 @@ class H2OSchemaUtilsTestSuite extends FlatSpec with Matchers with SparkTestConte
StructField("b", IntegerType, false)
:: Nil
)
val flatSchema = H2OSchemaUtils.flattenStructsInSchema(expSchema)
val flatSchema = SchemaUtils.flattenStructsInSchema(expSchema)
val expected = Seq(
(StructField("a", IntegerType, true), "a"),
(StructField("b", IntegerType, false), "b"))
Expand All @@ -58,7 +59,7 @@ class H2OSchemaUtilsTestSuite extends FlatSpec with Matchers with SparkTestConte
), false)
:: Nil
)
val flatSchema = H2OSchemaUtils.flattenStructsInSchema(expSchema)
val flatSchema = SchemaUtils.flattenStructsInSchema(expSchema)
val expected = Seq(
(StructField("a.a1", DoubleType, true), "a.a1"),
(StructField("a.a2", StringType, true), "a.a2"),
Expand All @@ -79,7 +80,7 @@ class H2OSchemaUtilsTestSuite extends FlatSpec with Matchers with SparkTestConte
(1, 2, 3, 4, 5, 6)
).toDF("arr.0._1", "arr.0._2", "arr.1._1", "arr.1._2", "arr.2._1", "arr.2._2")

val result = H2OSchemaUtils.flattenDataFrame(input)
val result = SchemaUtils.flattenDataFrame(input)

TestFrameUtils.assertFieldNamesAreEqual(expected, result)
TestFrameUtils.assertDataFramesAreIdentical(expected, result)
Expand All @@ -97,7 +98,7 @@ class H2OSchemaUtilsTestSuite extends FlatSpec with Matchers with SparkTestConte
(1, 2, null, null, 5, 6)
).toDF("struct.arr1.0", "struct.arr1.1", "struct.arr2.0", "struct.arr2.1", "struct.arr3.0", "struct.arr3.1")

val result = H2OSchemaUtils.flattenDataFrame(input)
val result = SchemaUtils.flattenDataFrame(input)

TestFrameUtils.assertFieldNamesAreEqual(expected, result)
TestFrameUtils.assertDataFramesAreIdentical(expected, result)
Expand All @@ -115,7 +116,7 @@ class H2OSchemaUtilsTestSuite extends FlatSpec with Matchers with SparkTestConte
(1, 2, 3, 4, 5, 6, null, "extra")
).toDF("arr.0.0", "arr.0.1", "arr.1.0", "arr.1.1", "arr.2.0", "arr.2.1", "arr.2.2", "extra")

val result = H2OSchemaUtils.flattenDataFrame(input)
val result = SchemaUtils.flattenDataFrame(input)

TestFrameUtils.assertFieldNamesAreEqual(expected, result)
TestFrameUtils.assertDataFramesAreIdentical(expected, result)
Expand All @@ -133,7 +134,7 @@ class H2OSchemaUtilsTestSuite extends FlatSpec with Matchers with SparkTestConte
(1, 2, null, 4)
).toDF("struct.struct1._1", "struct.struct1._2", "struct.struct2._1", "struct.struct2._2")

val result = H2OSchemaUtils.flattenDataFrame(input)
val result = SchemaUtils.flattenDataFrame(input)

TestFrameUtils.assertFieldNamesAreEqual(expected, result)
TestFrameUtils.assertDataFramesAreIdentical(expected, result)
Expand All @@ -151,7 +152,7 @@ class H2OSchemaUtilsTestSuite extends FlatSpec with Matchers with SparkTestConte
(null, 2, 3, 4, "extra")
).toDF("arr.0.a", "arr.0.b", "arr.0.c", "arr.1.a", "extra")

val result = H2OSchemaUtils.flattenDataFrame(input)
val result = SchemaUtils.flattenDataFrame(input)

TestFrameUtils.assertFieldNamesAreEqual(expected, result)
TestFrameUtils.assertDataFramesAreIdentical(expected, result)
Expand All @@ -169,7 +170,7 @@ class H2OSchemaUtilsTestSuite extends FlatSpec with Matchers with SparkTestConte
(null, null, null, 4, 5, 6, "extra")
).toDF("map.a.0", "map.a.1", "map.b.0", "map.b.1", "map.c.0", "map.c.1", "extra")

val result = H2OSchemaUtils.flattenDataFrame(input)
val result = SchemaUtils.flattenDataFrame(input)

TestFrameUtils.assertFieldNamesAreEqual(expected, result)
TestFrameUtils.assertDataFramesAreIdentical(expected, result)
Expand Down Expand Up @@ -200,7 +201,7 @@ class H2OSchemaUtilsTestSuite extends FlatSpec with Matchers with SparkTestConte
StructField("arr.2.b", IntegerType, true) ::
Nil)

val result = H2OSchemaUtils.flattenSchema(df)
val result = SchemaUtils.flattenSchema(df)

result shouldEqual expectedSchema
}
Expand Down Expand Up @@ -232,7 +233,7 @@ class H2OSchemaUtilsTestSuite extends FlatSpec with Matchers with SparkTestConte
StructField("struct.d.1", IntegerType, true) ::
Nil)

val result = H2OSchemaUtils.flattenSchema(df)
val result = SchemaUtils.flattenSchema(df)

result shouldEqual expectedSchema
}
Expand Down Expand Up @@ -264,7 +265,7 @@ class H2OSchemaUtilsTestSuite extends FlatSpec with Matchers with SparkTestConte
StructField("map.d.b", IntegerType, true) ::
Nil)

val result = H2OSchemaUtils.flattenSchema(df)
val result = SchemaUtils.flattenSchema(df)

result shouldEqual expectedSchema
}
Expand Down Expand Up @@ -297,7 +298,7 @@ class H2OSchemaUtilsTestSuite extends FlatSpec with Matchers with SparkTestConte
StructField("struct.d.i", IntegerType, true) ::
Nil)

val result = H2OSchemaUtils.flattenSchema(df)
val result = SchemaUtils.flattenSchema(df)

result shouldEqual expectedSchema
}
Expand Down
Expand Up @@ -22,11 +22,12 @@ import java.sql.Timestamp
import java.util
import java.util.UUID

import ai.h2o.sparkling.ml.utils.SchemaUtils
import hex.splitframe.ShuffleSplitFrame
import org.apache.spark.SparkContext
import org.apache.spark.h2o.testdata._
import org.apache.spark.h2o.utils.H2OAsserts._
import org.apache.spark.h2o.utils.{H2OSchemaUtils, SharedH2OTestContext}
import org.apache.spark.h2o.utils.SharedH2OTestContext
import org.apache.spark.h2o.utils.TestFrameUtils._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -464,9 +465,9 @@ class DataFrameConverterTest extends FunSuite with SharedH2OTestContext {
))
val df = spark.createDataFrame(rdd, schema)

val flattenDF = H2OSchemaUtils.flattenDataFrame(df)
val maxElementSizes = H2OSchemaUtils.collectMaxElementSizes(flattenDF)
val expandedSchema = H2OSchemaUtils.expandedSchema(H2OSchemaUtils.flattenSchema(df), maxElementSizes)
val flattenDF = SchemaUtils.flattenDataFrame(df)
val maxElementSizes = SchemaUtils.collectMaxElementSizes(flattenDF)
val expandedSchema = SchemaUtils.expandedSchema(SchemaUtils.flattenSchema(df), maxElementSizes)
val expected: Vector[StructField] = Vector(
StructField("a.n", IntegerType, nullable = false),
StructField("a.name", StringType, nullable = true),
Expand Down Expand Up @@ -807,13 +808,13 @@ class DataFrameConverterTest extends FunSuite with SharedH2OTestContext {

def assertH2OFrameInvariants(inputDF: DataFrame, df: H2OFrame): Unit = {
assert(inputDF.count == df.numRows(), "Number of rows has to match")
assert(df.numCols() == H2OSchemaUtils.flattenSchema(inputDF).length, "Number columns should match")
assert(df.numCols() == SchemaUtils.flattenSchema(inputDF).length, "Number columns should match")
}

def getSchemaInfo(df: DataFrame): (DataFrame, Array[Int], Seq[StructField]) = {
val flattenDF = H2OSchemaUtils.flattenDataFrame(df)
val maxElementSizes = H2OSchemaUtils.collectMaxElementSizes(flattenDF)
val expandedSchema = H2OSchemaUtils.expandedSchema(H2OSchemaUtils.flattenSchema(df), maxElementSizes)
val flattenDF = SchemaUtils.flattenDataFrame(df)
val maxElementSizes = SchemaUtils.collectMaxElementSizes(flattenDF)
val expandedSchema = SchemaUtils.expandedSchema(SchemaUtils.flattenSchema(df), maxElementSizes)
(flattenDF, maxElementSizes, expandedSchema)
}
}
Expand Up @@ -18,7 +18,6 @@ package ai.h2o.sparkling.ml.algos

import ai.h2o.sparkling.ml.params.H2OCommonParams
import org.apache.spark.h2o.H2OContext
import org.apache.spark.h2o.utils.H2OSchemaUtils
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{Dataset, SparkSession}
import water.Key
Expand Down
Expand Up @@ -19,8 +19,8 @@ package ai.h2o.sparkling.ml.models

import ai.h2o.automl.targetencoding.TargetEncoderModel
import ai.h2o.sparkling.ml.features.{H2OTargetEncoderBase, H2OTargetEncoderHoldoutStrategy}
import ai.h2o.sparkling.ml.utils.SchemaUtils
import org.apache.spark.h2o.H2OContext
import org.apache.spark.h2o.utils.H2OSchemaUtils
import org.apache.spark.ml.Model
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.util.{MLWritable, MLWriter}
Expand Down Expand Up @@ -51,7 +51,7 @@ class H2OTargetEncoderModel(
val h2oContext = H2OContext.getOrCreate(SparkSession.builder().getOrCreate())
val temporaryColumn = getClass.getSimpleName + "_temporary_id"
val withIdDF = dataset.withColumn(temporaryColumn, monotonically_increasing_id)
val flatDF = H2OSchemaUtils.flattenDataFrame(withIdDF)
val flatDF = SchemaUtils.flattenDataFrame(withIdDF)
val relevantColumns = getInputCols() ++ Array(getLabelCol(), getFoldCol(), temporaryColumn).flatMap(Option(_))
val relevantColumnsDF = flatDF.select(relevantColumns.map(col(_)): _*)
val input = h2oContext.asH2OFrame(relevantColumnsDF)
Expand Down

0 comments on commit 23ccf30

Please sign in to comment.