Skip to content

Commit

Permalink
Shutdown CloudWatch reporter when query completes
Browse files Browse the repository at this point in the history
Add delta streamer tests

Only shutdown in non-continuous mode
  • Loading branch information
junyuc25 committed Aug 22, 2022
1 parent 3adb571 commit 3fd2cb6
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,15 @@ private Metrics(HoodieWriteConfig metricConfig) {
}
reporter.start();

Runtime.getRuntime().addShutdownHook(new Thread(this::reportAndCloseReporter));
Runtime.getRuntime().addShutdownHook(new Thread(Metrics::shutdown));
}

private void reportAndCloseReporter() {
try {
registerHoodieCommonMetrics();
reporter.report();
if (getReporter() != null) {
LOG.info("Closing metrics reporter...");
getReporter().close();
}
} catch (Exception e) {
Expand Down Expand Up @@ -139,4 +140,8 @@ public MetricRegistry getRegistry() {
public Closeable getReporter() {
return reporter.getReporter();
}

public static boolean isInitialized() {
return initialized;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ class DefaultSource extends RelationProvider
HoodieSparkSqlWriter.bootstrap(sqlContext, mode, optParams, dfWithoutMetaCols)
} else {
HoodieSparkSqlWriter.write(sqlContext, mode, optParams, dfWithoutMetaCols)
HoodieSparkSqlWriter.cleanup()
}
new HoodieEmptyRelation(sqlContext, dfWithoutMetaCols.schema)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
import org.apache.hudi.internal.schema.utils.{AvroSchemaEvolutionUtils, SerDeHelper}
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.metrics.Metrics
import org.apache.hudi.sync.common.HoodieSyncConfig
import org.apache.hudi.sync.common.util.SyncUtilHelpers
import org.apache.hudi.table.BulkInsertPartitioner
Expand Down Expand Up @@ -593,6 +594,10 @@ object HoodieSparkSqlWriter {
(syncHiveSuccess, common.util.Option.ofNullable(instantTime))
}

def cleanup() : Unit = {
Metrics.shutdown()
}

private def handleSaveModes(spark: SparkSession, mode: SaveMode, tablePath: Path, tableConfig: HoodieTableConfig, tableName: String,
operation: WriteOperationType, fs: FileSystem): Unit = {
if (mode == SaveMode.Append && tableExists) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.{HoodieException, HoodieUpsertException}
import org.apache.hudi.keygen._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config
import org.apache.hudi.metrics.Metrics
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.hudi.util.JFunction
import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkUtils}
Expand Down Expand Up @@ -738,6 +739,7 @@ class TestCOWDataSource extends HoodieClientTestBase {
.option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, commitInstantTime1)
.load(basePath)
assertEquals(N + 1, hoodieIncViewDF1.count())
assertEquals(false, Metrics.isInitialized)
}

@Test def testSchemaEvolution(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.metrics.Metrics;
import org.apache.hudi.utilities.HiveIncrementalPuller;
import org.apache.hudi.utilities.IdentitySplitter;
import org.apache.hudi.utilities.UtilHelpers;
Expand Down Expand Up @@ -208,6 +209,7 @@ public void sync() throws Exception {
throw ex;
} finally {
deltaSyncService.ifPresent(DeltaSyncService::close);
Metrics.shutdown();
LOG.info("Shut down delta streamer");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveSyncClient;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.metrics.Metrics;
import org.apache.hudi.utilities.DummySchemaProvider;
import org.apache.hudi.utilities.HoodieClusteringJob;
import org.apache.hudi.utilities.HoodieIndexer;
Expand Down Expand Up @@ -739,30 +740,36 @@ public void testSchemaEvolution(String tableType, boolean useUserProvidedSchema,

@Test
public void testUpsertsCOWContinuousMode() throws Exception {
testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow");
testUpserts(HoodieTableType.COPY_ON_WRITE, "continuous_cow", false, true);
}

@Test
public void testUpsertsCOW_ContinuousModeDisabled() throws Exception {
testUpserts(HoodieTableType.COPY_ON_WRITE, "non_continuous_cow", false, false);
}

@Test
public void testUpsertsCOWContinuousModeShutdownGracefully() throws Exception {
testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow", true);
testUpserts(HoodieTableType.COPY_ON_WRITE, "continuous_cow_shutdown_gracefully", true, true);
}

@Test
public void testUpsertsMORContinuousMode() throws Exception {
testUpsertsContinuousMode(HoodieTableType.MERGE_ON_READ, "continuous_mor");
testUpserts(HoodieTableType.MERGE_ON_READ, "continuous_mor", false, true);
}

private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir) throws Exception {
testUpsertsContinuousMode(tableType, tempDir, false);
@Test
public void testUpsertsMOR_ContinuousModeDisabled() throws Exception {
testUpserts(HoodieTableType.MERGE_ON_READ, "non_continuous_mor", false, false);
}

private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir, boolean testShutdownGracefully) throws Exception {
private void testUpserts(HoodieTableType tableType, String tempDir, boolean testShutdownGracefully, boolean continuousMode) throws Exception {
String tableBasePath = dfsBasePath + "/" + tempDir;
// Keep it higher than batch-size to test continuous mode
int totalRecords = 3000;
// Initial bulk insert
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT);
cfg.continuousMode = true;
cfg.continuousMode = continuousMode;
if (testShutdownGracefully) {
cfg.postWriteTerminationStrategyClass = NoNewDataTerminationStrategy.class.getName();
}
Expand All @@ -782,6 +789,10 @@ private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir
if (testShutdownGracefully) {
TestDataSource.returnEmptyBatch = true;
}

if (!cfg.continuousMode) {
assertFalse(Metrics.isInitialized());
}
return true;
});
}
Expand Down

0 comments on commit 3fd2cb6

Please sign in to comment.