Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-2970] Adding tests for archival of replace commit actions #4268

Merged
merged 2 commits into from Dec 19, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -21,18 +21,23 @@ package org.apache.hudi.functional

import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{col, lit}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.Tag
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.{Tag, Test}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.params.provider.{Arguments, CsvSource, ValueSource}

import java.util
import java.util.Arrays
import java.util.stream.Stream
import scala.collection.JavaConversions._


Expand Down Expand Up @@ -170,4 +175,79 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness {
.load(basePath)
assertEquals(100, timeTravelDF.count()) // 100 initial inserts must be pulled
}

@ParameterizedTest
@ValueSource(strings = Array("insert_overwrite", "delete_partition"))
def testArchivalWithReplaceCommitActions(writeOperation: String): Unit = {

val dataGen = new HoodieTestDataGenerator()
// use this to generate records only for certain partitions.
val dataGenPartition1 = new HoodieTestDataGenerator(Array[String](HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH))
val dataGenPartition2 = new HoodieTestDataGenerator(Array[String](HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH))

// do one bulk insert to all partitions
val records = recordsToStrings(dataGen.generateInserts("%05d".format(1), 100)).toList
val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
val partition1RecordCount = inputDF.filter(row => row.getAs("partition_path")
.equals(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).count()
inputDF.write.format("hudi")
.options(commonOpts)
.option("hoodie.keep.min.commits", "2")
.option("hoodie.keep.max.commits", "3")
.option("hoodie.cleaner.commits.retained", "1")
.option("hoodie.metadata.enable","false")
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
.mode(SaveMode.Overwrite)
.save(basePath)

assertRecordCount(basePath, 100)

// issue delete partition to partition1
writeRecords(2, dataGenPartition1, writeOperation, basePath)

val expectedRecCount = if (writeOperation.equals(DataSourceWriteOptions.INSERT_OVERWRITE_OPERATION_OPT_VAL))
{
200 - partition1RecordCount
} else {
100 - partition1RecordCount
}
assertRecordCount(basePath, expectedRecCount)

// add more data to partition2.
for (i <- 3 to 7) {
writeRecords(i, dataGenPartition2, DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL, basePath)
}

assertRecordCount(basePath, expectedRecCount + 500)
val metaClient = HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(basePath)
.setLoadActiveTimelineOnLoad(true).build()
val commits = metaClient.getActiveTimeline.filterCompletedInstants().getInstants.toArray
.map(instant => instant.asInstanceOf[HoodieInstant].getAction)
// assert replace commit is archived and not part of active timeline.
assertFalse(commits.contains(HoodieTimeline.REPLACE_COMMIT_ACTION))
nsivabalan marked this conversation as resolved.
Show resolved Hide resolved
// assert that archival timeline has replace commit actions.
val archivedTimeline = metaClient.getArchivedTimeline();
assertTrue(archivedTimeline.getInstants.toArray.map(instant => instant.asInstanceOf[HoodieInstant].getAction)
.filter(action => action.equals(HoodieTimeline.REPLACE_COMMIT_ACTION)).size > 0)
}

def writeRecords(commitTime: Int, dataGen: HoodieTestDataGenerator, writeOperation: String, basePath: String): Unit = {
val records = recordsToStrings(dataGen.generateInserts("%05d".format(commitTime), 100)).toList
val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
inputDF.write.format("hudi")
.options(commonOpts)
.option("hoodie.keep.min.commits", "2")
.option("hoodie.keep.max.commits", "3")
.option("hoodie.cleaner.commits.retained", "1")
.option("hoodie.metadata.enable","false")
.option(DataSourceWriteOptions.OPERATION.key, writeOperation)
.mode(SaveMode.Append)
.save(basePath)
}

def assertRecordCount(basePath: String, expectedRecordCount: Long) : Unit = {
val snapshotDF = spark.read.format("org.apache.hudi")
.load(basePath + "/*/*/*/*")
assertEquals(expectedRecordCount, snapshotDF.count())
}
}