Skip to content

Commit

Permalink
Adding tests for replace actions with archival
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan committed Dec 10, 2021
1 parent 7c3f077 commit e50f0d6
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ object DataSourceWriteOptions {
"Use bulkinsert to load new data into a table, and there on use upsert/insert. " +
"bulk insert uses a disk based write path to scale to load large inputs without need to cache it.")


val COW_TABLE_TYPE_OPT_VAL = HoodieTableType.COPY_ON_WRITE.name
val MOR_TABLE_TYPE_OPT_VAL = HoodieTableType.MERGE_ON_READ.name
val TABLE_TYPE: ConfigProperty[String] = ConfigProperty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.hudi.functional

import org.apache.hadoop.fs.FileSystem
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.table.timeline.HoodieInstant
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrings, recordsToStrings}
Expand All @@ -34,7 +34,7 @@ import org.apache.spark.sql.functions.{col, concat, lit, udf}
import org.apache.spark.sql.types._
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue, fail}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue, fail}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{CsvSource, ValueSource}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,23 @@ package org.apache.hudi.functional

import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{col, lit}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.Tag
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.{Tag, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.params.provider.{Arguments, CsvSource, ValueSource}

import java.util
import java.util.Arrays
import java.util.stream.Stream
import scala.collection.JavaConversions._


Expand Down Expand Up @@ -170,4 +175,73 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {
.load(basePath)
assertEquals(100, timeTravelDF.count()) // 100 initial inserts must be pulled
}

@ParameterizedTest
@ValueSource(strings = Array("insert_overwrite", "delete_partition"))
def testArchivalWithReplaceCommitActions(writeOperation: String): Unit = {

val dataGen = new HoodieTestDataGenerator()
// use this to generate records only for certain partitions.
val dataGenPartition1 = new HoodieTestDataGenerator(Array[String](HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH))
val dataGenPartition2 = new HoodieTestDataGenerator(Array[String](HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH))

// do one bulk insert to all partitions
val records = recordsToStrings(dataGen.generateInserts("%05d".format(1), 100)).toList
val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
val partition1RecordCount = inputDF.filter(row => row.getAs("partition_path")
.equals(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).count()
inputDF.write.format("hudi")
.options(commonOpts)
.option("hoodie.keep.min.commits", "2")
.option("hoodie.keep.max.commits", "3")
.option("hoodie.cleaner.commits.retained", "1")
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Overwrite)
.save(basePath)

assertRecordCount(basePath, 100)

// issue delete partition to partition1
writeRecords(2, dataGenPartition1, writeOperation, basePath)

val expectedRecCount = if (writeOperation.equals(DataSourceWriteOptions.INSERT_OVERWRITE_OPERATION_OPT_VAL))
{
200 - partition1RecordCount
} else {
100 - partition1RecordCount
}
assertRecordCount(basePath, expectedRecCount)

// add more data to partition2.
for (i <- 3 to 7) {
writeRecords(i, dataGenPartition2, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL, basePath)
}

assertRecordCount(basePath, expectedRecCount + 500)
val metaClient = HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath)
.setLoadActiveTimelineOnLoad(true).build()
val commits = metaClient.getActiveTimeline.filterCompletedInstants().getInstants.toArray
.map(instant => instant.asInstanceOf[HoodieInstant].getAction)
// assert replace commit is archived and not part of active timeline.
assertFalse(commits.contains(HoodieTimeline.REPLACE_COMMIT_ACTION))
}

def writeRecords(commitTime: Int, dataGen: HoodieTestDataGenerator, writeOperation: String, basePath: String): Unit = {
val records = recordsToStrings(dataGen.generateInserts("%05d".format(commitTime), 100)).toList
val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
inputDF.write.format("hudi")
.options(commonOpts)
.option("hoodie.keep.min.commits", "2")
.option("hoodie.keep.max.commits", "3")
.option("hoodie.cleaner.commits.retained", "1")
.option(DataSourceWriteOptions.OPERATION.key, writeOperation)
.mode(SaveMode.Append)
.save(basePath)
}

def assertRecordCount(basePath: String, expectedRecordCount: Long) : Unit = {
val snapshotDF = spark.read.format("org.apache.hudi")
.load(basePath + "/*/*/*/*")
assertEquals(expectedRecordCount, snapshotDF.count())
}
}

0 comments on commit e50f0d6

Please sign in to comment.