Conversation
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java
Fixed
Show fixed
Hide fixed
extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3TaskLogsTest.java
Fixed
Show fixed
Hide fixed
| @@ -87,38 +92,42 @@ public Optional<InputStream> streamTaskStatus(String taskid) throws IOException | |||
| private Optional<InputStream> streamTaskFile(final long offset, String taskKey) throws IOException | |||
There was a problem hiding this comment.
It might be nicer to just add a new method streamTaskFileWithRetry, which calls the existing streamTaskFile.
something like:
private Optional<InputStream> streamTaskFileWithRetry(final long offset, String taskKey)
{
try {
return S3Utils.retryOperation(() -> streamTaskFile(offset, taskKey))
}
catch (Exception e) {
throw new IOE(e, "Failed to stream logs from: %s", taskKey);
}
}The new method can also have a javadoc to mention which failure cases are retried.
There was a problem hiding this comment.
Note: the existing streamTaskFile method could throw some wrapped exception e.g. new IOE(e, "Failed to stream logs from: %s", taskKey), in that case the outer new method streamTaskFileWithRetry won't know whether should retry or not. That means we need to refine the exception throwing in the existing streamTaskFile as well.
There was a problem hiding this comment.
Yeah, you can modify the exception if required, as long as the overall logic remains the same. The advantage of having a separate retry method is readability and a small diff.
| final String taskKey = getTaskLogKey(taskid, "log"); | ||
| // this is to satisfy CodeQL scan | ||
| Preconditions.checkArgument( | ||
| offset < Long.MAX_VALUE && offset > Long.MIN_VALUE, |
There was a problem hiding this comment.
I think we can ignore the CodeQL scan for now because I am not entirely sure if Long.MIN_VALUE or Long.MAX_VALUE are not being used anywhere on purpose to represent some special scenarios.
The scan might also go away if we make the suggested change of adding a new method rather than updating the existing one. Although, I am not entirely sure if it will.
There was a problem hiding this comment.
ok, though PR can't be merged with CodeQL scan issue right?
kfaraz
left a comment
There was a problem hiding this comment.
Changes look good, left some minor comments.
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java
Fixed
Show fixed
Hide fixed
kfaraz
left a comment
There was a problem hiding this comment.
Minor suggestions, otherwise looks good.
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java
Show resolved
Hide resolved
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TaskLogs.java
Outdated
Show resolved
Hide resolved
| EasyMock.reset(s3Client); | ||
| AmazonS3Exception awsError = new AmazonS3Exception("AWS Error"); | ||
| awsError.setErrorCode("503"); | ||
| awsError.setStatusCode(503); | ||
| EasyMock.expect(s3Client.getObjectMetadata(EasyMock.anyString(), EasyMock.anyString())).andThrow(awsError); | ||
| EasyMock.expectLastCall().once(); | ||
| String logPath = TEST_PREFIX + "/" + KEY_1 + "/status.json"; | ||
| ObjectMetadata objectMetadata = new ObjectMetadata(); | ||
| objectMetadata.setContentLength(STATUS_CONTENTS.length()); | ||
| EasyMock.expect(s3Client.getObjectMetadata(TEST_BUCKET, logPath)).andReturn(objectMetadata); | ||
| S3Object s3Object = new S3Object(); | ||
| s3Object.setObjectContent(new ByteArrayInputStream(STATUS_CONTENTS.getBytes(StandardCharsets.UTF_8))); | ||
| GetObjectRequest getObjectRequest = new GetObjectRequest(TEST_BUCKET, logPath); | ||
| getObjectRequest.setRange(0, STATUS_CONTENTS.length() - 1); | ||
| getObjectRequest.withMatchingETagConstraint(objectMetadata.getETag()); | ||
| EasyMock.expect(s3Client.getObject(getObjectRequest)).andReturn(s3Object); | ||
| EasyMock.expectLastCall().once(); | ||
| replayAll(); |
There was a problem hiding this comment.
Some 1-line comments might be good here, or at least a logical separation using newlines.
…torage/s3/S3TaskLogs.java Co-authored-by: Kashif Faraz <kashif.faraz@gmail.com>
…torage/s3/S3TaskLogs.java Co-authored-by: Kashif Faraz <kashif.faraz@gmail.com>
Description
Saw the following error when fetching task status from S3. This is due to S3 rate limiting on query, we should retry the operation in this case.
Now change to use
S3Utils.retryS3Operationfor task log fetch, same as task file push method.Release note
Retry S3 task log fetch
Key changed/added classes in this PR
streamTaskFilemethod retry on transient S3 error inS3TaskLogsclassThis PR has: