From 3fd2cb61fe8ae7f0b260a517383e6fe37233ec44 Mon Sep 17 00:00:00 2001 From: Junyu Chen <10862251+junyuc25@users.noreply.github.com> Date: Mon, 22 Aug 2022 10:41:36 +0000 Subject: [PATCH] Shutdown CloudWatch reporter when query completes Add delta streamer tests Only shutdown in non-continuous mode --- .../java/org/apache/hudi/metrics/Metrics.java | 7 +++++- .../scala/org/apache/hudi/DefaultSource.scala | 1 + .../apache/hudi/HoodieSparkSqlWriter.scala | 5 ++++ .../hudi/functional/TestCOWDataSource.scala | 2 ++ .../deltastreamer/HoodieDeltaStreamer.java | 2 ++ .../functional/TestHoodieDeltaStreamer.java | 25 +++++++++++++------ 6 files changed, 34 insertions(+), 8 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java index b570f512f371d..d9f22bca01ed4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java @@ -53,7 +53,7 @@ private Metrics(HoodieWriteConfig metricConfig) { } reporter.start(); - Runtime.getRuntime().addShutdownHook(new Thread(this::reportAndCloseReporter)); + Runtime.getRuntime().addShutdownHook(new Thread(Metrics::shutdown)); } private void reportAndCloseReporter() { @@ -61,6 +61,7 @@ private void reportAndCloseReporter() { registerHoodieCommonMetrics(); reporter.report(); if (getReporter() != null) { + LOG.info("Closing metrics reporter..."); getReporter().close(); } } catch (Exception e) { @@ -139,4 +140,8 @@ public MetricRegistry getRegistry() { public Closeable getReporter() { return reporter.getReporter(); } + + public static boolean isInitialized() { + return initialized; + } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala index d5cbf020ed3e5..630d1366bca68 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -181,6 +181,7 @@ class DefaultSource extends RelationProvider HoodieSparkSqlWriter.bootstrap(sqlContext, mode, optParams, dfWithoutMetaCols) } else { HoodieSparkSqlWriter.write(sqlContext, mode, optParams, dfWithoutMetaCols) + HoodieSparkSqlWriter.cleanup() } new HoodieEmptyRelation(sqlContext, dfWithoutMetaCols.schema) } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index c07024810017a..bd898dfa205d4 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -44,6 +44,7 @@ import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter import org.apache.hudi.internal.schema.utils.{AvroSchemaEvolutionUtils, SerDeHelper} import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} +import org.apache.hudi.metrics.Metrics import org.apache.hudi.sync.common.HoodieSyncConfig import org.apache.hudi.sync.common.util.SyncUtilHelpers import org.apache.hudi.table.BulkInsertPartitioner @@ -593,6 +594,10 @@ object HoodieSparkSqlWriter { (syncHiveSuccess, common.util.Option.ofNullable(instantTime)) } + def cleanup() : Unit = { + Metrics.shutdown() + } + private def handleSaveModes(spark: SparkSession, mode: SaveMode, tablePath: Path, tableConfig: HoodieTableConfig, tableName: String, operation: WriteOperationType, fs: FileSystem): Unit = { if (mode == SaveMode.Append && tableExists) { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 6697ec1514cdd..cdfc2d4b5831e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -31,6 +31,7 @@ import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.{HoodieException, HoodieUpsertException} import org.apache.hudi.keygen._ import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config +import org.apache.hudi.metrics.Metrics import org.apache.hudi.testutils.HoodieClientTestBase import org.apache.hudi.util.JFunction import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkUtils} @@ -738,6 +739,7 @@ class TestCOWDataSource extends HoodieClientTestBase { .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, commitInstantTime1) .load(basePath) assertEquals(N + 1, hoodieIncViewDF1.count()) + assertEquals(false, Metrics.isInitialized) } @Test def testSchemaEvolution(): Unit = { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index fe7576cc803c4..68e64f541705c 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -53,6 +53,7 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.hive.HiveSyncTool; +import org.apache.hudi.metrics.Metrics; import org.apache.hudi.utilities.HiveIncrementalPuller; import org.apache.hudi.utilities.IdentitySplitter; import org.apache.hudi.utilities.UtilHelpers; @@ -208,6 +209,7 @@ public void sync() throws Exception { throw ex; } finally { deltaSyncService.ifPresent(DeltaSyncService::close); + Metrics.shutdown(); LOG.info("Shut down delta streamer"); } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 88948b03850ac..26ed2b48ec3a5 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -56,6 +56,7 @@ import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HoodieHiveSyncClient; import org.apache.hudi.keygen.SimpleKeyGenerator; +import org.apache.hudi.metrics.Metrics; import org.apache.hudi.utilities.DummySchemaProvider; import org.apache.hudi.utilities.HoodieClusteringJob; import org.apache.hudi.utilities.HoodieIndexer; @@ -739,30 +740,36 @@ public void testSchemaEvolution(String tableType, boolean useUserProvidedSchema, @Test public void testUpsertsCOWContinuousMode() throws Exception { - testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow"); + testUpserts(HoodieTableType.COPY_ON_WRITE, "continuous_cow", false, true); + } + + @Test + public void testUpsertsCOW_ContinuousModeDisabled() throws Exception { + testUpserts(HoodieTableType.COPY_ON_WRITE, "non_continuous_cow", false, false); } @Test public void testUpsertsCOWContinuousModeShutdownGracefully() throws Exception { - testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow", true); + testUpserts(HoodieTableType.COPY_ON_WRITE, "continuous_cow_shutdown_gracefully", true, true); } @Test public void testUpsertsMORContinuousMode() throws Exception { - testUpsertsContinuousMode(HoodieTableType.MERGE_ON_READ, "continuous_mor"); + testUpserts(HoodieTableType.MERGE_ON_READ, "continuous_mor", false, true); } - private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir) throws Exception { - testUpsertsContinuousMode(tableType, tempDir, false); + @Test + public void testUpsertsMOR_ContinuousModeDisabled() throws Exception { + testUpserts(HoodieTableType.MERGE_ON_READ, "non_continuous_mor", false, false); } - private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir, boolean testShutdownGracefully) throws Exception { + private void testUpserts(HoodieTableType tableType, String tempDir, boolean testShutdownGracefully, boolean continuousMode) throws Exception { String tableBasePath = dfsBasePath + "/" + tempDir; // Keep it higher than batch-size to test continuous mode int totalRecords = 3000; // Initial bulk insert HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT); - cfg.continuousMode = true; + cfg.continuousMode = continuousMode; if (testShutdownGracefully) { cfg.postWriteTerminationStrategyClass = NoNewDataTerminationStrategy.class.getName(); } @@ -782,6 +789,10 @@ private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir if (testShutdownGracefully) { TestDataSource.returnEmptyBatch = true; } + + if (!cfg.continuousMode) { + assertFalse(Metrics.isInitialized()); + } return true; }); }