Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNM] rgw: sync plugin to AWS S3 #14165

Closed
wants to merge 26 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
898a89a
rgw: sync plugin for aws (the humble beginnings!)
theanalyst Feb 21, 2017
3440919
rgw: sync aws module compiles
theanalyst Feb 21, 2017
ea1203f
rgw: make RGWSyncModule classes for AWS
theanalyst Feb 22, 2017
896973d
rgw: sync_module: register aws module
theanalyst Feb 22, 2017
d550752
non working download client
theanalyst Feb 22, 2017
c0c26c4
allow cr http client to deal with non json types
theanalyst Mar 27, 2017
7b353c9
well more hacks to get my previous hack working
theanalyst Feb 22, 2017
d2db788
rgw: RGWRestConn allow keys to be initialized
theanalyst Feb 23, 2017
a5b396a
rgw: aws sync module: set keys for rest client
theanalyst Feb 23, 2017
67bb52e
rgw_cr_rest: allow sending of raw put requests
theanalyst Mar 27, 2017
144f8ba
rgw_aws_module: lets call put object and see what happens
theanalyst Feb 23, 2017
6672282
rgw_cr_rest: send content length on raw requests
theanalyst Mar 27, 2017
095623f
delete-me: log path for requests
theanalyst Feb 23, 2017
f8859f7
aws: ws fix
theanalyst Feb 23, 2017
a99b7a8
rgw: sync_module_aws implement rm object
theanalyst Feb 23, 2017
2c6642a
rgw_aws_sync: untested code for bucket create
theanalyst Feb 23, 2017
8de3c1c
rgw: aws sync: make bucket name more url friendly
theanalyst Feb 24, 2017
9e58d3c
set_content_length for streamrw requests
theanalyst Mar 27, 2017
6a80eab
sync_module_aws: minor logging improvements
theanalyst Feb 27, 2017
b9c05f1
rgw_cr_rest: have a raw variant of read resource
theanalyst Mar 27, 2017
f90e671
rgw_sync_module_aws: use the readrawrestresource cr
theanalyst Feb 27, 2017
ba65996
rgw_sync_module_aws: make conn a unique ptr
theanalyst Feb 27, 2017
3dd4391
aws: use slashes in obj name
theanalyst Feb 27, 2017
043441e
aws: remove comment
theanalyst Feb 27, 2017
dde7945
rgw_rest_client: clarify comment on set content length
theanalyst Feb 27, 2017
f00a377
rgw: typo fix while rebasing (squash)
theanalyst Mar 27, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions src/rgw/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ set(rgw_a_srcs
rgw_sync.cc
rgw_data_sync.cc
rgw_sync_module.cc
rgw_sync_module_aws.cc
rgw_sync_module_es.cc
rgw_sync_module_log.cc
rgw_period_history.cc
Expand Down
155 changes: 118 additions & 37 deletions src/rgw/rgw_cr_rest.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,30 @@
#include "rgw_coroutine.h"
#include "rgw_rest_conn.h"

template <class T>
class RGWReadRESTResourceCR : public RGWSimpleCoroutine {
class RGWReadRawRESTResourceCR : public RGWSimpleCoroutine{
bufferlist *result;
protected:
RGWRESTConn *conn;
RGWHTTPManager *http_manager;
string path;
param_vec_t params;
T *result;

public:
boost::intrusive_ptr<RGWRESTReadResource> http_op;
RGWReadRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
RGWHTTPManager *_http_manager, const string& _path,
rgw_http_param_pair *params, bufferlist *_result)
: RGWSimpleCoroutine(_cct), result(_result), conn(_conn), http_manager(_http_manager),
path(_path), params(make_param_list(params))
{}

public:
RGWReadRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
RGWHTTPManager *_http_manager, const string& _path,
rgw_http_param_pair *params, T *_result)
: RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager),
path(_path), params(make_param_list(params)), result(_result)
RGWReadRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
RGWHTTPManager *_http_manager, const string& _path,
rgw_http_param_pair *params)
: RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager),
path(_path), params(make_param_list(params))
{}

