Skip to content

Commit

Permalink
[HUDI-2970] Adding tests for archival of replace commit actions (#4268)
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan committed Dec 19, 2021
1 parent 478f9f3 commit 03f71ef
Showing 1 changed file with 83 additions and 3 deletions.
Expand Up @@ -21,18 +21,23 @@ package org.apache.hudi.functional

import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{col, lit}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.Tag
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.{Tag, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.params.provider.{Arguments, CsvSource, ValueSource}

import java.util
import java.util.Arrays
import java.util.stream.Stream
import scala.collection.JavaConversions._


Expand Down Expand Up @@ -170,4 +175,79 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {
.load(basePath)
assertEquals(100, timeTravelDF.count()) // 100 initial inserts must be pulled
}

@ParameterizedTest
@ValueSource(strings = Array("insert_overwrite", "delete_partition"))
def testArchivalWithReplaceCommitActions(writeOperation: String): Unit = {

val dataGen = new HoodieTestDataGenerator()
// use this to generate records only for certain partitions.
val dataGenPartition1 = new HoodieTestDataGenerator(Array[String](HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH))
val dataGenPartition2 = new HoodieTestDataGenerator(Array[String](HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH))

// do one bulk insert to all partitions
val records = recordsToStrings(dataGen.generateInserts("%05d".format(1), 100)).toList
val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
val partition1RecordCount = inputDF.filter(row => row.getAs("partition_path")
.equals(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).count()
inputDF.write.format("hudi")
.options(commonOpts)
.option("hoodie.keep.min.commits", "2")
.option("hoodie.keep.max.commits", "3")
.option("hoodie.cleaner.commits.retained", "1")
.option("hoodie.metadata.enable","false")
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Overwrite)
.save(basePath)

assertRecordCount(basePath, 100)

// issue delete partition to partition1
writeRecords(2, dataGenPartition1, writeOperation, basePath)

val expectedRecCount = if (writeOperation.equals(DataSourceWriteOptions.INSERT_OVERWRITE_OPERATION_OPT_VAL))
{
200 - partition1RecordCount
} else {
100 - partition1RecordCount
}
assertRecordCount(basePath, expectedRecCount)

// add more data to partition2.
for (i <- 3 to 7) {
writeRecords(i, dataGenPartition2, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL, basePath)
}

assertRecordCount(basePath, expectedRecCount + 500)
val metaClient = HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath)
.setLoadActiveTimelineOnLoad(true).build()
val commits = metaClient.getActiveTimeline.filterCompletedInstants().getInstants.toArray
.map(instant => instant.asInstanceOf[HoodieInstant].getAction)
// assert replace commit is archived and not part of active timeline.
assertFalse(commits.contains(HoodieTimeline.REPLACE_COMMIT_ACTION))
// assert that archival timeline has replace commit actions.
val archivedTimeline = metaClient.getArchivedTimeline();
assertTrue(archivedTimeline.getInstants.toArray.map(instant => instant.asInstanceOf[HoodieInstant].getAction)
.filter(action => action.equals(HoodieTimeline.REPLACE_COMMIT_ACTION)).size > 0)
}

def writeRecords(commitTime: Int, dataGen: HoodieTestDataGenerator, writeOperation: String, basePath: String): Unit = {
val records = recordsToStrings(dataGen.generateInserts("%05d".format(commitTime), 100)).toList
val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
inputDF.write.format("hudi")
.options(commonOpts)
.option("hoodie.keep.min.commits", "2")
.option("hoodie.keep.max.commits", "3")
.option("hoodie.cleaner.commits.retained", "1")
.option("hoodie.metadata.enable","false")
.option(DataSourceWriteOptions.OPERATION.key, writeOperation)
.mode(SaveMode.Append)
.save(basePath)
}

def assertRecordCount(basePath: String, expectedRecordCount: Long) : Unit = {
val snapshotDF = spark.read.format("org.apache.hudi")
.load(basePath + "/*/*/*/*")
assertEquals(expectedRecordCount, snapshotDF.count())
}
}

0 comments on commit 03f71ef

Please sign in to comment.