Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HADOOP-17131. Refactor S3A Listing code for better isolation. #2148

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -30,11 +30,22 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.s3a.impl.AbstractStoreOperation;
import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks;
import org.apache.hadoop.fs.s3a.impl.StoreContext;
import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
import org.apache.hadoop.fs.s3a.s3guard.MetadataStoreListFilesIterator;
import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
import org.apache.hadoop.fs.s3a.s3guard.S3Guard;

import com.google.common.base.Preconditions;
import org.slf4j.Logger;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -46,25 +57,31 @@
import java.util.Set;

import static org.apache.hadoop.fs.s3a.Constants.S3N_FOLDER_SUFFIX;
import static org.apache.hadoop.fs.s3a.S3AUtils.ACCEPT_ALL;
import static org.apache.hadoop.fs.s3a.S3AUtils.createFileStatus;
import static org.apache.hadoop.fs.s3a.S3AUtils.maybeAddTrailingSlash;
import static org.apache.hadoop.fs.s3a.S3AUtils.objectRepresentsDirectory;
import static org.apache.hadoop.fs.s3a.S3AUtils.stringify;
import static org.apache.hadoop.fs.s3a.S3AUtils.translateException;
import static org.apache.hadoop.fs.s3a.auth.RoleModel.pathToKey;

