diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java index 954ee3a0e48ae..48c2a920dd8d2 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java @@ -227,6 +227,7 @@ protected boolean isClosed() { @Override protected void abortInFinalizer() { + getS3AStreamStatistics().streamLeaked(); try { close(); } catch (IOException ignored) { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInputStreamLeakage.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInputStreamLeakage.java index 31fe270a8b9e3..0aca12ee337e3 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInputStreamLeakage.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInputStreamLeakage.java @@ -37,7 +37,6 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_LEAKS; import static org.apache.hadoop.test.GenericTestUtils.LogCapturer.captureLogs; @@ -90,10 +89,6 @@ public void setup() throws Exception { @Test public void testFinalizer() throws Throwable { Path path = methodPath(); - // Analytics accelerator currently does not support stream leak detection. This work is tracked - // in https://issues.apache.org/jira/browse/HADOOP-19451 - skipIfAnalyticsAcceleratorEnabled(getConfiguration(), - "Analytics Accelerator currently does not support leak detection"); final S3AFileSystem fs = getFileSystem(); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java index 1eb302a0ca137..e5205d648545e 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java @@ -624,7 +624,7 @@ public static void skipIfAnalyticsAcceleratorEnabled( public static boolean isAnalyticsAcceleratorEnabled(final Configuration conf) { return conf.get(INPUT_STREAM_TYPE, - INPUT_STREAM_TYPE_CLASSIC).equals(INPUT_STREAM_TYPE_ANALYTICS); + INPUT_STREAM_TYPE_ANALYTICS).equals(INPUT_STREAM_TYPE_ANALYTICS); } /** diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/streams/TestStreamFactories.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/streams/TestStreamFactories.java index 76b18c04339c3..4ff00e0ae4436 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/streams/TestStreamFactories.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/streams/TestStreamFactories.java @@ -54,14 +54,14 @@ public class TestStreamFactories extends AbstractHadoopTestBase { /** - * The empty string and "default" both map to the classic stream. + * The empty string and "default" both map to the analytics stream. */ @Test public void testDefaultFactoryCreation() throws Throwable { load("", DEFAULT_STREAM_TYPE, - ClassicObjectInputStreamFactory.class); + AnalyticsStreamFactory.class); load(INPUT_STREAM_TYPE_DEFAULT, DEFAULT_STREAM_TYPE, - ClassicObjectInputStreamFactory.class); + AnalyticsStreamFactory.class); } /** @@ -70,7 +70,7 @@ public void testDefaultFactoryCreation() throws Throwable { @Test public void testClassicFactoryCreation() throws Throwable { final ClassicObjectInputStreamFactory f = - load(INPUT_STREAM_TYPE_CLASSIC, DEFAULT_STREAM_TYPE, + load(INPUT_STREAM_TYPE_CLASSIC, InputStreamType.Classic, ClassicObjectInputStreamFactory.class); final StreamFactoryRequirements requirements = f.factoryRequirements(); assertThat(requirements.requiresFuturePool())