Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-17131. Refactor S3A Listing code for better isolation. #2148

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -30,7 +30,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.s3a.impl.AbstractStoreOperation;
import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks;
import org.apache.hadoop.fs.s3a.impl.StoreContext;
import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
import org.apache.hadoop.fs.s3a.s3guard.MetadataStoreListFilesIterator;
Expand Down Expand Up @@ -68,15 +68,21 @@
* Place for the S3A listing classes; keeps all the small classes under control.
*/
@InterfaceAudience.Private
public class Listing extends AbstractStoreOperation {
public class Listing {
mukund-thakur marked this conversation as resolved.
Show resolved Hide resolved

private static final Logger LOG = S3AFileSystem.LOG;

static final FileStatusAcceptor ACCEPT_ALL_BUT_S3N =
new AcceptAllButS3nDirs();

public Listing(StoreContext storeContext) {
super(storeContext);
private final ListingOperationCallbacks listingOperationCallbacks;

private final StoreContext storeContext;

public Listing(ListingOperationCallbacks listingOperationCallbacks,
StoreContext storeContext) {
this.listingOperationCallbacks = listingOperationCallbacks;
this.storeContext = storeContext;
}

/**
Expand Down Expand Up @@ -193,11 +199,10 @@ public RemoteIterator<S3ALocatedFileStatus> getListFilesAssumingDir(
key, delimiter);
final RemoteIterator<S3AFileStatus> cachedFilesIterator;
final Set<Path> tombstones;
boolean allowAuthoritative = getStoreContext()
.getOperationCallbacks()
boolean allowAuthoritative = listingOperationCallbacks
.allowAuthoritative(path);
if (recursive) {
final PathMetadata pm = getStoreContext()
final PathMetadata pm = storeContext
.getMetadataStore()
.get(path, true);
if (pm != null) {
Expand All @@ -212,7 +217,7 @@ public RemoteIterator<S3ALocatedFileStatus> getListFilesAssumingDir(
}
MetadataStoreListFilesIterator metadataStoreListFilesIterator =
new MetadataStoreListFilesIterator(
getStoreContext().getMetadataStore(),
storeContext.getMetadataStore(),
pm,
allowAuthoritative);
tombstones = metadataStoreListFilesIterator.listTombstones();
Expand All @@ -235,11 +240,9 @@ public RemoteIterator<S3ALocatedFileStatus> getListFilesAssumingDir(
} else {
DirListingMetadata meta =
S3Guard.listChildrenWithTtl(
getStoreContext().getMetadataStore(),
storeContext.getMetadataStore(),
path,
getStoreContext()
.getContextAccessors()
.getUpdatedTtlTimeProvider(),
listingOperationCallbacks.getUpdatedTtlTimeProvider(),
allowAuthoritative);
if (meta != null) {
tombstones = meta.listTombstones();
Expand All @@ -256,8 +259,7 @@ public RemoteIterator<S3ALocatedFileStatus> getListFilesAssumingDir(
return createTombstoneReconcilingIterator(
createLocatedFileStatusIterator(
createFileStatusListingIterator(path,
getStoreContext()
.getListingOperationCallbacks()
listingOperationCallbacks
.createListObjectsRequest(key, delimiter),
ACCEPT_ALL,
acceptor,
Expand All @@ -278,14 +280,12 @@ public RemoteIterator<S3ALocatedFileStatus> getLocatedFileStatusIteratorForDir(
final String key = maybeAddTrailingSlash(pathToKey(dir));
final Listing.FileStatusAcceptor acceptor =
new Listing.AcceptAllButSelfAndS3nDirs(dir);
boolean allowAuthoritative = getStoreContext()
.getOperationCallbacks()
boolean allowAuthoritative = listingOperationCallbacks
.allowAuthoritative(dir);
DirListingMetadata meta =
S3Guard.listChildrenWithTtl(getStoreContext().getMetadataStore(),
S3Guard.listChildrenWithTtl(storeContext.getMetadataStore(),
dir,
getStoreContext()
.getContextAccessors()
listingOperationCallbacks
.getUpdatedTtlTimeProvider(),
allowAuthoritative);
Set<Path> tombstones = meta != null
Expand All @@ -301,15 +301,18 @@ public RemoteIterator<S3ALocatedFileStatus> getLocatedFileStatusIteratorForDir(
: createTombstoneReconcilingIterator(
createLocatedFileStatusIterator(
createFileStatusListingIterator(dir,
getStoreContext()
.getListingOperationCallbacks()
listingOperationCallbacks
.createListObjectsRequest(key, "/"),
filter,
acceptor,
cachedFileStatusIterator)),
tombstones);
}

public S3ListRequest createListObjectsRequest(String key, String delimiter) {
return listingOperationCallbacks.createListObjectsRequest(key, delimiter);
}

/**
* Interface to implement by the logic deciding whether to accept a summary
* entry or path as a valid file or directory.
Expand Down Expand Up @@ -619,15 +622,15 @@ private boolean buildNextStatusBatch(S3ListResult objects) {
// objects
for (S3ObjectSummary summary : objects.getObjectSummaries()) {
String key = summary.getKey();
Path keyPath = getStoreContext().getContextAccessors().keyToPath(key);
Path keyPath = storeContext.getContextAccessors().keyToPath(key);
if (LOG.isDebugEnabled()) {
LOG.debug("{}: {}", keyPath, stringify(summary));
}
// Skip over keys that are ourselves and old S3N _$folder$ files
if (acceptor.accept(keyPath, summary) && filter.accept(keyPath)) {
S3AFileStatus status = createFileStatus(keyPath, summary,
getStoreContext().getContextAccessors().getDefaultBlockSize(keyPath),
getStoreContext().getUsername(),
listingOperationCallbacks.getDefaultBlockSize(keyPath),
storeContext.getUsername(),
summary.getETag(), null);
LOG.debug("Adding: {}", status);
stats.add(status);
Expand All @@ -640,10 +643,10 @@ private boolean buildNextStatusBatch(S3ListResult objects) {

// prefixes: always directories
for (String prefix : objects.getCommonPrefixes()) {
Path keyPath = getStoreContext().getContextAccessors().keyToPath(prefix);
Path keyPath = storeContext.getContextAccessors().keyToPath(prefix);
if (acceptor.accept(keyPath, prefix) && filter.accept(keyPath)) {
S3AFileStatus status = new S3AFileStatus(Tristate.FALSE, keyPath,
getStoreContext().getUsername());
storeContext.getUsername());
LOG.debug("Adding directory: {}", status);
added++;
stats.add(status);
Expand Down Expand Up @@ -728,8 +731,8 @@ class ObjectListingIterator implements RemoteIterator<S3ListResult> {
Path listPath,
S3ListRequest request) throws IOException {
this.listPath = listPath;
this.maxKeys = getStoreContext().getContextAccessors().getMaxKeys();
this.objects = getStoreContext().getListingOperationCallbacks().listObjects(request);
this.maxKeys = listingOperationCallbacks.getMaxKeys();
this.objects = listingOperationCallbacks.listObjects(request);
this.request = request;
}

Expand Down Expand Up @@ -771,8 +774,7 @@ public S3ListResult next() throws IOException {
// need to request a new set of objects.
LOG.debug("[{}], Requesting next {} objects under {}",
listingCount, maxKeys, listPath);
objects = getStoreContext()
.getListingOperationCallbacks()
objects = listingOperationCallbacks
.continueListObjects(request, objects);
listingCount++;
LOG.debug("New listing status: {}", this);
Expand Down Expand Up @@ -873,8 +875,7 @@ public boolean hasNext() throws IOException {

@Override
public S3ALocatedFileStatus next() throws IOException {
return getStoreContext()
.getListingOperationCallbacks()
return listingOperationCallbacks
.toLocatedFileStatus(statusIterator.next());
}
}
Expand Down
Expand Up @@ -149,7 +149,6 @@
import org.apache.hadoop.fs.s3a.select.SelectBinding;
import org.apache.hadoop.fs.s3a.select.SelectConstants;
import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
import org.apache.hadoop.fs.s3a.s3guard.MetadataStoreListFilesIterator;
import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
Expand Down Expand Up @@ -294,8 +293,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
private final S3AFileSystem.OperationCallbacksImpl
operationCallbacks = new OperationCallbacksImpl();

private final S3AFileSystem.ListingOperationCallbacksImpl
listingOperationCallbacks = new ListingOperationCallbacksImpl();
private final ListingOperationCallbacks listingOperationCallbacks =
new ListingOperationCallbacksImpl();

/** Add any deprecated keys. */
@SuppressWarnings("deprecation")
Expand Down Expand Up @@ -458,7 +457,7 @@ public void initialize(URI name, Configuration originalConf)

pageSize = intOption(getConf(), BULK_DELETE_PAGE_SIZE,
BULK_DELETE_PAGE_SIZE_DEFAULT, 0);
listing = new Listing(createStoreContext());
listing = new Listing(listingOperationCallbacks, createStoreContext());
} catch (AmazonClientException e) {
// amazon client exception: stop all services then throw the translation
stopAllServices();
Expand Down Expand Up @@ -593,6 +592,14 @@ public S3AInstrumentation getInstrumentation() {
return instrumentation;
}

/**
* Get current listing instance.
* @return this instance's listing.
*/
public Listing getListing() {
return listing;
}

/**
* Set up the client bindings.
* If delegation tokens are enabled, the FS first looks for a DT
Expand Down Expand Up @@ -1631,6 +1638,26 @@ public S3ALocatedFileStatus toLocatedFileStatus(S3AFileStatus status) throws IOE
public S3ListRequest createListObjectsRequest(String key, String delimiter) {
return S3AFileSystem.this.createListObjectsRequest(key, delimiter);
}

@Override
public long getDefaultBlockSize(Path path) {
return S3AFileSystem.this.getDefaultBlockSize(path);
}

@Override
public int getMaxKeys() {
return S3AFileSystem.this.getMaxKeys();
}

@Override
public ITtlTimeProvider getUpdatedTtlTimeProvider() {
return S3AFileSystem.this.ttlTimeProvider;
}

@Override
public boolean allowAuthoritative(final Path p) {
return S3AFileSystem.this.allowAuthoritative(p);
}
}

/**
Expand Down Expand Up @@ -4765,8 +4792,6 @@ public StoreContext createStoreContext() {
.setUseListV1(useListV1)
.setContextAccessors(new ContextAccessorsImpl())
.setTimeProvider(getTtlTimeProvider())
.setOperationCallbacks(operationCallbacks)
.setListingOperationCallbacks(listingOperationCallbacks)
.build();
}

Expand Down Expand Up @@ -4801,19 +4826,5 @@ public Path makeQualified(final Path path) {
return S3AFileSystem.this.makeQualified(path);
}

mukund-thakur marked this conversation as resolved.
Show resolved Hide resolved
@Override
public long getDefaultBlockSize(Path path) {
return S3AFileSystem.this.getDefaultBlockSize(path);
}

@Override
public int getMaxKeys() {
return S3AFileSystem.this.getMaxKeys();
}

@Override
public ITtlTimeProvider getUpdatedTtlTimeProvider() {
return S3AFileSystem.this.ttlTimeProvider;
}
}
}
Expand Up @@ -82,26 +82,4 @@ public interface ContextAccessors {
* @return possibly new path.
*/
Path makeQualified(Path path);

/**
* Return the number of bytes that large input files should be optimally
* be split into to minimize I/O time. The given path will be used to
* locate the actual filesystem. The full path does not have to exist.
* @param path path of file
* @return the default block size for the path's filesystem
*/
long getDefaultBlockSize(Path path);

/**
* Get the maximum key count.
* @return a value, valid after initialization
*/
int getMaxKeys();

/**
* Get the updated time provider for the current fs instance.
* @return implementation of {@link ITtlTimeProvider}
*/
ITtlTimeProvider getUpdatedTtlTimeProvider();

}
Expand Up @@ -3,11 +3,13 @@
import java.io.IOException;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.Retries;
import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.S3ALocatedFileStatus;
import org.apache.hadoop.fs.s3a.S3ListRequest;
import org.apache.hadoop.fs.s3a.S3ListResult;
import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider;

/**
* These are all the callbacks which
Expand Down Expand Up @@ -56,12 +58,43 @@ S3ALocatedFileStatus toLocatedFileStatus(
throws IOException;
/**
* Create a {@code ListObjectsRequest} request against this bucket,
* with the maximum keys returned in a query set by {@link ContextAccessors#getMaxKeys()}.
* with the maximum keys returned in a query set by {@link this.getMaxKeys()}.
* @param key key for request
* @param delimiter any delimiter
* @return the request
*/
public S3ListRequest createListObjectsRequest(
S3ListRequest createListObjectsRequest(
String key,
String delimiter);


/**
* Return the number of bytes that large input files should be optimally
* be split into to minimize I/O time. The given path will be used to
* locate the actual filesystem. The full path does not have to exist.
* @param path path of file
* @return the default block size for the path's filesystem
*/
long getDefaultBlockSize(Path path);

/**
* Get the maximum key count.
* @return a value, valid after initialization
*/
int getMaxKeys();

/**
* Get the updated time provider for the current fs instance.
* @return implementation of {@link ITtlTimeProvider}
*/
ITtlTimeProvider getUpdatedTtlTimeProvider();

/**
* Is the path for this instance considered authoritative on the client,
* that is: will listing/status operations only be handled by the metastore,
* with no fallback to S3.
* @param p path
* @return true iff the path is authoritative on the client.
*/
boolean allowAuthoritative(Path p);
}
Expand Up @@ -40,8 +40,7 @@
import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;

/**
* These are all the callbacks which the {@link RenameOperation},
* {@link org.apache.hadoop.fs.s3a.Listing }
* These are all the callbacks which the {@link RenameOperation}
* and {@link DeleteOperation } operations need,
* derived from the appropriate S3AFileSystem methods.
*/
Expand Down