Skip to content

Commit

Permalink
[SW-2449] asH2OFrame Method Could Fail on a String Column Having More…
Browse files Browse the repository at this point in the history
… Than 10 Million Distinct Values (#2341)

* [SW-2449] asH2OFrame Method Could Fail on a String Column Having More Than 10 Million Distinct Values

* spotlessApply

* Fix SupportedRDDConverterTestSuite tests

* Revert changes in tests

* Fix OOM in tests

* Escape names

* Move the whole conversion logic to H2O backend

* Use ExpectedType.Categorical

* typo

* spotlessApply

* Remove DataTypeConverterTestSuite

* fix DataFrameConverterTestSuite

* fix calculation of the ratio

* fix ConvertCategoricalToStringColumnsTask

* fix empty frames

* conversion logic to separate methods

* Add more tests

* condition for unique columns

* Add tests on one partition

* Use PreviewParseWriter

* spotless

* fix categorical preview writer

* remove irrelevant column

* Virtual ice hash map

* spotlessApply

* Adding DKV.put and disabling tests with big datasets on external backend

* spotlessApply

* change test for external backend in test
  • Loading branch information
mn-mikke committed Oct 14, 2020
1 parent b46cec2 commit b2f2d55
Show file tree
Hide file tree
Showing 12 changed files with 441 additions and 204 deletions.
8 changes: 2 additions & 6 deletions core/src/main/scala/ai/h2o/sparkling/backend/Writer.scala
Expand Up @@ -143,12 +143,8 @@ private[backend] object Writer {
case _: DecimalType => con.put(row.getDecimal(idxField).doubleValue())
case DoubleType => con.put(row.getDouble(idxField))
case StringType =>
metadata.expectedTypes(idxField) match {
case ExpectedTypes.String => con.put(row.getString(idxField))
case ExpectedTypes.Categorical =>
val valueIndex = domainBuilder.addStringToDomain(row.getString(idxField), idxField)
con.put(valueIndex)
}
val valueIndex = domainBuilder.addStringToDomain(row.getString(idxField), idxField)
con.put(valueIndex)
case TimestampType =>
con.put(timeZoneConverter.fromSparkTimeZoneToUTC(row.getAs[java.sql.Timestamp](idxField)))
case DateType => con.put(timeZoneConverter.fromSparkTimeZoneToUTC(row.getAs[java.sql.Date](idxField)))
Expand Down

This file was deleted.

Expand Up @@ -21,77 +21,22 @@ import ai.h2o.sparkling.backend.utils.SupportedTypes
import ai.h2o.sparkling.extensions.serde.ExpectedTypes
import ai.h2o.sparkling.extensions.serde.ExpectedTypes.ExpectedType
import org.apache.spark.ExposeUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import water.fvec.Vec
import water.parser.{BufferedString, PreviewParseWriter}
import water.parser.{BufferedString, Categorical, PreviewParseWriter}

private[backend] object DataTypeConverter {

private def stringTypesToExpectedTypes(rdd: RDD[Row], schema: StructType): Map[Int, ExpectedType] = {
val stringTypeIndices = for {
(field, index) <- schema.fields.zipWithIndex
if field.dataType == StringType
} yield index

val types = if (rdd.getNumPartitions > 0) {
val serializedPreview = rdd
.mapPartitions[Array[Byte]](createPartitionPreview(_, stringTypeIndices))
.reduce(mergePartitionPreview)

val preview = CategoricalPreviewWriter.deserialize(serializedPreview)
preview.guessTypes().map {
case Vec.T_CAT => ExpectedTypes.Categorical
case _ => ExpectedTypes.String
}
} else {
stringTypeIndices.map(_ => ExpectedTypes.String)
}

stringTypeIndices.zip(types).toMap
}

private def createPartitionPreview(rows: Iterator[Row], stringTypeIndices: Array[Int]): Iterator[Array[Byte]] = {
val previewParseWriter = new CategoricalPreviewWriter(stringTypeIndices.length)
val bufferedString = new BufferedString()
var rowId = 0
while (rows.hasNext && rowId < CategoricalPreviewWriter.MAX_PREVIEW_RECORDS) {
val row = rows.next()
var i = 0
while (i < stringTypeIndices.length) {
val colId = stringTypeIndices(i)
val string = row.getString(colId)
if (string == null) {
previewParseWriter.addInvalidCol(i)
} else {
bufferedString.set(string)
previewParseWriter.addStrCol(i, bufferedString)
}
i += 1
}
rowId += 1
}
Iterator.single(CategoricalPreviewWriter.serialize(previewParseWriter))
}

private def mergePartitionPreview(first: Array[Byte], second: Array[Byte]): Array[Byte] = {
val firstObject = CategoricalPreviewWriter.deserialize(first)
val secondObject = CategoricalPreviewWriter.deserialize(second)
val result =
PreviewParseWriter.unifyColumnPreviews(firstObject, secondObject).asInstanceOf[CategoricalPreviewWriter]
CategoricalPreviewWriter.serialize(result)
}

def determineExpectedTypes(rdd: RDD[Row], schema: StructType): Array[ExpectedType] = {
val stringTypes = stringTypesToExpectedTypes(rdd, schema)
schema.zipWithIndex.map {
case (field, index) =>
def determineExpectedTypes(schema: StructType): Array[ExpectedType] = {
schema.map {
case field =>
field.dataType match {
case n if n.isInstanceOf[DecimalType] & n.getClass.getSuperclass != classOf[DecimalType] =>
ExpectedTypes.Double
case StringType => ExpectedTypes.Categorical
case v if ExposeUtils.isAnyVectorUDT(v) => ExpectedTypes.Vector
case StringType => stringTypes(index)
case dt: DataType => SupportedTypes.bySparkType(dt).expectedType
}
}.toArray
Expand Down
Expand Up @@ -44,15 +44,16 @@ object SparkDataFrameConverter extends Logging {
val df = dataFrame.toDF() // Because of PySparkling, we can receive Dataset[Primitive] in this method, ensure that
// we are dealing with Dataset[Row]
val flatDataFrame = flattenDataFrame(df)
val schema = flatDataFrame.schema
val rdd = flatDataFrame.rdd // materialized the data frame

val elemMaxSizes = collectMaxElementSizes(flatDataFrame)
val vecIndices = collectVectorLikeTypes(flatDataFrame.schema).toArray
val flattenSchema = expandedSchema(flatDataFrame.schema, elemMaxSizes)
val elemMaxSizes = collectMaxElementSizes(rdd, schema)
val vecIndices = collectVectorLikeTypes(schema).toArray
val flattenSchema = expandedSchema(schema, elemMaxSizes)
val colNames = flattenSchema.map(_.name).toArray
val maxVecSizes = vecIndices.map(elemMaxSizes(_))

val rdd = flatDataFrame.rdd
val expectedTypes = DataTypeConverter.determineExpectedTypes(rdd, flatDataFrame.schema)
val expectedTypes = DataTypeConverter.determineExpectedTypes(schema)

val uniqueFrameId = frameKeyName.getOrElse("frame_rdd_" + rdd.id + scala.util.Random.nextInt())
val metadata = WriterMetadata(hc.getConf, uniqueFrameId, expectedTypes, maxVecSizes, SparkTimeZone.current())
Expand Down
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package ai.h2o.sparkling.backend.converters

import ai.h2o.sparkling.ml.utils.SchemaUtils
import ai.h2o.sparkling.{H2OFrame, SharedH2OTestContext, TestUtils}
import org.apache.spark.sql.types.{StringType, StructField}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.FunSuite
import water.parser.Categorical

@RunWith(classOf[JUnitRunner])
class DataFrameConverterCategoricalTestSuite extends FunSuite with SharedH2OTestContext {

override def createSparkSession(): SparkSession = sparkSession("local[*]")
import spark.implicits._

test("PUBDEV-766 H2OFrame[T_ENUM] to DataFrame[StringType]") {
val df = spark.sparkContext.parallelize(Array("ONE", "ZERO", "ZERO", "ONE")).toDF("C0")
val h2oFrame = hc.asH2OFrame(df)
h2oFrame.convertColumnsToCategorical(Array(0))
assert(h2oFrame.columns(0).isCategorical())

val dataFrame = hc.asSparkFrame(h2oFrame)
assert(dataFrame.count == h2oFrame.numberOfRows)
assert(dataFrame.take(4)(3)(0) == "ONE")
assert(dataFrame.schema.fields(0) match {
case StructField("C0", StringType, false, _) => true
case _ => false
})

h2oFrame.delete()
}

test("DataFrame[String] to H2OFrame[T_STRING] and back") {
val df = Seq("one", "two", "three", "four", "five", "six", "seven").toDF("Strings").repartition(3)
val h2oFrame = hc.asH2OFrame(df)

assertH2OFrameInvariants(df, h2oFrame)
assert(h2oFrame.columns(0).isString())

val resultDF = hc.asSparkFrame(h2oFrame)
TestUtils.assertDataFramesAreIdentical(df, resultDF)
h2oFrame.delete()
}

test("DataFrame[String] to H2OFrame[T_CAT] and back") {
val df = Seq("one", "two", "three", "one", "two", "three", "one").toDF("Strings").repartition(3)
val h2oFrame = hc.asH2OFrame(df)

assertH2OFrameInvariants(df, h2oFrame)
assert(h2oFrame.columns(0).isCategorical())

val resultDF = hc.asSparkFrame(h2oFrame)
TestUtils.assertDataFramesAreIdentical(df, resultDF)
h2oFrame.delete()
}

// External backed can go OOM in testing docker image
if (sys.props.getOrElse("spark.ext.h2o.backend.cluster.mode", "internal") == "internal") {
test("DataFrame[String] with more than 10M unique values in one partition to H2OFrame[T_STR] and back") {
testDataFrameConversionWithHighNumberOfCategoricalLevels(1)
}

test("DataFrame[String] with more than 10M unique values in 100 partitions to H2OFrame[T_STR] and back") {
testDataFrameConversionWithHighNumberOfCategoricalLevels(100)
}

def testDataFrameConversionWithHighNumberOfCategoricalLevels(numPartitions: Int) {
val uniqueValues = 1 to (Categorical.MAX_CATEGORICAL_COUNT * 1.1).toInt
val values = uniqueValues.map(i => (i % (Categorical.MAX_CATEGORICAL_COUNT + 1)).toHexString)
val rdd = sc.parallelize(values, numPartitions)

val df = rdd.toDF("strings")
val h2oFrame = hc.asH2OFrame(df)

assertH2OFrameInvariants(df, h2oFrame)
assert(h2oFrame.columns(0).isString())

val resultDF = hc.asSparkFrame(h2oFrame)
TestUtils.assertDataFramesAreIdentical(df, resultDF)
h2oFrame.delete()
}
}

test("DataFrame[String] with only unique values with in one partition to H2OFrame[T_STR] and back") {
testDataFrameConversionWithOnlyUniqueValues(1)
}

test("DataFrame[String] with only unique values with in 100 partitions to H2OFrame[T_STR] and back") {
testDataFrameConversionWithOnlyUniqueValues(100)
}

def testDataFrameConversionWithOnlyUniqueValues(numPartitions: Int) {
val uniqueValues = (1 to (Categorical.MAX_CATEGORICAL_COUNT / 10)).map(_.toHexString)
val rdd = sc.parallelize(uniqueValues, numPartitions)

val df = rdd.toDF("strings")
val h2oFrame = hc.asH2OFrame(df)

assertH2OFrameInvariants(df, h2oFrame)
assert(h2oFrame.columns(0).isString())

val resultDF = hc.asSparkFrame(h2oFrame)
TestUtils.assertDataFramesAreIdentical(df, resultDF)
h2oFrame.delete()
}

private def assertH2OFrameInvariants(inputDF: DataFrame, df: H2OFrame): Unit = {
assert(inputDF.count == df.numberOfRows, "Number of rows has to match")
assert(df.numberOfColumns == SchemaUtils.flattenSchema(inputDF).length, "Number columns should match")
}
}

0 comments on commit b2f2d55

Please sign in to comment.