Skip to content

Commit

Permalink
Narrow the scope of retrying on getting object
Browse files Browse the repository at this point in the history
  • Loading branch information
huylenq committed Oct 1, 2018
1 parent fe85732 commit 0edaf4b
Showing 1 changed file with 11 additions and 8 deletions.
Expand Up @@ -43,6 +43,7 @@
import java.util.Iterator;
import java.util.List;

import static java.lang.String.format;
import static org.embulk.spi.util.RetryExecutor.retryExecutor;

public abstract class AbstractS3FileInputPlugin
Expand Down Expand Up @@ -402,10 +403,10 @@ public S3InputStreamReopener(AmazonS3 client, GetObjectRequest request, long con
@Override
public InputStream reopen(final long offset, final Exception closedCause) throws IOException
{
log.warn(String.format("S3 read failed. Retrying GET request with %,d bytes offset", offset), closedCause);
log.warn(format("S3 read failed. Retrying GET request with %,d bytes offset", offset), closedCause);
request.setRange(offset, contentLength - 1); // [first, last]

return new DefaultRetryable<S3ObjectInputStream>("Opening the file") {
return new DefaultRetryable<S3ObjectInputStream>(format("Getting object '%s'", request.getKey())) {
@Override
public S3ObjectInputStream call()
{
Expand Down Expand Up @@ -464,16 +465,18 @@ public InputStream openNext() throws IOException
}
final String key = iterator.next();
final GetObjectRequest request = new GetObjectRequest(bucket, key);
return new DefaultRetryable<ResumableInputStream>("Opening the file") {

S3Object object = new DefaultRetryable<S3Object>(format("Getting object '%s'", request.getKey())) {
@Override
public ResumableInputStream call() throws Exception
public S3Object call()
{
S3Object obj = client.getObject(request);
long objectSize = obj.getObjectMetadata().getContentLength();
LOGGER.info("Open S3Object with bucket [{}], key [{}], with size [{}]", bucket, key, objectSize);
return new ResumableInputStream(obj.getObjectContent(), new S3InputStreamReopener(client, request, objectSize, retryExec));
return client.getObject(request);
}
}.executeWithCheckedException(retryExec, IOException.class);

long objectSize = object.getObjectMetadata().getContentLength();
LOGGER.info("Open S3Object with bucket [{}], key [{}], with size [{}]", bucket, key, objectSize);
return new ResumableInputStream(object.getObjectContent(), new S3InputStreamReopener(client, request, objectSize, retryExec));
}

@Override
Expand Down

0 comments on commit 0edaf4b

Please sign in to comment.