/**
* Place for the S3A listing classes; keeps all the small classes under control.
*/
@InterfaceAudience.Private
public class Listing {
public class Listing extends AbstractStoreOperation {

private final S3AFileSystem owner;
private static final Logger LOG = S3AFileSystem.LOG;

static final FileStatusAcceptor ACCEPT_ALL_BUT_S3N =
new AcceptAllButS3nDirs();

public Listing(S3AFileSystem owner) {
this.owner = owner;
private final ListingOperationCallbacks listingOperationCallbacks;

public Listing(ListingOperationCallbacks listingOperationCallbacks,
StoreContext storeContext) {
super(storeContext);
this.listingOperationCallbacks = listingOperationCallbacks;
}

/**
Expand Down Expand Up @@ -156,6 +173,145 @@ TombstoneReconcilingIterator createTombstoneReconcilingIterator(
return new TombstoneReconcilingIterator(iterator, tombstones);
}


/**
* List files under a path assuming the path to be a directory.
* @param path input path.
* @param recursive recursive listing?
* @param acceptor file status filter
* @param collectTombstones should tombstones be collected from S3Guard?
* @param forceNonAuthoritativeMS forces metadata store to act like non
* authoritative. This is useful when
* listFiles output is used by import tool.
* @return an iterator over listing.
* @throws IOException any exception.
*/
public RemoteIterator<S3ALocatedFileStatus> getListFilesAssumingDir(
Path path,
boolean recursive, Listing.FileStatusAcceptor acceptor,
boolean collectTombstones,
boolean forceNonAuthoritativeMS) throws IOException {

String key = maybeAddTrailingSlash(pathToKey(path));
String delimiter = recursive ? null : "/";
LOG.debug("Requesting all entries under {} with delimiter '{}'",
key, delimiter);
final RemoteIterator<S3AFileStatus> cachedFilesIterator;
final Set<Path> tombstones;
boolean allowAuthoritative = listingOperationCallbacks
.allowAuthoritative(path);
if (recursive) {
final PathMetadata pm = getStoreContext()
.getMetadataStore()
.get(path, true);
if (pm != null) {
if (pm.isDeleted()) {
OffsetDateTime deletedAt = OffsetDateTime
.ofInstant(Instant.ofEpochMilli(
pm.getFileStatus().getModificationTime()),
ZoneOffset.UTC);
throw new FileNotFoundException("Path " + path + " is recorded as " +
"deleted by S3Guard at " + deletedAt);
}
}
MetadataStoreListFilesIterator metadataStoreListFilesIterator =
new MetadataStoreListFilesIterator(
getStoreContext().getMetadataStore(),
pm,
allowAuthoritative);
tombstones = metadataStoreListFilesIterator.listTombstones();
// if all of the below is true
// - authoritative access is allowed for this metadatastore
// for this directory,
// - all the directory listings are authoritative on the client
// - the caller does not force non-authoritative access
// return the listing without any further s3 access
if (!forceNonAuthoritativeMS &&
allowAuthoritative &&
metadataStoreListFilesIterator.isRecursivelyAuthoritative()) {
S3AFileStatus[] statuses = S3Guard.iteratorToStatuses(
metadataStoreListFilesIterator, tombstones);
cachedFilesIterator = createProvidedFileStatusIterator(
statuses, ACCEPT_ALL, acceptor);
return createLocatedFileStatusIterator(cachedFilesIterator);
}
cachedFilesIterator = metadataStoreListFilesIterator;
} else {
DirListingMetadata meta =
S3Guard.listChildrenWithTtl(
getStoreContext().getMetadataStore(),
path,
listingOperationCallbacks.getUpdatedTtlTimeProvider(),
allowAuthoritative);
if (meta != null) {
tombstones = meta.listTombstones();
} else {
tombstones = null;
}
cachedFilesIterator = createProvidedFileStatusIterator(
S3Guard.dirMetaToStatuses(meta), ACCEPT_ALL, acceptor);
if (allowAuthoritative && meta != null && meta.isAuthoritative()) {
// metadata listing is authoritative, so return it directly
return createLocatedFileStatusIterator(cachedFilesIterator);
}
}
return createTombstoneReconcilingIterator(
createLocatedFileStatusIterator(
createFileStatusListingIterator(path,
listingOperationCallbacks
.createListObjectsRequest(key, delimiter),
ACCEPT_ALL,
acceptor,
cachedFilesIterator)),
collectTombstones ? tombstones : null);
}

/**
* Generate list located status for a directory.
* Also performing tombstone reconciliation for guarded directories.
* @param dir directory to check.
* @param filter a path filter.
* @return an iterator that traverses statuses of the given dir.
* @throws IOException in case of failure.
*/
public RemoteIterator<S3ALocatedFileStatus> getLocatedFileStatusIteratorForDir(
Path dir, PathFilter filter) throws IOException {
final String key = maybeAddTrailingSlash(pathToKey(dir));
final Listing.FileStatusAcceptor acceptor =
new Listing.AcceptAllButSelfAndS3nDirs(dir);
boolean allowAuthoritative = listingOperationCallbacks
.allowAuthoritative(dir);
DirListingMetadata meta =
S3Guard.listChildrenWithTtl(getStoreContext().getMetadataStore(),
dir,
listingOperationCallbacks
.getUpdatedTtlTimeProvider(),
allowAuthoritative);
Set<Path> tombstones = meta != null
? meta.listTombstones()
: null;
final RemoteIterator<S3AFileStatus> cachedFileStatusIterator =
createProvidedFileStatusIterator(
S3Guard.dirMetaToStatuses(meta), filter, acceptor);
return (allowAuthoritative && meta != null
&& meta.isAuthoritative())
? createLocatedFileStatusIterator(
cachedFileStatusIterator)
: createTombstoneReconcilingIterator(
createLocatedFileStatusIterator(
createFileStatusListingIterator(dir,
listingOperationCallbacks
.createListObjectsRequest(key, "/"),
filter,
acceptor,
cachedFileStatusIterator)),
tombstones);
}

public S3ListRequest createListObjectsRequest(String key, String delimiter) {
return listingOperationCallbacks.createListObjectsRequest(key, delimiter);
}

/**
* Interface to implement by the logic deciding whether to accept a summary
* entry or path as a valid file or directory.
Expand Down Expand Up @@ -193,9 +349,9 @@ interface FileStatusAcceptor {
* value.
*
* If the status value is null, the iterator declares that it has no data.
* This iterator is used to handle {@link S3AFileSystem#listStatus} calls
* where the path handed in refers to a file, not a directory: this is the
* iterator returned.
* This iterator is used to handle {@link S3AFileSystem#listStatus(Path)}
* calls where the path handed in refers to a file, not a directory:
* this is the iterator returned.
*/
static final class SingleStatusRemoteIterator
implements RemoteIterator<S3ALocatedFileStatus> {
Expand Down Expand Up @@ -465,14 +621,15 @@ private boolean buildNextStatusBatch(S3ListResult objects) {
// objects
for (S3ObjectSummary summary : objects.getObjectSummaries()) {
String key = summary.getKey();
Path keyPath = owner.keyToQualifiedPath(key);
Path keyPath = getStoreContext().getContextAccessors().keyToPath(key);
if (LOG.isDebugEnabled()) {
LOG.debug("{}: {}", keyPath, stringify(summary));
}
// Skip over keys that are ourselves and old S3N _$folder$ files
if (acceptor.accept(keyPath, summary) && filter.accept(keyPath)) {
S3AFileStatus status = createFileStatus(keyPath, summary,
owner.getDefaultBlockSize(keyPath), owner.getUsername(),
listingOperationCallbacks.getDefaultBlockSize(keyPath),
getStoreContext().getUsername(),
summary.getETag(), null);
LOG.debug("Adding: {}", status);
stats.add(status);
Expand All @@ -485,10 +642,12 @@ private boolean buildNextStatusBatch(S3ListResult objects) {

// prefixes: always directories
for (String prefix : objects.getCommonPrefixes()) {
Path keyPath = owner.keyToQualifiedPath(prefix);
Path keyPath = getStoreContext()
.getContextAccessors()
.keyToPath(prefix);
if (acceptor.accept(keyPath, prefix) && filter.accept(keyPath)) {
S3AFileStatus status = new S3AFileStatus(Tristate.FALSE, keyPath,
owner.getUsername());
getStoreContext().getUsername());
LOG.debug("Adding directory: {}", status);
added++;
stats.add(status);
Expand Down Expand Up @@ -573,8 +732,8 @@ class ObjectListingIterator implements RemoteIterator<S3ListResult> {
Path listPath,
S3ListRequest request) throws IOException {
this.listPath = listPath;
this.maxKeys = owner.getMaxKeys();
this.objects = owner.listObjects(request);
this.maxKeys = listingOperationCallbacks.getMaxKeys();
this.objects = listingOperationCallbacks.listObjects(request);
this.request = request;
}

Expand Down Expand Up @@ -616,7 +775,8 @@ public S3ListResult next() throws IOException {
// need to request a new set of objects.
LOG.debug("[{}], Requesting next {} objects under {}",
listingCount, maxKeys, listPath);
objects = owner.continueListObjects(request, objects);
objects = listingOperationCallbacks
.continueListObjects(request, objects);
listingCount++;
LOG.debug("New listing status: {}", this);
} catch (AmazonClientException e) {
Expand Down Expand Up @@ -716,7 +876,8 @@ public boolean hasNext() throws IOException {

@Override
public S3ALocatedFileStatus next() throws IOException {
return owner.toLocatedFileStatus(statusIterator.next());
return listingOperationCallbacks
.toLocatedFileStatus(statusIterator.next());
}
}

Expand Down