Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SW-1368] Preparation for scoring package #1393

Merged
merged 7 commits into from Jul 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 4 additions & 2 deletions build.gradle
Expand Up @@ -42,7 +42,8 @@ ext {
project(':sparkling-water-examples'),
project(':sparkling-water-ml'),
project(':sparkling-water-package'),
project(":sparkling-water-doc")
project(":sparkling-water-doc"),
project(':sparkling-water-utils')
]
// Projects with integration tests
integTestProjects = [
Expand All @@ -65,7 +66,8 @@ ext {
project(':sparkling-water-app-streaming'),
project(':sparkling-water-package'),
project(':sparkling-water-benchmarks'),
project(':sparkling-water-macros')
project(':sparkling-water-macros'),
project(':sparkling-water-utils')
]
javaProjects = [
project(':sparkling-water-extension-stack-trace')
Expand Down
1 change: 1 addition & 0 deletions core/build.gradle
Expand Up @@ -25,6 +25,7 @@ jar {
}

dependencies {
compile project(':sparkling-water-utils')
compile("ai.h2o:h2o-ext-jython-cfunc:${h2oVersion}")

// Generic model support
Expand Down
Expand Up @@ -17,10 +17,11 @@

package ai.h2o.sparkling.bench

import ai.h2o.sparkling.ml.utils.SchemaUtils
import ai.h2o.sparkling.utils.schemas._
import org.apache.spark.SparkContext
import org.apache.spark.h2o.testdata.{DenseVectorHolder, SparseVectorHolder}
import org.apache.spark.h2o.utils.{H2OSchemaUtils, SharedH2OTestContext, TestFrameUtils}
import org.apache.spark.h2o.utils.{SharedH2OTestContext, TestFrameUtils}
import org.apache.spark.ml.linalg.{DenseVector, SparseVector}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
Expand Down Expand Up @@ -83,17 +84,17 @@ class DataFrameConverterBenchSuite extends BenchSuite with SharedH2OTestContext

def testflattenOnlyPerSchema(schemaHolder: TestFrameUtils.SchemaHolder): Unit = {
val df = TestFrameUtils.generateDataFrame(spark, schemaHolder, settings)
H2OSchemaUtils.flattenDataFrame(df).foreach(_ => {})
SchemaUtils.flattenDataFrame(df).foreach(_ => {})
}

def testflattenSchema(schemaHolder: TestFrameUtils.SchemaHolder): Unit = {
val df = TestFrameUtils.generateDataFrame(spark, schemaHolder, settings)
H2OSchemaUtils.flattenSchema(df)
SchemaUtils.flattenSchema(df)
}

def rowToSchema(schemaHolder: TestFrameUtils.SchemaHolder): Unit = {
val df = TestFrameUtils.generateDataFrame(spark, schemaHolder, settings)
H2OSchemaUtils.rowsToRowSchemas(df).foreach(_ => {})
SchemaUtils.rowsToRowSchemas(df).foreach(_ => {})
}

benchTest("Measure performance of conversion to H2OFrame on a data frame with wide sparse vectors") {
Expand Down
Expand Up @@ -17,8 +17,9 @@

package water.sparkling.itest.local

import ai.h2o.sparkling.ml.utils.SchemaUtils
import ai.h2o.sparkling.utils.schemas.ComplexSchema
import org.apache.spark.h2o.utils.{H2OSchemaUtils, SparkTestContext, TestFrameUtils}
import org.apache.spark.h2o.utils.{SparkTestContext, TestFrameUtils}
import org.apache.spark.sql.Row
import org.apache.spark.{SparkConf, SparkContext}
import org.junit.runner.RunWith
Expand Down Expand Up @@ -58,7 +59,7 @@ class H2OSchemaUtilsIntegrationTestSuite extends FunSuite with Matchers with Spa
def testFlatteningOnComplexType(settings: TestFrameUtils.GenerateDataFrameSettings, expectedNumberOfColumns: Int) = {
trackTime {
val complexDF = TestFrameUtils.generateDataFrame(spark, ComplexSchema, settings)
val flattened = H2OSchemaUtils.flattenDataFrame(complexDF)
val flattened = SchemaUtils.flattenDataFrame(complexDF)

val fieldTypeNames = flattened.schema.fields.map(_.dataType.typeName)
val numberOfFields = fieldTypeNames.length
Expand Down
Expand Up @@ -78,7 +78,8 @@ class DefaultSource extends RelationProvider
case SaveMode.Append =>
sys.error("Appending to H2O Frame is not supported.")
case SaveMode.Overwrite =>
DataSourceUtils.overwrite(key, originalFrame, data)
originalFrame.remove()
h2oContext.asH2OFrame(data, key)
case SaveMode.ErrorIfExists =>
sys.error(s"Frame with key '$key' already exists, if you want to override it set the save mode to override.")
case SaveMode.Ignore => // do nothing
Expand Down
Expand Up @@ -20,9 +20,9 @@ package org.apache.spark.h2o.converters
import org.apache.spark.h2o.H2OContext
import org.apache.spark.h2o.backends.external.{ExternalBackendUtils, ExternalWriteConverterCtx}
import org.apache.spark.h2o.converters.WriteConverterCtxUtils.UploadPlan
import org.apache.spark.h2o.utils.{H2OSchemaUtils, ReflectionUtils}
import org.apache.spark.h2o.utils.ReflectionUtils
import org.apache.spark.internal.Logging
import org.apache.spark.sql.types.{BinaryType, BooleanType, ByteType, DateType, DoubleType, FloatType, IntegerType, LongType, ShortType, StringType, TimestampType, _}
import org.apache.spark.sql.types.{BooleanType, ByteType, DateType, DoubleType, FloatType, IntegerType, LongType, ShortType, StringType, TimestampType, _}
import org.apache.spark.sql.{DataFrame, H2OFrameRelation, Row}
import org.apache.spark.{mllib, _}
import water.Key
Expand Down Expand Up @@ -50,7 +50,7 @@ private[h2o] object SparkDataFrameConverter extends Logging {

/** Transform Spark's DataFrame into H2O Frame */
def toH2OFrame(hc: H2OContext, dataFrame: DataFrame, frameKeyName: Option[String]): H2OFrame = {
import H2OSchemaUtils._
import ai.h2o.sparkling.ml.utils.SchemaUtils._

val flatDataFrame = flattenDataFrame(dataFrame)
val dfRdd = flatDataFrame.rdd
Expand Down

This file was deleted.

Expand Up @@ -19,11 +19,11 @@ package org.apache.spark.sql

import org.apache.spark.h2o.H2OContext
import org.apache.spark.h2o.converters.H2ODataFrame
import org.apache.spark.h2o.utils.H2OSchemaUtils
import org.apache.spark.h2o.utils.ReflectionUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.{BaseRelation, PrunedScan, TableScan}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
import water.fvec.Frame

/**
Expand All @@ -50,11 +50,52 @@ case class H2OFrameRelation[T <: Frame](@transient h2oFrame: T,

override val needConversion = false

override val schema: StructType = H2OSchemaUtils.createSchema(h2oFrame, copyMetadata)
override val schema: StructType = createSchema(h2oFrame, copyMetadata)

override def buildScan(): RDD[Row] =
new H2ODataFrame(h2oFrame)(h2oContext).asInstanceOf[RDD[Row]]

override def buildScan(requiredColumns: Array[String]): RDD[Row] =
new H2ODataFrame(h2oFrame, requiredColumns)(h2oContext).asInstanceOf[RDD[Row]]

private def createSchema(f: T, copyMetadata: Boolean): StructType = {
import ReflectionUtils._

val types = new Array[StructField](f.numCols())
val vecs = f.vecs()
val names = f.names()
for (i <- 0 until f.numCols()) {
val vec = vecs(i)
types(i) = if (copyMetadata) {
var metadata = (new MetadataBuilder).
putLong("count", vec.length()).
putLong("naCnt", vec.naCnt())

if (vec.isCategorical) {
metadata = metadata.putStringArray("vals", vec.domain()).
putLong("cardinality", vec.cardinality().toLong)
} else if (vec.isNumeric) {
metadata = metadata.
putDouble("min", vec.min()).
putDouble("mean", vec.mean()).
putDoubleArray("percentiles", vec.pctiles()).
putDouble("max", vec.max()).
putDouble("std", vec.sigma()).
putDouble("sparsity", vec.nzCnt() / vec.length().toDouble)
}
StructField(
names(i), // Name of column
dataTypeFor(vec), // Catalyst type of column
vec.naCnt() > 0,
metadata.build())
} else {
StructField(
names(i), // Name of column
dataTypeFor(vec), // Catalyst type of column
vec.naCnt() > 0)
}
}
StructType(types)
}

}
Expand Up @@ -16,8 +16,9 @@
*/
package org.apache.spark.h2o

import ai.h2o.sparkling.ml.utils.SchemaUtils
import org.apache.spark.SparkContext
import org.apache.spark.h2o.utils.{H2OSchemaUtils, SparkTestContext, TestFrameUtils}
import org.apache.spark.h2o.utils.{SparkTestContext, TestFrameUtils}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{ArrayType, DoubleType, IntegerType, MapType, StringType, StructField, StructType}
import org.apache.spark.sql.functions._
Expand All @@ -39,7 +40,7 @@ class H2OSchemaUtilsTestSuite extends FlatSpec with Matchers with SparkTestConte
StructField("b", IntegerType, false)
:: Nil
)
val flatSchema = H2OSchemaUtils.flattenStructsInSchema(expSchema)
val flatSchema = SchemaUtils.flattenStructsInSchema(expSchema)
val expected = Seq(
(StructField("a", IntegerType, true), "a"),
(StructField("b", IntegerType, false), "b"))
Expand All @@ -58,7 +59,7 @@ class H2OSchemaUtilsTestSuite extends FlatSpec with Matchers with SparkTestConte
), false)
:: Nil
)
val flatSchema = H2OSchemaUtils.flattenStructsInSchema(expSchema)
val flatSchema = SchemaUtils.flattenStructsInSchema(expSchema)
val expected = Seq(
(StructField("a.a1", DoubleType, true), "a.a1"),
(StructField("a.a2", StringType, true), "a.a2"),
Expand All @@ -79,7 +80,7 @@ class H2OSchemaUtilsTestSuite extends FlatSpec with Matchers with SparkTestConte
(1, 2, 3, 4, 5, 6)
).toDF("arr.0._1", "arr.0._2", "arr.1._1", "arr.1._2", "arr.2._1", "arr.2._2")

val result = H2OSchemaUtils.flattenDataFrame(input)
val result = SchemaUtils.flattenDataFrame(input)

TestFrameUtils.assertFieldNamesAreEqual(expected, result)
TestFrameUtils.assertDataFramesAreIdentical(expected, result)
Expand All @@ -97,7 +98,7 @@ class H2OSchemaUtilsTestSuite extends FlatSpec with Matchers with SparkTestConte
(1, 2, null, null, 5, 6)
).toDF("struct.arr1.0", "struct.arr1.1", "struct.arr2.0", "struct.arr2.1", "struct.arr3.0", "struct.arr3.1")

val result = H2OSchemaUtils.flattenDataFrame(input)
val result = SchemaUtils.flattenDataFrame(input)

TestFrameUtils.assertFieldNamesAreEqual(expected, result)
TestFrameUtils.assertDataFramesAreIdentical(expected, result)
Expand All @@ -115,7 +116,7 @@ class H2OSchemaUtilsTestSuite extends FlatSpec with Matchers with SparkTestConte
(1, 2, 3, 4, 5, 6, null, "extra")
).toDF("arr.0.0", "arr.0.1", "arr.1.0", "arr.1.1", "arr.2.0", "arr.2.1", "arr.2.2", "extra")

val result = H2OSchemaUtils.flattenDataFrame(input)
val result = SchemaUtils.flattenDataFrame(input)

TestFrameUtils.assertFieldNamesAreEqual(expected, result)
TestFrameUtils.assertDataFramesAreIdentical(expected, result)
Expand All @@ -133,7 +134,7 @@ class H2OSchemaUtilsTestSuite extends FlatSpec with Matchers with SparkTestConte
(1, 2, null, 4)
).toDF("struct.struct1._1", "struct.struct1._2", "struct.struct2._1", "struct.struct2._2")

val result = H2OSchemaUtils.flattenDataFrame(input)
val result = SchemaUtils.flattenDataFrame(input)

TestFrameUtils.assertFieldNamesAreEqual(expected, result)
TestFrameUtils.assertDataFramesAreIdentical(expected, result)
Expand All @@ -151,7 +152,7 @@ class H2OSchemaUtilsTestSuite extends FlatSpec with Matchers with SparkTestConte
(null, 2, 3, 4, "extra")
).toDF("arr.0.a", "arr.0.b", "arr.0.c", "arr.1.a", "extra")