~RGWReadRESTResourceCR() override {
~RGWReadRawRESTResourceCR() override {
request_cleanup();
}

Expand All @@ -46,8 +51,16 @@ class RGWReadRESTResourceCR : public RGWSimpleCoroutine {
return 0;
}

virtual int wait_result() {
return http_op->wait_bl(result);
}

int request_complete() override {
int ret = http_op->wait(result);

int ret;

ret = wait_result();

auto op = std::move(http_op); // release ref on return
if (ret < 0) {
error_stream << "http operation failed: " << op->to_str()
Expand All @@ -65,48 +78,84 @@ class RGWReadRESTResourceCR : public RGWSimpleCoroutine {
http_op = NULL;
}
}

};

template <class S, class T>
class RGWSendRESTResourceCR : public RGWSimpleCoroutine {

template <class T>
class RGWReadRESTResourceCR : public RGWReadRawRESTResourceCR {
T *result;
public:
RGWReadRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
RGWHTTPManager *_http_manager, const string& _path,
rgw_http_param_pair *params, T *_result)
: RGWReadRawRESTResourceCR(_cct, _conn, _http_manager, _path, params), result(_result)
{}

int wait_result() override {
return http_op->wait(result);
}

};

template <class T>
class RGWSendRawRESTResourceCR: public RGWSimpleCoroutine {
protected:
RGWRESTConn *conn;
RGWHTTPManager *http_manager;
string method;
string path;
param_vec_t params;
T *result;
S input;

bufferlist input_bl;
bool send_content_length=false;
boost::intrusive_ptr<RGWRESTSendResource> http_op;

public:
RGWSendRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
RGWHTTPManager *_http_manager,
const string& _method, const string& _path,
rgw_http_param_pair *_params, S& _input, T *_result)
: RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager),
method(_method), path(_path), params(make_param_list(_params)), result(_result),
input(_input)
{}
public:
RGWSendRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
RGWHTTPManager *_http_manager,
const string& _method, const string& _path,
rgw_http_param_pair *_params, bufferlist& _input, T *_result, bool _send_content_length)
: RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager),
method(_method), path(_path), params(make_param_list(_params)), result(_result),
input_bl(_input), send_content_length(_send_content_length)
{}

RGWSendRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
RGWHTTPManager *_http_manager,
const string& _method, const string& _path,
rgw_http_param_pair *_params, T *_result)
: RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager),
method(_method), path(_path), params(make_param_list(_params)), result(_result)
{}

~RGWSendRESTResourceCR() override {


~RGWSendRawRESTResourceCR() override {
request_cleanup();
}

void set_input_bl(bufferlist bl){
input_bl = std::move(bl);
}

int send_request() override {


param_vec_t p;
if (send_content_length){
lsubdout(cct, rgw, 0) << "abhi: sending content length of " << input_bl.length() << dendl;
string content_length = to_string(input_bl.length());
p.push_back(param_pair_t("CONTENT_LENGTH",content_length));
}


auto op = boost::intrusive_ptr<RGWRESTSendResource>(
new RGWRESTSendResource(conn, method, path, params, NULL, http_manager));
new RGWRESTSendResource(conn, method, path, params, nullptr, http_manager));

op->set_user_info((void *)stack);

JSONFormatter jf;
encode_json("data", input, &jf);
std::stringstream ss;
jf.flush(ss);
bufferlist bl;
bl.append(ss.str());

int ret = op->aio_send(bl);
int ret = op->aio_send(input_bl);
if (ret < 0) {
lsubdout(cct, rgw, 0) << "ERROR: failed to send request" << dendl;
op->put();
Expand Down Expand Up @@ -145,6 +194,26 @@ class RGWSendRESTResourceCR : public RGWSimpleCoroutine {
}
};

template <class S, class T>
class RGWSendRESTResourceCR : public RGWSendRawRESTResourceCR<T> {
public:
RGWSendRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
RGWHTTPManager *_http_manager,
const string& _method, const string& _path,
rgw_http_param_pair *_params,S& _input, T *_result)
: RGWSendRawRESTResourceCR<T>(_cct, _conn, _http_manager, _method, _path, _params, _result){

JSONFormatter jf;
encode_json("data", _input, &jf);
std::stringstream ss;
jf.flush(ss);
//bufferlist bl;
this->input_bl.append(ss.str());
//set_input_bl(std::move(bl));
}

};

