Skip to content

Commit

Permalink
[Hudi] Flesh out tests and update column type support (#3339)
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [ ] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [x] Other (Hudi)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->
This PR adds unit tests and also removes support of some data types that
are not supported by Hudi (e.g. TIMESTAMP_NTZ, SHORT, BYTE, etc)

## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->
Unit tests

## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
No
  • Loading branch information
anniewang-db committed Jul 8, 2024
1 parent 74d09e9 commit 0e5b856
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,29 @@

package org.apache.spark.sql.delta.hudi

import org.apache.avro.Schema
import java.io.{IOException, UncheckedIOException}
import java.time.{Instant, LocalDateTime, ZoneId}
import java.time.format.{DateTimeFormatterBuilder, DateTimeParseException}
import java.time.temporal.{ChronoField, ChronoUnit}
import java.util
import java.util.{Collections, Properties}
import java.util.stream.Collectors

import scala.collection.JavaConverters._
import scala.collection.mutable._
import scala.util.control.NonFatal

import org.apache.spark.sql.delta.Snapshot
import org.apache.spark.sql.delta.actions.Action
import org.apache.spark.sql.delta.hudi.HudiSchemaUtils._
import org.apache.spark.sql.delta.hudi.HudiTransactionUtils._
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.avro.Schema
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hudi.avro.model.HoodieActionInstant
import org.apache.hudi.avro.model.HoodieCleanFileInfo
import org.apache.hudi.avro.model.HoodieCleanerPlan
import org.apache.hudi.avro.model.HoodieCleanFileInfo
import org.apache.hudi.client.HoodieJavaWriteClient
import org.apache.hudi.client.HoodieTimelineArchiver
import org.apache.hudi.client.WriteStatus
Expand All @@ -40,9 +50,9 @@ import org.apache.hudi.common.model.{HoodieAvroPayload, HoodieBaseFile, HoodieCl
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieInstantTimeGenerator, HoodieTimeline, TimelineMetadataUtils}
import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.{MILLIS_INSTANT_TIMESTAMP_FORMAT_LENGTH, SECS_INSTANT_ID_LENGTH, SECS_INSTANT_TIMESTAMP_FORMAT}
import org.apache.hudi.common.util.{Option => HudiOption}
import org.apache.hudi.common.util.CleanerUtils
import org.apache.hudi.common.util.ExternalFilePathUtil
import org.apache.hudi.common.util.{Option => HudiOption}
import org.apache.hudi.common.util.collection.Pair
import org.apache.hudi.config.HoodieArchivalConfig
import org.apache.hudi.config.HoodieCleanConfig
Expand All @@ -53,16 +63,6 @@ import org.apache.hudi.index.HoodieIndex.IndexType.INMEMORY
import org.apache.hudi.table.HoodieJavaTable
import org.apache.hudi.table.action.clean.CleanPlanner

import java.io.{IOException, UncheckedIOException}
import java.time.{Instant, LocalDateTime, ZoneId}
import java.time.format.{DateTimeFormatterBuilder, DateTimeParseException}
import java.time.temporal.{ChronoField, ChronoUnit}
import java.util
import java.util.stream.Collectors
import java.util.{Collections, Properties}
import collection.mutable._
import scala.collection.JavaConverters._

/**
* Used to prepare (convert) and then commit a set of Delta actions into the Hudi table located
* at the same path as [[postCommitSnapshot]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,30 @@

package org.apache.spark.sql.delta.hudi

import java.io.{IOException, UncheckedIOException}
import java.util.concurrent.atomic.AtomicReference
import javax.annotation.concurrent.GuardedBy

import scala.collection.JavaConverters._
import scala.util.control.NonFatal

import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.OptimisticTransactionImpl
import org.apache.spark.sql.delta.Snapshot
import org.apache.spark.sql.delta.UniversalFormatConverter
import org.apache.spark.sql.delta.actions.Action
import org.apache.spark.sql.delta.hooks.HudiConverterHook
import org.apache.spark.sql.delta.hudi.HudiTransactionUtils._
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hudi.common.model.{HoodieCommitMetadata, HoodieReplaceCommitMetadata}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.delta.actions.Action
import org.apache.spark.sql.delta.hooks.HudiConverterHook
import org.apache.spark.sql.delta.hudi.HudiTransactionUtils._
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta._

import java.io.{IOException, UncheckedIOException}
import java.util.concurrent.atomic.AtomicReference
import javax.annotation.concurrent.GuardedBy
import scala.collection.JavaConverters._
import scala.util.control.NonFatal

object HudiConverter {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@

package org.apache.spark.sql.delta.hudi

import org.apache.avro.{LogicalTypes, Schema}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.types._

import java.util

import scala.collection.JavaConverters._

import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.avro.{LogicalTypes, Schema}

import org.apache.spark.sql.types._

object HudiSchemaUtils extends DeltaLogging {

/////////////////
Expand Down Expand Up @@ -73,7 +75,8 @@ object HudiSchemaUtils extends DeltaLogging {
private def convertAtomic[E <: DataType](elem: E, isNullable: Boolean) = elem match {
case StringType => finalizeSchema(Schema.create(Schema.Type.STRING), isNullable)
case LongType => finalizeSchema(Schema.create(Schema.Type.LONG), isNullable)
case IntegerType | ShortType => finalizeSchema(Schema.create(Schema.Type.INT), isNullable)
case IntegerType => finalizeSchema(
Schema.create(Schema.Type.INT), isNullable)
case FloatType => finalizeSchema(Schema.create(Schema.Type.FLOAT), isNullable)
case DoubleType => finalizeSchema(Schema.create(Schema.Type.DOUBLE), isNullable)
case d: DecimalType => finalizeSchema(LogicalTypes.decimal(d.precision, d.scale)
Expand All @@ -84,8 +87,6 @@ object HudiSchemaUtils extends DeltaLogging {
LogicalTypes.date.addToSchema(Schema.create(Schema.Type.INT)), isNullable)
case TimestampType => finalizeSchema(
LogicalTypes.timestampMicros.addToSchema(Schema.create(Schema.Type.LONG)), isNullable)
case TimestampNTZType => finalizeSchema(
LogicalTypes.localTimestampMicros.addToSchema(Schema.create(Schema.Type.LONG)), isNullable)
case _ => throw new UnsupportedOperationException(s"Could not convert atomic type $elem")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,15 @@

package org.apache.spark.sql.delta.hudi

import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.delta.actions.AddFile
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.hadoop.fs.Path
import org.apache.hudi.client.WriteStatus
import org.apache.hudi.common.model.{HoodieAvroPayload, HoodieTableType, HoodieTimelineTimeZone, HoodieDeltaWriteStat}
import org.apache.hudi.common.model.{HoodieAvroPayload, HoodieDeltaWriteStat, HoodieTableType, HoodieTimelineTimeZone}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.ExternalFilePathUtil
import org.apache.hudi.exception.TableNotFoundException
import org.apache.hudi.storage.StorageConfiguration
import org.apache.spark.sql.delta.actions.AddFile
import org.apache.spark.sql.delta.metering.DeltaLogging

object HudiTransactionUtils extends DeltaLogging {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,21 @@

package org.apache.spark.sql.delta.hudi

import java.io.File
import java.time.Instant
import java.util.UUID
import java.util.stream.Collectors

import scala.collection.JavaConverters

import org.apache.spark.sql.delta.{DeltaConfigs, DeltaLog, DeltaUnsupportedOperationException, OptimisticTransaction}
import org.apache.spark.sql.delta.DeltaOperations.Truncate
import org.apache.spark.sql.delta.actions.{Action, AddFile, Metadata}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.scalatest.concurrent.Eventually
import org.scalatest.time.SpanSugar._
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.engine.HoodieLocalEngineContext
import org.apache.hudi.common.fs.FSUtils
Expand All @@ -26,23 +39,12 @@ import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.metadata.HoodieMetadataFileSystemView
import org.apache.hudi.storage.StorageConfiguration
import org.apache.hudi.storage.hadoop.{HadoopStorageConfiguration, HoodieHadoopStorage}

import org.apache.spark.SparkContext
import org.apache.spark.sql.{QueryTest, SparkSession}
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.delta.DeltaOperations.Truncate
import org.apache.spark.sql.delta.{DeltaConfigs, DeltaLog, DeltaUnsupportedOperationException, OptimisticTransaction}
import org.apache.spark.sql.delta.actions.{Action, AddFile, Metadata, RemoveFile}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{ByteType, IntegerType, ShortType, StructField, StructType}
import org.apache.spark.util.{ManualClock, Utils}
import org.scalatest.concurrent.Eventually
import org.scalatest.time.SpanSugar._

import java.io.File
import java.time.Instant
import java.util.UUID
import java.util.stream.Collectors
import scala.collection.JavaConverters

class ConvertToHudiSuite extends QueryTest with Eventually {

Expand Down Expand Up @@ -150,6 +152,21 @@ class ConvertToHudiSuite extends QueryTest with Eventually {
}
}

test("Enabling Delete Vector After Hudi Enabled Already Throws Exception") {
intercept[DeltaUnsupportedOperationException] {
_sparkSession.sql(
s"""CREATE TABLE `$testTableName` (col1 INT, col2 STRING) USING DELTA
|LOCATION '$testTablePath'
|TBLPROPERTIES (
| 'delta.universalFormat.enabledFormats' = 'hudi'
|)""".stripMargin)
_sparkSession.sql(
s"""ALTER TABLE `$testTableName` SET TBLPROPERTIES (
| 'delta.enableDeletionVectors' = true
|)""".stripMargin)
}
}

test(s"Conversion behavior for lists") {
_sparkSession.sql(
s"""CREATE TABLE `$testTableName` (col1 ARRAY<INT>) USING DELTA
Expand Down Expand Up @@ -201,6 +218,22 @@ class ConvertToHudiSuite extends QueryTest with Eventually {
verifyFilesAndSchemaMatch()
}

test(s"Conversion behavior for nested structs") {
_sparkSession.sql(
s"""CREATE TABLE `$testTableName` (col1 STRUCT<field1: INT, field2: STRING,
|field3: STRUCT<field4: INT, field5: INT, field6: STRING>>)
|USING DELTA
|LOCATION '$testTablePath'
|TBLPROPERTIES (
| 'delta.universalFormat.enabledFormats' = 'hudi'
|)""".stripMargin)
_sparkSession.sql(
s"INSERT INTO `$testTableName` VALUES (named_struct('field1', 1, 'field2', 'hello', " +
"'field3', named_struct('field4', 2, 'field5', 3, 'field6', 'world')))"
)
verifyFilesAndSchemaMatch()
}

test("validate Hudi timeline archival and cleaning") {
val testOp = Truncate()
withDefaultTablePropsInSQLConf(true, {
Expand All @@ -213,7 +246,8 @@ class ConvertToHudiSuite extends QueryTest with Eventually {
val file = AddFile(i.toString + ".parquet", Map.empty, 1, 1, true) :: Nil
val delete: Seq[Action] = if (i > 1) {
val timestamp = startTime + (System.currentTimeMillis() - actualTestStartTime)
RemoveFile((i - 1).toString + ".parquet", Some(timestamp), true) :: Nil
val prevFile = AddFile((i - 1).toString + ".parquet", Map.empty, 1, 1, true)
prevFile.removeWithTimestamp(timestamp) :: Nil
} else {
Nil
}
Expand Down Expand Up @@ -241,7 +275,9 @@ class ConvertToHudiSuite extends QueryTest with Eventually {
_sparkSession.sql(
s"""CREATE TABLE `$testTableName` (col1 BIGINT, col2 BOOLEAN, col3 DATE,
| col4 DOUBLE, col5 FLOAT, col6 INT, col7 STRING, col8 TIMESTAMP,
| col9 STRUCT<field1: INT, field2: STRING>)
| col9 BINARY, col10 DECIMAL(5, 2),
| col11 STRUCT<field1: INT, field2: STRING,
| field3: STRUCT<field4: INT, field5: INT, field6: STRING>>)
| USING DELTA
|LOCATION '$testTablePath'
|TBLPROPERTIES (
Expand All @@ -250,11 +286,25 @@ class ConvertToHudiSuite extends QueryTest with Eventually {
val nowSeconds = Instant.now().getEpochSecond
_sparkSession.sql(s"INSERT INTO `$testTableName` VALUES (123, true, "
+ s"date(from_unixtime($nowSeconds)), 32.1, 1.23, 456, 'hello world', "
+ s"timestamp(from_unixtime($nowSeconds)), "
+ s"named_struct('field1', 789, 'field2', 'hello'))")
+ s"timestamp(from_unixtime($nowSeconds)), X'1ABF', -999.99,"
+ s"STRUCT(1, 'hello', STRUCT(2, 3, 'world')))")
verifyFilesAndSchemaMatch()
}

for (invalidType <- Seq("SMALLINT", "TINYINT", "TIMESTAMP_NTZ", "VOID")) {
test(s"Unsupported Type $invalidType Throws Exception") {
intercept[DeltaUnsupportedOperationException] {
_sparkSession.sql(
s"""CREATE TABLE `$testTableName` (col1 $invalidType) USING DELTA
|LOCATION '$testTablePath'
|TBLPROPERTIES (
| 'delta.universalFormat.enabledFormats' = 'hudi'
|)""".stripMargin)
}
}
}


test("all batches of actions are converted") {
withSQLConf(
DeltaSQLConf.HUDI_MAX_COMMITS_TO_CONVERT.key -> "3"
Expand Down Expand Up @@ -322,6 +372,7 @@ class ConvertToHudiSuite extends QueryTest with Eventually {
s"Files do not match.\nExpected: $expectedFiles\nActual: $paths")
// Assert schemas are equal
val expectedSchema = deltaDF.schema

assert(hudiSchemaAsStruct.equals(expectedSchema),
s"Schemas do not match.\nExpected: $expectedSchema\nActual: $hudiSchemaAsStruct")
}
Expand Down Expand Up @@ -352,7 +403,6 @@ class ConvertToHudiSuite extends QueryTest with Eventually {
.appName("UniformSession")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.delta.schema.SchemaUtils
import org.apache.spark.internal.MDC
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.types.NullType
import org.apache.spark.sql.types.{ByteType, CalendarIntervalType, NullType, ShortType, TimestampNTZType}

/**
* Utils to validate the Universal Format (UniForm) Delta feature (NOT a table feature).
Expand Down Expand Up @@ -103,7 +103,8 @@ object UniversalFormat extends DeltaLogging {
throw DeltaErrors.uniFormHudiDeleteVectorCompat()
}
SchemaUtils.findAnyTypeRecursively(newestMetadata.schema) { f =>
f.isInstanceOf[NullType]
f.isInstanceOf[NullType] | f.isInstanceOf[ByteType] | f.isInstanceOf[ShortType] |
f.isInstanceOf[TimestampNTZType]
} match {
case Some(unsupportedType) =>
throw DeltaErrors.uniFormHudiSchemaCompat(unsupportedType)
Expand Down

0 comments on commit 0e5b856

Please sign in to comment.