Skip to content

Commit

Permalink
# This is a combination of 32 commits.
Browse files Browse the repository at this point in the history
# This is the 1st commit message:

DO-NOT-MERGE; first commit for integration of s3-select engine into RGW; the request can only sent by AWS client ; can execute on CSV files

# This is the commit message #2:

remove debug info

# This is the commit message #3:

bug fix (aggregation) ; error handling

# This is the commit message #4:

fix comments(to be continue);

# This is the commit message #5:

placement-new allocator;cosmetics

# This is the commit message #6:

add namespace ; memory-mng: response buffer is now class-member

# This is the commit message #7:

std::list --> std::vector

# This is the commit message #8:

replace boost::split with simple C csv parser; there is a big difference ; mainly because of too many allocation & copy

# This is the commit message #9:

performance improvement; upon star-operation using reusable-buffer to reduce copies and allocations

# This is the commit message #10:

performance improvement; reduce allocations and copies; using reusable buffer(std::string) for message meta-data also

# This is the commit message #11:

replace crc implementation with boost implementation; it also improve performance;

# This is the commit message #12:

performance improvement ; reduce the number of object value construction on intensive flow ( eval() );

# This is the commit message #13:

move from char* to std::string_view; change to csv_object interfaces mainly for performance improvements

# This is the commit message #14:

initial commit for column-alias supoort; next steps are error-handling(semantic, cyclic reference) and related performance improvements

# This is the commit message #15:

adding cache to column-alias, upon refer to alias more than once, it return cache result instead of executing the referenced-sub-tree; it can improve performance significantly (alias vs non-alias)

# This is the commit message #16:

cosmitcs; aggregation semantic validation is done just after syntax phase; error-messages for failed queries;

# This is the commit message #17:

adding validation for cyclic-alias-reference (endless evaluate-loop) ; its done by validating the call-stack-deph not crossing a threshold

# This is the commit message #18:

1) seperate headers for the s3-select-functions framework; 2)bug fix for copy-constructor

# This is the commit message #19:

adding new basic-type timestamp (boost::posix_time); adding to_timestamp,add_date,diff_date,extract_date functions;

# This is the commit message #20:

adding yuvalif utcnow (return current time) implementation

# This is the commit message #21:

adding CSV parser integrated with AWS-cli, the upgraded parser is able handle null columns, dynamic column/row/escape/quote char definitions. the CSV-parser is implemented with BOOST state machine.

# This is the commit message #22:

fix comments

# This is the commit message #23:

add escape rules ; default row-delimiter

# This is the commit message #24:

*) bug fix. in case of syntax error, send error-description back to client.
*) upon amount of runtime-error is crossing 100, abort query execution with error-message.
*) compression-type value is check for "NONE"

# This is the commit message #25:

adding initial s3-select documentation

# This is the commit message #26:

*)identation

*)add table for CSV behavior

*)add alias feature decription

# This is the commit message #27:

add csv-header-info handling, use: get csv schema by first line. ignore: skip the first line.

# This is the commit message #28:

add csv-header-info feature description

# This is the commit message #29:

*) handling broken-CSV-rows is done on s3select-engine (CSV s3select reader) *) RGW is executing s3-select on io-vec instead of calling c_str (it might realloc)

# This is the commit message #30:

adding s3 select documentation(to be continue ...) , s3-select is part of radosgw top-level-link

# This is the commit message #31:

add s3select submodule (remove s3select header files from src/rgw )

# This is the commit message #32:

re shape the document; mainly user oriented ; design & architecture is out (different document) ; TBD detailed example.
  • Loading branch information
galsalomon66 committed Jun 2, 2020
1 parent cca6533 commit 6d0a233
Show file tree
Hide file tree
Showing 9 changed files with 665 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .gitmodules
Expand Up @@ -67,3 +67,6 @@
[submodule "src/pybind/mgr/rook/rook-client-python"]
path = src/pybind/mgr/rook/rook-client-python
url = https://github.com/ceph/rook-client-python.git
[submodule "s3select"]
path = src/s3select
url = https://github.com/ceph/s3select.git
1 change: 1 addition & 0 deletions doc/radosgw/index.rst
Expand Up @@ -76,4 +76,5 @@ you may write data with one API and retrieve it with the other.
Manpage radosgw <../../man/8/radosgw>
Manpage radosgw-admin <../../man/8/radosgw-admin>
QAT Acceleration for Encryption and Compression <qat-accel>
S3-select <s3select>