val result = H2OSchemaUtils.flattenDataFrame(input)
val result = SchemaUtils.flattenDataFrame(input)

TestFrameUtils.assertFieldNamesAreEqual(expected, result)
TestFrameUtils.assertDataFramesAreIdentical(expected, result)
Expand All @@ -169,7 +170,7 @@ class H2OSchemaUtilsTestSuite extends FlatSpec with Matchers with SparkTestConte
(null, null, null, 4, 5, 6, "extra")
).toDF("map.a.0", "map.a.1", "map.b.0", "map.b.1", "map.c.0", "map.c.1", "extra")

val result = H2OSchemaUtils.flattenDataFrame(input)
val result = SchemaUtils.flattenDataFrame(input)

TestFrameUtils.assertFieldNamesAreEqual(expected, result)
TestFrameUtils.assertDataFramesAreIdentical(expected, result)
Expand Down Expand Up @@ -200,7 +201,7 @@ class H2OSchemaUtilsTestSuite extends FlatSpec with Matchers with SparkTestConte
StructField("arr.2.b", IntegerType, true) ::
Nil)

val result = H2OSchemaUtils.flattenSchema(df)
val result = SchemaUtils.flattenSchema(df)

result shouldEqual expectedSchema
}
Expand Down Expand Up @@ -232,7 +233,7 @@ class H2OSchemaUtilsTestSuite extends FlatSpec with Matchers with SparkTestConte
StructField("struct.d.1", IntegerType, true) ::
Nil)

