-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ability to Delete task logs and segments from S3 #9459
Conversation
* implement ability to delete all tasks logs or all task logs written before a particular date when written to S3 * implement ability to delete all segments from S3 deep storage * upgrade version of aws SDK in use
* revert back to original version of AWS SDK
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for adding support for this stuff 🤘
try { | ||
S3Utils.retryS3Operation( | ||
() -> { | ||
String bucketName = segmentPusherConfig.getBucket(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, this block is almost identical to the block in S3TaskLogs.killOlderThan
, and both partially very close to what ObjectSummaryIterator
is providing.
I think it would be nicer if this class and S3TaskLogs
could use ObjectSummaryIterator
, and if the method that does the bulk key delete can be put in a shared method in S3Utils
.
Can I recommend something like this in S3Utils
?
public static void deleteObjectsInPath(
ServerSideEncryptingAmazonS3 s3Client,
S3InputDataConfig config,
String bucket,
String prefix,
Predicate<S3ObjectSummary> filter
)
throws Exception
{
final List<DeleteObjectsRequest.KeyVersion> keysToDelete = new ArrayList<>(config.getMaxListingLength());
final ObjectSummaryIterator iterator = new ObjectSummaryIterator(
s3Client,
ImmutableList.of(new CloudObjectLocation(bucket, prefix).toUri("s3")),
config.getMaxListingLength()
);
while (iterator.hasNext()) {
final S3ObjectSummary nextObject = iterator.next();
if (filter.apply(nextObject)) {
keysToDelete.add(new DeleteObjectsRequest.KeyVersion(nextObject.getKey()));
if (keysToDelete.size() == config.getMaxListingLength()) {
deleteBucketKeys(s3Client, bucket, keysToDelete);
keysToDelete.clear();
}
}
}
if (keysToDelete.size() > 0) {
deleteBucketKeys(s3Client, bucket, keysToDelete);
}
}
public static void deleteBucketKeys(
ServerSideEncryptingAmazonS3 s3Client,
String bucket,
List<DeleteObjectsRequest.KeyVersion> keysToDelete
)
throws Exception
{
DeleteObjectsRequest deleteRequest = new DeleteObjectsRequest(bucket).withKeys(keysToDelete);
S3Utils.retryS3Operation(() -> {
s3Client.deleteObjects(deleteRequest);
return null;
});
}
Then, not only does it share all the code for listing objects, it also pushes down the retry to the specific API calls to list and delete, instead of wrapping the entire loop, which I think is better.
If you make a change like this, then the kill calls become something like:
@Override
public void killAll() throws IOException
{
try {
S3Utils.deleteObjectsInPath(
s3Client,
inputDataConfig,
segmentPusherConfig.getBucket(),
segmentPusherConfig.getBaseKey(),
Predicates.alwaysTrue()
);
}
catch (Exception e) {
log.error("Error occurred while deleting segment files from s3. Error: %s", e.getMessage());
throw new IOException(e);
}
}
and
@Override
public void killOlderThan(long timestamp) throws IOException
{
try {
S3Utils.deleteObjectsInPath(
service,
inputDataConfig,
config.getS3Bucket(),
config.getS3Prefix(),
(object) -> object.getLastModified().getTime() < timestamp
);
}
catch (Exception e) {
log.error("Error occurred while deleting task log files from s3. Error: %s", e.getMessage());
throw new IOException(e);
}
}
ObjectSummaryIterator
currently will skip things it thinks are directories, if this is not desirable for deleting segments and task logs, then I would suggest maybe pushing down the predicate to the ObjectSummaryIterator
so that the existing users can filter out directories, and your usages can get everything or filter by timestamp as appropriate. In addition, if it shouldn't skip directories, it would probably be worth adding a test for what happens when one is present in the list objects response, if not present.
disclaimer: I didn't test this in real s3, but did run unit tests which passed after some modifications to expected calls, but would still recommend to review these snippets first to make sure they are correct.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point about reusing the object iterator, I actually thought about that before, not sure why I decided against. As you said its good that all the listing code is shared in this case. I believe that I would want to skip directories in this case too, so that should be good here as well. Thanks for the suggestion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
{ | ||
ListObjectsV2Request request = EasyMock.createMock(ListObjectsV2Request.class); | ||
EasyMock.expect(request.getBucketName()).andReturn(bucket); | ||
EasyMock.expectLastCall().anyTimes(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these expect
/expectLastCall
lines can be collapsed into the form of:
EasyMock.expect(request.getBucketName()).andReturn(bucket).anyTimes();
plus a handful of other places in the tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you're right, noticing it in other places in the code. I will simplify, thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
import java.util.function.Supplier; | ||
|
||
public class TimeSupplier implements Supplier<Long> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have mixed feelings about this class. It seems to exist in service of testing S3TaskLogs.killAll
, but isn't that kind of leaking what is basically a test fixture, into the production code? Since the killAll
method does nothing but delegate to the other call that takes an explicit timestamp, is this abstraction really worth having?
On the other hand, I can see an argument for using something like this to control system time, and I guess we already have similar situations sometimes when @VisibleForTesting
are only used by tests, so I'm not strictly against using this, just thinking out loud to have the discussion.
In the very least I think it should be renamed CurrentTimeMillisSupplier
to indicate what time it is supplying, and add javadocs to describe its intended usage to ease testing.
Does this need to be setup in the module so that it gets injected into S3TaskLogs
? (or did I miss that somewhere?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right that in this case that most of the logic here lives in killOlderThan which killAll delegates to, thought I think that a supplier for current time could be useful in other cases. I will change name and add javadocs to make its purpose more clear. You didn't miss the binding, there is none. This works since it is not an interface. Should I make an interface for this and explicitly bind this implementation to the interface? What would be a good module to do this in?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You didn't miss the binding, there is none. This works since it is not an interface. Should I make an interface for this and explicitly bind this implementation to the interface? What would be a good module to do this in?
Oh yeah, my bad, I'm not a guice wizard and forget how things work sometimes 😜. No reason to make an interface if only one implementation i think as long as everything works 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be LongSupplier
instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
ListObjectsV2Result result; | ||
String continuationToken = null; | ||
do { | ||
log.info("Deleting batch of %d segment files from s3 location [bucket: %s prefix: %s].", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log.info
is a bit too informative for this operation I think, inside the loop at least. If you need to log anything, I suggest just counting the number of keys actually deleted and reporting the total at the end outside of the loop, or just a single message before the loop happens indicating that a some deletes are going to happen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I agree, I will log only once per invocation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, thanks 🤘
} | ||
} | ||
|
||
public static void deleteBucketKeys( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this can be private actually (my bad)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll get this in the next change which should be coming shortly.
@@ -200,6 +204,54 @@ public static S3ObjectSummary getSingleObjectSummary(ServerSideEncryptingAmazonS | |||
return objectSummary; | |||
} | |||
|
|||
public static void deleteObjectsInPath( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: javadocs describing this method would be nice
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll get this in the next change which should be coming shortly.
written before a particular date when written to S3
This PR has: