Skip to content

Commit

Permalink
[HUDI-4731] Shutdown CloudWatch reporter when query completes (#6468)
Browse files Browse the repository at this point in the history
  • Loading branch information
junyuc25 committed Sep 7, 2022
1 parent 8ffcb2f commit dbb044b
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,15 @@ private Metrics(HoodieWriteConfig metricConfig) {
}
reporter.start();

Runtime.getRuntime().addShutdownHook(new Thread(this::reportAndCloseReporter));
Runtime.getRuntime().addShutdownHook(new Thread(Metrics::shutdown));
}

private void reportAndCloseReporter() {
try {
registerHoodieCommonMetrics();
reporter.report();
if (getReporter() != null) {
LOG.info("Closing metrics reporter...");
getReporter().close();
}
} catch (Exception e) {
Expand Down Expand Up @@ -139,4 +140,8 @@ public MetricRegistry getRegistry() {
public Closeable getReporter() {
return reporter.getReporter();
}

public static boolean isInitialized() {
return initialized;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ class DefaultSource extends RelationProvider
} else {
HoodieSparkSqlWriter.write(sqlContext, mode, optParams, dfWithoutMetaCols)
}

HoodieSparkSqlWriter.cleanup()
new HoodieEmptyRelation(sqlContext, dfWithoutMetaCols.schema)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
import org.apache.hudi.internal.schema.utils.{AvroSchemaEvolutionUtils, SerDeHelper}
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.metrics.Metrics
import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.sync.common.util.SyncUtilHelpers
import org.apache.hudi.table.BulkInsertPartitioner
Expand Down Expand Up @@ -594,6 +595,10 @@ object HoodieSparkSqlWriter {
(syncHiveSuccess, common.util.Option.ofNullable(instantTime))
}

def cleanup() : Unit = {
Metrics.shutdown()
}

private def handleSaveModes(spark: SparkSession, mode: SaveMode, tablePath: Path, tableConfig: HoodieTableConfig, tableName: String,
operation: WriteOperationType, fs: FileSystem): Unit = {
if (mode == SaveMode.Append && tableExists) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.hudi.functional

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.HoodieConversionUtils.toJavaOption
import org.apache.hudi.QuickstartUtils.{convertToStringList, getQuickstartWriteConfigs}
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.timeline.HoodieInstant
Expand All @@ -28,12 +29,14 @@ import org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrin
import org.apache.hudi.common.util
import org.apache.hudi.common.util.PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.metrics.HoodieMetricsConfig
import org.apache.hudi.exception.{HoodieException, HoodieUpsertException}
import org.apache.hudi.keygen._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
import org.apache.hudi.metrics.Metrics
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.hudi.util.JFunction
import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkUtils}
import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, QuickstartUtils}
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{col, concat, lit, udf}
import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
Expand Down Expand Up @@ -738,6 +741,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, commitInstantTime1)
.load(basePath)
assertEquals(N + 1, hoodieIncViewDF1.count())
assertEquals(false, Metrics.isInitialized)
}

@Test def testSchemaEvolution(): Unit = {
Expand Down Expand Up @@ -1024,4 +1028,26 @@ class TestCOWDataSource extends HoodieClientTestBase {
.saveAsTable("hoodie_test")
assertEquals(spark.read.format("hudi").load(basePath).count(), 9)
}

@Test
def testMetricsReporterViaDataSource(): Unit = {
val dataGenerator = new QuickstartUtils.DataGenerator()
val records = convertToStringList(dataGenerator.generateInserts( 10))
val recordsRDD = spark.sparkContext.parallelize(records, 2)
val inputDF = spark.read.json(sparkSession.createDataset(recordsRDD)(Encoders.STRING))
inputDF.write.format("hudi")
.options(getQuickstartWriteConfigs)
.option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "uuid")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "partitionpath")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD.key, "ts")
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test")
.option(HoodieMetricsConfig.TURN_METRICS_ON.key(), "true")
.option(HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE.key(), "CONSOLE")
.mode(SaveMode.Overwrite)
.save(basePath)

assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000"))
assertEquals(false, Metrics.isInitialized, "Metrics should be shutdown")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.metrics.Metrics;
import org.apache.hudi.utilities.HiveIncrementalPuller;
import org.apache.hudi.utilities.IdentitySplitter;
import org.apache.hudi.utilities.UtilHelpers;
Expand Down Expand Up @@ -208,6 +209,7 @@ public void sync() throws Exception {
throw ex;
} finally {
deltaSyncService.ifPresent(DeltaSyncService::close);
Metrics.shutdown();
LOG.info("Shut down delta streamer");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveSyncClient;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.metrics.Metrics;
import org.apache.hudi.utilities.DummySchemaProvider;
import org.apache.hudi.utilities.HoodieClusteringJob;
import org.apache.hudi.utilities.HoodieIndexer;
Expand Down Expand Up @@ -134,6 +135,8 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.config.metrics.HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE;
import static org.apache.hudi.config.metrics.HoodieMetricsConfig.TURN_METRICS_ON;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS;
import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME;
import static org.apache.hudi.utilities.UtilHelpers.EXECUTE;
Expand Down Expand Up @@ -742,6 +745,20 @@ public void testUpsertsCOWContinuousMode() throws Exception {
testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow");
}

@Test
public void testUpsertsCOW_ContinuousModeDisabled() throws Exception {
String tableBasePath = dfsBasePath + "/non_continuous_cow";
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
cfg.configs.add(String.format("%s=%s", TURN_METRICS_ON.key(), "true"));
cfg.configs.add(String.format("%s=%s", METRICS_REPORTER_TYPE_VALUE.key(), "CONSOLE"));
cfg.continuousMode = false;
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
ds.sync();
TestHelpers.assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext);
assertFalse(Metrics.isInitialized(), "Metrics should be shutdown");
}

@Test
public void testUpsertsCOWContinuousModeShutdownGracefully() throws Exception {
testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow", true);
Expand All @@ -752,6 +769,20 @@ public void testUpsertsMORContinuousMode() throws Exception {
testUpsertsContinuousMode(HoodieTableType.MERGE_ON_READ, "continuous_mor");
}

@Test
public void testUpsertsMOR_ContinuousModeDisabled() throws Exception {
String tableBasePath = dfsBasePath + "/non_continuous_mor";
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
cfg.tableType = HoodieTableType.MERGE_ON_READ.name();
cfg.configs.add(String.format("%s=%s", TURN_METRICS_ON.key(), "true"));
cfg.configs.add(String.format("%s=%s", METRICS_REPORTER_TYPE_VALUE.key(), "CONSOLE"));
cfg.continuousMode = false;
HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
ds.sync();
TestHelpers.assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext);
assertFalse(Metrics.isInitialized(), "Metrics should be shutdown");
}

private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir) throws Exception {
testUpsertsContinuousMode(tableType, tempDir, false);
}
Expand Down

0 comments on commit dbb044b

Please sign in to comment.