val result = H2OSchemaUtils.flattenSchema(df)
val result = SchemaUtils.flattenSchema(df)

result shouldEqual expectedSchema
}
Expand Down Expand Up @@ -264,7 +265,7 @@ class H2OSchemaUtilsTestSuite extends FlatSpec with Matchers with SparkTestConte
StructField("map.d.b", IntegerType, true) ::
Nil)

val result = H2OSchemaUtils.flattenSchema(df)
val result = SchemaUtils.flattenSchema(df)

result shouldEqual expectedSchema
}
Expand Down Expand Up @@ -297,7 +298,7 @@ class H2OSchemaUtilsTestSuite extends FlatSpec with Matchers with SparkTestConte
StructField("struct.d.i", IntegerType, true) ::
Nil)

val result = H2OSchemaUtils.flattenSchema(df)
val result = SchemaUtils.flattenSchema(df)

result shouldEqual expectedSchema
}
Expand Down
Expand Up @@ -22,11 +22,12 @@ import java.sql.Timestamp
import java.util
import java.util.UUID

import ai.h2o.sparkling.ml.utils.SchemaUtils
import hex.splitframe.ShuffleSplitFrame
import org.apache.spark.SparkContext
import org.apache.spark.h2o.testdata._
import org.apache.spark.h2o.utils.H2OAsserts._
import org.apache.spark.h2o.utils.{H2OSchemaUtils, SharedH2OTestContext}
import org.apache.spark.h2o.utils.SharedH2OTestContext
import org.apache.spark.h2o.utils.TestFrameUtils._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -464,9 +465,9 @@ class DataFrameConverterTest extends FunSuite with SharedH2OTestContext {
))
val df = spark.createDataFrame(rdd, schema)

