Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SW-2449] asH2OFrame Method Could Fail on a String Column Having More Than 10 Million Distinct Values #2341

Merged
merged 28 commits into from Oct 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
a1c4419
[SW-2449] asH2OFrame Method Could Fail on a String Column Having More…
mn-mikke Oct 2, 2020
7496fda
spotlessApply
mn-mikke Oct 2, 2020
0efa40e
Fix SupportedRDDConverterTestSuite tests
mn-mikke Oct 2, 2020
79d66e8
Revert changes in tests
mn-mikke Oct 5, 2020
bb44db3
Fix OOM in tests
mn-mikke Oct 5, 2020
7374ce9
Escape names
mn-mikke Oct 5, 2020
bc749d4
Move the whole conversion logic to H2O backend
mn-mikke Oct 8, 2020
9ff21d7
Use ExpectedType.Categorical
mn-mikke Oct 9, 2020
7b14bdf
typo
mn-mikke Oct 9, 2020
79d7d32
spotlessApply
mn-mikke Oct 9, 2020
f76389c
Remove DataTypeConverterTestSuite
mn-mikke Oct 9, 2020
a92457a
fix DataFrameConverterTestSuite
mn-mikke Oct 9, 2020
1e55627
fix calculation of the ratio
mn-mikke Oct 12, 2020
b2991b0
fix ConvertCategoricalToStringColumnsTask
mn-mikke Oct 12, 2020
d54fba5
fix empty frames
mn-mikke Oct 12, 2020
a4ce1e4
conversion logic to separate methods
mn-mikke Oct 12, 2020
af28732
Add more tests
mn-mikke Oct 12, 2020
8ca5d9c
condition for unique columns
mn-mikke Oct 12, 2020
c0889d3
Add tests on one partition
mn-mikke Oct 13, 2020
5a8ac1c
Use PreviewParseWriter
mn-mikke Oct 13, 2020
459a939
spotless
mn-mikke Oct 13, 2020
79086e3
fix categorical preview writer
mn-mikke Oct 13, 2020
02f6df1
remove irrelevant column
mn-mikke Oct 13, 2020
bf2508b
Virtual ice hash map
mn-mikke Oct 13, 2020
0f0bf7e
spotlessApply
mn-mikke Oct 14, 2020
c2794e4
Adding DKV.put and disabling tests with big datasets on external backend
mn-mikke Oct 14, 2020
72ca306
spotlessApply
mn-mikke Oct 14, 2020
e96f5ba
change test for external backend in test
mn-mikke Oct 14, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 2 additions & 6 deletions core/src/main/scala/ai/h2o/sparkling/backend/Writer.scala
Expand Up @@ -143,12 +143,8 @@ private[backend] object Writer {
case _: DecimalType => con.put(row.getDecimal(idxField).doubleValue())
case DoubleType => con.put(row.getDouble(idxField))
case StringType =>
metadata.expectedTypes(idxField) match {
case ExpectedTypes.String => con.put(row.getString(idxField))
case ExpectedTypes.Categorical =>
val valueIndex = domainBuilder.addStringToDomain(row.getString(idxField), idxField)
con.put(valueIndex)
}
val valueIndex = domainBuilder.addStringToDomain(row.getString(idxField), idxField)
con.put(valueIndex)
case TimestampType =>
con.put(timeZoneConverter.fromSparkTimeZoneToUTC(row.getAs[java.sql.Timestamp](idxField)))
case DateType => con.put(timeZoneConverter.fromSparkTimeZoneToUTC(row.getAs[java.sql.Date](idxField)))
Expand Down

This file was deleted.

Expand Up @@ -21,77 +21,22 @@ import ai.h2o.sparkling.backend.utils.SupportedTypes
import ai.h2o.sparkling.extensions.serde.ExpectedTypes
import ai.h2o.sparkling.extensions.serde.ExpectedTypes.ExpectedType
import org.apache.spark.ExposeUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import water.fvec.Vec
import water.parser.{BufferedString, PreviewParseWriter}
import water.parser.{BufferedString, Categorical, PreviewParseWriter}

private[backend] object DataTypeConverter {

private def stringTypesToExpectedTypes(rdd: RDD[Row], schema: StructType): Map[Int, ExpectedType] = {
val stringTypeIndices = for {
(field, index) <- schema.fields.zipWithIndex
if field.dataType == StringType
} yield index

val types = if (rdd.getNumPartitions > 0) {
val serializedPreview = rdd
.mapPartitions[Array[Byte]](createPartitionPreview(_, stringTypeIndices))
.reduce(mergePartitionPreview)

val preview = CategoricalPreviewWriter.deserialize(serializedPreview)
preview.guessTypes().map {
case Vec.T_CAT => ExpectedTypes.Categorical
case _ => ExpectedTypes.String
}
} else {
stringTypeIndices.map(_ => ExpectedTypes.String)
}

stringTypeIndices.zip(types).toMap
}

private def createPartitionPreview(rows: Iterator[Row], stringTypeIndices: Array[Int]): Iterator[Array[Byte]] = {
val previewParseWriter = new CategoricalPreviewWriter(stringTypeIndices.length)
val bufferedString = new BufferedString()
var rowId = 0
while (rows.hasNext && rowId < CategoricalPreviewWriter.MAX_PREVIEW_RECORDS) {
val row = rows.next()
var i = 0
while (i < stringTypeIndices.length) {
val colId = stringTypeIndices(i)
val string = row.getString(colId)
if (string == null) {
previewParseWriter.addInvalidCol(i)
} else {
bufferedString.set(string)
previewParseWriter.addStrCol(i, bufferedString)
}
i += 1
}
rowId += 1
}
Iterator.single(CategoricalPreviewWriter.serialize(previewParseWriter))
}

private def mergePartitionPreview(first: Array[Byte], second: Array[Byte]): Array[Byte] = {
val firstObject = CategoricalPreviewWriter.deserialize(first)
val secondObject = CategoricalPreviewWriter.deserialize(second)
val result =
PreviewParseWriter.unifyColumnPreviews(firstObject, secondObject).asInstanceOf[CategoricalPreviewWriter]
CategoricalPreviewWriter.serialize(result)
}

def determineExpectedTypes(rdd: RDD[Row], schema: StructType): Array[ExpectedType] = {
val stringTypes = stringTypesToExpectedTypes(rdd, schema)
schema.zipWithIndex.map {
case (field, index) =>
def determineExpectedTypes(schema: StructType): Array[ExpectedType] = {
schema.map {
case field =>
field.dataType match {
case n if n.isInstanceOf[DecimalType] & n.getClass.getSuperclass != classOf[DecimalType] =>
ExpectedTypes.Double
case StringType => ExpectedTypes.Categorical
case v if ExposeUtils.isAnyVectorUDT(v) => ExpectedTypes.Vector
case StringType => stringTypes(index)
case dt: DataType => SupportedTypes.bySparkType(dt).expectedType
}
}.toArray
Expand Down
Expand Up @@ -44,15 +44,16 @@ object SparkDataFrameConverter extends Logging {
val df = dataFrame.toDF() // Because of PySparkling, we can receive Dataset[Primitive] in this method, ensure that
// we are dealing with Dataset[Row]
val flatDataFrame = flattenDataFrame(df)
val schema = flatDataFrame.schema
val rdd = flatDataFrame.rdd // materialized the data frame

val elemMaxSizes = collectMaxElementSizes(flatDataFrame)
val vecIndices = collectVectorLikeTypes(flatDataFrame.schema).toArray
val flattenSchema = expandedSchema(flatDataFrame.schema, elemMaxSizes)
val elemMaxSizes = collectMaxElementSizes(rdd, schema)
val vecIndices = collectVectorLikeTypes(schema).toArray
val flattenSchema = expandedSchema(schema, elemMaxSizes)
val colNames = flattenSchema.map(_.name).toArray
val maxVecSizes = vecIndices.map(elemMaxSizes(_))

val rdd = flatDataFrame.rdd
val expectedTypes = DataTypeConverter.determineExpectedTypes(rdd, flatDataFrame.schema)
val expectedTypes = DataTypeConverter.determineExpectedTypes(schema)

val uniqueFrameId = frameKeyName.getOrElse("frame_rdd_" + rdd.id + scala.util.Random.nextInt())
val metadata = WriterMetadata(hc.getConf, uniqueFrameId, expectedTypes, maxVecSizes, SparkTimeZone.current())
Expand Down
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package ai.h2o.sparkling.backend.converters

import ai.h2o.sparkling.ml.utils.SchemaUtils
import ai.h2o.sparkling.{H2OFrame, SharedH2OTestContext, TestUtils}
import org.apache.spark.sql.types.{StringType, StructField}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.FunSuite
import water.parser.Categorical

@RunWith(classOf[JUnitRunner])
class DataFrameConverterCategoricalTestSuite extends FunSuite with SharedH2OTestContext {

override def createSparkSession(): SparkSession = sparkSession("local[*]")
import spark.implicits._

test("PUBDEV-766 H2OFrame[T_ENUM] to DataFrame[StringType]") {
val df = spark.sparkContext.parallelize(Array("ONE", "ZERO", "ZERO", "ONE")).toDF("C0")
val h2oFrame = hc.asH2OFrame(df)
h2oFrame.convertColumnsToCategorical(Array(0))
assert(h2oFrame.columns(0).isCategorical())

val dataFrame = hc.asSparkFrame(h2oFrame)
assert(dataFrame.count == h2oFrame.numberOfRows)
assert(dataFrame.take(4)(3)(0) == "ONE")
assert(dataFrame.schema.fields(0) match {
case StructField("C0", StringType, false, _) => true
case _ => false
})

h2oFrame.delete()
}

test("DataFrame[String] to H2OFrame[T_STRING] and back") {
val df = Seq("one", "two", "three", "four", "five", "six", "seven").toDF("Strings").repartition(3)
val h2oFrame = hc.asH2OFrame(df)

assertH2OFrameInvariants(df, h2oFrame)
assert(h2oFrame.columns(0).isString())

val resultDF = hc.asSparkFrame(h2oFrame)
TestUtils.assertDataFramesAreIdentical(df, resultDF)
h2oFrame.delete()
}

test("DataFrame[String] to H2OFrame[T_CAT] and back") {
val df = Seq("one", "two", "three", "one", "two", "three", "one").toDF("Strings").repartition(3)
val h2oFrame = hc.asH2OFrame(df)

assertH2OFrameInvariants(df, h2oFrame)
assert(h2oFrame.columns(0).isCategorical())

val resultDF = hc.asSparkFrame(h2oFrame)
TestUtils.assertDataFramesAreIdentical(df, resultDF)
h2oFrame.delete()
}

// External backed can go OOM in testing docker image
if (sys.props.getOrElse("spark.ext.h2o.backend.cluster.mode", "internal") == "internal") {
test("DataFrame[String] with more than 10M unique values in one partition to H2OFrame[T_STR] and back") {
testDataFrameConversionWithHighNumberOfCategoricalLevels(1)
}

test("DataFrame[String] with more than 10M unique values in 100 partitions to H2OFrame[T_STR] and back") {
testDataFrameConversionWithHighNumberOfCategoricalLevels(100)
}

def testDataFrameConversionWithHighNumberOfCategoricalLevels(numPartitions: Int) {
val uniqueValues = 1 to (Categorical.MAX_CATEGORICAL_COUNT * 1.1).toInt
val values = uniqueValues.map(i => (i % (Categorical.MAX_CATEGORICAL_COUNT + 1)).toHexString)
val rdd = sc.parallelize(values, numPartitions)

val df = rdd.toDF("strings")
val h2oFrame = hc.asH2OFrame(df)

assertH2OFrameInvariants(df, h2oFrame)
assert(h2oFrame.columns(0).isString())

val resultDF = hc.asSparkFrame(h2oFrame)
TestUtils.assertDataFramesAreIdentical(df, resultDF)
h2oFrame.delete()
}
}

test("DataFrame[String] with only unique values with in one partition to H2OFrame[T_STR] and back") {
testDataFrameConversionWithOnlyUniqueValues(1)
}

test("DataFrame[String] with only unique values with in 100 partitions to H2OFrame[T_STR] and back") {
testDataFrameConversionWithOnlyUniqueValues(100)
}

def testDataFrameConversionWithOnlyUniqueValues(numPartitions: Int) {
val uniqueValues = (1 to (Categorical.MAX_CATEGORICAL_COUNT / 10)).map(_.toHexString)
val rdd = sc.parallelize(uniqueValues, numPartitions)

val df = rdd.toDF("strings")
val h2oFrame = hc.asH2OFrame(df)

assertH2OFrameInvariants(df, h2oFrame)
assert(h2oFrame.columns(0).isString())

val resultDF = hc.asSparkFrame(h2oFrame)
TestUtils.assertDataFramesAreIdentical(df, resultDF)
h2oFrame.delete()
}

private def assertH2OFrameInvariants(inputDF: DataFrame, df: H2OFrame): Unit = {
assert(inputDF.count == df.numberOfRows, "Number of rows has to match")
assert(df.numberOfColumns == SchemaUtils.flattenSchema(inputDF).length, "Number columns should match")
}
}