268 changes: 268 additions & 0 deletions doc/radosgw/s3select.rst

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions src/common/Formatter.cc
Expand Up @@ -75,6 +75,8 @@ FormatterAttrs::FormatterAttrs(const char *attr, ...)
va_end(ap);
}

void Formatter::write_bin_data(const char*, int){}

Formatter::Formatter() { }

Formatter::~Formatter() { }
Expand Down Expand Up @@ -544,6 +546,13 @@ void XMLFormatter::write_raw_data(const char *data)
m_ss << data;
}

void XMLFormatter::write_bin_data(const char *buff, int buf_len)
{
std::stringbuf *pbuf = m_ss.rdbuf();
pbuf->sputn(buff, buf_len);
m_ss.seekg(buf_len);
}

void XMLFormatter::get_attrs_str(const FormatterAttrs *attrs, std::string& attrs_str)
{
std::stringstream attrs_ss;
Expand Down
2 changes: 2 additions & 0 deletions src/common/Formatter.h
Expand Up @@ -118,6 +118,7 @@ namespace ceph {
virtual void *get_external_feature_handler(const std::string& feature) {
return nullptr;
}
virtual void write_bin_data(const char * buff,int buf_len);
};

class copyable_sstream : public std::stringstream {
Expand Down Expand Up @@ -226,6 +227,7 @@ namespace ceph {
void dump_format_va(std::string_view name, const char *ns, bool quoted, const char *fmt, va_list ap) override;
int get_len() const override;
void write_raw_data(const char *data) override;
void write_bin_data(const char * buff,int len) override;

/* with attrs */
void open_array_section_with_attrs(std::string_view name, const FormatterAttrs& attrs) override;
Expand Down
2 changes: 2 additions & 0 deletions src/rgw/rgw_rest.cc
Expand Up @@ -1894,6 +1894,8 @@ int RGWHandler_REST::read_permissions(RGWOp* op_obj)
/* is it a 'create bucket' request? */
if (op_obj->get_type() == RGW_OP_CREATE_BUCKET)
return 0;
if (op_obj->get_type() == RGW_OP_GET_OBJ) break;

only_bucket = true;
break;
case OP_DELETE:
Expand Down
312 changes: 312 additions & 0 deletions src/rgw/rgw_rest_s3.cc
Expand Up @@ -16,6 +16,7 @@
#include <boost/algorithm/string/replace.hpp>
#include <boost/utility/string_view.hpp>
#include <boost/tokenizer.hpp>
#include <s3select/include/s3select.h>

#include <liboath/oath.h>

Expand Down Expand Up @@ -4516,6 +4517,9 @@ RGWOp *RGWHandler_REST_Obj_S3::op_post()

if (s->info.args.exists("uploads"))
return new RGWInitMultipart_ObjStore_S3;

if (s->info.args.exists("select-type"))
return new s3selectEngine::RGWSelectObj_ObjStore_S3;

return new RGWPostObj_ObjStore_S3;
}
Expand Down Expand Up @@ -5281,6 +5285,7 @@ AWSGeneralAbstractor::get_auth_data_v4(const req_state* const s,
case RGW_OP_PUT_BUCKET_PUBLIC_ACCESS_BLOCK:
case RGW_OP_GET_BUCKET_PUBLIC_ACCESS_BLOCK:
case RGW_OP_DELETE_BUCKET_PUBLIC_ACCESS_BLOCK:
case RGW_OP_GET_OBJ://s3select its post-method(payload contain the query) , the request is get-object
break;
default:
dout(10) << "ERROR: AWS4 completion for this operation NOT IMPLEMENTED" << dendl;
Expand Down Expand Up @@ -5870,3 +5875,310 @@ bool rgw::auth::s3::S3AnonymousEngine::is_applicable(

return route == AwsRoute::QUERY_STRING && version == AwsVersion::UNKNOWN;
}


using namespace s3selectEngine;
const char *RGWSelectObj_ObjStore_S3::header_name_str[3] = {":event-type", ":content-type", ":message-type"};
const char *RGWSelectObj_ObjStore_S3::header_value_str[3] = {"Records", "application/octet-stream", "event"};

RGWSelectObj_ObjStore_S3::RGWSelectObj_ObjStore_S3():
s3select_syntax(new s3select()),
m_s3_csv_object(0),
m_buff_header(static_cast<char*>(malloc(1000))),
chunk_number(0)
{
set_get_data(true);
}

RGWSelectObj_ObjStore_S3::~RGWSelectObj_ObjStore_S3()
{
if (m_s3_csv_object!=0)
delete m_s3_csv_object;

delete s3select_syntax;

if(m_buff_header)
delete m_buff_header;
}

void RGWSelectObj_ObjStore_S3::encode_short(char & buff, short s , int & i)
{
short x = htons(s);
memcpy(&buff,&x,sizeof(s));i+=sizeof(s);
}

void RGWSelectObj_ObjStore_S3::encode_int(char & buff, u_int32_t s , int & i)
{
u_int32_t x = htonl(s);
memcpy(&buff,&x,sizeof(s));i+=sizeof(s);
}

int RGWSelectObj_ObjStore_S3::creare_header_records(char *buff)
{
int i = 0;

//1
buff[i++] = (char)strlen(header_name_str[EVENT_TYPE]);
memcpy(&buff[i], header_name_str[EVENT_TYPE], strlen(header_name_str[EVENT_TYPE]));
i += strlen(header_name_str[EVENT_TYPE]);
buff[i++] = (char)7;
encode_short(buff[i], (short)strlen(header_value_str[RECORDS]), i);
memcpy(&buff[i], header_value_str[RECORDS], strlen(header_value_str[RECORDS]));
i += strlen(header_value_str[RECORDS]);

//2
buff[i++] = (char)strlen(header_name_str[CONTENT_TYPE]);
memcpy(&buff[i], header_name_str[CONTENT_TYPE], strlen(header_name_str[CONTENT_TYPE]));
i += strlen(header_name_str[CONTENT_TYPE]);
buff[i++] = (char)7;
encode_short(buff[i], (short)strlen(header_value_str[OCTET_STREAM]), i);
memcpy(&buff[i], header_value_str[OCTET_STREAM], strlen(header_value_str[OCTET_STREAM]));
i += strlen(header_value_str[OCTET_STREAM]);

//3
buff[i++] = (char)strlen(header_name_str[MESSAGE_TYPE]);
memcpy(&buff[i], header_name_str[MESSAGE_TYPE], strlen(header_name_str[MESSAGE_TYPE]));
i += strlen(header_name_str[MESSAGE_TYPE]);
buff[i++] = (char)7;
encode_short(buff[i], (short)strlen(header_value_str[EVENT]), i);
memcpy(&buff[i], header_value_str[EVENT], strlen(header_value_str[EVENT]));
i += strlen(header_value_str[EVENT]);

return i;
}

int RGWSelectObj_ObjStore_S3::create_message(char * buff , u_int32_t result_len,u_int32_t header_len)
{
u_int32_t total_byte_len = 0;
u_int32_t preload_crc = 0;
u_int32_t message_crc = 0;
int i = 0;

total_byte_len = result_len + 16;

encode_int(buff[i], total_byte_len, i);
encode_int(buff[i], header_len, i);

crc32.reset();
crc32 = std::for_each( buff, buff + 8, crc32 );
preload_crc = crc32();
encode_int(buff[i], preload_crc, i);

i += result_len;

crc32.reset();
crc32 = std::for_each( buff, buff + i, crc32 );
message_crc = crc32();
encode_int(buff[i], message_crc, i);

return i;
}

#define PAYLOAD_LINE "\n<Payload>\n<Records>\n<Payload>\n"
#define END_PAYLOAD_LINE "\n</Payload></Records></Payload>"

int RGWSelectObj_ObjStore_S3::run_s3select(const char*query,const char*input,size_t input_length)
{
int status = 0;
csv_object::csv_defintions csv;

m_result = "012345678901"; //12 positions for header-crc

int header_size = 0;
s3select_syntax->parse_query(query);

if (m_s3_csv_object==0)
{
if (m_row_delimiter.size())
csv.row_delimiter = *m_row_delimiter.c_str();

if (m_column_delimiter.size())
csv.column_delimiter = *m_column_delimiter.c_str();

if (m_quot.size())
csv.quot_char = *m_quot.c_str();

if (m_escape_char.size())
csv.escape_char = *m_escape_char.c_str();

if(m_header_info.compare("IGNORE")==0)
{
csv.ignore_header_info=true;
}
else if(m_header_info.compare("USE")==0)
{
csv.use_header_info=true;
}

m_s3_csv_object = new s3selectEngine::csv_object(s3select_syntax,csv);
}

if (s3select_syntax->get_error_description().empty() == false)
{
header_size = creare_header_records(m_buff_header);

m_result.append(m_buff_header,header_size);

m_result.append(PAYLOAD_LINE);

m_result.append(s3select_syntax->get_error_description());

ldout(s->cct, 10) << "s3-select query: failed to prase query; {" << s3select_syntax->get_error_description() << "}"<< dendl;

status = -1;
}
else
{
header_size = creare_header_records(m_buff_header);

m_result.append(m_buff_header,header_size);

m_result.append(PAYLOAD_LINE);

status = m_s3_csv_object->run_s3select_on_stream(m_result,input,input_length,s->obj_size);
if(status<0)
{
m_result.append(m_s3_csv_object->get_error_description());
}
}

if (m_result.size() > strlen(PAYLOAD_LINE))
{
m_result.append(END_PAYLOAD_LINE);

int buff_len = create_message(m_result.data(), m_result.size() - 12, header_size);

s->formatter->write_bin_data(m_result.data(), buff_len);
if (op_ret < 0)
return op_ret;
}
rgw_flush_formatter_and_reset(s, s->formatter);

return status;
}

void RGWSelectObj_ObjStore_S3::convert_escape_seq(std::string & esc)
{
if (esc.compare("\n") == 0)
esc = '\n';
else if (esc.compare("\t") == 0)
esc = '\t';
else if (esc.compare("\r") == 0)
esc = '\r';
}

int RGWSelectObj_ObjStore_S3::handle_aws_cli_parameters(std::string & sql_query)
{

if(chunk_number !=0)
return 0;

#define GT "&gt;"
#define LT "&lt;"
if (m_s3select_query.find(GT) != std::string::npos)
boost::replace_all(m_s3select_query, GT, ">");
if (m_s3select_query.find(LT) != std::string::npos)
boost::replace_all(m_s3select_query, LT, "<");

//AWS cli s3select parameters
extract_by_tag("Expression", sql_query);

extract_by_tag("FieldDelimiter", m_column_delimiter);
convert_escape_seq(m_column_delimiter);

extract_by_tag("QuoteCharacter", m_quot);
convert_escape_seq(m_quot);

extract_by_tag("RecordDelimiter", m_row_delimiter);
convert_escape_seq(m_row_delimiter);
if (m_row_delimiter.size()==0)
m_row_delimiter='\n';

extract_by_tag("QuoteEscapeCharacter", m_escape_char);
convert_escape_seq(m_escape_char);

extract_by_tag("CompressionType", m_compression_type);

if (m_compression_type.length()>0 && m_compression_type.compare("NONE") != 0)
{
ldout(s->cct, 10) << "RGW supports currently only NONE option for compression type" << dendl;
return -1;
}

extract_by_tag("FileHeaderInfo",m_header_info);

return 0;
}

int RGWSelectObj_ObjStore_S3::extract_by_tag(std::string tag_name,std::string & result)
{
result = "";
size_t _qs = m_s3select_query.find("<" + tag_name + ">", 0) + tag_name.size() + 2;
if (_qs == std::string::npos)
return -1;
size_t _qe = m_s3select_query.find("</" + tag_name + ">", _qs);
if (_qe == std::string::npos)
return -1;

result = m_s3select_query.substr(_qs, _qe - _qs);

return 0;
}

int RGWSelectObj_ObjStore_S3::send_response_data(bufferlist &bl, off_t ofs, off_t len)
{
if (len == 0)
return 0;

if (s->info.args.exists("select-type") && (chunk_number == 0))
{
//retrieve s3-select query from payload
bufferlist data;
int ret;
int max_size = 4096;
std::tie(ret, data) = rgw_rest_read_all_input(s, max_size, false);
if (ret != 0)
{
ldout(s->cct, 10) << "s3-select query: failed to retrieve query; ret = " << ret << dendl;
return ret;
}

m_s3select_query = data.to_str();
if (m_s3select_query.length() > 0)
{
ldout(s->cct, 10) << "s3-select query: " << m_s3select_query << dendl;
}
else
{
ldout(s->cct, 10) << "s3-select query: failed to retrieve query;" << dendl;
return -1;
}
}

if (chunk_number == 0)
{
if (op_ret < 0)
set_req_state_err(s, op_ret);
dump_errno(s);
}
// Explicitly use chunked transfer encoding so that we can stream the result
// to the user without having to wait for the full length of it.
if (chunk_number == 0)
end_header(s, this, "application/xml", CHUNKED_TRANSFER_ENCODING);

std::string sql_query;
handle_aws_cli_parameters(sql_query);

int status =0;

for(auto & it : bl.buffers())
{
status = run_s3select(sql_query.c_str(),(char*)&(it)[0],it.length());
if(status<0)
break;
}

chunk_number++;

return status;
}

0 comments on commit 6d0a233

Please sign in to comment.