From 1ba0c71be8f3ba9f0e3f663bc299b60b5360b409 Mon Sep 17 00:00:00 2001 From: Junyu Chen <10862251+junyuc25@users.noreply.github.com> Date: Thu, 25 Aug 2022 15:02:02 +0000 Subject: [PATCH] Add tests for metrics reporter --- .../hudi/functional/TestCOWDataSource.scala | 1 - .../functional/TestHoodieDeltaStreamer.java | 42 ++++++++++++++----- 2 files changed, 31 insertions(+), 12 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 51f0d89cc11fb..b964b9cd84e10 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -1033,7 +1033,6 @@ class TestCOWDataSource extends HoodieClientTestBase { def testInsertMetricsReporterEnabled(): Unit = { val dataGenerator = new QuickstartUtils.DataGenerator() val records = convertToStringList(dataGenerator.generateInserts( 10)) -// println("Printing Records: " + records) val recordsRDD = spark.sparkContext.parallelize(records, 2) val inputDF = spark.read.json(sparkSession.createDataset(recordsRDD)(Encoders.STRING)) inputDF.write.format("hudi") diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index dc91605f35b4d..2f6bd9c15d931 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -135,6 +135,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.config.metrics.HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE; +import static org.apache.hudi.config.metrics.HoodieMetricsConfig.TURN_METRICS_ON; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME; import static org.apache.hudi.utilities.UtilHelpers.EXECUTE; @@ -740,36 +742,58 @@ public void testSchemaEvolution(String tableType, boolean useUserProvidedSchema, @Test public void testUpsertsCOWContinuousMode() throws Exception { - testUpserts(HoodieTableType.COPY_ON_WRITE, "continuous_cow", false, true); + testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow"); } @Test public void testUpsertsCOW_ContinuousModeDisabled() throws Exception { - testUpserts(HoodieTableType.COPY_ON_WRITE, "non_continuous_cow", false, false); + String tableBasePath = dfsBasePath + "/non_continuous_cow"; + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT); + cfg.tableType = HoodieTableType.COPY_ON_WRITE.name(); + cfg.configs.add(String.format("%s=%s", TURN_METRICS_ON.key(), "true")); + cfg.configs.add(String.format("%s=%s", METRICS_REPORTER_TYPE_VALUE.key(), "JMX")); + cfg.continuousMode = false; + HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); + ds.sync(); + TestHelpers.assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext); + assertFalse(Metrics.isInitialized(), "Metrics should be shutdown"); } @Test public void testUpsertsCOWContinuousModeShutdownGracefully() throws Exception { - testUpserts(HoodieTableType.COPY_ON_WRITE, "continuous_cow_shutdown_gracefully", true, true); + testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow", true); } @Test public void testUpsertsMORContinuousMode() throws Exception { - testUpserts(HoodieTableType.MERGE_ON_READ, "continuous_mor", false, true); + testUpsertsContinuousMode(HoodieTableType.MERGE_ON_READ, "continuous_mor"); } @Test public void testUpsertsMOR_ContinuousModeDisabled() throws Exception { - testUpserts(HoodieTableType.MERGE_ON_READ, "non_continuous_mor", false, false); + String tableBasePath = dfsBasePath + "/non_continuous_mor"; + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT); + cfg.tableType = HoodieTableType.MERGE_ON_READ.name(); + cfg.configs.add(String.format("%s=%s", TURN_METRICS_ON.key(), "true")); + cfg.configs.add(String.format("%s=%s", METRICS_REPORTER_TYPE_VALUE.key(), "CONSOLE")); + cfg.continuousMode = false; + HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); + ds.sync(); + TestHelpers.assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext); + assertFalse(Metrics.isInitialized(), "Metrics should be shutdown"); + } + + private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir) throws Exception { + testUpsertsContinuousMode(tableType, tempDir, false); } - private void testUpserts(HoodieTableType tableType, String tempDir, boolean testShutdownGracefully, boolean continuousMode) throws Exception { + private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir, boolean testShutdownGracefully) throws Exception { String tableBasePath = dfsBasePath + "/" + tempDir; // Keep it higher than batch-size to test continuous mode int totalRecords = 3000; // Initial bulk insert HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT); - cfg.continuousMode = continuousMode; + cfg.continuousMode = true; if (testShutdownGracefully) { cfg.postWriteTerminationStrategyClass = NoNewDataTerminationStrategy.class.getName(); } @@ -789,10 +813,6 @@ private void testUpserts(HoodieTableType tableType, String tempDir, boolean test if (testShutdownGracefully) { TestDataSource.returnEmptyBatch = true; } - - if (!cfg.continuousMode) { - assertFalse(Metrics.isInitialized()); - } return true; }); }