diff --git a/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml
index 39a9e51ac8125..11410a24c1ad2 100644
--- a/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml
+++ b/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml
@@ -30,7 +30,7 @@
-
+
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index 0204b21780858..8cb9b0417b6f2 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -21,6 +21,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
import java.time.Duration;
@@ -1580,6 +1581,35 @@ private Constants() {
*/
public static final String AWS_AUTH_CLASS_PREFIX = "com.amazonaws.auth";
+ /**
+ * Input stream type: {@value}.
+ */
+ public static final String INPUT_STREAM_TYPE = "fs.s3a.input.stream.type";
+
+ /**
+ * The classic input stream.
+ */
+ public static final String INPUT_STREAM_TYPE_CLASSIC =
+ InputStreamType.Classic.getName();
+
+ /**
+ * The prefetching input stream: "prefetch".
+ */
+ public static final String INPUT_STREAM_TYPE_PREFETCH = InputStreamType.Prefetch.getName();
+
+ /**
+ * The analytics input stream: "analytics".
+ */
+ public static final String INPUT_STREAM_TYPE_ANALYTICS =
+ InputStreamType.Analytics.getName();
+
+ /**
+ * The default input stream.
+ * Currently {@link #INPUT_STREAM_TYPE_CLASSIC}
+ */
+ public static final String INPUT_STREAM_TYPE_DEFAULT =
+ InputStreamType.DEFAULT_STREAM_TYPE.getName();
+
/**
* Controls whether the prefetching input stream is enabled.
*/
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 24ad025b8e5ff..6d62dd0bf6964 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -51,7 +51,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
-import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
@@ -59,8 +58,6 @@
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.GetBucketLocationRequest;
-import software.amazon.awssdk.services.s3.model.GetObjectRequest;
-import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.HeadBucketRequest;
import software.amazon.awssdk.services.s3.model.HeadBucketResponse;
import software.amazon.awssdk.services.s3.model.MultipartUpload;
@@ -132,6 +129,7 @@
import org.apache.hadoop.fs.s3a.impl.ContextAccessors;
import org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation;
import org.apache.hadoop.fs.s3a.impl.CreateFileBuilder;
+import org.apache.hadoop.fs.s3a.impl.InputStreamCallbacksImpl;
import org.apache.hadoop.fs.s3a.impl.S3AFileSystemOperations;
import org.apache.hadoop.fs.s3a.impl.CSEV1CompatibleS3AFileSystemOperations;
import org.apache.hadoop.fs.s3a.impl.CSEMaterials;
@@ -155,7 +153,9 @@
import org.apache.hadoop.fs.s3a.impl.StoreContextFactory;
import org.apache.hadoop.fs.s3a.impl.UploadContentProviders;
import org.apache.hadoop.fs.s3a.impl.CSEUtils;
-import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream;
+import org.apache.hadoop.fs.s3a.impl.streams.ObjectReadParameters;
+import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStreamCallbacks;
+import org.apache.hadoop.fs.s3a.impl.streams.StreamThreadOptions;
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperations;
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperationsImpl;
import org.apache.hadoop.fs.statistics.DurationTracker;
@@ -164,7 +164,6 @@
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
-import org.apache.hadoop.fs.statistics.StreamStatisticNames;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.fs.store.LogExactlyOnce;
import org.apache.hadoop.fs.store.audit.AuditEntryPoint;
@@ -308,9 +307,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
private String username;
- /**
- * Store back end.
- */
private S3AStore store;
/**
@@ -348,23 +344,16 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
private ExecutorService boundedThreadPool;
private ThreadPoolExecutor unboundedThreadPool;
- // S3 reads are prefetched asynchronously using this future pool.
+ /**
+ * Future pool built on the bounded thread pool.
+ */
private ExecutorServiceFuturePool futurePool;
- // If true, the prefetching input stream is used for reads.
- private boolean prefetchEnabled;
-
// If true, S3SeekableInputStream from Analytics Accelerator for Amazon S3 will be used.
private boolean analyticsAcceleratorEnabled;
private boolean analyticsAcceleratorCRTEnabled;
- // Size in bytes of a single prefetch block.
- private int prefetchBlockSize;
-
- // Size of prefetch queue (in number of blocks).
- private int prefetchBlockCount;
-
private int executorCapacity;
private long multiPartThreshold;
public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
@@ -372,7 +361,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
/** Log to warn of storage class configuration problems. */
private static final LogExactlyOnce STORAGE_CLASS_WARNING = new LogExactlyOnce(LOG);
- private LocalDirAllocator directoryAllocator;
private String cannedACL;
/**
@@ -685,16 +673,6 @@ public void initialize(URI name, Configuration originalConf)
dirOperationsPurgeUploads = conf.getBoolean(DIRECTORY_OPERATIONS_PURGE_UPLOADS,
s3ExpressStore);
- this.prefetchEnabled = conf.getBoolean(PREFETCH_ENABLED_KEY, PREFETCH_ENABLED_DEFAULT);
- long prefetchBlockSizeLong =
- longBytesOption(conf, PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE, 1);
- if (prefetchBlockSizeLong > (long) Integer.MAX_VALUE) {
- throw new IOException("S3A prefatch block size exceeds int limit");
- }
- this.prefetchBlockSize = (int) prefetchBlockSizeLong;
- this.prefetchBlockCount =
- intOption(conf, PREFETCH_BLOCK_COUNT_KEY, PREFETCH_BLOCK_DEFAULT_COUNT, 1);
-
this.analyticsAcceleratorEnabled =
conf.getBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, ANALYTICS_ACCELERATOR_ENABLED_DEFAULT);
this.analyticsAcceleratorCRTEnabled =
@@ -712,8 +690,6 @@ public void initialize(URI name, Configuration originalConf)
// multipart copy and upload are the same; this just makes it explicit
this.isMultipartCopyEnabled = isMultipartUploadEnabled;
- initThreadPools(conf);
-
int listVersion = conf.getInt(LIST_VERSION, DEFAULT_LIST_VERSION);
if (listVersion < 1 || listVersion > 2) {
LOG.warn("Configured fs.s3a.list.version {} is invalid, forcing " +
@@ -825,12 +801,8 @@ public void initialize(URI name, Configuration originalConf)
s3AccessGrantsEnabled = conf.getBoolean(AWS_S3_ACCESS_GRANTS_ENABLED, false);
int rateLimitCapacity = intOption(conf, S3A_IO_RATE_LIMIT, DEFAULT_S3A_IO_RATE_LIMIT, 0);
- // now create the store
+ // now create and initialize the store
store = createS3AStore(clientManager, rateLimitCapacity);
- // the s3 client is created through the store, rather than
- // directly through the client manager.
- // this is to aid mocking.
- s3Client = store.getOrCreateS3Client();
if (this.analyticsAcceleratorEnabled) {
LOG.info("Using S3SeekableInputStream");
@@ -852,6 +824,14 @@ public void initialize(URI name, Configuration originalConf)
seekableInputStreamConfiguration);
}
+ // the s3 client is created through the store, rather than
+ // directly through the client manager.
+ // this is to aid mocking.
+ s3Client = getStore().getOrCreateS3Client();
+
+ // thread pool init requires store to be created
+ initThreadPools();
+
// The filesystem is now ready to perform operations against
// S3
// This initiates a probe against S3 for the bucket existing.
@@ -894,7 +874,7 @@ private S3AFileSystemOperations createFileSystemHandler() {
/**
- * Create the S3AStore instance.
+ * Create and start the S3AStore instance.
* This is protected so that tests can override it.
* @param clientManager client manager
* @param rateLimitCapacity rate limit
@@ -903,7 +883,7 @@ private S3AFileSystemOperations createFileSystemHandler() {
@VisibleForTesting
protected S3AStore createS3AStore(final ClientManager clientManager,
final int rateLimitCapacity) {
- return new S3AStoreBuilder()
+ final S3AStore st = new S3AStoreBuilder()
.withAuditSpanSource(getAuditManager())
.withClientManager(clientManager)
.withDurationTrackerFactory(getDurationTrackerFactory())
@@ -915,6 +895,9 @@ protected S3AStore createS3AStore(final ClientManager clientManager,
.withReadRateLimiter(unlimitedRate())
.withWriteRateLimiter(RateLimitingFactory.create(rateLimitCapacity))
.build();
+ st.init(getConf());
+ st.start();
+ return st;
}
/**
@@ -997,12 +980,18 @@ public Statistics getInstanceStatistics() {
}
/**
- * Initialize the thread pool.
+ * Initialize the thread pools.
+ *
* This must be re-invoked after replacing the S3Client during test
* runs.
+ *
+ * It requires the S3Store to have been instantiated.
* @param conf configuration.
*/
- private void initThreadPools(Configuration conf) {
+ private void initThreadPools() {
+
+ Configuration conf = getConf();
+
final String name = "s3a-transfer-" + getBucket();
int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS);
if (maxThreads < 2) {
@@ -1018,7 +1007,9 @@ private void initThreadPools(Configuration conf) {
TimeUnit.SECONDS,
Duration.ZERO).getSeconds();
- int numPrefetchThreads = this.prefetchEnabled ? this.prefetchBlockCount : 0;
+ final StreamThreadOptions threadRequirements =
+ getStore().threadRequirements();
+ int numPrefetchThreads = threadRequirements.sharedThreads();
int activeTasksForBoundedThreadPool = maxThreads;
int waitingTasksForBoundedThreadPool = maxThreads + totalTasks + numPrefetchThreads;
@@ -1036,7 +1027,8 @@ private void initThreadPools(Configuration conf) {
unboundedThreadPool.allowCoreThreadTimeOut(true);
executorCapacity = intOption(conf,
EXECUTOR_CAPACITY, DEFAULT_EXECUTOR_CAPACITY, 1);
- if (prefetchEnabled) {
+ if (threadRequirements.createFuturePool()) {
+ // create a future pool.
final S3AInputStreamStatistics s3AInputStreamStatistics =
statisticsContext.newInputStreamStatistics();
futurePool = new ExecutorServiceFuturePool(
@@ -1387,6 +1379,15 @@ public FlagSet getPerformanceFlags() {
return performanceFlags;
}
+
+ /**
+ * Get the store for low-level operations.
+ * @return the store the S3A FS is working through.
+ */
+ private S3AStore getStore() {
+ return store;
+ }
+
/**
* Implementation of all operations used by delegation tokens.
*/
@@ -1592,7 +1593,7 @@ public S3Client getAmazonS3Client(String reason) {
@Override
public S3AStore getStore() {
- return store;
+ return S3AFileSystem.this.getStore();
}
/**
@@ -1721,28 +1722,8 @@ public S3AEncryptionMethods getS3EncryptionAlgorithm() {
*/
File createTmpFileForWrite(String pathStr, long size,
Configuration conf) throws IOException {
- initLocalDirAllocatorIfNotInitialized(conf);
- Path path = directoryAllocator.getLocalPathForWrite(pathStr,
- size, conf);
- File dir = new File(path.getParent().toUri().getPath());
- String prefix = path.getName();
- // create a temp file on this directory
- return File.createTempFile(prefix, null, dir);
- }
- /**
- * Initialize dir allocator if not already initialized.
- *
- * @param conf The Configuration object.
- */
- private void initLocalDirAllocatorIfNotInitialized(Configuration conf) {
- if (directoryAllocator == null) {
- synchronized (this) {
- String bufferDir = conf.get(BUFFER_DIR) != null
- ? BUFFER_DIR : HADOOP_TMP_DIR;
- directoryAllocator = new LocalDirAllocator(bufferDir);
- }
- }
+ return getS3AInternals().getStore().createTemporaryFileForWriting(pathStr, size, conf);
}
/**
@@ -1919,8 +1900,6 @@ private FSDataInputStream executeOpen(
final Path path,
final OpenFileSupport.OpenFileInformation fileInformation)
throws IOException {
-
-
// create the input stream statistics before opening
// the file so that the time to prepare to open the file is included.
S3AInputStreamStatistics inputStreamStats =
@@ -1930,114 +1909,49 @@ private FSDataInputStream executeOpen(
final S3AFileStatus fileStatus =
trackDuration(inputStreamStats,
ACTION_FILE_OPENED.getSymbol(), () ->
- extractOrFetchSimpleFileStatus(path, fileInformation));
+ extractOrFetchSimpleFileStatus(path, fileInformation));
S3AReadOpContext readContext = createReadContext(
fileStatus,
auditSpan);
fileInformation.applyOptions(readContext);
LOG.debug("Opening '{}'", readContext);
- if (this.analyticsAcceleratorEnabled) {
- return new FSDataInputStream(
- new S3ASeekableStream(
- this.bucket,
- pathToKey(path),
- s3SeekableInputStreamFactory));
- }
-
- if (this.prefetchEnabled) {
- Configuration configuration = getConf();
- initLocalDirAllocatorIfNotInitialized(configuration);
- return new FSDataInputStream(
- new S3APrefetchingInputStream(
- readContext.build(),
- createObjectAttributes(path, fileStatus),
- createInputStreamCallbacks(auditSpan),
- inputStreamStats,
- configuration,
- directoryAllocator));
- } else {
- return new FSDataInputStream(
- new S3AInputStream(
- readContext.build(),
- createObjectAttributes(path, fileStatus),
- createInputStreamCallbacks(auditSpan),
- inputStreamStats,
- new SemaphoredDelegatingExecutor(
- boundedThreadPool,
- vectoredActiveRangeReads,
- true,
- inputStreamStats)));
- }
- }
+ // what does the stream need
+ final StreamThreadOptions requirements =
+ getStore().threadRequirements();
+
+ // calculate the permit count.
+ final int permitCount = requirements.streamThreads() +
+ (requirements.vectorSupported()
+ ? vectoredActiveRangeReads
+ : 0);
+ // create an executor which is a subset of the
+ // bounded thread pool.
+ final SemaphoredDelegatingExecutor pool = new SemaphoredDelegatingExecutor(
+ boundedThreadPool,
+ permitCount,
+ true,
+ inputStreamStats);
+
+ // do not validate() the parameters as the store
+ // completes this.
+ ObjectReadParameters parameters = new ObjectReadParameters()
+ .withBoundedThreadPool(pool)
+ .withCallbacks(createInputStreamCallbacks(auditSpan))
+ .withContext(readContext.build())
+ .withObjectAttributes(createObjectAttributes(path, fileStatus))
+ .withStreamStatistics(inputStreamStats);
+ return new FSDataInputStream(getStore().readObject(parameters));
- /**
- * Override point: create the callbacks for S3AInputStream.
- * @return an implementation of the InputStreamCallbacks,
- */
- private S3AInputStream.InputStreamCallbacks createInputStreamCallbacks(
- final AuditSpan auditSpan) {
- return new InputStreamCallbacksImpl(auditSpan);
}
/**
- * Operations needed by S3AInputStream to read data.
+ * Override point: create the callbacks for ObjectInputStream.
+ * @return an implementation of callbacks,
*/
- private final class InputStreamCallbacksImpl implements
- S3AInputStream.InputStreamCallbacks {
-
- /**
- * Audit span to activate before each call.
- */
- private final AuditSpan auditSpan;
-
- /**
- * Create.
- * @param auditSpan Audit span to activate before each call.
- */
- private InputStreamCallbacksImpl(final AuditSpan auditSpan) {
- this.auditSpan = requireNonNull(auditSpan);
- }
-
- /**
- * Closes the audit span.
- */
- @Override
- public void close() {
- auditSpan.close();
- }
-
- @Override
- public GetObjectRequest.Builder newGetRequestBuilder(final String key) {
- // active the audit span used for the operation
- try (AuditSpan span = auditSpan.activate()) {
- return getRequestFactory().newGetObjectRequestBuilder(key);
- }
- }
-
- @Override
- public ResponseInputStream getObject(GetObjectRequest request) throws
- IOException {
- // active the audit span used for the operation
- try (AuditSpan span = auditSpan.activate()) {
- return fsHandler.getObject(store, request, getRequestFactory());
- }
- }
-
- @Override
- public CompletableFuture submit(final CallableRaisingIOE operation) {
- CompletableFuture result = new CompletableFuture<>();
- unboundedThreadPool.submit(() ->
- LambdaUtils.eval(result, () -> {
- LOG.debug("Starting submitted operation in {}", auditSpan.getSpanId());
- try (AuditSpan span = auditSpan.activate()) {
- return operation.apply();
- } finally {
- LOG.debug("Completed submitted operation in {}", auditSpan.getSpanId());
- }
- }));
- return result;
- }
+ private ObjectInputStreamCallbacks createInputStreamCallbacks(
+ final AuditSpan auditSpan) {
+ return new InputStreamCallbacksImpl(auditSpan, getStore(), fsHandler, unboundedThreadPool);
}
/**
@@ -2050,7 +1964,7 @@ private final class WriteOperationHelperCallbacksImpl
@Retries.OnceRaw
public CompleteMultipartUploadResponse completeMultipartUpload(
CompleteMultipartUploadRequest request) {
- return store.completeMultipartUpload(request);
+ return getStore().completeMultipartUpload(request);
}
@Override
@@ -2060,7 +1974,7 @@ public UploadPartResponse uploadPart(
final RequestBody body,
final DurationTrackerFactory durationTrackerFactory)
throws AwsServiceException, UncheckedIOException {
- return store.uploadPart(request, body, durationTrackerFactory);
+ return getStore().uploadPart(request, body, durationTrackerFactory);
}
}
@@ -2084,9 +1998,8 @@ protected S3AReadOpContext createReadContext(
fileStatus,
vectoredIOContext,
IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator(),
- futurePool,
- prefetchBlockSize,
- prefetchBlockCount)
+ futurePool
+ )
.withAuditSpan(auditSpan);
openFileHelper.applyDefaultOptions(roc);
return roc.build();
@@ -2824,7 +2737,7 @@ public long getDefaultBlockSize(Path path) {
*/
@Override
public long getObjectSize(S3Object s3Object) throws IOException {
- return fsHandler.getS3ObjectSize(s3Object.key(), s3Object.size(), store, null);
+ return fsHandler.getS3ObjectSize(s3Object.key(), s3Object.size(), getStore(), null);
}
@Override
@@ -3055,7 +2968,7 @@ protected DurationTrackerFactory getDurationTrackerFactory() {
*/
protected DurationTrackerFactory nonNullDurationTrackerFactory(
DurationTrackerFactory factory) {
- return store.nonNullDurationTrackerFactory(factory);
+ return getStore().nonNullDurationTrackerFactory(factory);
}
/**
@@ -3093,7 +3006,7 @@ protected HeadObjectResponse getObjectMetadata(String key,
ChangeTracker changeTracker,
Invoker changeInvoker,
String operation) throws IOException {
- return store.headObject(key, changeTracker, changeInvoker, fsHandler, operation);
+ return getStore().headObject(key, changeTracker, changeInvoker, fsHandler, operation);
}
/**
@@ -3241,7 +3154,7 @@ public void incrementWriteOperations() {
protected void deleteObject(String key)
throws SdkException, IOException {
incrementWriteOperations();
- store.deleteObject(getRequestFactory()
+ getStore().deleteObject(getRequestFactory()
.newDeleteObjectRequestBuilder(key)
.build());
}
@@ -3295,7 +3208,7 @@ void deleteObjectAtPath(Path f,
private DeleteObjectsResponse deleteObjects(DeleteObjectsRequest deleteRequest)
throws MultiObjectDeleteException, SdkException, IOException {
incrementWriteOperations();
- DeleteObjectsResponse response = store.deleteObjects(deleteRequest).getValue();
+ DeleteObjectsResponse response = getStore().deleteObjects(deleteRequest).getValue();
if (!response.errors().isEmpty()) {
throw new MultiObjectDeleteException(response.errors());
}
@@ -3338,7 +3251,7 @@ public PutObjectRequest.Builder newPutObjectRequestBuilder(String key,
@Retries.OnceRaw
public UploadInfo putObject(PutObjectRequest putObjectRequest, File file,
ProgressableProgressListener listener) throws IOException {
- return store.putObject(putObjectRequest, file, listener);
+ return getStore().putObject(putObjectRequest, file, listener);
}
/**
@@ -3437,7 +3350,7 @@ UploadPartResponse uploadPart(UploadPartRequest request, RequestBody body,
* @param bytes bytes in the request.
*/
protected void incrementPutStartStatistics(long bytes) {
- store.incrementPutStartStatistics(bytes);
+ getStore().incrementPutStartStatistics(bytes);
}
/**
@@ -3448,7 +3361,7 @@ protected void incrementPutStartStatistics(long bytes) {
* @param bytes bytes in the request.
*/
protected void incrementPutCompletedStatistics(boolean success, long bytes) {
- store.incrementPutCompletedStatistics(success, bytes);
+ getStore().incrementPutCompletedStatistics(success, bytes);
}
/**
@@ -3459,7 +3372,7 @@ protected void incrementPutCompletedStatistics(boolean success, long bytes) {
* @param bytes bytes successfully uploaded.
*/
protected void incrementPutProgressStatistics(String key, long bytes) {
- store.incrementPutProgressStatistics(key, bytes);
+ getStore().incrementPutProgressStatistics(key, bytes);
}
/**
@@ -4327,7 +4240,7 @@ PutObjectResponse executePut(
ProgressableProgressListener listener =
new ProgressableProgressListener(store, key, progress);
UploadInfo info = putObject(putObjectRequest, file, listener);
- PutObjectResponse result = store.waitForUploadCompletion(key, info).response();
+ PutObjectResponse result = getStore().waitForUploadCompletion(key, info).response();
listener.uploadCompleted(info.getFileUpload());
return result;
}
@@ -4422,7 +4335,7 @@ public void close() throws IOException {
protected synchronized void stopAllServices() {
try {
trackDuration(getDurationTrackerFactory(), FILESYSTEM_CLOSE.getSymbol(), () -> {
- closeAutocloseables(LOG, store, s3SeekableInputStreamFactory);
+ closeAutocloseables(LOG, getStore());
store = null;
s3Client = null;
s3AsyncClient = null;
@@ -4430,16 +4343,19 @@ protected synchronized void stopAllServices() {
// At this point the S3A client is shut down,
// now the executor pools are closed
+
+ // shut future pool first as it wraps the bounded thread pool
+ if (futurePool != null) {
+ futurePool.shutdown(LOG, THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
+ futurePool = null;
+ }
HadoopExecutors.shutdown(boundedThreadPool, LOG,
THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
boundedThreadPool = null;
HadoopExecutors.shutdown(unboundedThreadPool, LOG,
THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
unboundedThreadPool = null;
- if (futurePool != null) {
- futurePool.shutdown(LOG, THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
- futurePool = null;
- }
+
// other services are shutdown.
cleanupWithLogger(LOG,
delegationTokens.orElse(null),
@@ -4645,7 +4561,7 @@ private CopyObjectResponse copyFile(String srcKey, String dstKey, long size,
() -> {
incrementStatistic(OBJECT_COPY_REQUESTS);
- Copy copy = store.getOrCreateTransferManager().copy(
+ Copy copy = getStore().getOrCreateTransferManager().copy(
CopyRequest.builder()
.copyObjectRequest(copyRequest)
.build());
@@ -5504,15 +5420,17 @@ public boolean hasPathCapability(final Path path, final String capability)
case AWS_S3_ACCESS_GRANTS_ENABLED:
return s3AccessGrantsEnabled;
- // stream leak detection.
- case StreamStatisticNames.STREAM_LEAKS:
- return !prefetchEnabled;
-
default:
// is it a performance flag?
if (performanceFlags.hasCapability(capability)) {
return true;
}
+
+ // ask the store for what input stream capabilities it offers
+ if (getStore() != null && getStore().hasCapability(capability)) {
+ return true;
+ }
+
// fall through
}
@@ -5773,7 +5691,7 @@ public BulkDelete createBulkDelete(final Path path)
*/
protected BulkDeleteOperation.BulkDeleteOperationCallbacks createBulkDeleteCallbacks(
Path path, int pageSize, AuditSpanS3A span) {
- return new BulkDeleteOperationCallbacksImpl(store, pathToKey(path), pageSize, span);
+ return new BulkDeleteOperationCallbacksImpl(getStore(), pathToKey(path), pageSize, span);
}
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
index c620ca042dc82..859eb8dfa0a4a 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.fs.s3a;
import javax.annotation.Nullable;
-import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
@@ -30,7 +29,6 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntFunction;
@@ -41,7 +39,8 @@
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.impl.LeakReporter;
-import org.apache.hadoop.fs.statistics.StreamStatisticNames;
+import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream;
+import org.apache.hadoop.fs.s3a.impl.streams.ObjectReadParameters;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -49,7 +48,6 @@
import org.apache.hadoop.fs.CanSetReadahead;
import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.FSExceptionMessages;
-import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.impl.CombinedFileRange;
@@ -57,17 +55,11 @@
import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
import org.apache.hadoop.fs.s3a.impl.InternalConstants;
import org.apache.hadoop.fs.s3a.impl.SDKStreamDrainer;
-import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.statistics.DurationTracker;
-import org.apache.hadoop.fs.statistics.IOStatistics;
-import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.util.functional.CallableRaisingIOE;
-import static java.util.Objects.requireNonNull;
-import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.hadoop.fs.VectoredReadUtils.isOrderedDisjoint;
import static org.apache.hadoop.fs.VectoredReadUtils.mergeSortedRanges;
import static org.apache.hadoop.fs.VectoredReadUtils.validateAndSortRanges;
@@ -94,7 +86,7 @@
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-public class S3AInputStream extends FSInputStream implements CanSetReadahead,
+public class S3AInputStream extends ObjectInputStream implements CanSetReadahead,
CanUnbuffer, StreamCapabilities, IOStatisticsSource {
public static final String E_NEGATIVE_READAHEAD_VALUE
@@ -134,6 +126,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
* and returned in {@link #getPos()}.
*/
private long pos;
+
/**
* Closed bit. Volatile so reads are non-blocking.
* Updates must be in a synchronized block to guarantee an atomic check and
@@ -144,30 +137,12 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
* Input stream returned by a getObject call.
*/
private ResponseInputStream wrappedStream;
- private final S3AReadOpContext context;
- private final InputStreamCallbacks client;
-
- /**
- * Thread pool used for vectored IO operation.
- */
- private final ExecutorService boundedThreadPool;
- private final String bucket;
- private final String key;
- private final String pathStr;
-
- /**
- * Content length from HEAD or openFile option.
- */
- private final long contentLength;
/**
* Content length in format for vector IO.
*/
private final Optional fileLength;
- private final String uri;
- private final S3AInputStreamStatistics streamStatistics;
- private S3AInputPolicy inputPolicy;
private long readahead = Constants.DEFAULT_READAHEAD_RANGE;
/** Vectored IO context. */
@@ -193,96 +168,33 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
/** change tracker. */
private final ChangeTracker changeTracker;
- /**
- * IOStatistics report.
- */
- private final IOStatistics ioStatistics;
-
/**
* Threshold for stream reads to switch to
* asynchronous draining.
*/
- private long asyncDrainThreshold;
-
- /** Aggregator used to aggregate per thread IOStatistics. */
- private final IOStatisticsAggregator threadIOStatistics;
-
- /**
- * Report of leaks.
- * with report and abort unclosed streams in finalize().
- */
- private final LeakReporter leakReporter;
+ private final long asyncDrainThreshold;
/**
* Create the stream.
* This does not attempt to open it; that is only done on the first
* actual read() operation.
- * @param ctx operation context
- * @param s3Attributes object attributes
- * @param client S3 client to use
- * @param streamStatistics stream io stats.
- * @param boundedThreadPool thread pool to use.
- */
- public S3AInputStream(S3AReadOpContext ctx,
- S3ObjectAttributes s3Attributes,
- InputStreamCallbacks client,
- S3AInputStreamStatistics streamStatistics,
- ExecutorService boundedThreadPool) {
- Preconditions.checkArgument(isNotEmpty(s3Attributes.getBucket()),
- "No Bucket");
- Preconditions.checkArgument(isNotEmpty(s3Attributes.getKey()), "No Key");
- long l = s3Attributes.getLen();
- Preconditions.checkArgument(l >= 0, "Negative content length");
- this.context = ctx;
- this.bucket = s3Attributes.getBucket();
- this.key = s3Attributes.getKey();
- this.pathStr = s3Attributes.getPath().toString();
- this.contentLength = l;
- this.fileLength = Optional.of(contentLength);
- this.client = client;
- this.uri = "s3a://" + this.bucket + "/" + this.key;
- this.streamStatistics = streamStatistics;
- this.ioStatistics = streamStatistics.getIOStatistics();
- this.changeTracker = new ChangeTracker(uri,
- ctx.getChangeDetectionPolicy(),
- streamStatistics.getChangeTrackerStatistics(),
- s3Attributes);
- setInputPolicy(ctx.getInputPolicy());
- setReadahead(ctx.getReadahead());
- this.asyncDrainThreshold = ctx.getAsyncDrainThreshold();
- this.boundedThreadPool = boundedThreadPool;
- this.vectoredIOContext = context.getVectoredIOContext();
- this.threadIOStatistics = requireNonNull(ctx.getIOStatisticsAggregator());
- // build the leak reporter
- this.leakReporter = new LeakReporter(
- "Stream not closed while reading " + uri,
- this::isStreamOpen,
- () -> abortInFinalizer());
- }
-
- /**
- * Finalizer.
- *
- * Verify that the inner stream is closed.
- *
- * If it is not, it means streams are being leaked in application code.
- * Log a warning, including the stack trace of the caller,
- * then abort the stream.
- *
- * This does not attempt to invoke {@link #close()} as that is
- * a more complex operation, and this method is being executed
- * during a GC finalization phase.
- *
- * Applications MUST close their streams; this is a defensive
- * operation to return http connections and warn the end users
- * that their applications are at risk of running out of connections.
*
- * {@inheritDoc}
+ * @param parameters creation parameters.
*/
- @Override
- protected void finalize() throws Throwable {
- leakReporter.close();
- super.finalize();
+ public S3AInputStream(ObjectReadParameters parameters) {
+
+ super(parameters);
+
+
+ this.fileLength = Optional.of(getContentLength());
+ S3AReadOpContext context = getContext();
+ this.changeTracker = new ChangeTracker(getUri(),
+ context.getChangeDetectionPolicy(),
+ getS3AStreamStatistics().getChangeTrackerStatistics(),
+ getObjectAttributes());
+ setReadahead(context.getReadahead());
+ this.asyncDrainThreshold = context.getAsyncDrainThreshold();
+ this.vectoredIOContext = this.getContext().getVectoredIOContext();
}
/**
@@ -290,7 +202,8 @@ protected void finalize() throws Throwable {
* Not synchronized; the flag is volatile.
* @return true if the stream is still open.
*/
- private boolean isStreamOpen() {
+ @Override
+ protected boolean isStreamOpen() {
return !closed;
}
@@ -298,10 +211,11 @@ private boolean isStreamOpen() {
* Brute force stream close; invoked by {@link LeakReporter}.
* All exceptions raised are ignored.
*/
- private void abortInFinalizer() {
+ @Override
+ protected void abortInFinalizer() {
try {
// stream was leaked: update statistic
- streamStatistics.streamLeaked();
+ getS3AStreamStatistics().streamLeaked();
// abort the stream. This merges statistics into the filesystem.
closeStream("finalize()", true, true).get();
} catch (InterruptedException | ExecutionException ignroed) {
@@ -309,32 +223,12 @@ private void abortInFinalizer() {
}
}
- /**
- * Set/update the input policy of the stream.
- * This updates the stream statistics.
- * @param inputPolicy new input policy.
- */
- private void setInputPolicy(S3AInputPolicy inputPolicy) {
- LOG.debug("Switching to input policy {}", inputPolicy);
- this.inputPolicy = inputPolicy;
- streamStatistics.inputPolicySet(inputPolicy.ordinal());
- }
-
- /**
- * Get the current input policy.
- * @return input policy.
- */
- @VisibleForTesting
- public S3AInputPolicy getInputPolicy() {
- return inputPolicy;
- }
-
/**
* If the stream is in Adaptive mode, switch to random IO at this
* point. Unsynchronized.
*/
private void maybeSwitchToRandomIO() {
- if (inputPolicy.isAdaptive()) {
+ if (getInputPolicy().isAdaptive()) {
setInputPolicy(S3AInputPolicy.Random);
}
}
@@ -355,24 +249,24 @@ private synchronized void reopen(String reason, long targetPos, long length,
closeStream("reopen(" + reason + ")", forceAbort, false);
}
- contentRangeFinish = calculateRequestLimit(inputPolicy, targetPos,
- length, contentLength, readahead);
+ contentRangeFinish = calculateRequestLimit(getInputPolicy(), targetPos,
+ length, getContentLength(), readahead);
LOG.debug("reopen({}) for {} range[{}-{}], length={}," +
" streamPosition={}, nextReadPosition={}, policy={}",
- uri, reason, targetPos, contentRangeFinish, length, pos, nextReadPos,
- inputPolicy);
+ getUri(), reason, targetPos, contentRangeFinish, length, pos, nextReadPos,
+ getInputPolicy());
- GetObjectRequest request = client.newGetRequestBuilder(key)
+ GetObjectRequest request = getCallbacks().newGetRequestBuilder(getKey())
.range(S3AUtils.formatRange(targetPos, contentRangeFinish - 1))
.applyMutation(changeTracker::maybeApplyConstraint)
.build();
- long opencount = streamStatistics.streamOpened();
+ long opencount = getS3AStreamStatistics().streamOpened();
String operation = opencount == 0 ? OPERATION_OPEN : OPERATION_REOPEN;
String text = String.format("%s %s at %d",
- operation, uri, targetPos);
- wrappedStream = onceTrackingDuration(text, uri,
- streamStatistics.initiateGetRequest(), () ->
- client.getObject(request));
+ operation, getUri(), targetPos);
+ wrappedStream = onceTrackingDuration(text, getUri(),
+ getS3AStreamStatistics().initiateGetRequest(), () ->
+ getCallbacks().getObject(request));
changeTracker.processResponse(wrappedStream.response(), operation,
targetPos);
@@ -396,7 +290,7 @@ public synchronized void seek(long targetPos) throws IOException {
+ " " + targetPos);
}
- if (this.contentLength <= 0) {
+ if (this.getContentLength() <= 0) {
return;
}
@@ -414,7 +308,7 @@ private void seekQuietly(long positiveTargetPos) {
seek(positiveTargetPos);
} catch (IOException ioe) {
LOG.debug("Ignoring IOE on seek of {} to {}",
- uri, positiveTargetPos, ioe);
+ getUri(), positiveTargetPos, ioe);
}
}
@@ -449,12 +343,12 @@ private void seekInStream(long targetPos, long length) throws IOException {
&& diff < forwardSeekLimit;
if (skipForward) {
// the forward seek range is within the limits
- LOG.debug("Forward seek on {}, of {} bytes", uri, diff);
+ LOG.debug("Forward seek on {}, of {} bytes", getUri(), diff);
long skipped = wrappedStream.skip(diff);
if (skipped > 0) {
pos += skipped;
}
- streamStatistics.seekForwards(diff, skipped);
+ getS3AStreamStatistics().seekForwards(diff, skipped);
if (pos == targetPos) {
// all is well
@@ -464,15 +358,15 @@ private void seekInStream(long targetPos, long length) throws IOException {
} else {
// log a warning; continue to attempt to re-open
LOG.warn("Failed to seek on {} to {}. Current position {}",
- uri, targetPos, pos);
+ getUri(), targetPos, pos);
}
} else {
// not attempting to read any bytes from the stream
- streamStatistics.seekForwards(diff, 0);
+ getS3AStreamStatistics().seekForwards(diff, 0);
}
} else if (diff < 0) {
// backwards seek
- streamStatistics.seekBackwards(diff);
+ getS3AStreamStatistics().seekBackwards(diff);
// if the stream is in "Normal" mode, switch to random IO at this
// point, as it is indicative of columnar format IO
maybeSwitchToRandomIO();
@@ -513,8 +407,8 @@ public boolean seekToNewSource(long targetPos) throws IOException {
@Retries.RetryTranslated
private void lazySeek(long targetPos, long len) throws IOException {
- Invoker invoker = context.getReadInvoker();
- invoker.retry("lazySeek to " + targetPos, pathStr, true,
+ Invoker invoker = getContext().getReadInvoker();
+ invoker.retry("lazySeek to " + targetPos, getPathStr(), true,
() -> {
//For lazy seek
seekInStream(targetPos, len);
@@ -532,9 +426,9 @@ private void lazySeek(long targetPos, long len) throws IOException {
* @param bytesRead number of bytes read
*/
private void incrementBytesRead(long bytesRead) {
- streamStatistics.bytesRead(bytesRead);
- if (context.stats != null && bytesRead > 0) {
- context.stats.incrementBytesRead(bytesRead);
+ getS3AStreamStatistics().bytesRead(bytesRead);
+ if (getContext().stats != null && bytesRead > 0) {
+ getContext().stats.incrementBytesRead(bytesRead);
}
}
@@ -542,7 +436,7 @@ private void incrementBytesRead(long bytesRead) {
@Retries.RetryTranslated
public synchronized int read() throws IOException {
checkNotClosed();
- if (this.contentLength == 0 || (nextReadPos >= contentLength)) {
+ if (this.getContentLength() == 0 || (nextReadPos >= getContentLength())) {
return -1;
}
@@ -554,8 +448,8 @@ public synchronized int read() throws IOException {
return -1;
}
- Invoker invoker = context.getReadInvoker();
- int byteRead = invoker.retry("read", pathStr, true,
+ Invoker invoker = getContext().getReadInvoker();
+ int byteRead = invoker.retry("read", getPathStr(), true,
() -> {
int b;
// When exception happens before re-setting wrappedStream in "reopen" called
@@ -597,13 +491,13 @@ private void onReadFailure(IOException ioe, boolean forceAbort) {
if (LOG.isDebugEnabled()) {
LOG.debug("Got exception while trying to read from stream {}, " +
"client: {} object: {}, trying to recover: ",
- uri, client, objectResponse, ioe);
+ getUri(), getCallbacks(), objectResponse, ioe);
} else {
LOG.info("Got exception while trying to read from stream {}, " +
"client: {} object: {}, trying to recover: " + ioe,
- uri, client, objectResponse);
+ getUri(), getCallbacks(), objectResponse);
}
- streamStatistics.readException();
+ getS3AStreamStatistics().readException();
closeStream("failure recovery", forceAbort, false);
}
@@ -638,7 +532,7 @@ public synchronized int read(byte[] buf, int off, int len)
return 0;
}
- if (this.contentLength == 0 || (nextReadPos >= contentLength)) {
+ if (this.getContentLength() == 0 || (nextReadPos >= getContentLength())) {
return -1;
}
@@ -649,10 +543,10 @@ public synchronized int read(byte[] buf, int off, int len)
return -1;
}
- Invoker invoker = context.getReadInvoker();
+ Invoker invoker = getContext().getReadInvoker();
- streamStatistics.readOperationStarted(nextReadPos, len);
- int bytesRead = invoker.retry("read", pathStr, true,
+ getS3AStreamStatistics().readOperationStarted(nextReadPos, len);
+ int bytesRead = invoker.retry("read", getPathStr(), true,
() -> {
int bytes;
// When exception happens before re-setting wrappedStream in "reopen" called
@@ -685,7 +579,7 @@ public synchronized int read(byte[] buf, int off, int len)
} else {
streamReadResultNegative();
}
- streamStatistics.readOperationCompleted(len, bytesRead);
+ getS3AStreamStatistics().readOperationCompleted(len, bytesRead);
return bytesRead;
}
@@ -696,7 +590,7 @@ public synchronized int read(byte[] buf, int off, int len)
*/
private void checkNotClosed() throws IOException {
if (closed) {
- throw new IOException(uri + ": " + FSExceptionMessages.STREAM_IS_CLOSED);
+ throw new IOException(getUri() + ": " + FSExceptionMessages.STREAM_IS_CLOSED);
}
}
@@ -717,28 +611,14 @@ public synchronized void close() throws IOException {
// close or abort the stream; blocking
closeStream("close() operation", false, true);
// end the client+audit span.
- client.close();
- // this is actually a no-op
- super.close();
+ getCallbacks().close();
+
} finally {
- // merge the statistics back into the FS statistics.
- streamStatistics.close();
- // Collect ThreadLevel IOStats
- mergeThreadIOStatistics(streamStatistics.getIOStatistics());
+ super.close();
}
}
}
- /**
- * Merging the current thread's IOStatistics with the current IOStatistics
- * context.
- *
- * @param streamIOStats Stream statistics to be merged into thread
- * statistics aggregator.
- */
- private void mergeThreadIOStatistics(IOStatistics streamIOStats) {
- threadIOStatistics.aggregate(streamIOStats);
- }
/**
* Close a stream: decide whether to abort or close, based on
@@ -776,11 +656,11 @@ private CompletableFuture closeStream(
boolean shouldAbort = forceAbort || remaining > readahead;
CompletableFuture operation;
SDKStreamDrainer> drainer = new SDKStreamDrainer<>(
- uri,
+ getUri(),
wrappedStream,
shouldAbort,
(int) remaining,
- streamStatistics,
+ getS3AStreamStatistics(),
reason);
if (blocking || shouldAbort || remaining <= asyncDrainThreshold) {
@@ -792,7 +672,7 @@ private CompletableFuture closeStream(
} else {
LOG.debug("initiating asynchronous drain of {} bytes", remaining);
// schedule an async drain/abort
- operation = client.submit(drainer);
+ operation = getCallbacks().submit(drainer);
}
// either the stream is closed in the blocking call or the async call is
@@ -817,7 +697,7 @@ private CompletableFuture closeStream(
@InterfaceStability.Unstable
public synchronized boolean resetConnection() throws IOException {
checkNotClosed();
- LOG.info("Forcing reset of connection to {}", uri);
+ LOG.info("Forcing reset of connection to {}", getUri());
return awaitFuture(closeStream("reset()", true, true));
}
@@ -839,7 +719,7 @@ public synchronized int available() throws IOException {
@InterfaceAudience.Private
@InterfaceStability.Unstable
public synchronized long remainingInFile() {
- return this.contentLength - this.pos;
+ return this.getContentLength() - this.pos;
}
/**
@@ -879,17 +759,17 @@ public boolean markSupported() {
@Override
@InterfaceStability.Unstable
public String toString() {
- String s = streamStatistics.toString();
+ String s = getS3AStreamStatistics().toString();
synchronized (this) {
final StringBuilder sb = new StringBuilder(
"S3AInputStream{");
- sb.append(uri);
+ sb.append(getUri());
sb.append(" wrappedStream=")
.append(isObjectStreamOpen() ? "open" : "closed");
- sb.append(" read policy=").append(inputPolicy);
+ sb.append(" read policy=").append(getInputPolicy());
sb.append(" pos=").append(pos);
sb.append(" nextReadPos=").append(nextReadPos);
- sb.append(" contentLength=").append(contentLength);
+ sb.append(" contentLength=").append(getContentLength());
sb.append(" contentRangeStart=").append(contentRangeStart);
sb.append(" contentRangeFinish=").append(contentRangeFinish);
sb.append(" remainingInCurrentRequest=")
@@ -920,7 +800,7 @@ public void readFully(long position, byte[] buffer, int offset, int length)
throws IOException {
checkNotClosed();
validatePositionedReadArgs(position, buffer, offset, length);
- streamStatistics.readFullyOperationStarted(position, length);
+ getS3AStreamStatistics().readFullyOperationStarted(position, length);
if (length == 0) {
return;
}
@@ -971,10 +851,10 @@ public int maxReadSizeForVectorReads() {
@Override
public synchronized void readVectored(List extends FileRange> ranges,
IntFunction allocate) throws IOException {
- LOG.debug("Starting vectored read on path {} for ranges {} ", pathStr, ranges);
+ LOG.debug("Starting vectored read on path {} for ranges {} ", getPathStr(), ranges);
checkNotClosed();
if (stopVectoredIOOperations.getAndSet(false)) {
- LOG.debug("Reinstating vectored read operation for path {} ", pathStr);
+ LOG.debug("Reinstating vectored read operation for path {} ", getPathStr());
}
// prepare to read
@@ -992,26 +872,28 @@ public synchronized void readVectored(List extends FileRange> ranges,
if (isOrderedDisjoint(sortedRanges, 1, minSeekForVectorReads())) {
LOG.debug("Not merging the ranges as they are disjoint");
- streamStatistics.readVectoredOperationStarted(sortedRanges.size(), sortedRanges.size());
+ getS3AStreamStatistics().readVectoredOperationStarted(sortedRanges.size(),
+ sortedRanges.size());
for (FileRange range: sortedRanges) {
ByteBuffer buffer = allocate.apply(range.getLength());
- boundedThreadPool.submit(() -> readSingleRange(range, buffer));
+ getBoundedThreadPool().submit(() -> readSingleRange(range, buffer));
}
} else {
LOG.debug("Trying to merge the ranges as they are not disjoint");
List combinedFileRanges = mergeSortedRanges(sortedRanges,
1, minSeekForVectorReads(),
maxReadSizeForVectorReads());
- streamStatistics.readVectoredOperationStarted(sortedRanges.size(), combinedFileRanges.size());
+ getS3AStreamStatistics().readVectoredOperationStarted(sortedRanges.size(),
+ combinedFileRanges.size());
LOG.debug("Number of original ranges size {} , Number of combined ranges {} ",
ranges.size(), combinedFileRanges.size());
for (CombinedFileRange combinedFileRange: combinedFileRanges) {
- boundedThreadPool.submit(
+ getBoundedThreadPool().submit(
() -> readCombinedRangeAndUpdateChildren(combinedFileRange, allocate));
}
}
LOG.debug("Finished submitting vectored read to threadpool" +
- " on path {} for ranges {} ", pathStr, ranges);
+ " on path {} for ranges {} ", getPathStr(), ranges);
}
/**
@@ -1022,7 +904,7 @@ public synchronized void readVectored(List extends FileRange> ranges,
*/
private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRange,
IntFunction allocate) {
- LOG.debug("Start reading {} from path {} ", combinedFileRange, pathStr);
+ LOG.debug("Start reading {} from path {} ", combinedFileRange, getPathStr());
ResponseInputStream rangeContent = null;
try {
rangeContent = getS3ObjectInputStream("readCombinedFileRange",
@@ -1030,7 +912,7 @@ private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRa
combinedFileRange.getLength());
populateChildBuffers(combinedFileRange, rangeContent, allocate);
} catch (Exception ex) {
- LOG.debug("Exception while reading {} from path {} ", combinedFileRange, pathStr, ex);
+ LOG.debug("Exception while reading {} from path {} ", combinedFileRange, getPathStr(), ex);
// complete exception all the underlying ranges which have not already
// finished.
for(FileRange child : combinedFileRange.getUnderlying()) {
@@ -1041,7 +923,7 @@ private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRa
} finally {
IOUtils.cleanupWithLogger(LOG, rangeContent);
}
- LOG.debug("Finished reading {} from path {} ", combinedFileRange, pathStr);
+ LOG.debug("Finished reading {} from path {} ", combinedFileRange, getPathStr());
}
/**
@@ -1129,7 +1011,7 @@ private void drainUnnecessaryData(
remaining -= readCount;
}
} finally {
- streamStatistics.readVectoredBytesDiscarded(drainBytes);
+ getS3AStreamStatistics().readVectoredBytesDiscarded(drainBytes);
LOG.debug("{} bytes drained from stream ", drainBytes);
}
}
@@ -1140,7 +1022,7 @@ private void drainUnnecessaryData(
* @param buffer buffer to fill.
*/
private void readSingleRange(FileRange range, ByteBuffer buffer) {
- LOG.debug("Start reading {} from {} ", range, pathStr);
+ LOG.debug("Start reading {} from {} ", range, getPathStr());
if (range.getLength() == 0) {
// a zero byte read.
buffer.flip();
@@ -1155,12 +1037,12 @@ private void readSingleRange(FileRange range, ByteBuffer buffer) {
populateBuffer(range, buffer, objectRange);
range.getData().complete(buffer);
} catch (Exception ex) {
- LOG.warn("Exception while reading a range {} from path {} ", range, pathStr, ex);
+ LOG.warn("Exception while reading a range {} from path {} ", range, getPathStr(), ex);
range.getData().completeExceptionally(ex);
} finally {
IOUtils.cleanupWithLogger(LOG, objectRange);
}
- LOG.debug("Finished reading range {} from path {} ", range, pathStr);
+ LOG.debug("Finished reading range {} from path {} ", range, getPathStr());
}
/**
@@ -1274,18 +1156,18 @@ private ResponseInputStream getS3Object(String operationName,
long position,
int length)
throws IOException {
- final GetObjectRequest request = client.newGetRequestBuilder(key)
+ final GetObjectRequest request = getCallbacks().newGetRequestBuilder(getKey())
.range(S3AUtils.formatRange(position, position + length - 1))
.applyMutation(changeTracker::maybeApplyConstraint)
.build();
- DurationTracker tracker = streamStatistics.initiateGetRequest();
+ DurationTracker tracker = getS3AStreamStatistics().initiateGetRequest();
ResponseInputStream objectRange;
- Invoker invoker = context.getReadInvoker();
+ Invoker invoker = getContext().getReadInvoker();
try {
- objectRange = invoker.retry(operationName, pathStr, true,
+ objectRange = invoker.retry(operationName, getPathStr(), true,
() -> {
checkIfVectoredIOStopped();
- return client.getObject(request);
+ return getCallbacks().getObject(request);
});
} catch (IOException ex) {
@@ -1312,18 +1194,6 @@ private void checkIfVectoredIOStopped() throws InterruptedIOException {
}
}
- /**
- * Access the input stream statistics.
- * This is for internal testing and may be removed without warning.
- * @return the statistics for this input stream
- */
- @InterfaceAudience.Private
- @InterfaceStability.Unstable
- @VisibleForTesting
- public S3AInputStreamStatistics getS3AStreamStatistics() {
- return streamStatistics;
- }
-
@Override
public synchronized void setReadahead(Long readahead) {
this.readahead = validateReadahead(readahead);
@@ -1409,8 +1279,8 @@ public synchronized void unbuffer() {
stopVectoredIOOperations.set(true);
closeStream("unbuffer()", false, false);
} finally {
- streamStatistics.unbuffered();
- if (inputPolicy.isAdaptive()) {
+ getS3AStreamStatistics().unbuffered();
+ if (getInputPolicy().isAdaptive()) {
S3AInputPolicy policy = S3AInputPolicy.Random;
setInputPolicy(policy);
}
@@ -1420,15 +1290,12 @@ public synchronized void unbuffer() {
@Override
public boolean hasCapability(String capability) {
switch (toLowerCase(capability)) {
- case StreamCapabilities.IOSTATISTICS:
case StreamCapabilities.IOSTATISTICS_CONTEXT:
- case StreamStatisticNames.STREAM_LEAKS:
case StreamCapabilities.READAHEAD:
case StreamCapabilities.UNBUFFER:
- case StreamCapabilities.VECTOREDIO:
return true;
default:
- return false;
+ return super.hasCapability(capability);
}
}
@@ -1441,11 +1308,6 @@ public boolean isObjectStreamOpen() {
return wrappedStream != null;
}
- @Override
- public IOStatistics getIOStatistics() {
- return ioStatistics;
- }
-
/**
* Get the wrapped stream.
* This is for testing only.
@@ -1457,38 +1319,4 @@ public ResponseInputStream getWrappedStream() {
return wrappedStream;
}
- /**
- * Callbacks for input stream IO.
- */
- public interface InputStreamCallbacks extends Closeable {
-
- /**
- * Create a GET request builder.
- * @param key object key
- * @return the request builder
- */
- GetObjectRequest.Builder newGetRequestBuilder(String key);
-
- /**
- * Execute the request.
- * When CSE is enabled with reading of unencrypted data, The object is checked if it is
- * encrypted and if so, the request is made with encrypted S3 client. If the object is
- * not encrypted, the request is made with unencrypted s3 client.
- * @param request the request
- * @return the response
- * @throws IOException on any failure.
- */
- @Retries.OnceRaw
- ResponseInputStream getObject(GetObjectRequest request) throws IOException;
-
- /**
- * Submit some asynchronous work, for example, draining a stream.
- * @param operation operation to invoke
- * @param return type
- * @return a future.
- */
- CompletableFuture submit(CallableRaisingIOE operation);
-
- }
-
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java
index 55351f0c81396..0895e6a6c1155 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java
@@ -34,7 +34,7 @@
import static java.util.Objects.requireNonNull;
/**
- * Read-specific operation context struct.
+ * Read-specific operation context structure.
*/
public class S3AReadOpContext extends S3AOpContext {
@@ -75,15 +75,11 @@ public class S3AReadOpContext extends S3AOpContext {
/** Thread-level IOStatistics aggregator. **/
private final IOStatisticsAggregator ioStatisticsAggregator;
- // S3 reads are prefetched asynchronously using this future pool.
+ /**
+ * Pool for any future IO.
+ */
private ExecutorServiceFuturePool futurePool;
- // Size in bytes of a single prefetch block.
- private final int prefetchBlockSize;
-
- // Size of prefetch queue (in number of blocks).
- private final int prefetchBlockCount;
-
/**
* Instantiate.
* @param path path of read
@@ -93,9 +89,7 @@ public class S3AReadOpContext extends S3AOpContext {
* @param dstFileStatus target file status
* @param vectoredIOContext context for vectored read operation.
* @param ioStatisticsAggregator IOStatistics aggregator for each thread.
- * @param futurePool the ExecutorServiceFuturePool instance used by async prefetches.
- * @param prefetchBlockSize the size (in number of bytes) of each prefetched block.
- * @param prefetchBlockCount maximum number of prefetched blocks.
+ * @param futurePool Pool for any future IO
*/
public S3AReadOpContext(
final Path path,
@@ -105,9 +99,7 @@ public S3AReadOpContext(
FileStatus dstFileStatus,
VectoredIOContext vectoredIOContext,
IOStatisticsAggregator ioStatisticsAggregator,
- ExecutorServiceFuturePool futurePool,
- int prefetchBlockSize,
- int prefetchBlockCount) {
+ ExecutorServiceFuturePool futurePool) {
super(invoker, stats, instrumentation,
dstFileStatus);
@@ -115,12 +107,7 @@ public S3AReadOpContext(
this.vectoredIOContext = requireNonNull(vectoredIOContext, "vectoredIOContext");
this.ioStatisticsAggregator = ioStatisticsAggregator;
this.futurePool = futurePool;
- Preconditions.checkArgument(
- prefetchBlockSize > 0, "invalid prefetchBlockSize %d", prefetchBlockSize);
- this.prefetchBlockSize = prefetchBlockSize;
- Preconditions.checkArgument(
- prefetchBlockCount > 0, "invalid prefetchBlockCount %d", prefetchBlockCount);
- this.prefetchBlockCount = prefetchBlockCount;
+
}
/**
@@ -265,23 +252,6 @@ public ExecutorServiceFuturePool getFuturePool() {
return this.futurePool;
}
- /**
- * Gets the size in bytes of a single prefetch block.
- *
- * @return the size in bytes of a single prefetch block.
- */
- public int getPrefetchBlockSize() {
- return this.prefetchBlockSize;
- }
-
- /**
- * Gets the size of prefetch queue (in number of blocks).
- *
- * @return the size of prefetch queue (in number of blocks).
- */
- public int getPrefetchBlockCount() {
- return this.prefetchBlockCount;
- }
@Override
public String toString() {
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java
index ab8785e01dafd..8655956cf7d03 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStore.java
@@ -45,15 +45,20 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.PathCapabilities;
import org.apache.hadoop.fs.s3a.api.RequestFactory;
import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
import org.apache.hadoop.fs.s3a.impl.ClientManager;
import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteException;
import org.apache.hadoop.fs.s3a.impl.S3AFileSystemOperations;
import org.apache.hadoop.fs.s3a.impl.StoreContext;
+import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStreamFactory;
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.apache.hadoop.service.Service;
/**
* Interface for the S3A Store;
@@ -63,10 +68,19 @@
* The {@link ClientManager} interface is used to create the AWS clients;
* the base implementation forwards to the implementation of this interface
* passed in at construction time.
+ *
+ * The interface extends the Hadoop {@link Service} interface
+ * and follows its lifecycle: it MUST NOT be used until
+ * {@link Service#init(Configuration)} has been invoked.
*/
@InterfaceAudience.LimitedPrivate("Extensions")
@InterfaceStability.Unstable
-public interface S3AStore extends IOStatisticsSource, ClientManager {
+public interface S3AStore extends
+ ClientManager,
+ IOStatisticsSource,
+ ObjectInputStreamFactory,
+ PathCapabilities,
+ Service {
/**
* Acquire write capacity for operations.
@@ -302,4 +316,26 @@ CompletedFileUpload waitForUploadCompletion(String key, UploadInfo uploadInfo)
@Retries.OnceRaw
CompleteMultipartUploadResponse completeMultipartUpload(
CompleteMultipartUploadRequest request);
+
+ /**
+ * Get the directory allocator.
+ * @return the directory allocator
+ */
+ LocalDirAllocator getDirectoryAllocator();
+
+ /**
+ * Demand create the directory allocator, then create a temporary file.
+ * This does not mark the file for deletion when a process exits.
+ * Pass in a file size of {@link LocalDirAllocator#SIZE_UNKNOWN} if the
+ * size is unknown.
+ * {@link LocalDirAllocator#createTmpFileForWrite(String, long, Configuration)}.
+ * @param pathStr prefix for the temporary file
+ * @param size the size of the file that is going to be written
+ * @param conf the Configuration object
+ * @return a unique temporary file
+ * @throws IOException IO problems
+ */
+ File createTemporaryFileForWriting(String pathStr,
+ long size,
+ Configuration conf) throws IOException;
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManager.java
index ad7afc732387f..b60551088824c 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManager.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManager.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.fs.s3a.impl;
-import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
@@ -26,11 +25,13 @@
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
+import org.apache.hadoop.service.Service;
+
/**
* Interface for on-demand/async creation of AWS clients
* and extension services.
*/
-public interface ClientManager extends Closeable {
+public interface ClientManager extends Service {
/**
* Get the transfer manager, creating it and any dependencies if needed.
@@ -76,8 +77,4 @@ S3TransferManager getOrCreateTransferManager()
*/
S3Client getOrCreateAsyncS3ClientUnchecked() throws UncheckedIOException;
- /**
- * Close operation is required to not raise exceptions.
- */
- void close();
}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManagerImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManagerImpl.java
index 44383e381248f..2632c0820e854 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManagerImpl.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManagerImpl.java
@@ -25,7 +25,6 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,6 +34,7 @@
import org.apache.hadoop.fs.s3a.S3ClientFactory;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
+import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.functional.CallableRaisingIOE;
import org.apache.hadoop.util.functional.LazyAutoCloseableReference;
@@ -49,11 +49,13 @@
/**
* Client manager for on-demand creation of S3 clients,
- * with parallelized close of them in {@link #close()}.
+ * with parallelized close of them in {@link #serviceStop()}.
* Updates {@link org.apache.hadoop.fs.s3a.Statistic#STORE_CLIENT_CREATION}
* to track count and duration of client creation.
*/
-public class ClientManagerImpl implements ClientManager {
+public class ClientManagerImpl
+ extends AbstractService
+ implements ClientManager {
public static final Logger LOG = LoggerFactory.getLogger(ClientManagerImpl.class);
@@ -67,11 +69,6 @@ public class ClientManagerImpl implements ClientManager {
*/
private final S3ClientFactory unencryptedClientFactory;
- /**
- * Closed flag.
- */
- private final AtomicBoolean closed = new AtomicBoolean(false);
-
/**
* Parameters to create sync/async clients.
*/
@@ -115,6 +112,7 @@ public ClientManagerImpl(
final S3ClientFactory unencryptedClientFactory,
final S3ClientFactory.S3ClientCreationParameters clientCreationParameters,
final DurationTrackerFactory durationTrackerFactory) {
+ super("ClientManager");
this.clientFactory = requireNonNull(clientFactory);
this.unencryptedClientFactory = unencryptedClientFactory;
this.clientCreationParameters = requireNonNull(clientCreationParameters);
@@ -226,26 +224,8 @@ public synchronized S3TransferManager getOrCreateTransferManager() throws IOExce
return transferManager.eval();
}
- /**
- * Check that the client manager is not closed.
- * @throws IllegalStateException if it is closed.
- */
- private void checkNotClosed() {
- checkState(!closed.get(), "Client manager is closed");
- }
-
- /**
- * Close() is synchronized to avoid race conditions between
- * slow client creation and this close operation.
- *
- * The objects are all deleted in parallel
- */
@Override
- public synchronized void close() {
- if (closed.getAndSet(true)) {
- // re-entrant close.
- return;
- }
+ protected void serviceStop() throws Exception {
// queue the closures.
List> l = new ArrayList<>();
l.add(closeAsync(transferManager));
@@ -253,14 +233,18 @@ public synchronized void close() {
l.add(closeAsync(s3Client));
l.add(closeAsync(unencryptedS3Client));
- // once all are queued, await their completion
- // and swallow any exception.
- try {
- awaitAllFutures(l);
- } catch (Exception e) {
- // should never happen.
- LOG.warn("Exception in close", e);
- }
+ // once all are queued, await their completion;
+ // exceptions will be swallowed.
+ awaitAllFutures(l);
+ super.serviceStop();
+ }
+
+ /**
+ * Check that the client manager is not closed.
+ * @throws IllegalStateException if it is closed.
+ */
+ private void checkNotClosed() {
+ checkState(!isInState(STATE.STOPPED), "Client manager is closed");
}
/**
@@ -297,7 +281,7 @@ private CompletableFuture