diff --git a/build.gradle b/build.gradle index b18c291bbf..0dc82727b7 100644 --- a/build.gradle +++ b/build.gradle @@ -42,7 +42,8 @@ ext { project(':sparkling-water-examples'), project(':sparkling-water-ml'), project(':sparkling-water-package'), - project(":sparkling-water-doc") + project(":sparkling-water-doc"), + project(':sparkling-water-utils') ] // Projects with integration tests integTestProjects = [ @@ -65,7 +66,8 @@ ext { project(':sparkling-water-app-streaming'), project(':sparkling-water-package'), project(':sparkling-water-benchmarks'), - project(':sparkling-water-macros') + project(':sparkling-water-macros'), + project(':sparkling-water-utils') ] javaProjects = [ project(':sparkling-water-extension-stack-trace') diff --git a/core/build.gradle b/core/build.gradle index 4736fb56e8..3eb8c15d7c 100644 --- a/core/build.gradle +++ b/core/build.gradle @@ -25,6 +25,7 @@ jar { } dependencies { + compile project(':sparkling-water-utils') compile("ai.h2o:h2o-ext-jython-cfunc:${h2oVersion}") // Generic model support diff --git a/core/src/bench/scala/ai.h2o.sparkling.bench/DataFrameConverterBenchSuite.scala b/core/src/bench/scala/ai.h2o.sparkling.bench/DataFrameConverterBenchSuite.scala index 6df7d5646f..2a419a30fd 100644 --- a/core/src/bench/scala/ai.h2o.sparkling.bench/DataFrameConverterBenchSuite.scala +++ b/core/src/bench/scala/ai.h2o.sparkling.bench/DataFrameConverterBenchSuite.scala @@ -17,10 +17,11 @@ package ai.h2o.sparkling.bench +import ai.h2o.sparkling.ml.utils.SchemaUtils import ai.h2o.sparkling.utils.schemas._ import org.apache.spark.SparkContext import org.apache.spark.h2o.testdata.{DenseVectorHolder, SparseVectorHolder} -import org.apache.spark.h2o.utils.{H2OSchemaUtils, SharedH2OTestContext, TestFrameUtils} +import org.apache.spark.h2o.utils.{SharedH2OTestContext, TestFrameUtils} import org.apache.spark.ml.linalg.{DenseVector, SparseVector} import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner @@ -83,17 +84,17 @@ class DataFrameConverterBenchSuite extends BenchSuite with SharedH2OTestContext def testflattenOnlyPerSchema(schemaHolder: TestFrameUtils.SchemaHolder): Unit = { val df = TestFrameUtils.generateDataFrame(spark, schemaHolder, settings) - H2OSchemaUtils.flattenDataFrame(df).foreach(_ => {}) + SchemaUtils.flattenDataFrame(df).foreach(_ => {}) } def testflattenSchema(schemaHolder: TestFrameUtils.SchemaHolder): Unit = { val df = TestFrameUtils.generateDataFrame(spark, schemaHolder, settings) - H2OSchemaUtils.flattenSchema(df) + SchemaUtils.flattenSchema(df) } def rowToSchema(schemaHolder: TestFrameUtils.SchemaHolder): Unit = { val df = TestFrameUtils.generateDataFrame(spark, schemaHolder, settings) - H2OSchemaUtils.rowsToRowSchemas(df).foreach(_ => {}) + SchemaUtils.rowsToRowSchemas(df).foreach(_ => {}) } benchTest("Measure performance of conversion to H2OFrame on a data frame with wide sparse vectors") { diff --git a/core/src/integTest/scala/water/sparkling/itest/local/H2OSchemaUtilsIntegrationTestSuite.scala b/core/src/integTest/scala/water/sparkling/itest/local/H2OSchemaUtilsIntegrationTestSuite.scala index 397bab59a0..03321b7de5 100644 --- a/core/src/integTest/scala/water/sparkling/itest/local/H2OSchemaUtilsIntegrationTestSuite.scala +++ b/core/src/integTest/scala/water/sparkling/itest/local/H2OSchemaUtilsIntegrationTestSuite.scala @@ -17,8 +17,9 @@ package water.sparkling.itest.local +import ai.h2o.sparkling.ml.utils.SchemaUtils import ai.h2o.sparkling.utils.schemas.ComplexSchema -import org.apache.spark.h2o.utils.{H2OSchemaUtils, SparkTestContext, TestFrameUtils} +import org.apache.spark.h2o.utils.{SparkTestContext, TestFrameUtils} import org.apache.spark.sql.Row import org.apache.spark.{SparkConf, SparkContext} import org.junit.runner.RunWith @@ -58,7 +59,7 @@ class H2OSchemaUtilsIntegrationTestSuite extends FunSuite with Matchers with Spa def testFlatteningOnComplexType(settings: TestFrameUtils.GenerateDataFrameSettings, expectedNumberOfColumns: Int) = { trackTime { val complexDF = TestFrameUtils.generateDataFrame(spark, ComplexSchema, settings) - val flattened = H2OSchemaUtils.flattenDataFrame(complexDF) + val flattened = SchemaUtils.flattenDataFrame(complexDF) val fieldTypeNames = flattened.schema.fields.map(_.dataType.typeName) val numberOfFields = fieldTypeNames.length diff --git a/core/src/main/scala/org/apache/spark/h2o/DefaultSource.scala b/core/src/main/scala/org/apache/spark/h2o/DefaultSource.scala index fee5c65e19..84b4ddda90 100644 --- a/core/src/main/scala/org/apache/spark/h2o/DefaultSource.scala +++ b/core/src/main/scala/org/apache/spark/h2o/DefaultSource.scala @@ -78,7 +78,8 @@ class DefaultSource extends RelationProvider case SaveMode.Append => sys.error("Appending to H2O Frame is not supported.") case SaveMode.Overwrite => - DataSourceUtils.overwrite(key, originalFrame, data) + originalFrame.remove() + h2oContext.asH2OFrame(data, key) case SaveMode.ErrorIfExists => sys.error(s"Frame with key '$key' already exists, if you want to override it set the save mode to override.") case SaveMode.Ignore => // do nothing diff --git a/core/src/main/scala/org/apache/spark/h2o/converters/SparkDataFrameConverter.scala b/core/src/main/scala/org/apache/spark/h2o/converters/SparkDataFrameConverter.scala index f49a5fd334..d6b7a60260 100644 --- a/core/src/main/scala/org/apache/spark/h2o/converters/SparkDataFrameConverter.scala +++ b/core/src/main/scala/org/apache/spark/h2o/converters/SparkDataFrameConverter.scala @@ -20,9 +20,9 @@ package org.apache.spark.h2o.converters import org.apache.spark.h2o.H2OContext import org.apache.spark.h2o.backends.external.{ExternalBackendUtils, ExternalWriteConverterCtx} import org.apache.spark.h2o.converters.WriteConverterCtxUtils.UploadPlan -import org.apache.spark.h2o.utils.{H2OSchemaUtils, ReflectionUtils} +import org.apache.spark.h2o.utils.ReflectionUtils import org.apache.spark.internal.Logging -import org.apache.spark.sql.types.{BinaryType, BooleanType, ByteType, DateType, DoubleType, FloatType, IntegerType, LongType, ShortType, StringType, TimestampType, _} +import org.apache.spark.sql.types.{BooleanType, ByteType, DateType, DoubleType, FloatType, IntegerType, LongType, ShortType, StringType, TimestampType, _} import org.apache.spark.sql.{DataFrame, H2OFrameRelation, Row} import org.apache.spark.{mllib, _} import water.Key @@ -50,7 +50,7 @@ private[h2o] object SparkDataFrameConverter extends Logging { /** Transform Spark's DataFrame into H2O Frame */ def toH2OFrame(hc: H2OContext, dataFrame: DataFrame, frameKeyName: Option[String]): H2OFrame = { - import H2OSchemaUtils._ + import ai.h2o.sparkling.ml.utils.SchemaUtils._ val flatDataFrame = flattenDataFrame(dataFrame) val dfRdd = flatDataFrame.rdd diff --git a/core/src/main/scala/org/apache/spark/h2o/utils/DatasetShape.scala b/core/src/main/scala/org/apache/spark/h2o/utils/DatasetShape.scala deleted file mode 100644 index 54984f6bfd..0000000000 --- a/core/src/main/scala/org/apache/spark/h2o/utils/DatasetShape.scala +++ /dev/null @@ -1,23 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one or more -* contributor license agreements. See the NOTICE file distributed with -* this work for additional information regarding copyright ownership. -* The ASF licenses this file to You under the Apache License, Version 2.0 -* (the "License"); you may not use this file except in compliance with -* the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.spark.h2o.utils - -object DatasetShape extends Enumeration { - type DatasetShape = Value - val Flat, StructsOnly, Nested = Value -} diff --git a/core/src/main/scala/org/apache/spark/sql/H2OSQLContextUtils.scala b/core/src/main/scala/org/apache/spark/sql/H2OSQLContextUtils.scala index 8913c3f396..c16940bffb 100644 --- a/core/src/main/scala/org/apache/spark/sql/H2OSQLContextUtils.scala +++ b/core/src/main/scala/org/apache/spark/sql/H2OSQLContextUtils.scala @@ -19,11 +19,11 @@ package org.apache.spark.sql import org.apache.spark.h2o.H2OContext import org.apache.spark.h2o.converters.H2ODataFrame -import org.apache.spark.h2o.utils.H2OSchemaUtils +import org.apache.spark.h2o.utils.ReflectionUtils import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.sources.{BaseRelation, PrunedScan, TableScan} -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType} import water.fvec.Frame /** @@ -50,11 +50,52 @@ case class H2OFrameRelation[T <: Frame](@transient h2oFrame: T, override val needConversion = false - override val schema: StructType = H2OSchemaUtils.createSchema(h2oFrame, copyMetadata) + override val schema: StructType = createSchema(h2oFrame, copyMetadata) override def buildScan(): RDD[Row] = new H2ODataFrame(h2oFrame)(h2oContext).asInstanceOf[RDD[Row]] override def buildScan(requiredColumns: Array[String]): RDD[Row] = new H2ODataFrame(h2oFrame, requiredColumns)(h2oContext).asInstanceOf[RDD[Row]] + + private def createSchema(f: T, copyMetadata: Boolean): StructType = { + import ReflectionUtils._ + + val types = new Array[StructField](f.numCols()) + val vecs = f.vecs() + val names = f.names() + for (i <- 0 until f.numCols()) { + val vec = vecs(i) + types(i) = if (copyMetadata) { + var metadata = (new MetadataBuilder). + putLong("count", vec.length()). + putLong("naCnt", vec.naCnt()) + + if (vec.isCategorical) { + metadata = metadata.putStringArray("vals", vec.domain()). + putLong("cardinality", vec.cardinality().toLong) + } else if (vec.isNumeric) { + metadata = metadata. + putDouble("min", vec.min()). + putDouble("mean", vec.mean()). + putDoubleArray("percentiles", vec.pctiles()). + putDouble("max", vec.max()). + putDouble("std", vec.sigma()). + putDouble("sparsity", vec.nzCnt() / vec.length().toDouble) + } + StructField( + names(i), // Name of column + dataTypeFor(vec), // Catalyst type of column + vec.naCnt() > 0, + metadata.build()) + } else { + StructField( + names(i), // Name of column + dataTypeFor(vec), // Catalyst type of column + vec.naCnt() > 0) + } + } + StructType(types) + } + } diff --git a/core/src/test/scala/org/apache/spark/h2o/H2OSchemaUtilsTestSuite.scala b/core/src/test/scala/org/apache/spark/h2o/H2OSchemaUtilsTestSuite.scala index e826a3e570..2613771b8d 100644 --- a/core/src/test/scala/org/apache/spark/h2o/H2OSchemaUtilsTestSuite.scala +++ b/core/src/test/scala/org/apache/spark/h2o/H2OSchemaUtilsTestSuite.scala @@ -16,8 +16,9 @@ */ package org.apache.spark.h2o +import ai.h2o.sparkling.ml.utils.SchemaUtils import org.apache.spark.SparkContext -import org.apache.spark.h2o.utils.{H2OSchemaUtils, SparkTestContext, TestFrameUtils} +import org.apache.spark.h2o.utils.{SparkTestContext, TestFrameUtils} import org.apache.spark.sql.Row import org.apache.spark.sql.types.{ArrayType, DoubleType, IntegerType, MapType, StringType, StructField, StructType} import org.apache.spark.sql.functions._ @@ -39,7 +40,7 @@ class H2OSchemaUtilsTestSuite extends FlatSpec with Matchers with SparkTestConte StructField("b", IntegerType, false) :: Nil ) - val flatSchema = H2OSchemaUtils.flattenStructsInSchema(expSchema) + val flatSchema = SchemaUtils.flattenStructsInSchema(expSchema) val expected = Seq( (StructField("a", IntegerType, true), "a"), (StructField("b", IntegerType, false), "b")) @@ -58,7 +59,7 @@ class H2OSchemaUtilsTestSuite extends FlatSpec with Matchers with SparkTestConte ), false) :: Nil ) - val flatSchema = H2OSchemaUtils.flattenStructsInSchema(expSchema) + val flatSchema = SchemaUtils.flattenStructsInSchema(expSchema) val expected = Seq( (StructField("a.a1", DoubleType, true), "a.a1"), (StructField("a.a2", StringType, true), "a.a2"), @@ -79,7 +80,7 @@ class H2OSchemaUtilsTestSuite extends FlatSpec with Matchers with SparkTestConte (1, 2, 3, 4, 5, 6) ).toDF("arr.0._1", "arr.0._2", "arr.1._1", "arr.1._2", "arr.2._1", "arr.2._2") - val result = H2OSchemaUtils.flattenDataFrame(input) + val result = SchemaUtils.flattenDataFrame(input) TestFrameUtils.assertFieldNamesAreEqual(expected, result) TestFrameUtils.assertDataFramesAreIdentical(expected, result) @@ -97,7 +98,7 @@ class H2OSchemaUtilsTestSuite extends FlatSpec with Matchers with SparkTestConte (1, 2, null, null, 5, 6) ).toDF("struct.arr1.0", "struct.arr1.1", "struct.arr2.0", "struct.arr2.1", "struct.arr3.0", "struct.arr3.1") - val result = H2OSchemaUtils.flattenDataFrame(input) + val result = SchemaUtils.flattenDataFrame(input) TestFrameUtils.assertFieldNamesAreEqual(expected, result) TestFrameUtils.assertDataFramesAreIdentical(expected, result) @@ -115,7 +116,7 @@ class H2OSchemaUtilsTestSuite extends FlatSpec with Matchers with SparkTestConte (1, 2, 3, 4, 5, 6, null, "extra") ).toDF("arr.0.0", "arr.0.1", "arr.1.0", "arr.1.1", "arr.2.0", "arr.2.1", "arr.2.2", "extra") - val result = H2OSchemaUtils.flattenDataFrame(input) + val result = SchemaUtils.flattenDataFrame(input) TestFrameUtils.assertFieldNamesAreEqual(expected, result) TestFrameUtils.assertDataFramesAreIdentical(expected, result) @@ -133,7 +134,7 @@ class H2OSchemaUtilsTestSuite extends FlatSpec with Matchers with SparkTestConte (1, 2, null, 4) ).toDF("struct.struct1._1", "struct.struct1._2", "struct.struct2._1", "struct.struct2._2") - val result = H2OSchemaUtils.flattenDataFrame(input) + val result = SchemaUtils.flattenDataFrame(input) TestFrameUtils.assertFieldNamesAreEqual(expected, result) TestFrameUtils.assertDataFramesAreIdentical(expected, result) @@ -151,7 +152,7 @@ class H2OSchemaUtilsTestSuite extends FlatSpec with Matchers with SparkTestConte (null, 2, 3, 4, "extra") ).toDF("arr.0.a", "arr.0.b", "arr.0.c", "arr.1.a", "extra") - val result = H2OSchemaUtils.flattenDataFrame(input) + val result = SchemaUtils.flattenDataFrame(input) TestFrameUtils.assertFieldNamesAreEqual(expected, result) TestFrameUtils.assertDataFramesAreIdentical(expected, result) @@ -169,7 +170,7 @@ class H2OSchemaUtilsTestSuite extends FlatSpec with Matchers with SparkTestConte (null, null, null, 4, 5, 6, "extra") ).toDF("map.a.0", "map.a.1", "map.b.0", "map.b.1", "map.c.0", "map.c.1", "extra") - val result = H2OSchemaUtils.flattenDataFrame(input) + val result = SchemaUtils.flattenDataFrame(input) TestFrameUtils.assertFieldNamesAreEqual(expected, result) TestFrameUtils.assertDataFramesAreIdentical(expected, result) @@ -200,7 +201,7 @@ class H2OSchemaUtilsTestSuite extends FlatSpec with Matchers with SparkTestConte StructField("arr.2.b", IntegerType, true) :: Nil) - val result = H2OSchemaUtils.flattenSchema(df) + val result = SchemaUtils.flattenSchema(df) result shouldEqual expectedSchema } @@ -232,7 +233,7 @@ class H2OSchemaUtilsTestSuite extends FlatSpec with Matchers with SparkTestConte StructField("struct.d.1", IntegerType, true) :: Nil) - val result = H2OSchemaUtils.flattenSchema(df) + val result = SchemaUtils.flattenSchema(df) result shouldEqual expectedSchema } @@ -264,7 +265,7 @@ class H2OSchemaUtilsTestSuite extends FlatSpec with Matchers with SparkTestConte StructField("map.d.b", IntegerType, true) :: Nil) - val result = H2OSchemaUtils.flattenSchema(df) + val result = SchemaUtils.flattenSchema(df) result shouldEqual expectedSchema } @@ -297,7 +298,7 @@ class H2OSchemaUtilsTestSuite extends FlatSpec with Matchers with SparkTestConte StructField("struct.d.i", IntegerType, true) :: Nil) - val result = H2OSchemaUtils.flattenSchema(df) + val result = SchemaUtils.flattenSchema(df) result shouldEqual expectedSchema } diff --git a/core/src/test/scala/org/apache/spark/h2o/converters/DataFrameConverterTest.scala b/core/src/test/scala/org/apache/spark/h2o/converters/DataFrameConverterTest.scala index 9dd58f00cc..ec4881d516 100644 --- a/core/src/test/scala/org/apache/spark/h2o/converters/DataFrameConverterTest.scala +++ b/core/src/test/scala/org/apache/spark/h2o/converters/DataFrameConverterTest.scala @@ -22,11 +22,12 @@ import java.sql.Timestamp import java.util import java.util.UUID +import ai.h2o.sparkling.ml.utils.SchemaUtils import hex.splitframe.ShuffleSplitFrame import org.apache.spark.SparkContext import org.apache.spark.h2o.testdata._ import org.apache.spark.h2o.utils.H2OAsserts._ -import org.apache.spark.h2o.utils.{H2OSchemaUtils, SharedH2OTestContext} +import org.apache.spark.h2o.utils.SharedH2OTestContext import org.apache.spark.h2o.utils.TestFrameUtils._ import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.sql.types._ @@ -464,9 +465,9 @@ class DataFrameConverterTest extends FunSuite with SharedH2OTestContext { )) val df = spark.createDataFrame(rdd, schema) - val flattenDF = H2OSchemaUtils.flattenDataFrame(df) - val maxElementSizes = H2OSchemaUtils.collectMaxElementSizes(flattenDF) - val expandedSchema = H2OSchemaUtils.expandedSchema(H2OSchemaUtils.flattenSchema(df), maxElementSizes) + val flattenDF = SchemaUtils.flattenDataFrame(df) + val maxElementSizes = SchemaUtils.collectMaxElementSizes(flattenDF) + val expandedSchema = SchemaUtils.expandedSchema(SchemaUtils.flattenSchema(df), maxElementSizes) val expected: Vector[StructField] = Vector( StructField("a.n", IntegerType, nullable = false), StructField("a.name", StringType, nullable = true), @@ -807,13 +808,13 @@ class DataFrameConverterTest extends FunSuite with SharedH2OTestContext { def assertH2OFrameInvariants(inputDF: DataFrame, df: H2OFrame): Unit = { assert(inputDF.count == df.numRows(), "Number of rows has to match") - assert(df.numCols() == H2OSchemaUtils.flattenSchema(inputDF).length, "Number columns should match") + assert(df.numCols() == SchemaUtils.flattenSchema(inputDF).length, "Number columns should match") } def getSchemaInfo(df: DataFrame): (DataFrame, Array[Int], Seq[StructField]) = { - val flattenDF = H2OSchemaUtils.flattenDataFrame(df) - val maxElementSizes = H2OSchemaUtils.collectMaxElementSizes(flattenDF) - val expandedSchema = H2OSchemaUtils.expandedSchema(H2OSchemaUtils.flattenSchema(df), maxElementSizes) + val flattenDF = SchemaUtils.flattenDataFrame(df) + val maxElementSizes = SchemaUtils.collectMaxElementSizes(flattenDF) + val expandedSchema = SchemaUtils.expandedSchema(SchemaUtils.flattenSchema(df), maxElementSizes) (flattenDF, maxElementSizes, expandedSchema) } } diff --git a/ml/src/main/scala/ai/h2o/sparkling/ml/algos/H2OAlgoCommonUtils.scala b/ml/src/main/scala/ai/h2o/sparkling/ml/algos/H2OAlgoCommonUtils.scala index 021e53244d..1d362e419d 100644 --- a/ml/src/main/scala/ai/h2o/sparkling/ml/algos/H2OAlgoCommonUtils.scala +++ b/ml/src/main/scala/ai/h2o/sparkling/ml/algos/H2OAlgoCommonUtils.scala @@ -18,7 +18,6 @@ package ai.h2o.sparkling.ml.algos import ai.h2o.sparkling.ml.params.H2OCommonParams import org.apache.spark.h2o.H2OContext -import org.apache.spark.h2o.utils.H2OSchemaUtils import org.apache.spark.sql.functions.col import org.apache.spark.sql.{Dataset, SparkSession} import water.Key diff --git a/ml/src/main/scala/ai/h2o/sparkling/ml/models/H2OTargetEncoderModel.scala b/ml/src/main/scala/ai/h2o/sparkling/ml/models/H2OTargetEncoderModel.scala index 71db8677cc..12eb683a9e 100644 --- a/ml/src/main/scala/ai/h2o/sparkling/ml/models/H2OTargetEncoderModel.scala +++ b/ml/src/main/scala/ai/h2o/sparkling/ml/models/H2OTargetEncoderModel.scala @@ -19,8 +19,8 @@ package ai.h2o.sparkling.ml.models import ai.h2o.automl.targetencoding.TargetEncoderModel import ai.h2o.sparkling.ml.features.{H2OTargetEncoderBase, H2OTargetEncoderHoldoutStrategy} +import ai.h2o.sparkling.ml.utils.SchemaUtils import org.apache.spark.h2o.H2OContext -import org.apache.spark.h2o.utils.H2OSchemaUtils import org.apache.spark.ml.Model import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.util.{MLWritable, MLWriter} @@ -51,7 +51,7 @@ class H2OTargetEncoderModel( val h2oContext = H2OContext.getOrCreate(SparkSession.builder().getOrCreate()) val temporaryColumn = getClass.getSimpleName + "_temporary_id" val withIdDF = dataset.withColumn(temporaryColumn, monotonically_increasing_id) - val flatDF = H2OSchemaUtils.flattenDataFrame(withIdDF) + val flatDF = SchemaUtils.flattenDataFrame(withIdDF) val relevantColumns = getInputCols() ++ Array(getLabelCol(), getFoldCol(), temporaryColumn).flatMap(Option(_)) val relevantColumnsDF = flatDF.select(relevantColumns.map(col(_)): _*) val input = h2oContext.asH2OFrame(relevantColumnsDF) diff --git a/ml/src/main/scala/org/apache/spark/ml/h2o/models/H2OMOJOFlattenedInput.scala b/ml/src/main/scala/org/apache/spark/ml/h2o/models/H2OMOJOFlattenedInput.scala index f47fa170a4..94a731850d 100644 --- a/ml/src/main/scala/org/apache/spark/ml/h2o/models/H2OMOJOFlattenedInput.scala +++ b/ml/src/main/scala/org/apache/spark/ml/h2o/models/H2OMOJOFlattenedInput.scala @@ -18,7 +18,8 @@ package org.apache.spark.ml.h2o.models import org.apache.spark.h2o.converters.RowConverter -import org.apache.spark.h2o.utils.{DatasetShape, H2OSchemaUtils} +import ai.h2o.sparkling.ml.utils.DatasetShape +import ai.h2o.sparkling.ml.utils.SchemaUtils import org.apache.spark.sql.{DataFrame, Dataset} import org.apache.spark.sql.expressions.UserDefinedFunction import org.apache.spark.sql.functions.struct @@ -32,10 +33,10 @@ trait H2OMOJOFlattenedInput { dataset: Dataset[_], udfConstructor: Array[String] => UserDefinedFunction): DataFrame = { val originalDF = dataset.toDF() - H2OSchemaUtils.getDatasetShape(dataset.schema) match { + DatasetShape.getDatasetShape(dataset.schema) match { case DatasetShape.Flat => applyPredictionUdfToFlatDataFrame(originalDF, udfConstructor, inputColumnNames) case DatasetShape.StructsOnly | DatasetShape.Nested => - val flattenedDF = H2OSchemaUtils.appendFlattenedStructsToDataFrame(originalDF, RowConverter.temporaryColumnPrefix) + val flattenedDF = SchemaUtils.appendFlattenedStructsToDataFrame(originalDF, RowConverter.temporaryColumnPrefix) val inputs = inputColumnNames ++ inputColumnNames.map(s => RowConverter.temporaryColumnPrefix + "." + s) val flatWithPredictionsDF = applyPredictionUdfToFlatDataFrame(flattenedDF, udfConstructor, inputs) flatWithPredictionsDF.schema.foldLeft(flatWithPredictionsDF) { (df, field) => diff --git a/settings.gradle b/settings.gradle index 4e34f43a3f..9e1b63eb4b 100644 --- a/settings.gradle +++ b/settings.gradle @@ -18,6 +18,7 @@ include 'benchmarks' include 'dist' include 'macros' include 'jenkins' +include 'utils' // Prefix all projects with sparkling-water name rootProject.children.each { project -> diff --git a/utils/build.gradle b/utils/build.gradle new file mode 100644 index 0000000000..4f67f4281d --- /dev/null +++ b/utils/build.gradle @@ -0,0 +1,13 @@ +description = "Utilities shared accross Sparkling Water sub-modules. These utilities does not depend on H2O." + +apply from: "$rootDir/gradle/sparkTest.gradle" +apply from: "$rootDir/gradle/utils.gradle" + +dependencies { + compileOnly "org.apache.spark:spark-core_${scalaBaseVersion}:${sparkVersion}" + compileOnly "org.apache.spark:spark-mllib_${scalaBaseVersion}:${sparkVersion}" + + compile "org.scala-lang:scala-compiler:${scalaVersion}" +} + +defineStandardPublication().call() diff --git a/utils/src/main/scala/ai/h2o/sparkling/ml/utils/DatasetShape.scala b/utils/src/main/scala/ai/h2o/sparkling/ml/utils/DatasetShape.scala new file mode 100644 index 0000000000..b9079d0998 --- /dev/null +++ b/utils/src/main/scala/ai/h2o/sparkling/ml/utils/DatasetShape.scala @@ -0,0 +1,46 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package ai.h2o.sparkling.ml.utils + +import org.apache.spark.sql.types._ + +object DatasetShape extends Enumeration { + type DatasetShape = Value + val Flat, StructsOnly, Nested = Value + + def getDatasetShape(schema: StructType): DatasetShape = { + def mergeShape(first: DatasetShape, second: DatasetShape): DatasetShape = (first, second) match { + case (DatasetShape.Nested, _) => DatasetShape.Nested + case (_, DatasetShape.Nested) => DatasetShape.Nested + case (DatasetShape.StructsOnly, _) => DatasetShape.StructsOnly + case (_, DatasetShape.StructsOnly) => DatasetShape.StructsOnly + case _ => DatasetShape.Flat + } + + def fieldToShape(field: StructField): DatasetShape = field.dataType match { + case _: ArrayType | _: MapType | _: BinaryType => DatasetShape.Nested + case s: StructType => mergeShape(DatasetShape.StructsOnly, getDatasetShape(s)) + case _ => DatasetShape.Flat + } + + schema.fields.foldLeft(DatasetShape.Flat) { (acc, field) => + val fieldShape = fieldToShape(field) + mergeShape(acc, fieldShape) + } + } +} diff --git a/core/src/main/scala/org/apache/spark/h2o/utils/H2OSchemaUtils.scala b/utils/src/main/scala/ai/h2o/sparkling/ml/utils/SchemaUtils.scala similarity index 76% rename from core/src/main/scala/org/apache/spark/h2o/utils/H2OSchemaUtils.scala rename to utils/src/main/scala/ai/h2o/sparkling/ml/utils/SchemaUtils.scala index e1d5245633..fdb28bfb07 100644 --- a/core/src/main/scala/org/apache/spark/h2o/utils/H2OSchemaUtils.scala +++ b/utils/src/main/scala/ai/h2o/sparkling/ml/utils/SchemaUtils.scala @@ -15,87 +15,24 @@ * limitations under the License. */ -package org.apache.spark.h2o.utils +package ai.h2o.sparkling.ml.utils -import org.apache.spark.h2o._ -import org.apache.spark.h2o.utils.DatasetShape.DatasetShape import org.apache.spark.ml.attribute.AttributeGroup import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema -import org.apache.spark.sql.types._ import org.apache.spark.sql.functions.col -import org.apache.spark.sql.{DataFrame, Row} -import org.apache.spark.{ml, mllib} +import org.apache.spark.sql.types._ +import org.apache.spark.sql.{DataFrame, Dataset, Row} +import org.apache.spark.{ExposeUtils, ml, mllib} import scala.collection.mutable.ArrayBuffer /** * Utilities for working with Spark SQL component. */ -object H2OSchemaUtils { - - import ReflectionUtils._ - - def createSchema[T <: Frame](f: T, copyMetadata: Boolean): StructType = { - val types = new Array[StructField](f.numCols()) - val vecs = f.vecs() - val names = f.names() - for (i <- 0 until f.numCols()) { - val vec = vecs(i) - types(i) = if (copyMetadata) { - var metadata = (new MetadataBuilder). - putLong("count", vec.length()). - putLong("naCnt", vec.naCnt()) - - if (vec.isCategorical) { - metadata = metadata.putStringArray("vals", vec.domain()). - putLong("cardinality", vec.cardinality().toLong) - } else if (vec.isNumeric) { - metadata = metadata. - putDouble("min", vec.min()). - putDouble("mean", vec.mean()). - putDoubleArray("percentiles", vec.pctiles()). - putDouble("max", vec.max()). - putDouble("std", vec.sigma()). - putDouble("sparsity", vec.nzCnt() / vec.length().toDouble) - } - StructField( - names(i), // Name of column - dataTypeFor(vec), // Catalyst type of column - vec.naCnt() > 0, - metadata.build()) - } else { - StructField( - names(i), // Name of column - dataTypeFor(vec), // Catalyst type of column - vec.naCnt() > 0) - } - } - StructType(types) - } +object SchemaUtils { - def getDatasetShape(schema: StructType): DatasetShape = { - def mergeShape(first: DatasetShape, second: DatasetShape): DatasetShape = (first, second) match { - case (DatasetShape.Nested, _) => DatasetShape.Nested - case (_, DatasetShape.Nested) => DatasetShape.Nested - case (DatasetShape.StructsOnly, _) => DatasetShape.StructsOnly - case (_, DatasetShape.StructsOnly) => DatasetShape.StructsOnly - case _ => DatasetShape.Flat - } - - def fieldToShape(field: StructField): DatasetShape = field.dataType match { - case _: ArrayType | _: MapType | _: BinaryType => DatasetShape.Nested - case s: StructType => mergeShape(DatasetShape.StructsOnly, getDatasetShape(s)) - case _ => DatasetShape.Flat - } - - schema.fields.foldLeft(DatasetShape.Flat) { (acc, field) => - val fieldShape = fieldToShape(field) - mergeShape(acc, fieldShape) - } - } - - def flattenDataFrame(df: DataFrame): DataFrame = getDatasetShape(df.schema) match { + def flattenDataFrame(df: DataFrame): DataFrame = DatasetShape.getDatasetShape(df.schema) match { case DatasetShape.Flat => df case DatasetShape.StructsOnly => flattenStructsInDataFrame(df) case DatasetShape.Nested => @@ -128,8 +65,8 @@ object H2OSchemaUtils { } private def fillBuffer - (flatSchemaIndexes: Map[String, Int], buffer: ArrayBuffer[Any]) - (qualifiedName: String, dataType: DataType, data: Any) = { + (flatSchemaIndexes: Map[String, Int], buffer: ArrayBuffer[Any]) + (qualifiedName: String, dataType: DataType, data: Any) = { if (data != null) { dataType match { case BinaryType => fillBinary(qualifiedName, ByteType, flatSchemaIndexes, buffer, data) @@ -142,11 +79,11 @@ object H2OSchemaUtils { } private def fillBinary( - qualifiedName: String, - elementType: DataType, - flatSchemaIndexes: Map[String, Int], - buffer: ArrayBuffer[Any], - data: Any): Unit = { + qualifiedName: String, + elementType: DataType, + flatSchemaIndexes: Map[String, Int], + buffer: ArrayBuffer[Any], + data: Any): Unit = { val array = data.asInstanceOf[Array[Byte]] val fillBufferPartiallyApplied = fillBuffer(flatSchemaIndexes, buffer) _ var idx = 0 @@ -158,11 +95,11 @@ object H2OSchemaUtils { } private def fillArray( - qualifiedName: String, - elementType: DataType, - flatSchemaIndexes: Map[String, Int], - buffer: ArrayBuffer[Any], - data: Any): Unit = { + qualifiedName: String, + elementType: DataType, + flatSchemaIndexes: Map[String, Int], + buffer: ArrayBuffer[Any], + data: Any): Unit = { val seq = data.asInstanceOf[Seq[Any]] val fillBufferPartiallyApplied = fillBuffer(flatSchemaIndexes, buffer) _ var idx = 0 @@ -174,11 +111,11 @@ object H2OSchemaUtils { } private def fillMap( - qualifiedName: String, - valueType: DataType, - flatSchemaIndexes: Map[String, Int], - buffer: ArrayBuffer[Any], - data: Any): Unit = { + qualifiedName: String, + valueType: DataType, + flatSchemaIndexes: Map[String, Int], + buffer: ArrayBuffer[Any], + data: Any): Unit = { val map = data.asInstanceOf[Map[Any, Any]] val fillBufferPartiallyApplied = fillBuffer(flatSchemaIndexes, buffer) _ map.foreach { case (key, value) => @@ -188,11 +125,11 @@ object H2OSchemaUtils { } private def fillStruct( - qualifiedName: String, - fields: Seq[StructField], - flatSchemaIndexes: Map[String, Int], - buffer: ArrayBuffer[Any], - data: Any): Unit = { + qualifiedName: String, + fields: Seq[StructField], + flatSchemaIndexes: Map[String, Int], + buffer: ArrayBuffer[Any], + data: Any): Unit = { val subRow = data.asInstanceOf[Row] val fillBufferPartiallyApplied = fillBuffer(flatSchemaIndexes, buffer) _ fields.zip(subRow.toSeq).foreach { case (subField, value) => @@ -273,12 +210,12 @@ object H2OSchemaUtils { private def getQualifiedName(prefix: String, name: String): String = prefix + "." + name private def flattenField( - qualifiedName: String, - dataType: DataType, - nullable: Boolean, - metadata: Metadata, - data: Any, - path: List[Any]): Seq[FieldWithOrder] = { + qualifiedName: String, + dataType: DataType, + nullable: Boolean, + metadata: Metadata, + data: Any, + path: List[Any]): Seq[FieldWithOrder] = { if (data != null) { dataType match { case BinaryType => @@ -300,12 +237,12 @@ object H2OSchemaUtils { case class FieldWithOrder(field: StructField, order: Iterable[Any]) private def flattenBinaryType( - qualifiedName: String, - elementType: DataType, - nullable: Boolean, - metadata: Metadata, - data: Any, - path: List[Any]) = { + qualifiedName: String, + elementType: DataType, + nullable: Boolean, + metadata: Metadata, + data: Any, + path: List[Any]) = { val values = data.asInstanceOf[Array[Byte]] val result = new ArrayBuffer[FieldWithOrder]() var idx = 0 @@ -318,12 +255,12 @@ object H2OSchemaUtils { } private def flattenArrayType( - qualifiedName: String, - elementType: DataType, - nullable: Boolean, - metadata: Metadata, - data: Any, - path: List[Any]) = { + qualifiedName: String, + elementType: DataType, + nullable: Boolean, + metadata: Metadata, + data: Any, + path: List[Any]) = { val values = data.asInstanceOf[Seq[Any]] val result = new ArrayBuffer[FieldWithOrder]() var idx = 0 @@ -336,14 +273,13 @@ object H2OSchemaUtils { } private def flattenMapType( - qualifiedName: String, - valueType: DataType, - nullable: Boolean, - metadata: Metadata, - data: Any, - path: List[Any]) = { + qualifiedName: String, + valueType: DataType, + nullable: Boolean, + metadata: Metadata, + data: Any, + path: List[Any]) = { val map = data.asInstanceOf[Map[Any, Any]] - val subRow = Row.fromSeq(map.values.toSeq) val result = new ArrayBuffer[FieldWithOrder]() map.foreach { case (key, value) => val fieldQualifiedName = getQualifiedName(qualifiedName, key.toString) @@ -353,12 +289,12 @@ object H2OSchemaUtils { } private def flattenStructType( - qualifiedName: String, - nullableParent: Boolean, - metadata: Metadata, - fields: Seq[StructField], - data: Any, - path: List[Any]) = { + qualifiedName: String, + nullableParent: Boolean, + metadata: Metadata, + fields: Seq[StructField], + data: Any, + path: List[Any]) = { val subRow = data.asInstanceOf[Row] fields.zipWithIndex.flatMap { case (subField, idx) => val StructField(name, dataType, nullable, fieldMetadata) = subField @@ -372,10 +308,10 @@ object H2OSchemaUtils { } def flattenStructsInSchema( - schema: StructType, - sourceColPrefix: Option[String] = None, - targetColPrefix: Option[String] = None, - nullable: Boolean = false): Seq[(StructField, String)] = { + schema: StructType, + sourceColPrefix: Option[String] = None, + targetColPrefix: Option[String] = None, + nullable: Boolean = false): Seq[(StructField, String)] = { val flattened = schema.fields.flatMap { f => val escaped = if (f.name.contains(".")) "`" + f.name + "`" else f.name @@ -399,6 +335,7 @@ object H2OSchemaUtils { } def appendFlattenedStructsToDataFrame(df: DataFrame, prefixForNewColumns: String): DataFrame = { + import org.apache.spark.sql.DatasetExtensions._ val structsOnlySchema = StructType(df.schema.fields.filter(_.dataType.isInstanceOf[StructType])) val flatten = flattenStructsInSchema(structsOnlySchema, targetColPrefix = Some(prefixForNewColumns)) flatten.foldLeft(df) { case (tempDF, (field, colName)) => @@ -418,11 +355,11 @@ object H2OSchemaUtils { val expandedSchema = flatSchema.fields.zipWithIndex.flatMap { case (field, idx) => field.dataType match { - case _ : ml.linalg.VectorUDT | _: mllib.linalg.VectorUDT => + case v if ExposeUtils.isAnyVectorUDT(v) => (0 until elemMaxSizes(idx)).map { arrIdx => StructField(field.name + arrIdx.toString, DoubleType, nullable = true) } - case udt: UserDefinedType[_] => throw new UnsupportedOperationException(s"User defined type is not supported: ${udt.getClass}") + case udt if ExposeUtils.isUDT(udt) => throw new UnsupportedOperationException(s"User defined type is not supported: ${udt.getClass}") case _ => Seq(field) } } @@ -455,7 +392,7 @@ object H2OSchemaUtils { flatDataFrame.schema.fields.zipWithIndex.flatMap { case (field, idx) => field.dataType match { - case _: ml.linalg.VectorUDT | _: mllib.linalg.VectorUDT => + case v if ExposeUtils.isAnyVectorUDT(v) => Array.fill(elemMaxSizes(idx))(sparseInfoForVec(idx)).toSeq case _ => Seq(false) } @@ -481,11 +418,11 @@ object H2OSchemaUtils { } private def fieldSizeFromMetadata(field: StructField): Option[Int] = { - field.dataType match { - case _: ml.linalg.VectorUDT => - Some(AttributeGroup.fromStructField(field).size).filter(_ != -1) - case _ => None - } + field.dataType match { + case v if ExposeUtils.isMLVectorUDT(v) => + Some(AttributeGroup.fromStructField(field).size).filter(_ != -1) + case _ => None + } } /** @@ -525,7 +462,7 @@ object H2OSchemaUtils { def collectVectorLikeTypes(flatSchema: StructType): Seq[Int] = { flatSchema.fields.zipWithIndex.flatMap { case (field, idx) => field.dataType match { - case _: ml.linalg.VectorUDT => Some(idx) + case v if ExposeUtils.isMLVectorUDT(v) => Some(idx) case _: mllib.linalg.VectorUDT => Some(idx) case _ => None } @@ -563,11 +500,10 @@ object H2OSchemaUtils { } else { val dataType = row.schema.fields(idx).dataType dataType match { - case _: ml.linalg.VectorUDT => row.getAs[ml.linalg.Vector](idx).size + case v if ExposeUtils.isMLVectorUDT(v) => row.getAs[ml.linalg.Vector](idx).size case _: mllib.linalg.VectorUDT => row.getAs[mllib.linalg.Vector](idx).size - case udt: UserDefinedType[_] => throw new UnsupportedOperationException(s"User defined type is not supported: ${udt.getClass}") + case udt if ExposeUtils.isUDT(udt) => throw new UnsupportedOperationException(s"User defined type is not supported: ${udt.getClass}") } } } - } diff --git a/core/src/main/scala/org/apache/spark/h2o/H2ORelation.scala b/utils/src/main/scala/org/apache/spark/ExposeUtils.scala similarity index 53% rename from core/src/main/scala/org/apache/spark/h2o/H2ORelation.scala rename to utils/src/main/scala/org/apache/spark/ExposeUtils.scala index 651fd8cd57..9775021df5 100644 --- a/core/src/main/scala/org/apache/spark/h2o/H2ORelation.scala +++ b/utils/src/main/scala/org/apache/spark/ExposeUtils.scala @@ -15,21 +15,34 @@ * limitations under the License. */ -package org.apache.spark.h2o +package org.apache.spark -import org.apache.spark.h2o.utils.H2OSchemaUtils -import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.types.StructType -import water.DKV +import org.apache.spark.sql.types.{DataType, UserDefinedType} -object DataSourceUtils { - def getSparkSQLSchema(key: String, copyMetadata: Boolean = true): StructType = { - val frame = DKV.getGet[H2OFrame](key) - H2OSchemaUtils.createSchema(frame, copyMetadata) +object ExposeUtils { + def classForName(className: String): Class[_] = { + org.apache.spark.util.Utils.classForName(className) } - def overwrite(key: String, originalFrame: Frame, newDataFrame: DataFrame)(implicit h2oContext: H2OContext): Unit = { - originalFrame.remove() - h2oContext.asH2OFrame(newDataFrame, key) + def isMLVectorUDT(dataType: DataType): Boolean = { + dataType match { + case _ : ml.linalg.VectorUDT => true + case _ => false + } + } + + def isAnyVectorUDT(dataType: DataType): Boolean = { + dataType match { + case _ : ml.linalg.VectorUDT => true + case _ : mllib.linalg.VectorUDT => true + case _ => false + } + } + + def isUDT(dataType: DataType): Boolean = { + dataType match { + case _ : UserDefinedType[_] => true + case _ => false + } } } diff --git a/core/src/main/scala/org/apache/spark/sql/DatasetExtensions.scala b/utils/src/main/scala/org/apache/spark/sql/DatasetExtensions.scala similarity index 86% rename from core/src/main/scala/org/apache/spark/sql/DatasetExtensions.scala rename to utils/src/main/scala/org/apache/spark/sql/DatasetExtensions.scala index 4db5d95d41..236cda2459 100644 --- a/core/src/main/scala/org/apache/spark/sql/DatasetExtensions.scala +++ b/utils/src/main/scala/org/apache/spark/sql/DatasetExtensions.scala @@ -17,12 +17,20 @@ package org.apache.spark.sql +import org.apache.spark.sql.types.Metadata + object DatasetExtensions { + implicit class DatasetWrapper(dataset: Dataset[_]) { def withColumns(colNames: Seq[String], cols: Seq[Column]): DataFrame = { colNames.zip(cols).foldLeft(dataset.toDF()) { case (currentDataFrame, (columnName, column)) => currentDataFrame.withColumn(columnName, column) } } + + def withColumn(colName: String, col: Column, metadata: Metadata): DataFrame = { + dataset.withColumn(colName, col, metadata) + } } + }