Skip to content

Commit

Permalink
Merge pull request #11494 from aclamk/wip-rgw-compression
Browse files Browse the repository at this point in the history
[RGW] Wip rgw compression
Reviewed-by: Orit Wasserman <owasserm@redhat.com>
Reviewed-by: Casey Bodley <cbodley@redhat.com>
  • Loading branch information
oritwas committed Nov 3, 2016
2 parents 6d6d4b3 + 144b031 commit 40ba752
Show file tree
Hide file tree
Showing 23 changed files with 1,032 additions and 282 deletions.
10 changes: 10 additions & 0 deletions doc/radosgw/config-ref.rst
Expand Up @@ -380,6 +380,16 @@ Ceph configuration file, the default value will be set automatically.
:Default: ``-1``


``rgw compression type``

:Description: The compression plugin to use when writing object data. Each
compressed object remembers which plugin was used, so changing
this setting does not hinder the ability to decompress existing
objects, not does it force existing objects to be recompressed.
:Type: String
:Default: ``none``


Regions
=======

Expand Down
9 changes: 7 additions & 2 deletions src/cls/rgw/cls_rgw.cc
Expand Up @@ -536,6 +536,7 @@ static int check_index(cls_method_context_t hctx, struct rgw_bucket_dir_header *
stats.num_entries++;
stats.total_size += entry.meta.accounted_size;
stats.total_size_rounded += cls_rgw_get_rounded_size(entry.meta.accounted_size);
stats.actual_size += entry.meta.size;

start_obj = kiter->first;
}
Expand Down Expand Up @@ -652,7 +653,7 @@ int rgw_bucket_set_tag_timeout(cls_method_context_t hctx, bufferlist *in, buffer
struct rgw_bucket_dir_header header;
int rc = read_bucket_header(hctx, &header);
if (rc < 0) {
CLS_LOG(1, "ERROR: rgw_bucket_complete_op(): failed to read header\n");
CLS_LOG(1, "ERROR: rgw_bucket_set_tag_timeout(): failed to read header\n");
return rc;
}

Expand Down Expand Up @@ -713,7 +714,7 @@ int rgw_bucket_prepare_op(cls_method_context_t hctx, bufferlist *in, bufferlist
struct rgw_bucket_dir_header header;
rc = read_bucket_header(hctx, &header);
if (rc < 0) {
CLS_LOG(1, "ERROR: rgw_bucket_complete_op(): failed to read header\n");
CLS_LOG(1, "ERROR: rgw_bucket_prepare_op(): failed to read header\n");
return rc;
}

Expand All @@ -740,6 +741,7 @@ static void unaccount_entry(struct rgw_bucket_dir_header& header, struct rgw_buc
stats.num_entries--;
stats.total_size -= entry.meta.accounted_size;
stats.total_size_rounded -= cls_rgw_get_rounded_size(entry.meta.accounted_size);
stats.actual_size -= entry.meta.size;
}

static void log_entry(const char *func, const char *str, struct rgw_bucket_dir_entry *entry)
Expand Down Expand Up @@ -925,6 +927,7 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist
stats.num_entries++;
stats.total_size += meta.accounted_size;
stats.total_size_rounded += cls_rgw_get_rounded_size(meta.accounted_size);
stats.actual_size += meta.size;
bufferlist new_key_bl;
::encode(entry, new_key_bl);
int ret = cls_cxx_map_set_val(hctx, idx, &new_key_bl);
Expand Down Expand Up @@ -1955,6 +1958,7 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx, bufferlist *in, bufferlis
old_stats.num_entries--;
old_stats.total_size -= cur_disk.meta.accounted_size;
old_stats.total_size_rounded -= cls_rgw_get_rounded_size(cur_disk.meta.accounted_size);
old_stats.actual_size -= cur_disk.meta.size;
header_changed = true;
}
struct rgw_bucket_category_stats& stats =
Expand Down Expand Up @@ -1982,6 +1986,7 @@ int rgw_dir_suggest_changes(cls_method_context_t hctx, bufferlist *in, bufferlis
stats.num_entries++;
stats.total_size += cur_change.meta.accounted_size;
stats.total_size_rounded += cls_rgw_get_rounded_size(cur_change.meta.accounted_size);
stats.actual_size += cur_change.meta.size;
header_changed = true;
cur_change.index_ver = header.ver;
bufferlist cur_state_bl;
Expand Down
2 changes: 2 additions & 0 deletions src/cls/rgw/cls_rgw_types.cc
Expand Up @@ -468,6 +468,7 @@ void rgw_bucket_category_stats::generate_test_instances(list<rgw_bucket_category
s->total_size = 1024;
s->total_size_rounded = 4096;
s->num_entries = 2;
s->actual_size = 1024;
o.push_back(s);
o.push_back(new rgw_bucket_category_stats);
}
Expand All @@ -477,6 +478,7 @@ void rgw_bucket_category_stats::dump(Formatter *f) const
f->dump_unsigned("total_size", total_size);
f->dump_unsigned("total_size_rounded", total_size_rounded);
f->dump_unsigned("num_entries", num_entries);
f->dump_unsigned("actual_size", actual_size);
}

void rgw_bucket_dir_header::generate_test_instances(list<rgw_bucket_dir_header*>& o)
Expand Down
11 changes: 9 additions & 2 deletions src/cls/rgw/cls_rgw_types.h
Expand Up @@ -554,21 +554,28 @@ struct rgw_bucket_category_stats {
uint64_t total_size;
uint64_t total_size_rounded;
uint64_t num_entries;
uint64_t actual_size{0}; //< account for compression, encryption

rgw_bucket_category_stats() : total_size(0), total_size_rounded(0), num_entries(0) {}

void encode(bufferlist &bl) const {
ENCODE_START(2, 2, bl);
ENCODE_START(3, 2, bl);
::encode(total_size, bl);
::encode(total_size_rounded, bl);
::encode(num_entries, bl);
::encode(actual_size, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::iterator &bl) {
DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, bl);
DECODE_START_LEGACY_COMPAT_LEN(3, 2, 2, bl);
::decode(total_size, bl);
::decode(total_size_rounded, bl);
::decode(num_entries, bl);
if (struct_v >= 3) {
::decode(actual_size, bl);
} else {
actual_size = total_size;
}
DECODE_FINISH(bl);
}
void dump(Formatter *f) const;
Expand Down
1 change: 1 addition & 0 deletions src/common/config_opts.h
Expand Up @@ -1529,6 +1529,7 @@ OPTION(mon_mgr_beacon_grace, OPT_INT, 30) // How long to wait to failover
OPTION(rgw_list_bucket_min_readahead, OPT_INT, 1000) // minimum number of entries to read from rados for bucket listing

OPTION(rgw_rest_getusage_op_compat, OPT_BOOL, false) // dump description of total stats for s3 GetUsage API
OPTION(rgw_compression_type, OPT_STR, "none") // type of compressor, none to not use

OPTION(mutex_perf_counter, OPT_BOOL, false) // enable/disable mutex perf counter
OPTION(throttler_perf_counter, OPT_BOOL, true) // enable/disable throttler perf counter
Expand Down
20 changes: 6 additions & 14 deletions src/compressor/snappy/SnappyCompressor.h
Expand Up @@ -76,23 +76,15 @@ class SnappyCompressor : public Compressor {
int decompress(bufferlist::iterator &p,
size_t compressed_len,
bufferlist &dst) override {
size_t res_len = 0;
// Trick, decompress only need first few bytes of buffer. note
// that 4 bytes is *not* enough.. it works for small buffers (a
// few MB), but not large ones (~260 MB), because the 32-bit value
// is encoded as a varint. From snappy.cc:
// if (Varint::Parse32WithLimit(start, limit, &v) != NULL) {
// 8 bytes is enough.
bufferlist::const_iterator ptmp = p;
bufferlist tmp;
ptmp.copy(std::min(8lu, compressed_len), tmp);
if (!snappy::GetUncompressedLength(tmp.c_str(), tmp.length(), &res_len)) {
snappy::uint32 res_len = 0;
BufferlistSource source_1(p, compressed_len);
if (!snappy::GetUncompressedLength(&source_1, &res_len)) {
return -1;
}
BufferlistSource source(p, compressed_len);
BufferlistSource source_2(p, compressed_len);
bufferptr ptr(res_len);
if (snappy::RawUncompress(&source, ptr.c_str())) {
p = source.get_pos();
if (snappy::RawUncompress(&source_2, ptr.c_str())) {
p = source_2.get_pos();
dst.append(ptr);
return 0;
}
Expand Down
1 change: 1 addition & 0 deletions src/rgw/CMakeLists.txt
Expand Up @@ -29,6 +29,7 @@ set(rgw_a_srcs
rgw_cache.cc
rgw_client_io.cc
rgw_common.cc
rgw_compression.cc
rgw_cors.cc
rgw_cors_s3.cc
rgw_dencoder.cc
Expand Down
6 changes: 3 additions & 3 deletions src/rgw/rgw_admin.cc
Expand Up @@ -1170,7 +1170,7 @@ int check_min_obj_stripe_size(RGWRados *store, RGWBucketInfo& bucket_info, rgw_o
read_op.params.attrs = &attrs;
read_op.params.obj_size = &obj_size;

int ret = read_op.prepare(NULL, NULL);
int ret = read_op.prepare();
if (ret < 0) {
lderr(store->ctx()) << "ERROR: failed to stat object, returned error: " << cpp_strerror(-ret) << dendl;
return ret;
Expand Down Expand Up @@ -1232,7 +1232,7 @@ int check_obj_locator_underscore(RGWBucketInfo& bucket_info, rgw_obj& obj, rgw_o
RGWRados::Object op_target(store, bucket_info, obj_ctx, obj);
RGWRados::Object::Read read_op(&op_target);

int ret = read_op.prepare(NULL, NULL);
int ret = read_op.prepare();
bool needs_fixing = (ret == -ENOENT);

f->dump_bool("needs_fixing", needs_fixing);
Expand Down Expand Up @@ -5106,7 +5106,7 @@ int main(int argc, char **argv)
read_op.params.attrs = &attrs;
read_op.params.obj_size = &obj_size;

ret = read_op.prepare(NULL, NULL);
ret = read_op.prepare();
if (ret < 0) {
cerr << "ERROR: failed to stat object, returned error: " << cpp_strerror(-ret) << std::endl;
return 1;
Expand Down
6 changes: 1 addition & 5 deletions src/rgw/rgw_bucket.cc
Expand Up @@ -976,11 +976,7 @@ static void dump_bucket_usage(map<RGWObjCategory, RGWStorageStats>& stats, Forma
RGWStorageStats& s = iter->second;
const char *cat_name = rgw_obj_category_name(iter->first);
formatter->open_object_section(cat_name);
formatter->dump_int("size", s.size);
formatter->dump_int("size_actual", s.size_rounded);
formatter->dump_int("size_kb", rgw_rounded_kb(s.size));
formatter->dump_int("size_kb_actual", rgw_rounded_kb(s.size_rounded));
formatter->dump_int("num_objects", s.num_objects);
s.dump(formatter);
formatter->close_section();
}
formatter->close_section();
Expand Down
8 changes: 6 additions & 2 deletions src/rgw/rgw_common.h
Expand Up @@ -109,6 +109,8 @@ using ceph::crypto::MD5;
#define RGW_ATTR_OLH_ID_TAG RGW_ATTR_OLH_PREFIX "idtag"
#define RGW_ATTR_OLH_PENDING_PREFIX RGW_ATTR_OLH_PREFIX "pending."

#define RGW_ATTR_COMPRESSION RGW_ATTR_PREFIX "compression"

/* RGW File Attributes */
#define RGW_ATTR_UNIX_KEY1 RGW_ATTR_PREFIX "unix-key1"
#define RGW_ATTR_UNIX1 RGW_ATTR_PREFIX "unix1"
Expand Down Expand Up @@ -1117,6 +1119,7 @@ struct RGWStorageStats
RGWObjCategory category;
uint64_t size;
uint64_t size_rounded;
uint64_t size_utilized{0}; //< size after compression, encryption
uint64_t num_objects;

RGWStorageStats()
Expand Down Expand Up @@ -1374,15 +1377,16 @@ struct RGWObjEnt {
std::string ns;
rgw_user owner;
std::string owner_display_name;
uint64_t size;
uint64_t size{0};
uint64_t accounted_size{0};
ceph::real_time mtime;
string etag;
string content_type;
string tag;
uint32_t flags;
uint64_t versioned_epoch;

RGWObjEnt() : size(0), flags(0), versioned_epoch(0) {}
RGWObjEnt() : flags(0), versioned_epoch(0) {}

void dump(Formatter *f) const;

Expand Down

0 comments on commit 40ba752

Please sign in to comment.