Skip to content

Commit

Permalink
Add tests for metrics reporter
Browse files Browse the repository at this point in the history
  • Loading branch information
junyuc25 authored and yihua committed Sep 6, 2022
1 parent d3f7897 commit 1ba0c71
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1033,7 +1033,6 @@ class TestCOWDataSource extends HoodieClientTestBase {
def testInsertMetricsReporterEnabled(): Unit = {
val dataGenerator = new QuickstartUtils.DataGenerator()
val records = convertToStringList(dataGenerator.generateInserts( 10))
// println("Printing Records: " + records)
val recordsRDD = spark.sparkContext.parallelize(records, 2)
val inputDF = spark.read.json(sparkSession.createDataset(recordsRDD)(Encoders.STRING))
inputDF.write.format("hudi")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.config.metrics.HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE;
import static org.apache.hudi.config.metrics.HoodieMetricsConfig.TURN_METRICS_ON;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
import static org.apache.hudi.utilities.UtilHelpers.EXECUTE;
Expand Down Expand Up @@ -740,36 +742,58 @@ public void testSchemaEvolution(String tableType, boolean useUserProvidedSchema,

@Test
public void testUpsertsCOWContinuousMode() throws Exception {
testUpserts(HoodieTableType.COPY_ON_WRITE, "continuous_cow", false, true);
testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow");
}

@Test
public void testUpsertsCOW_ContinuousModeDisabled() throws Exception {
testUpserts(HoodieTableType.COPY_ON_WRITE, "non_continuous_cow", false, false);
String tableBasePath = dfsBasePath + "/non_continuous_cow";
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
cfg.configs.add(String.format("%s=%s", TURN_METRICS_ON.key(), "true"));
cfg.configs.add(String.format("%s=%s", METRICS_REPORTER_TYPE_VALUE.key(), "JMX"));
cfg.continuousMode = false;
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
ds.sync();
TestHelpers.assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext);
assertFalse(Metrics.isInitialized(), "Metrics should be shutdown");
}

@Test
public void testUpsertsCOWContinuousModeShutdownGracefully() throws Exception {
testUpserts(HoodieTableType.COPY_ON_WRITE, "continuous_cow_shutdown_gracefully", true, true);
testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow", true);
}

@Test
public void testUpsertsMORContinuousMode() throws Exception {
testUpserts(HoodieTableType.MERGE_ON_READ, "continuous_mor", false, true);
testUpsertsContinuousMode(HoodieTableType.MERGE_ON_READ, "continuous_mor");
}

@Test
public void testUpsertsMOR_ContinuousModeDisabled() throws Exception {
testUpserts(HoodieTableType.MERGE_ON_READ, "non_continuous_mor", false, false);
String tableBasePath = dfsBasePath + "/non_continuous_mor";
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
cfg.configs.add(String.format("%s=%s", TURN_METRICS_ON.key(), "true"));
cfg.configs.add(String.format("%s=%s", METRICS_REPORTER_TYPE_VALUE.key(), "CONSOLE"));
cfg.continuousMode = false;
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
ds.sync();
TestHelpers.assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext);
assertFalse(Metrics.isInitialized(), "Metrics should be shutdown");
}

private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir) throws Exception {
testUpsertsContinuousMode(tableType, tempDir, false);
}

private void testUpserts(HoodieTableType tableType, String tempDir, boolean testShutdownGracefully, boolean continuousMode) throws Exception {
private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir, boolean testShutdownGracefully) throws Exception {
String tableBasePath = dfsBasePath + "/" + tempDir;
// Keep it higher than batch-size to test continuous mode
int totalRecords = 3000;
// Initial bulk insert
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
cfg.continuousMode = continuousMode;
cfg.continuousMode = true;
if (testShutdownGracefully) {
cfg.postWriteTerminationStrategyClass = NoNewDataTerminationStrategy.class.getName();
}
Expand All @@ -789,10 +813,6 @@ private void testUpserts(HoodieTableType tableType, String tempDir, boolean test
if (testShutdownGracefully) {
TestDataSource.returnEmptyBatch = true;
}

if (!cfg.continuousMode) {
assertFalse(Metrics.isInitialized());
}
return true;
});
}
Expand Down

0 comments on commit 1ba0c71

Please sign in to comment.