Skip to content

Commit

Permalink
Uploader refactoring and additional attempts for immediate uploaders
Browse files Browse the repository at this point in the history
  • Loading branch information
ssalinas committed Feb 20, 2018
1 parent 95cfb75 commit 20bdd4f
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 81 deletions.
Expand Up @@ -4,6 +4,7 @@
import java.util.Map;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -216,6 +217,11 @@ public Optional<String> getEncryptionKey() {
return encryptionKey;
}

@JsonIgnore
public boolean isImmediate() {
return uploadImmediately.or(false);
}

@Override
public String toString() {
return "S3UploadMetadata{" +
Expand Down
Expand Up @@ -15,16 +15,16 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -74,9 +74,8 @@ public class SingularityS3UploaderDriver extends WatchServiceHelper implements S
private final String hostname;
private final SingularityRunnerExceptionNotifier exceptionNotifier;

private final List<S3UploadMetadata> immediateUploadMetadata;
private final ReentrantLock lock;
private final ConcurrentMap<SingularityUploader, Future<Integer>> immediateUploaders;
private final Map<S3UploadMetadata, SingularityUploader> metadataToimmediateUploader;
private final Map<S3UploadMetadata, CompletableFuture<Integer>> immediateUploadersFutures;

private ScheduledFuture<?> future;

Expand Down Expand Up @@ -109,9 +108,8 @@ public SingularityS3UploaderDriver(SingularityRunnerBaseConfiguration baseConfig
this.hostname = hostname;
this.exceptionNotifier = exceptionNotifier;

this.immediateUploadMetadata = new ArrayList<>();
this.lock = new ReentrantLock();
this.immediateUploaders = Maps.newConcurrentMap();
this.immediateUploadersFutures = new ConcurrentHashMap<>();
this.metadataToimmediateUploader = new ConcurrentHashMap<>();
}

private void readInitialFiles() throws IOException {
Expand Down Expand Up @@ -142,13 +140,16 @@ public void startAndWait() {
throw new MissingConfigException("s3SecretKey not set in any s3 configs!");
}

runLock.lock();
try {
readInitialFiles();
} catch (Throwable t) {
throw Throwables.propagate(t);
throw new RuntimeException(t);
} finally {
runLock.unlock();
}

future = this.scheduler.scheduleAtFixedRate(() -> {
future = scheduler.scheduleAtFixedRate(() -> {
final long start = System.currentTimeMillis();

runLock.lock();
Expand Down Expand Up @@ -207,36 +208,53 @@ public void shutdown() {
}

private int checkUploads() {
if (metadataToUploader.isEmpty() && immediateUploaders.isEmpty()) {
if (metadataToUploader.isEmpty() && metadataToimmediateUploader.isEmpty()) {
return 0;
}

int totesUploads = 0;

// Check results of immediate uploaders
List<SingularityUploader> toRetry = new ArrayList<>();
List<SingularityUploader> toRemove = new ArrayList<>();
for (Map.Entry<SingularityUploader, Future<Integer>> entry : immediateUploaders.entrySet()) {
List<S3UploadMetadata> toRetry = new ArrayList<>();
List<S3UploadMetadata> toRemove = new ArrayList<>();
for (Map.Entry<S3UploadMetadata, CompletableFuture<Integer>> entry : immediateUploadersFutures.entrySet()) {
SingularityUploader uploader = metadataToimmediateUploader.get(entry.getKey());
if (uploader == null) {
toRemove.add(entry.getKey());
continue;
}
try {
int uploadedFiles = entry.getValue().get();
if (uploadedFiles != -1) {
List<Path> remainingFiles = uploader.filesToUpload(isFinished(uploader));
if (!remainingFiles.isEmpty() || uploadedFiles == -1) {
LOG.debug("Immediate uploader had {} remaining files, previously uploaded {}, will retry", remainingFiles.size(), uploadedFiles);
toRetry.add(entry.getKey());
} else {
totesUploads += uploadedFiles;
toRemove.add(entry.getKey());
} else {
toRetry.add(entry.getKey());
}
} catch (Throwable t) {
metrics.error();
LOG.error("Waiting on future", t);
exceptionNotifier.notify(String.format("Error waiting on uploader future (%s)", t.getMessage()), t, ImmutableMap.of("metadataPath", entry.getKey().getMetadataPath().toString()));
exceptionNotifier.notify(String.format("Error waiting on uploader future (%s)", t.getMessage()), t, ImmutableMap.of("metadataPath", uploader.getMetadataPath().toString()));
toRetry.add(entry.getKey());
}
}

for (SingularityUploader uploader : toRemove) {
for (S3UploadMetadata uploaderMetadata : toRemove) {
metrics.getImmediateUploaderCounter().dec();
immediateUploaders.remove(uploader);
immediateUploadMetadata.remove(uploader.getUploadMetadata());
SingularityUploader uploader = metadataToimmediateUploader.remove(uploaderMetadata);
CompletableFuture<Integer> uploaderFuture = immediateUploadersFutures.remove(uploaderMetadata);
if (uploaderFuture != null) {
try {
uploaderFuture.get(30, TimeUnit.SECONDS); // All uploaders reaching this point should already be finished, if it isn't done in 30s, it's stuck
} catch (Throwable t) {
LOG.error("Exception waiting for immediate uploader to complete for metadata {}", uploaderMetadata, t);
}
}
if (uploader == null) {
continue;
}
expiring.remove(uploader);

try {
Expand All @@ -250,30 +268,35 @@ private int checkUploads() {
}
}

for (SingularityUploader uploader : toRetry) {
LOG.debug("Retrying immediate uploader {}", uploader);
performImmediateUpload(uploader);
for (S3UploadMetadata uploaderMetadata : toRetry) {
SingularityUploader uploader = metadataToimmediateUploader.remove(uploaderMetadata);
if (uploader != null) {
LOG.debug("Retrying immediate uploader {}", uploaderMetadata);
performImmediateUpload(uploader);
} else {
LOG.debug("Uploader for metadata {} not found to retry, recreating", uploaderMetadata);
}
}

// Check regular uploaders
int initialExpectedSize = Math.max(metadataToUploader.size(), 1);
final Map<SingularityUploader, Future<Integer>> futures = Maps.newHashMapWithExpectedSize(initialExpectedSize);
final Map<SingularityUploader, CompletableFuture<Integer>> futures = Maps.newHashMapWithExpectedSize(initialExpectedSize);
final Map<SingularityUploader, Boolean> finishing = Maps.newHashMapWithExpectedSize(initialExpectedSize);

for (final SingularityUploader uploader : metadataToUploader.values()) {
final boolean isFinished = isFinished(uploader);
// do this here so we run at least once with isFinished = true
finishing.put(uploader, isFinished);

futures.put(uploader, executorService.submit(performUploadCallable(uploader, isFinished, false)));
futures.put(uploader, CompletableFuture.supplyAsync(performUploadSupplier(uploader, isFinished, false), executorService));
}

LOG.info("Waiting on {} future(s)", futures.size());

final long now = System.currentTimeMillis();
final Set<SingularityUploader> expiredUploaders = Sets.newHashSetWithExpectedSize(initialExpectedSize);

for (Entry<SingularityUploader, Future<Integer>> uploaderToFuture : futures.entrySet()) {
for (Entry<SingularityUploader, CompletableFuture<Integer>> uploaderToFuture : futures.entrySet()) {
final SingularityUploader uploader = uploaderToFuture.getKey();
try {
final int foundFiles = uploaderToFuture.getValue().get();
Expand Down Expand Up @@ -316,7 +339,7 @@ private int checkUploads() {
return totesUploads;
}

private Callable<Integer> performUploadCallable(final SingularityUploader uploader, final boolean finished, final boolean immediate) {
private Supplier<Integer> performUploadSupplier(final SingularityUploader uploader, final boolean finished, final boolean immediate) {
return () -> {
Integer returnValue = 0;
try {
Expand All @@ -335,7 +358,14 @@ private Callable<Integer> performUploadCallable(final SingularityUploader upload

private void performImmediateUpload(final SingularityUploader uploader) {
final boolean finished = isFinished(uploader);
immediateUploaders.put(uploader, executorService.submit(performUploadCallable(uploader, finished, true)));
if (immediateUploadersFutures.containsKey(uploader.getUploadMetadata()) && !immediateUploadersFutures.get(uploader.getUploadMetadata()).isDone()) {
LOG.debug("Immediate upload already in progress for metadata {}, will not reattempt", uploader.getUploadMetadata());
} else {
immediateUploadersFutures.put(
uploader.getUploadMetadata(),
CompletableFuture.supplyAsync(performUploadSupplier(uploader, finished, true), executorService)
);
}
}

private boolean shouldExpire(SingularityUploader uploader, boolean isFinished) {
Expand Down Expand Up @@ -389,23 +419,33 @@ private boolean handleNewOrModifiedS3Metadata(Path filename) throws IOException
final S3UploadMetadata metadata = maybeMetadata.get();

SingularityUploader existingUploader = metadataToUploader.get(metadata);
SingularityUploader existingImmediateUploader = metadataToimmediateUploader.get(metadata);

if (existingUploader != null) {
if (metadata.getUploadImmediately().isPresent() && metadata.getUploadImmediately().get()) {
if (metadata.isImmediate()) {
if (existingUploader != null) {
LOG.debug("Existing metadata {} from {} changed to be immediate, forcing upload", metadata, filename);
expiring.remove(existingUploader);
if (canCreateImmediateUploader(metadata)) {
if (existingImmediateUploader == null) {
metrics.getUploaderCounter().dec();
metrics.getImmediateUploaderCounter().inc();

metadataToimmediateUploader.put(metadata, existingUploader);
metadataToUploader.remove(existingUploader.getUploadMetadata());
uploaderLastHadFilesAt.remove(existingUploader);
performImmediateUpload(existingUploader);
return true;
} else {
performImmediateUpload(existingImmediateUploader);
return false;
}
} else if (existingUploader.getUploadMetadata().isFinished() == metadata.isFinished()) {
} else if (existingImmediateUploader != null) {
LOG.info("Already had an immediate uploader for metadata {}, triggering new upload attempt", metadata);
performImmediateUpload(existingImmediateUploader);
return false;
}
}

if (existingUploader != null) {
if (existingUploader.getUploadMetadata().isFinished() == metadata.isFinished()) {
LOG.debug("Ignoring metadata {} from {} because there was already one present", metadata, filename);
return false;
} else {
Expand Down Expand Up @@ -442,18 +482,14 @@ private boolean handleNewOrModifiedS3Metadata(Path filename) throws IOException
expiring.add(uploader);
}

LOG.info("Created new uploader {}", uploader);

if (metadata.getUploadImmediately().isPresent()
&& metadata.getUploadImmediately().get()) {
if (canCreateImmediateUploader(metadata)) {
metrics.getImmediateUploaderCounter().inc();
this.performImmediateUpload(uploader);
return true;
} else {
return false;
}
if (metadata.isImmediate()) {
LOG.info("Created new immediate uploader {}", uploader);
metadataToimmediateUploader.put(metadata, uploader);
metrics.getImmediateUploaderCounter().inc();
performImmediateUpload(uploader);
return true;
} else {
LOG.info("Created new uploader {}", uploader);
metrics.getUploaderCounter().inc();
metadataToUploader.put(metadata, uploader);
uploaderLastHadFilesAt.put(uploader, System.currentTimeMillis());
Expand Down Expand Up @@ -484,7 +520,8 @@ protected boolean processEvent(Kind<?> kind, final Path filename) throws IOExcep
final Path fullPath = Paths.get(baseConfiguration.getS3UploaderMetadataDirectory()).resolve(filename);

if (kind.equals(StandardWatchEventKinds.ENTRY_DELETE)) {
Optional<SingularityUploader> found = Iterables.tryFind(metadataToUploader.values(), input -> input.getMetadataPath().equals(fullPath));
Optional<SingularityUploader> found = Iterables.tryFind(metadataToUploader.values(), input -> input != null && input.getMetadataPath().equals(fullPath))
.or(Iterables.tryFind(metadataToimmediateUploader.values(), input -> input != null && input.getMetadataPath().equals(fullPath)));

LOG.trace("Found {} to match deleted path {}", found, filename);

Expand Down Expand Up @@ -518,27 +555,4 @@ private boolean isS3MetadataFile(Path filename) {

return true;
}

private boolean canCreateImmediateUploader(S3UploadMetadata metadata) {
try {
if (lock.tryLock(400, TimeUnit.MILLISECONDS)) {
if (this.immediateUploadMetadata.contains(metadata)) {
LOG.debug("Already have an immediate uploader for metadata {}.", metadata);
return false;
} else {
LOG.debug("Preparing to create new immediate uploader for metadata {}.", metadata);
this.immediateUploadMetadata.add(metadata);
return true;
}
} else {
LOG.debug("Could not acquire lock to create an immediate uploader for metadata {}.", metadata);
return false;
}
} catch (InterruptedException exn) {
LOG.debug("Interrupted while waiting on a lock to create an immediate uploader for metadata {}.", metadata);
return false;
} finally {
lock.unlock();
}
}
}
Expand Up @@ -11,6 +11,7 @@
import java.nio.file.PathMatcher;
import java.nio.file.Paths;
import java.nio.file.attribute.UserDefinedFileAttributeView;
import java.util.Collections;
import java.util.List;
import java.util.Set;

Expand Down Expand Up @@ -71,7 +72,7 @@ public abstract class SingularityUploader {

protected abstract void uploadSingle(int sequence, Path file) throws Exception;

private void uploadBatch(List<Path> toUpload) {
int uploadBatch(List<Path> toUpload) {
final long start = System.currentTimeMillis();
LOG.info("{} Uploading {} item(s)", logIdentifier, toUpload.size());

Expand Down Expand Up @@ -103,6 +104,7 @@ private void uploadBatch(List<Path> toUpload) {
}

LOG.info("{} Uploaded {} out of {} item(s) in {}", logIdentifier, success, toUpload.size(), JavaUtils.duration(start));
return toUpload.size();
}

Path getMetadataPath() {
Expand All @@ -114,27 +116,24 @@ S3UploadMetadata getUploadMetadata() {
}

int upload(boolean isFinished) throws IOException {
return uploadBatch(filesToUpload(isFinished));
}

List<Path> filesToUpload(boolean isFinished) throws IOException {
final List<Path> toUpload = Lists.newArrayList();
int found = 0;

final Path directory = Paths.get(fileDirectory);

if (!Files.exists(directory)) {
LOG.info("Path {} doesn't exist", fileDirectory);
return found;
return Collections.emptyList();
}

for (Path file : JavaUtils.iterable(directory)) {
found += handleFile(file, isFinished, toUpload);
handleFile(file, isFinished, toUpload);
}

if (toUpload.isEmpty()) {
return found;
}

uploadBatch(toUpload);

return found;
return toUpload;
}

private int handleFile(Path path, boolean isFinished, List<Path> toUpload) throws IOException {
Expand Down

0 comments on commit 20bdd4f

Please sign in to comment.