Skip to content

Commit

Permalink
HADOOP-17131. Refactor S3A Listing code for better isolation. (#2148)
Browse files Browse the repository at this point in the history
Contributed by Mukund Thakur.

Change-Id: I79160b236a92fdd67565a4b4974f1862e600c210
  • Loading branch information
mukund-thakur authored and steveloughran committed Aug 4, 2020
1 parent b5d7122 commit 251d2d1
Show file tree
Hide file tree
Showing 7 changed files with 521 additions and 106 deletions.
Expand Up @@ -30,11 +30,22 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.s3a.impl.AbstractStoreOperation;
import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks;
import org.apache.hadoop.fs.s3a.impl.StoreContext;
import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
import org.apache.hadoop.fs.s3a.s3guard.MetadataStoreListFilesIterator;
import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
import org.apache.hadoop.fs.s3a.s3guard.S3Guard;

import com.google.common.base.Preconditions;
import org.slf4j.Logger;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -46,25 +57,31 @@
import java.util.Set;

import static org.apache.hadoop.fs.s3a.Constants.S3N_FOLDER_SUFFIX;
import static org.apache.hadoop.fs.s3a.S3AUtils.ACCEPT_ALL;
import static org.apache.hadoop.fs.s3a.S3AUtils.createFileStatus;
import static org.apache.hadoop.fs.s3a.S3AUtils.maybeAddTrailingSlash;
import static org.apache.hadoop.fs.s3a.S3AUtils.objectRepresentsDirectory;
import static org.apache.hadoop.fs.s3a.S3AUtils.stringify;
import static org.apache.hadoop.fs.s3a.S3AUtils.translateException;
import static org.apache.hadoop.fs.s3a.auth.RoleModel.pathToKey;

/**
* Place for the S3A listing classes; keeps all the small classes under control.
*/
@InterfaceAudience.Private
public class Listing {
public class Listing extends AbstractStoreOperation {

private final S3AFileSystem owner;
private static final Logger LOG = S3AFileSystem.LOG;

static final FileStatusAcceptor ACCEPT_ALL_BUT_S3N =
new AcceptAllButS3nDirs();

public Listing(S3AFileSystem owner) {
this.owner = owner;
private final ListingOperationCallbacks listingOperationCallbacks;

public Listing(ListingOperationCallbacks listingOperationCallbacks,
StoreContext storeContext) {
super(storeContext);
this.listingOperationCallbacks = listingOperationCallbacks;
}

/**
Expand Down Expand Up @@ -156,6 +173,145 @@ TombstoneReconcilingIterator createTombstoneReconcilingIterator(
return new TombstoneReconcilingIterator(iterator, tombstones);
}


/**
* List files under a path assuming the path to be a directory.
* @param path input path.
* @param recursive recursive listing?
* @param acceptor file status filter
* @param collectTombstones should tombstones be collected from S3Guard?
* @param forceNonAuthoritativeMS forces metadata store to act like non
* authoritative. This is useful when
* listFiles output is used by import tool.
* @return an iterator over listing.
* @throws IOException any exception.
*/
public RemoteIterator<S3ALocatedFileStatus> getListFilesAssumingDir(
Path path,
boolean recursive, Listing.FileStatusAcceptor acceptor,
boolean collectTombstones,
boolean forceNonAuthoritativeMS) throws IOException {

String key = maybeAddTrailingSlash(pathToKey(path));
String delimiter = recursive ? null : "/";
LOG.debug("Requesting all entries under {} with delimiter '{}'",
key, delimiter);
final RemoteIterator<S3AFileStatus> cachedFilesIterator;
final Set<Path> tombstones;
boolean allowAuthoritative = listingOperationCallbacks
.allowAuthoritative(path);
if (recursive) {
final PathMetadata pm = getStoreContext()
.getMetadataStore()
.get(path, true);
if (pm != null) {
if (pm.isDeleted()) {
OffsetDateTime deletedAt = OffsetDateTime
.ofInstant(Instant.ofEpochMilli(
pm.getFileStatus().getModificationTime()),
ZoneOffset.UTC);
throw new FileNotFoundException("Path " + path + " is recorded as " +
"deleted by S3Guard at " + deletedAt);
}
}
MetadataStoreListFilesIterator metadataStoreListFilesIterator =
new MetadataStoreListFilesIterator(
getStoreContext().getMetadataStore(),
pm,
allowAuthoritative);
tombstones = metadataStoreListFilesIterator.listTombstones();
// if all of the below is true
// - authoritative access is allowed for this metadatastore
// for this directory,
// - all the directory listings are authoritative on the client
// - the caller does not force non-authoritative access
// return the listing without any further s3 access
if (!forceNonAuthoritativeMS &&
allowAuthoritative &&
metadataStoreListFilesIterator.isRecursivelyAuthoritative()) {
S3AFileStatus[] statuses = S3Guard.iteratorToStatuses(
metadataStoreListFilesIterator, tombstones);
cachedFilesIterator = createProvidedFileStatusIterator(
statuses, ACCEPT_ALL, acceptor);
return createLocatedFileStatusIterator(cachedFilesIterator);
}
cachedFilesIterator = metadataStoreListFilesIterator;
} else {
DirListingMetadata meta =
S3Guard.listChildrenWithTtl(
getStoreContext().getMetadataStore(),
path,
listingOperationCallbacks.getUpdatedTtlTimeProvider(),
allowAuthoritative);
if (meta != null) {
tombstones = meta.listTombstones();
} else {
tombstones = null;
}
cachedFilesIterator = createProvidedFileStatusIterator(
S3Guard.dirMetaToStatuses(meta), ACCEPT_ALL, acceptor);
if (allowAuthoritative && meta != null && meta.isAuthoritative()) {
// metadata listing is authoritative, so return it directly
return createLocatedFileStatusIterator(cachedFilesIterator);
}
}
return createTombstoneReconcilingIterator(
createLocatedFileStatusIterator(
createFileStatusListingIterator(path,
listingOperationCallbacks
.createListObjectsRequest(key, delimiter),
ACCEPT_ALL,
acceptor,
cachedFilesIterator)),
collectTombstones ? tombstones : null);
}

/**
* Generate list located status for a directory.
* Also performing tombstone reconciliation for guarded directories.
* @param dir directory to check.
* @param filter a path filter.
* @return an iterator that traverses statuses of the given dir.
* @throws IOException in case of failure.
*/
public RemoteIterator<S3ALocatedFileStatus> getLocatedFileStatusIteratorForDir(
Path dir, PathFilter filter) throws IOException {
final String key = maybeAddTrailingSlash(pathToKey(dir));
final Listing.FileStatusAcceptor acceptor =
new Listing.AcceptAllButSelfAndS3nDirs(dir);
boolean allowAuthoritative = listingOperationCallbacks
.allowAuthoritative(dir);
DirListingMetadata meta =
S3Guard.listChildrenWithTtl(getStoreContext().getMetadataStore(),
dir,
listingOperationCallbacks
.getUpdatedTtlTimeProvider(),
allowAuthoritative);
Set<Path> tombstones = meta != null
? meta.listTombstones()
: null;
final RemoteIterator<S3AFileStatus> cachedFileStatusIterator =
createProvidedFileStatusIterator(
S3Guard.dirMetaToStatuses(meta), filter, acceptor);
return (allowAuthoritative && meta != null
&& meta.isAuthoritative())
? createLocatedFileStatusIterator(
cachedFileStatusIterator)
: createTombstoneReconcilingIterator(
createLocatedFileStatusIterator(
createFileStatusListingIterator(dir,
listingOperationCallbacks
.createListObjectsRequest(key, "/"),
filter,
acceptor,
cachedFileStatusIterator)),
tombstones);
}

public S3ListRequest createListObjectsRequest(String key, String delimiter) {
return listingOperationCallbacks.createListObjectsRequest(key, delimiter);
}

/**
* Interface to implement by the logic deciding whether to accept a summary
* entry or path as a valid file or directory.
Expand Down Expand Up @@ -193,9 +349,9 @@ interface FileStatusAcceptor {
* value.
*
* If the status value is null, the iterator declares that it has no data.
* This iterator is used to handle {@link S3AFileSystem#listStatus} calls
* where the path handed in refers to a file, not a directory: this is the
* iterator returned.
* This iterator is used to handle {@link S3AFileSystem#listStatus(Path)}
* calls where the path handed in refers to a file, not a directory:
* this is the iterator returned.
*/
static final class SingleStatusRemoteIterator
implements RemoteIterator<S3ALocatedFileStatus> {
Expand Down Expand Up @@ -465,14 +621,15 @@ private boolean buildNextStatusBatch(S3ListResult objects) {
// objects
for (S3ObjectSummary summary : objects.getObjectSummaries()) {
String key = summary.getKey();
Path keyPath = owner.keyToQualifiedPath(key);
Path keyPath = getStoreContext().getContextAccessors().keyToPath(key);
if (LOG.isDebugEnabled()) {
LOG.debug("{}: {}", keyPath, stringify(summary));
}
// Skip over keys that are ourselves and old S3N _$folder$ files
if (acceptor.accept(keyPath, summary) && filter.accept(keyPath)) {
S3AFileStatus status = createFileStatus(keyPath, summary,
owner.getDefaultBlockSize(keyPath), owner.getUsername(),
listingOperationCallbacks.getDefaultBlockSize(keyPath),
getStoreContext().getUsername(),
summary.getETag(), null);
LOG.debug("Adding: {}", status);
stats.add(status);
Expand All @@ -485,10 +642,12 @@ private boolean buildNextStatusBatch(S3ListResult objects) {

// prefixes: always directories
for (String prefix : objects.getCommonPrefixes()) {
Path keyPath = owner.keyToQualifiedPath(prefix);
Path keyPath = getStoreContext()
.getContextAccessors()
.keyToPath(prefix);
if (acceptor.accept(keyPath, prefix) && filter.accept(keyPath)) {
S3AFileStatus status = new S3AFileStatus(Tristate.FALSE, keyPath,
owner.getUsername());
getStoreContext().getUsername());
LOG.debug("Adding directory: {}", status);
added++;
stats.add(status);
Expand Down Expand Up @@ -573,8 +732,8 @@ class ObjectListingIterator implements RemoteIterator<S3ListResult> {
Path listPath,
S3ListRequest request) throws IOException {
this.listPath = listPath;
this.maxKeys = owner.getMaxKeys();
this.objects = owner.listObjects(request);
this.maxKeys = listingOperationCallbacks.getMaxKeys();
this.objects = listingOperationCallbacks.listObjects(request);
this.request = request;
}

Expand Down Expand Up @@ -616,7 +775,8 @@ public S3ListResult next() throws IOException {
// need to request a new set of objects.
LOG.debug("[{}], Requesting next {} objects under {}",
listingCount, maxKeys, listPath);
objects = owner.continueListObjects(request, objects);
objects = listingOperationCallbacks
.continueListObjects(request, objects);
listingCount++;
LOG.debug("New listing status: {}", this);
} catch (AmazonClientException e) {
Expand Down Expand Up @@ -716,7 +876,8 @@ public boolean hasNext() throws IOException {

@Override
public S3ALocatedFileStatus next() throws IOException {
return owner.toLocatedFileStatus(statusIterator.next());
return listingOperationCallbacks
.toLocatedFileStatus(statusIterator.next());
}
}

Expand Down

0 comments on commit 251d2d1

Please sign in to comment.