val flattenDF = H2OSchemaUtils.flattenDataFrame(df)
val maxElementSizes = H2OSchemaUtils.collectMaxElementSizes(flattenDF)
val expandedSchema = H2OSchemaUtils.expandedSchema(H2OSchemaUtils.flattenSchema(df), maxElementSizes)
val flattenDF = SchemaUtils.flattenDataFrame(df)
val maxElementSizes = SchemaUtils.collectMaxElementSizes(flattenDF)
val expandedSchema = SchemaUtils.expandedSchema(SchemaUtils.flattenSchema(df), maxElementSizes)
val expected: Vector[StructField] = Vector(
StructField("a.n", IntegerType, nullable = false),
StructField("a.name", StringType, nullable = true),
Expand Down Expand Up @@ -807,13 +808,13 @@ class DataFrameConverterTest extends FunSuite with SharedH2OTestContext {

def assertH2OFrameInvariants(inputDF: DataFrame, df: H2OFrame): Unit = {
assert(inputDF.count == df.numRows(), "Number of rows has to match")
assert(df.numCols() == H2OSchemaUtils.flattenSchema(inputDF).length, "Number columns should match")
assert(df.numCols() == SchemaUtils.flattenSchema(inputDF).length, "Number columns should match")
}

def getSchemaInfo(df: DataFrame): (DataFrame, Array[Int], Seq[StructField]) = {
val flattenDF = H2OSchemaUtils.flattenDataFrame(df)
val maxElementSizes = H2OSchemaUtils.collectMaxElementSizes(flattenDF)
val expandedSchema = H2OSchemaUtils.expandedSchema(H2OSchemaUtils.flattenSchema(df), maxElementSizes)
val flattenDF = SchemaUtils.flattenDataFrame(df)
val maxElementSizes = SchemaUtils.collectMaxElementSizes(flattenDF)
val expandedSchema = SchemaUtils.expandedSchema(SchemaUtils.flattenSchema(df), maxElementSizes)
(flattenDF, maxElementSizes, expandedSchema)
}
}
Expand Up @@ -18,7 +18,6 @@ package ai.h2o.sparkling.ml.algos

import ai.h2o.sparkling.ml.params.H2OCommonParams
import org.apache.spark.h2o.H2OContext
import org.apache.spark.h2o.utils.H2OSchemaUtils
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{Dataset, SparkSession}
import water.Key
Expand Down
Expand Up @@ -19,8 +19,8 @@ package ai.h2o.sparkling.ml.models

import ai.h2o.automl.targetencoding.TargetEncoderModel
import ai.h2o.sparkling.ml.features.{H2OTargetEncoderBase, H2OTargetEncoderHoldoutStrategy}
import ai.h2o.sparkling.ml.utils.SchemaUtils
import org.apache.spark.h2o.H2OContext
import org.apache.spark.h2o.utils.H2OSchemaUtils
import org.apache.spark.ml.Model
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.util.{MLWritable, MLWriter}
Expand Down Expand Up @@ -51,7 +51,7 @@ class H2OTargetEncoderModel(
val h2oContext = H2OContext.getOrCreate(SparkSession.builder().getOrCreate())
val temporaryColumn = getClass.getSimpleName + "_temporary_id"
val withIdDF = dataset.withColumn(temporaryColumn, monotonically_increasing_id)
val flatDF = H2OSchemaUtils.flattenDataFrame(withIdDF)
val flatDF = SchemaUtils.flattenDataFrame(withIdDF)
val relevantColumns = getInputCols() ++ Array(getLabelCol(), getFoldCol(), temporaryColumn).flatMap(Option(_))
val relevantColumnsDF = flatDF.select(relevantColumns.map(col(_)): _*)
val input = h2oContext.asH2OFrame(relevantColumnsDF)
Expand Down