template <class S, class T>
class RGWPostRESTResourceCR : public RGWSendRESTResourceCR<S, T> {
public:
Expand All @@ -157,6 +226,18 @@ class RGWPostRESTResourceCR : public RGWSendRESTResourceCR<S, T> {
_params, _input, _result) {}
};

template <class T>
class RGWPutRawRESTResourceCR: public RGWSendRawRESTResourceCR <T> {
public:
RGWPutRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
RGWHTTPManager *_http_manager,
const string& _path,
rgw_http_param_pair *_params, bufferlist& _input, T *_result)
: RGWSendRawRESTResourceCR<T>(_cct, _conn, _http_manager, "PUT", _path, _params, _input, _result, true){}

};


template <class S, class T>
class RGWPutRESTResourceCR : public RGWSendRESTResourceCR<S, T> {
public:
Expand All @@ -165,8 +246,8 @@ class RGWPutRESTResourceCR : public RGWSendRESTResourceCR<S, T> {
const string& _path,
rgw_http_param_pair *_params, S& _input, T *_result)
: RGWSendRESTResourceCR<S, T>(_cct, _conn, _http_manager,
"PUT", _path,
_params, _input, _result) {}
"PUT", _path,
_params, _input, _result) {}
};

class RGWDeleteRESTResourceCR : public RGWSimpleCoroutine {
Expand Down
9 changes: 9 additions & 0 deletions src/rgw/rgw_rest_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,15 @@ int RGWRESTStreamRWRequest::get_resource(RGWAccessKey& key, map<string, string>&
pmanager = mgr;
}

// Not sure if this is the place to set a send_size, curl otherwise sets
// chunked option and doesn't send content length anymore
uint64_t send_size = (size_t)(outbl.length() - write_ofs);

if (send_size > 0){
ldout(cct,20) << "Setting content length as " << send_size << dendl;
set_send_length(send_size);
}

int r = pmanager->add_request(this, new_info.method, new_url.c_str());
if (r < 0)
return r;
Expand Down
4 changes: 4 additions & 0 deletions src/rgw/rgw_rest_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ class RGWRESTConn
public:

RGWRESTConn(CephContext *_cct, RGWRados *store, const string& _remote_id, const list<string>& endpoints);

RGWRESTConn(CephContext *_cct, const string& _remote_id, const list<string>& endpoints, RGWAccessKey _cred):
cct(_cct), endpoints(endpoints.begin(),endpoints.end()), key(std::move(_cred)), remote_id(_remote_id) {}

int get_url(string& endpoint);
string get_url();
const string& get_self_zonegroup() {
Expand Down
4 changes: 4 additions & 0 deletions src/rgw/rgw_sync_module.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include "rgw_sync_module_log.h"
#include "rgw_sync_module_es.h"
#include "rgw_sync_module_aws.h"

#define dout_subsys ceph_subsys_rgw

Expand Down Expand Up @@ -62,4 +63,7 @@ void rgw_register_sync_modules(RGWSyncModulesManager *modules_manager)

RGWSyncModuleRef es_module(std::make_shared<RGWElasticSyncModule>());
modules_manager->register_module("elasticsearch", es_module);

RGWSyncModuleRef aws_module(std::make_shared<RGWAWSSyncModule>());
modules_manager->register_module("aws", aws_module);
}