New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
rgwlc: optionally support notifications on object expiration #39192
Conversation
|
This pull request can no longer be automatically merged: a rebase is needed and changes have to be manually resolved |
3c5cee4
to
e24ac38
Compare
e24ac38
to
1b738d5
Compare
src/common/options.cc
Outdated
| .set_description( | ||
| "Optional support for delete notifications in lifecycle processing") | ||
| .set_long_description( | ||
| "Optional support for S3 bucket notifications in lifecycle processing " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
S3 bucket delete notifications?
src/rgw/rgw_lc.cc
Outdated
| bucket.get(), lc_id, const_cast<std::string&>(oc.bucket->get_tenant()), | ||
| lc_req_id, null_yield); | ||
|
|
||
| /* XXX do we agree that lifecycle expiration can be allowed to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I can agree with that.
src/rgw/rgw_notify.h
Outdated
| store(_store), s(_s), object(_object) {} | ||
| const DoutPrefixProvider* dpp; | ||
| RGWObjectCtx* obj_ctx; | ||
| rgw::sal::RGWObject* object; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, all this is going to conflict with zipper 10, which wraps reservation_t. Do we want to hold this until zipper 10 goes in and re-work, or do we want to hold zipper 10 until this goes in and re-work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think will be easier?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably better to do zipper 10 first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess thinking about it, I'm unclear why reservation_t would be subsumed under zipper, though. RADOS is currently used as a queuing back-end, but even that could at some point be plugged out. The notification itself is platform agnostic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reservation_t itself is more or less okay, but only because it's not a real class. publish_reserve() and publish_commit() are rados-specific, and so need a RGWRadosStore pointer, which is provided by reservation_t. If those methods were actually part of reservation_t, it would also be rados-specific.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems like we would ideally want these apis to be platform agnostic, but have the reservation step be specialized for rados; before yuval added persistent queues, I think there was no rados dependency
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(unlike pub-sub originally, which was rados-specific)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
zipper 10 provides a platform agnostic version of these APIs here: https://github.com/dang/ceph/blob/wip-dang-zipper-10/src/rgw/rgw_sal_rados.h#L526
I'm open to a better solution, if there is one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can abstract the queue API - originally we didn't think it has to be a RADOS backed queue (thought of using some shared memory queue for better perf with less persistency guarantees).
| @@ -902,7 +931,7 @@ int publish_commit(rgw::sal::RGWObject* obj, | |||
| return 0; | |||
| } | |||
|
|
|||
| int publish_abort(reservation_t& res) { | |||
| extern int publish_abort(reservation_t& res) { | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does "extern" mean here?
src/rgw/rgw_lc.cc
Outdated
| } | ||
| } | ||
|
|
||
| ret = obj->delete_object(dpp, &oc.rctx, obj_owner, bucket_owner, meta.mtime, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if operation fails, we should not "publish_commit"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, should be in the condition
src/rgw/rgw_lc.cc
Outdated
| @@ -590,8 +598,42 @@ static int remove_expired_obj(const DoutPrefixProvider *dpp, lc_op_ctx& oc, bool | |||
| ACLOwner bucket_owner; | |||
| bucket_owner.set_id(bucket_info.owner); | |||
|
|
|||
| return obj->delete_object(dpp, &oc.rctx, obj_owner, bucket_owner, meta.mtime, false, 0, | |||
| version_id, null_yield); | |||
| if (oc.cct->_conf->rgw_lc_notify) { | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why make it conf based and not based on bucket topics/notifications?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- if no notifications are are provisioned for the bucket, the cost of this "no-op" would be:
- in
publish_reserve()read the bucket topics object, if it does not exist, it would be cached (see: rgw: add negative cache to the system object #35777) - in
publish_commit(), we iterate over the reservations done in (1), if the list is empty we do nothing
- if we decide to keep that flag, we should also use it for rejecting the notification creation REST call (can probably do that in a separate PR)
src/rgw/rgw_notify.h
Outdated
| @@ -43,23 +43,67 @@ int remove_persistent_topic(const std::string& topic_name, optional_yield y); | |||
| // then used to commit or abort the reservation | |||
| struct reservation_t { | |||
| struct topic_t { | |||
| topic_t(const std::string& _configurationId, const rgw_pubsub_topic& _cfg, cls_2pc_reservation::id_t _res_id) : | |||
| configurationId(_configurationId), cfg(_cfg), res_id(_res_id) {} | |||
| topic_t(std::string& _configurationId, rgw_pubsub_topic& _cfg, | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why remove constness?
| @@ -357,7 +357,8 @@ class RGWPSCreateNotif_ObjStore : public RGWPSCreateNotifOp { | |||
| std::string events_str = s->info.args.get("events", &exists); | |||
| if (!exists) { | |||
| // if no events are provided, we notify on all of them | |||
| events_str = "OBJECT_CREATE,OBJECT_DELETE,DELETE_MARKER_CREATE"; | |||
| events_str = | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not really needed - this is only for pubsub
| case ObjectDeleteMarkerExpiration: | ||
| return "s3:ObjectLifecycle:DeleteMarkerExpiration"; | ||
| case UnknownEvent: | ||
| return "s3:UnknownEvent"; | ||
| } | ||
| return "s3:UnknownEvent"; | ||
| } | ||
|
|
||
| std::string to_ceph_string(EventType t) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to change - only for pubsub
|
This pull request can no longer be automatically merged: a rebase is needed and changes have to be manually resolved |
|
This pull request has been automatically marked as stale because it has not had any activity for 60 days. It will be closed if no further activity occurs for another 30 days. |
1b738d5
to
7013839
Compare
7013839
to
01d1846
Compare
|
jenkins retest this please |
01d1846
to
b6514b8
Compare
src/rgw/rgw_lc.cc
Outdated
| "ERROR: publishing notification failed, with error: " << ret << dendl; | ||
| } else { | ||
| // send request to notification manager | ||
| ret = rgw::notify::publish_commit( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dont think you want to return an error if the publish_commit() call fails
there delete operation is already done. i think its oik to just log
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, it's not fatal anyway, but I agree
src/rgw/rgw_notify.cc
Outdated
| EventType event_type, | ||
| reservation_t& res, | ||
| const DoutPrefixProvider *dpp) | ||
| extern int publish_commit(rgw::sal::Object* obj, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does "extern" mean here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
merely that it's external--that's the default anyway, I was going to remove that
src/rgw/rgw_notify.cc
Outdated
| const auto src_obj = get_object_with_atttributes(s, obj); | ||
| static inline void metadata_from_attributes( | ||
| const reservation_t& res, rgw::sal::Object* obj) { | ||
| auto metadata = res.x_meta_map; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this be a reference to the metadata in "res"?
otherwise, this is just updating the local variable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll check, was a merge thing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, right; I think I was going to remove the alias, but it's a bit nicer to read with it
src/rgw/rgw_notify.cc
Outdated
| event_entry.event.configurationId = topic.configurationId; | ||
| event_entry.event.opaque_data = topic.cfg.opaque_data; | ||
| if (topic.cfg.dest.persistent) { | ||
| event_entry.push_endpoint = std::move(topic.cfg.dest.push_endpoint); | ||
| event_entry.push_endpoint_args = std::move(topic.cfg.dest.push_endpoint_args); | ||
| event_entry.push_endpoint_args = | ||
| std::move(topic.cfg.dest.push_endpoint_args); | ||
| event_entry.arn_topic = std::move(topic.cfg.dest.arn_topic); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| event_entry.arn_topic = std::move(topic.cfg.dest.arn_topic); | |
| event_entry.arn_topic = topic.cfg.dest.arn_topic; |
this is an existing bug of "use after move". not sure how it was working before :-(
but now it is failing the persistent notifications test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ouch, thanks, will change
src/rgw/rgw_notify.h
Outdated
| topic_t(const std::string& _configurationId, const rgw_pubsub_topic& _cfg, cls_2pc_reservation::id_t _res_id) : | ||
| configurationId(_configurationId), cfg(_cfg), res_id(_res_id) {} | ||
| topic_t(const std::string& _configurationId, const rgw_pubsub_topic& _cfg, | ||
| const cls_2pc_reservation::id_t _res_id) : |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| const cls_2pc_reservation::id_t _res_id) : | |
| cls_2pc_reservation::id_t _res_id) : |
|
do we have an example of how to set lifecycle policy on a bucket, so we can test the new feature? response = client.put_bucket_lifecycle_configuration(Bucket=bucket_name,
LifecycleConfiguration={'Rules': [
{
'Expiration': {'Date': datetime.datetime(2021, 12, 29), 'ExpiredObjectDeleteMarker': False},
'Filter': {'Prefix': obj_prefix},
'Status': 'Enabled',
}
]
}
)and see: in the rgw log |
| +------------------------------------------------+-----------------+-------------------------------------------+ | ||
| | ``s3:ObjectRemoved:DeleteMarkerCreated`` | Supported | | ||
| +------------------------------------------------+-----------------+-------------------------------------------+ | ||
| | ``s3:ObjectLifecycle:Expiration:Current`` | Supported, Ceph extension | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we want to support s3:ObjectLifecycle:* ?
and/or s3:ObjectLifecycle:Expiration:* and s3:ObjectLifecycle:Transition:* ?
will it be common to ask for more than one type at once?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I've thought about this for a while, and think it makes sense; note that if you want the distinction, you have it; all you're saving by omitting the grouping is some characters in the event names; (oh, are you responding to the fact that I didn't mention the ObjectLifecycle grouping in the commit message? I wrote it before finalizing this and didn't think it needed to be exact)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we have an example of how to set lifecycle policy on a bucket, so we can test the new feature? I tried the following inside
src/test/rgw/bucket_notification/test_bn.py:response = client.put_bucket_lifecycle_configuration(Bucket=bucket_name, LifecycleConfiguration={'Rules': [ { 'Expiration': {'Date': datetime.datetime(2021, 12, 29), 'ExpiredObjectDeleteMarker': False}, 'Filter': {'Prefix': obj_prefix}, 'Status': 'Enabled', } ] } )and see:
s3:put_lifecycle read len=289 data=<LifecycleConfiguration xmlns="http://s3.amazonaws.com/doc/2006-03-01/"><Rule><Expiration><Date>2021-12-29T00:00:00Z</Date><ExpiredObjectDeleteMarker>false</ExpiredObjectDeleteMarker></Expiration><Filter><Prefix>ooo</Prefix></Filter><Status>Enabled</Status></Rule></LifecycleConfiguration> s3:put_lifecycle Bad lifecycle configuration: LifecycleConfiguration: Rule: Expiration: bad Expiration sectionin the rgw log
I tested with the following golang program:
package main
import (
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/sns"
)
func main() {
fmt.Println("started")
access_key := "0555b35654ad1656d804"
secret_key := "h7GhxuBLTrlhVUyxSPUKUV8r/2EI4ngqJxD7iBdBYLhwluN30JaT3Q=="
region := "us-east-1"
host := "lemon:8000"
force_pathstyle := true
disable_ssl := true
loglevel := aws.LogDebug
config := &aws.Config{
Region: ®ion,
Endpoint: &host,
Credentials: credentials.NewStaticCredentials(
access_key, secret_key, "" /* tokn */),
S3ForcePathStyle: &force_pathstyle,
DisableSSL: &disable_ssl,
LogLevel: &loglevel,
}
// create an S3 handle
session := session.New(config)
conn := s3.New(session)
// do it
var params *s3.ListBucketsInput
rsp, err := conn.ListBuckets(params)
if err != nil {
fmt.Println(err.Error())
} else {
fmt.Println(rsp)
}
topic_bucket := "bundeswehr"
// create bucket
cbp := &s3.CreateBucketInput{
Bucket: aws.String(topic_bucket),
}
cbp_rsp, err := conn.CreateBucket(cbp)
if err != nil {
fmt.Println(err.Error())
} else {
fmt.Println(cbp_rsp)
}
// add kafka topic
/*
[&Attributes.entry.1.key=amqp-exchange&Attributes.entry.1.value=<exchange>]
[&Attributes.entry.2.key=amqp-ack-level&Attributes.entry.2.value=none|broker|routable]
[&Attributes.entry.3.key=verify-ssl&Attributes.entry.3.value=true|false]
[&Attributes.entry.4.key=kafka-ack-level&Attributes.entry.4.value=none|broker]
[&Attributes.entry.5.key=use-ssl&Attributes.entry.5.value=true|false]
[&Attributes.entry.6.key=ca-location&Attributes.entry.6.value=<file path>]
[&Attributes.entry.7.key=OpaqueData&Attributes.entry.7.value=<opaque data>]
[&Attributes.entry.8.key=push-endpoint&Attributes.entry.8.value=<endpoint>]
[&Attributes.entry.9.key=persistent&Attributes.entry.9.value=true|false]` */
sconn := sns.New(session)
ct_rsp, err := sconn.CreateTopic(&sns.CreateTopicInput{
Name: aws.String(topic_bucket),
Attributes: map[string]*string{
"verify-ssl": aws.String("false"),
"use-ssl": aws.String("false"),
"kafka-ack-level": aws.String("none"),
"push-endpoint": aws.String("kafka://lemon:9092"),
"persistent": aws.String("false"), // for now!
},
})
if err != nil {
fmt.Println(err.Error())
}
fmt.Println(*ct_rsp.TopicArn)
pbn_input := &s3.PutBucketNotificationConfigurationInput{
Bucket: aws.String(topic_bucket),
NotificationConfiguration: &s3.NotificationConfiguration{
TopicConfigurations: []*s3.TopicConfiguration{
{
Id: aws.String(fmt.Sprintf("%s_%d", topic_bucket, 1)),
Events: []*string{
aws.String("s3:ObjectCreated:*"),
aws.String("s3:ObjectRemoved:*"),
aws.String("s3:ObjectLifecycle:*"), // RGW extension
},
TopicArn: aws.String(fmt.Sprintf("arn:aws:sns:default::%s", topic_bucket)),
},
},
},
}
fmt.Println(pbn_input)
pbnc_rsp, err := conn.PutBucketNotificationConfiguration(pbn_input)
if err != nil {
fmt.Println(err.Error())
}
fmt.Println(pbnc_rsp)
} /* main */
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but i dont see that you set the lifecycle configuration on the bucket
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops, I missed one of your comments from 8/31 about eventName, I'm about to repush this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, yeah, I did that in some python I think, just a sec
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, I should have done more in the golang practice, just skipped to running the code, there's so many ways to set lifecycle
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#!/usr/bin/python
import boto3
import logging
import sys
import os
import time
import datetime
import pytz
import re
import pdb
rgw_host = os.environ['RGW_HOST']
rgw_port = int(os.environ['RGW_PORT'])
access_key = os.environ['RGW_ACCESS_KEY']
secret_key = os.environ['RGW_SECRET_KEY']
endpoint='http://%s:%d' % (rgw_host, rgw_port)
client = boto3.client('s3',
endpoint_url=endpoint,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key)
nobjects = 100
bname = "bundeswehr"
bucket = client.create_bucket(Bucket=bname)
print(bname)
for o in range(0,nobjects):
oname = "%s_o%d" % (bname, o)
print(oname)
oval = "value for me=%s" % oname
client.put_object(Bucket=bname, Key=oname, Body=oval)
try:
client.delete_bucket_lifecycle(Bucket=bname)
except:
pass
rules=[{'ID': 'rule1', 'Expiration': {'Days': 1}, 'Prefix': '', 'Status':'Enabled'}]
lifecycle = {'Rules': rules}
res = client.put_bucket_lifecycle_configuration(
Bucket=bname,
LifecycleConfiguration=lifecycle)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so I see, you're having issues with the Expiration Date form; nothings changed there.
I'm pretty sure I've used this in the past:
rules=[{'ID': 'rule1', 'Expiration': { "Date": "20200101"}, 'Prefix': 'days0/',
'Status':'Enabled'}]
lifecycle = {'Rules': rules}
res = client.put_bucket_lifecycle_configuration(
Bucket=bucket_name,
LifecycleConfiguration=lifecycle)
i didn't look at the code, i did not see that it is not in the documentation |
9db1cee
to
a0a7694
Compare
|
@mattbenjamin i wrote an integration test for the new feature. could you please add to this PR? |
of course--thank you very much Matt |
Most of the work is to remove direct knowledge of req_state from methods in rgw_notify. I've chosen to create new notification types matching the different expire actions (but not transition). The new event types are not nested under Delete. Notifications are sent iff rgw_lc_notify is true (default false). Adjusted per comments in initial review, in particular, notification from lifecycle is no longer conditional on a config setting, and constness is restored. Fixes: https://tracker.ceph.com/issues/49068 Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
After working with notification configurations, it seemed to make more sense to organize the different lifecycle notification sub-types along the same lines as the official AWS ones. So, there are now ObjectExpirationCurrent and ObjectExpirationNoncurrent types, and an ObjectExpiration wildcard. Similarly, ObjectTransition contains Current and Noncurrent sub-types. Note that I have defined an ObjectExpirationAbortMPU notification sub-type, but do not currently generate it--this is to avoid changing the rgw::sal interface, but most likely we would support this in future. Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
64a221c
to
5721361
Compare
| if (user) { | ||
| bucket->set_owner(user.get()); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right here, the owner pointer is now junk, since user went out of scope. At this point you can't use the owner pointer anymore. It will be non-null, but point to freed memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh my
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
…) always nullptr (Acked by Daniel.) Now with less use-after-free. Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
Signed-off-by: yuval Lifshitz <ylifshit@redhat.com>
Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
5721361
to
16c8b64
Compare
this didn't run any explicit object lifecycle checks, but did excercise all the lifecycle code paths in rgwlc processing, and did pass |
the 4 failures are same as the bseline |
Most of the work is to remove direct knowledge of req_state from
methods in rgw_notify.
I've chosen to create new notification types matching the different
expire actions (but not transition). The new event types are not
nested under Delete. Notifications are sent iff rgw_lc_notify is true
(default false).
Fixes: https://tracker.ceph.com/issues/49068
Signed-off-by: Matt Benjamin mbenjamin@redhat.com
Checklist
Show available Jenkins commands
jenkins retest this pleasejenkins test classic perfjenkins test crimson perfjenkins test signedjenkins test make checkjenkins test make check arm64jenkins test submodulesjenkins test dashboardjenkins test apijenkins test docsjenkins render docsjenkins test ceph-volume alljenkins test ceph-volume tox