diff --git a/src/common/options/global.yaml.in b/src/common/options/global.yaml.in index 52d306e4c11ca..4a8403f6b98cc 100644 --- a/src/common/options/global.yaml.in +++ b/src/common/options/global.yaml.in @@ -4493,8 +4493,11 @@ options: desc: Default policy for using compression when pool does not specify long_desc: '''none'' means never use compression. ''passive'' means use compression when clients hint that data is compressible. ''aggressive'' means use compression - unless clients hint that data is not compressible. This option is used when the - per-pool property for the compression mode is not present.' + unless clients hint that data is not compressible. ''*_lazy'' counterparts instruct + to apply compression as per above during deep-scrubbing only. Which has to be + additionally enabled at per-pool level using ''deep_scrub_recompression'' pool + setting. + This option is used when the per-pool property for the compression mode is not present.' fmt_desc: The default policy for using compression if the per-pool property ``compression_mode`` is not set. ``none`` means never use compression. ``passive`` means use compression when @@ -4502,16 +4505,24 @@ options: compressible. ``aggressive`` means use compression unless clients hint that data is not compressible. ``force`` means use compression under all circumstances even if the clients hint that - the data is not compressible. + the data is not compressible. ''*_lazy'' modes are similar to their + counterpart ones but compression to be performed during deep + scrubbing only. Which has to be additionally enabled on per-pool basis + using ''deep_scrub_recompress'' pool setting. default: none enum_values: - none - passive - aggressive - force + - passive_lazy + - aggressive_lazy + - force_lazy flags: - runtime with_legacy: true + see_also: + - bluestore_prefer_deferred_size - name: bluestore_compression_algorithm type: str level: advanced diff --git a/src/compressor/Compressor.cc b/src/compressor/Compressor.cc index a13dfb30ddc78..451f58c63a960 100644 --- a/src/compressor/Compressor.cc +++ b/src/compressor/Compressor.cc @@ -54,6 +54,10 @@ const char *Compressor::get_comp_mode_name(int m) { case COMP_PASSIVE: return "passive"; case COMP_AGGRESSIVE: return "aggressive"; case COMP_FORCE: return "force"; + case COMP_PASSIVE_LAZY: return "passive_lazy"; + case COMP_AGGRESSIVE_LAZY: return "aggressive_lazy"; + case COMP_FORCE_LAZY: return "force_lazy"; + default: return "???"; } } @@ -65,6 +69,12 @@ Compressor::get_comp_mode_type(std::string_view s) { return COMP_AGGRESSIVE; if (s == "passive") return COMP_PASSIVE; + if (s == "force_lazy") + return COMP_FORCE_LAZY; + if (s == "aggressive_lazy") + return COMP_AGGRESSIVE_LAZY; + if (s == "passive_lazy") + return COMP_PASSIVE_LAZY; if (s == "none") return COMP_NONE; return {}; diff --git a/src/compressor/Compressor.h b/src/compressor/Compressor.h index 11f020a0dd247..b24774e8d8d14 100644 --- a/src/compressor/Compressor.h +++ b/src/compressor/Compressor.h @@ -64,7 +64,10 @@ class Compressor { COMP_NONE, ///< compress never COMP_PASSIVE, ///< compress if hinted COMPRESSIBLE COMP_AGGRESSIVE, ///< compress unless hinted INCOMPRESSIBLE - COMP_FORCE ///< compress always + COMP_FORCE, ///< compress always + COMP_PASSIVE_LAZY, ///< delayed compression during reformatting if hinted COMPRESSIBLE + COMP_AGGRESSIVE_LAZY, ///< delayed_compression during reformatting unless hinted INCOMPRESSIBLE + COMP_FORCE_LAZY ///< unconditional delayed_compression during reformatting }; static const char* get_comp_alg_name(int a); diff --git a/src/include/rados.h b/src/include/rados.h index 1fb86be947e60..96429555eb590 100644 --- a/src/include/rados.h +++ b/src/include/rados.h @@ -492,6 +492,7 @@ enum { CEPH_OSD_OP_FLAG_FADVISE_NOCACHE = 0x40, /* data will be accessed only once by this client */ CEPH_OSD_OP_FLAG_WITH_REFERENCE = 0x80, /* need reference couting */ CEPH_OSD_OP_FLAG_BYPASS_CLEAN_CACHE = 0x100, /* bypass ObjectStore cache, mainly for deep-scrub */ + CEPH_OSD_OP_FLAG_ALLOW_DATA_REFORMATTING = 0x200, /*ObjectStore can optimize data layout afterwards*/ }; #define EOLDSNAPC 85 /* ORDERSNAP flag set; writer has old snapc*/ diff --git a/src/mon/MonCommands.h b/src/mon/MonCommands.h index b2a678dff53c5..3b9f445c11ee1 100644 --- a/src/mon/MonCommands.h +++ b/src/mon/MonCommands.h @@ -1141,11 +1141,11 @@ COMMAND("osd pool rename " "rename to ", "osd", "rw") COMMAND("osd pool get " "name=pool,type=CephPoolname " - "name=var,type=CephChoices,strings=size|min_size|pg_num|pgp_num|crush_rule|hashpspool|nodelete|nopgchange|nosizechange|write_fadvise_dontneed|noscrub|nodeep-scrub|hit_set_type|hit_set_period|hit_set_count|hit_set_fpp|use_gmt_hitset|target_max_objects|target_max_bytes|cache_target_dirty_ratio|cache_target_dirty_high_ratio|cache_target_full_ratio|cache_min_flush_age|cache_min_evict_age|erasure_code_profile|min_read_recency_for_promote|all|min_write_recency_for_promote|fast_read|hit_set_grade_decay_rate|hit_set_search_last_n|scrub_min_interval|scrub_max_interval|deep_scrub_interval|recovery_priority|recovery_op_priority|scrub_priority|compression_mode|compression_algorithm|compression_required_ratio|compression_max_blob_size|compression_min_blob_size|csum_type|csum_min_block|csum_max_block|allow_ec_overwrites|fingerprint_algorithm|pg_autoscale_mode|pg_autoscale_bias|pg_num_min|pg_num_max|target_size_bytes|target_size_ratio|dedup_tier|dedup_chunk_algorithm|dedup_cdc_chunk_size|eio|bulk|read_ratio", + "name=var,type=CephChoices,strings=size|min_size|pg_num|pgp_num|crush_rule|hashpspool|nodelete|nopgchange|nosizechange|write_fadvise_dontneed|noscrub|nodeep-scrub|hit_set_type|hit_set_period|hit_set_count|hit_set_fpp|use_gmt_hitset|target_max_objects|target_max_bytes|cache_target_dirty_ratio|cache_target_dirty_high_ratio|cache_target_full_ratio|cache_min_flush_age|cache_min_evict_age|erasure_code_profile|min_read_recency_for_promote|all|min_write_recency_for_promote|fast_read|hit_set_grade_decay_rate|hit_set_search_last_n|scrub_min_interval|scrub_max_interval|deep_scrub_interval|recovery_priority|recovery_op_priority|scrub_priority|compression_mode|compression_algorithm|compression_required_ratio|compression_max_blob_size|compression_min_blob_size|csum_type|csum_min_block|csum_max_block|allow_ec_overwrites|fingerprint_algorithm|pg_autoscale_mode|pg_autoscale_bias|pg_num_min|pg_num_max|target_size_bytes|target_size_ratio|dedup_tier|dedup_chunk_algorithm|dedup_cdc_chunk_size|eio|bulk|read_ratio|deep_scrub_defragment|deep_scrub_recompress", "get pool parameter ", "osd", "r") COMMAND("osd pool set " "name=pool,type=CephPoolname " - "name=var,type=CephChoices,strings=size|min_size|pg_num|pgp_num|pgp_num_actual|crush_rule|hashpspool|nodelete|nopgchange|nosizechange|write_fadvise_dontneed|noscrub|nodeep-scrub|hit_set_type|hit_set_period|hit_set_count|hit_set_fpp|use_gmt_hitset|target_max_bytes|target_max_objects|cache_target_dirty_ratio|cache_target_dirty_high_ratio|cache_target_full_ratio|cache_min_flush_age|cache_min_evict_age|min_read_recency_for_promote|min_write_recency_for_promote|fast_read|hit_set_grade_decay_rate|hit_set_search_last_n|scrub_min_interval|scrub_max_interval|deep_scrub_interval|recovery_priority|recovery_op_priority|scrub_priority|compression_mode|compression_algorithm|compression_required_ratio|compression_max_blob_size|compression_min_blob_size|csum_type|csum_min_block|csum_max_block|allow_ec_overwrites|fingerprint_algorithm|pg_autoscale_mode|pg_autoscale_bias|pg_num_min|pg_num_max|target_size_bytes|target_size_ratio|dedup_tier|dedup_chunk_algorithm|dedup_cdc_chunk_size|eio|bulk|read_ratio " + "name=var,type=CephChoices,strings=size|min_size|pg_num|pgp_num|pgp_num_actual|crush_rule|hashpspool|nodelete|nopgchange|nosizechange|write_fadvise_dontneed|noscrub|nodeep-scrub|hit_set_type|hit_set_period|hit_set_count|hit_set_fpp|use_gmt_hitset|target_max_bytes|target_max_objects|cache_target_dirty_ratio|cache_target_dirty_high_ratio|cache_target_full_ratio|cache_min_flush_age|cache_min_evict_age|min_read_recency_for_promote|min_write_recency_for_promote|fast_read|hit_set_grade_decay_rate|hit_set_search_last_n|scrub_min_interval|scrub_max_interval|deep_scrub_interval|recovery_priority|recovery_op_priority|scrub_priority|compression_mode|compression_algorithm|compression_required_ratio|compression_max_blob_size|compression_min_blob_size|csum_type|csum_min_block|csum_max_block|allow_ec_overwrites|fingerprint_algorithm|pg_autoscale_mode|pg_autoscale_bias|pg_num_min|pg_num_max|target_size_bytes|target_size_ratio|dedup_tier|dedup_chunk_algorithm|dedup_cdc_chunk_size|eio|bulk|read_ratio|deep_scrub_defragment|deep_scrub_recompress " "name=val,type=CephString " "name=yes_i_really_mean_it,type=CephBool,req=false", "set pool parameter to ", "osd", "rw") diff --git a/src/mon/OSDMonitor.cc b/src/mon/OSDMonitor.cc index b70fcc064dd80..0dc9faf020d27 100644 --- a/src/mon/OSDMonitor.cc +++ b/src/mon/OSDMonitor.cc @@ -5405,7 +5405,8 @@ namespace { CSUM_TYPE, CSUM_MAX_BLOCK, CSUM_MIN_BLOCK, FINGERPRINT_ALGORITHM, PG_AUTOSCALE_MODE, PG_NUM_MIN, TARGET_SIZE_BYTES, TARGET_SIZE_RATIO, PG_AUTOSCALE_BIAS, DEDUP_TIER, DEDUP_CHUNK_ALGORITHM, - DEDUP_CDC_CHUNK_SIZE, POOL_EIO, BULK, PG_NUM_MAX, READ_RATIO }; + DEDUP_CDC_CHUNK_SIZE, POOL_EIO, BULK, PG_NUM_MAX, READ_RATIO, + DEEP_SCRUB_DEFRAGMENT, DEEP_SCRUB_RECOMPRESS }; std::set subtract_second_from_first(const std::set& first, @@ -6156,7 +6157,9 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op) {"dedup_chunk_algorithm", DEDUP_CHUNK_ALGORITHM}, {"dedup_cdc_chunk_size", DEDUP_CDC_CHUNK_SIZE}, {"bulk", BULK}, - {"read_ratio", READ_RATIO} + {"read_ratio", READ_RATIO}, + {"deep_scrub_defragment", DEEP_SCRUB_DEFRAGMENT}, + {"deep_scrub_recompress", DEEP_SCRUB_RECOMPRESS}, }; typedef std::set choices_set_t; @@ -6403,6 +6406,8 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op) case DEDUP_CHUNK_ALGORITHM: case DEDUP_CDC_CHUNK_SIZE: case READ_RATIO: + case DEEP_SCRUB_DEFRAGMENT: + case DEEP_SCRUB_RECOMPRESS: pool_opts_t::key_t key = pool_opts_t::get_opt_desc(i->first).key; if (p->opts.is_set(key)) { if(*it == CSUM_TYPE) { @@ -6567,6 +6572,8 @@ bool OSDMonitor::preprocess_command(MonOpRequestRef op) case DEDUP_CHUNK_ALGORITHM: case DEDUP_CDC_CHUNK_SIZE: case READ_RATIO: + case DEEP_SCRUB_DEFRAGMENT: + case DEEP_SCRUB_RECOMPRESS: for (i = ALL_CHOICES.begin(); i != ALL_CHOICES.end(); ++i) { if (i->second == *it) break; diff --git a/src/os/bluestore/BlueStore.cc b/src/os/bluestore/BlueStore.cc index 2f6cf3efb899b..49f93e0282596 100644 --- a/src/os/bluestore/BlueStore.cc +++ b/src/os/bluestore/BlueStore.cc @@ -2987,13 +2987,13 @@ ostream& operator<<(ostream& out, const BlueStore::Extent& e) } // OldExtent -BlueStore::OldExtent* BlueStore::OldExtent::create(CollectionRef c, +BlueStore::OldExtent* BlueStore::OldExtent::create(Collection* c, uint32_t lo, uint32_t o, uint32_t l, BlobRef& b) { OldExtent* oe = new OldExtent(lo, o, l, b); - b->put_ref(c.get(), o, l, &(oe->r)); + b->put_ref(c, o, l, &(oe->r)); oe->blob_empty = !b->is_referenced(); return oe; } @@ -4388,7 +4388,7 @@ int BlueStore::ExtentMap::compress_extent_map( } void BlueStore::ExtentMap::punch_hole( - CollectionRef &c, + Collection* c, uint64_t offset, uint64_t length, old_extent_map_t *old_extents) @@ -4459,7 +4459,7 @@ BlueStore::Extent *BlueStore::ExtentMap::set_lextent( b->get_ref(onode->c, blob_offset, length); if (old_extents) { - punch_hole(c, logical_offset, length, old_extents); + punch_hole(c.get(), logical_offset, length, old_extents); } Extent *le = new Extent(logical_offset, blob_offset, length, b); @@ -4825,6 +4825,25 @@ void BlueStore::Onode::decode_omap_key(const string& key, string *user_key) // ======================================================= // WriteContext +void BlueStore::WriteContext::setup_prealloc(PExtentVector&& from, + uint32_t total_bytes) +{ + prealloc_slicer.setup(std::move(from), total_bytes); +} + +void BlueStore::WriteContext::dispose_remaining_prealloc(Allocator* alloc) +{ + PExtentVector to_release; + if (!prealloc_slicer.end() && prealloc_slicer.slice(to_release) > 0) { + alloc->release(to_release); + } +} + +void BlueStore::WriteContext::rewind_prealloc() +{ + prealloc_slicer.rewind(); +} + /// Checks for writes to the same pextent within a blob bool BlueStore::WriteContext::has_conflict( BlobRef b, @@ -5616,7 +5635,6 @@ bufferlist BlueStore::OmapIteratorImpl::value() return it->value(); } - // ===================================== #undef dout_prefix @@ -6413,6 +6431,37 @@ void BlueStore::_init_logger() "srwc", PerfCountersBuilder::PRIO_USEFUL); + // reformatting counters + //**************************************** + b.add_time_avg(l_bluestore_reformat_lat, "reformat_lat", + "Average reformatting latency", + "rf_l", PerfCountersBuilder::PRIO_CRITICAL); + b.add_u64_counter(l_bluestore_reformat_compress_attempted, + "reformat_compress_attempted", + "Recompression attempts done", + "rfca", + PerfCountersBuilder::PRIO_USEFUL); + b.add_u64_counter(l_bluestore_reformat_compress_omitted, + "reformat_compress_omitted", + "Recompression attempts omitted", + "rfco", + PerfCountersBuilder::PRIO_USEFUL); + b.add_u64_counter(l_bluestore_reformat_defragment_attempted, + "reformat_defragment_attempted", + "Defragmentation attempts done", + "rfda", + PerfCountersBuilder::PRIO_USEFUL); + b.add_u64_counter(l_bluestore_reformat_defragment_omitted, + "reformat_defragment_omitted", + "Defragmentation attempts omitted", + "rfdo", + PerfCountersBuilder::PRIO_USEFUL); + b.add_u64_counter(l_bluestore_reformat_issued, + "reformat_issued", + "Reformatting requests issued", + "rfi", + PerfCountersBuilder::PRIO_USEFUL); + // Resulting size axis configuration for op histograms, values are in bytes PerfHistogramCommon::axis_config_d alloc_hist_x_axis_config{ "Given size (bytes)", @@ -11741,7 +11790,7 @@ int BlueStore::set_collection_opts( } int BlueStore::read( - CollectionHandle &c_, + CollectionHandle &ch, const ghobject_t& oid, uint64_t offset, size_t length, @@ -11749,20 +11798,46 @@ int BlueStore::read( uint32_t op_flags) { auto start = mono_clock::now(); - Collection *c = static_cast(c_.get()); + Collection *c = static_cast(ch.get()); const coll_t &cid = c->get_cid(); dout(15) << __func__ << " " << cid << " " << oid - << " 0x" << std::hex << offset << "~" << length << std::dec - << dendl; + << " 0x" << std::hex << offset << "~" << length + << " flags 0x" << op_flags + << std::dec << dendl; if (!c->exists) return -ENOENT; bl.clear(); int r; + OnodeRef o; + bool might_need_reformatting = + op_flags & CEPH_OSD_OP_FLAG_ALLOW_DATA_REFORMATTING; + if (might_need_reformatting) { + // for the sake of simplicity do not apply data reformatting to unaligned + // reads, hopefylly we'll never get them as the primary user for + // the feature is scrubbing. + might_need_reformatting = p2nphase(offset, min_alloc_size) == 0 && + p2nphase(offset + length, min_alloc_size) == 0; + if (!might_need_reformatting) { + dout(5) << __func__ + << " data reformatting skipped due to unaligned read bounds " + << p2nphase(offset, min_alloc_size) << " " + << p2nphase(offset + length, min_alloc_size) + << dendl; + } + } + span_stat_t span_stat; { - std::shared_lock l(c->lock); + std::shared_lock slock(c->lock, std::defer_lock); + std::unique_lock ulock(c->lock, std::defer_lock); + if (might_need_reformatting) { + ulock.lock(); + } else { + slock.lock(); + } + auto start1 = mono_clock::now(); - OnodeRef o = c->get_onode(oid, false); + o = c->get_onode(oid, false); log_latency("get_onode@read", l_bluestore_read_onode_meta_lat, mono_clock::now() - start1, @@ -11776,9 +11851,22 @@ int BlueStore::read( if (offset == length && offset == 0) length = o->onode.size; - r = _do_read(c, o, offset, length, bl, op_flags); + r = _do_read(c, o, offset, length, bl, op_flags, 0, + might_need_reformatting ? &span_stat : nullptr); if (r == -EIO) { logger->inc(l_bluestore_read_eio); + log_latency(__func__, + l_bluestore_read_lat, + mono_clock::now() - start, + cct->_conf->bluestore_log_op_age); + } + if (might_need_reformatting) { + auto start2 = mono_clock::now(); + _maybe_reformat_object(c, o, offset, length, bl, op_flags, span_stat); + log_latency(__func__, + l_bluestore_reformat_lat, + mono_clock::now() - start2, + cct->_conf->bluestore_log_op_age); } } @@ -11796,10 +11884,6 @@ int BlueStore::read( dout(10) << __func__ << " " << cid << " " << oid << " 0x" << std::hex << offset << "~" << length << std::dec << " = " << r << dendl; - log_latency(__func__, - l_bluestore_read_lat, - mono_clock::now() - start, - cct->_conf->bluestore_log_op_age); return r; } @@ -11809,11 +11893,15 @@ void BlueStore::_read_cache( size_t length, int read_cache_policy, ready_regions_t& ready_regions, - blobs2read_t& blobs2read) + blobs2read_t& blobs2read, + span_stat_t* res_span_stat) { // build blob-wise list to of stuff read (that isn't cached) unsigned left = length; uint64_t pos = offset; + span_stat_t dummy_span_stat; + span_stat_t& span_stat = res_span_stat ? *res_span_stat : dummy_span_stat; + span_stat.stored = length; auto lp = o->extent_map.seek_lextent(offset); while (left > 0 && lp != o->extent_map.extent_map.end()) { if (pos < lp->logical_offset) { @@ -11825,7 +11913,9 @@ void BlueStore::_read_cache( << std::dec << dendl; pos += hole; left -= hole; + span_stat.stored -= hole; } + span_stat.extents++; BlobRef& bptr = lp->blob; unsigned l_off = pos - lp->logical_offset; unsigned b_off = l_off + lp->blob_offset; @@ -11849,7 +11939,8 @@ void BlueStore::_read_cache( pc->first == b_off) { l = pc->second.length(); ready_regions[pos] = std::move(pc->second); - dout(30) << __func__ << " use cache 0x" << std::hex << pos << ": 0x" + span_stat.cached += l; + dout(30) << __func__ << " use cache 0x" << std::hex << pos << ": 0x" << b_off << "~" << l << std::dec << dendl; ++pc; } else { @@ -11898,18 +11989,38 @@ void BlueStore::_read_cache( } ++lp; } + span_stat.stored -= left; // finally adjust if we haven't seen the full length } int BlueStore::_prepare_read_ioc( blobs2read_t& blobs2read, vector* compressed_blob_bls, - IOContext* ioc) -{ + IOContext* ioc, + span_stat_t* res_span_stat) +{ + span_stat_t dummy_span_stat; + span_stat_t& span_stat = res_span_stat ? *res_span_stat : dummy_span_stat; + interval_set pintervals; // need to accumulate pextents in this set + // to get them sorted by offset and merged + // into larger intervals if possible. for (auto& p : blobs2read) { const BlobRef& bptr = p.first; regions2read_t& r2r = p.second; dout(20) << __func__ << " blob " << *bptr << " need " << r2r << dendl; + SharedBlobRef sb; + bool has_shared = false; + if (bptr->get_blob().is_shared() && res_span_stat) { + sb = bptr->get_shared_blob(); + bptr->collection->load_shared_blob(sb); + has_shared = true; + } + auto shared_cb = [&](uint64_t o, uint32_t len, uint32_t refs) { + if (refs > 1) { + span_stat.allocated_shared += len; + } + return 0; + }; if (bptr->get_blob().is_compressed()) { // read the whole thing if (compressed_blob_bls->empty()) { @@ -11918,9 +12029,18 @@ int BlueStore::_prepare_read_ioc( } compressed_blob_bls->push_back(bufferlist()); bufferlist& bl = compressed_blob_bls->back(); + auto on_disk_len = bptr->get_blob().get_ondisk_length(); + span_stat.stored_compressed += bptr->get_blob().get_logical_length(); + span_stat.allocated_compressed += on_disk_len; auto r = bptr->get_blob().map( - 0, bptr->get_blob().get_ondisk_length(), + 0, on_disk_len, [&](uint64_t offset, uint64_t length) { + if (res_span_stat) { + pintervals.insert(offset, length); + if (has_shared) { + sb->map_fn(offset, length, shared_cb); + } + } int r = bdev->aio_read(offset, length, &bl, ioc); if (r < 0) return r; @@ -11945,10 +12065,17 @@ int BlueStore::_prepare_read_ioc( << dendl; // read it - auto r = bptr->get_blob().map( + span_stat.allocated += req.r_len; + auto r = bptr->get_blob().map( req.r_off, req.r_len, [&](uint64_t offset, uint64_t length) { - int r = bdev->aio_read(offset, length, &req.bl, ioc); + if (res_span_stat) { + pintervals.insert(offset, length); + if (has_shared) { + sb->map_fn(offset, length, shared_cb); + } + } + int r = bdev->aio_read(offset, length, &req.bl, ioc); if (r < 0) return r; return 0; @@ -11966,6 +12093,7 @@ int BlueStore::_prepare_read_ioc( } } } + span_stat.frags += pintervals.num_intervals(); return 0; } @@ -12069,7 +12197,8 @@ int BlueStore::_do_read( size_t length, bufferlist& bl, uint32_t op_flags, - uint64_t retry_count) + uint64_t retry_count, + span_stat_t* span_stat) { FUNCTRACE(cct); int r = 0; @@ -12120,7 +12249,7 @@ int BlueStore::_do_read( // build blob-wise list to of stuff read (that isn't cached) ready_regions_t ready_regions; blobs2read_t blobs2read; - _read_cache(o, offset, length, read_cache_policy, ready_regions, blobs2read); + _read_cache(o, offset, length, read_cache_policy, ready_regions, blobs2read, span_stat); // read raw blob data. @@ -12129,7 +12258,7 @@ int BlueStore::_do_read( // The error isn't that much... vector compressed_blob_bls; IOContext ioc(cct, NULL, !cct->_conf->bluestore_fail_eio); - r = _prepare_read_ioc(blobs2read, &compressed_blob_bls, &ioc); + r = _prepare_read_ioc(blobs2read, &compressed_blob_bls, &ioc, span_stat); // we always issue aio for reading, so errors other than EIO are not allowed if (r < 0) return r; @@ -12484,9 +12613,10 @@ int BlueStore::_do_readv( for (auto p = m.begin(); p != m.end(); p++, i++) { raw_results.push_back({}); _read_cache(o, p.get_start(), p.get_len(), read_cache_policy, - std::get<0>(raw_results[i]), std::get<2>(raw_results[i])); - r = _prepare_read_ioc(std::get<2>(raw_results[i]), &std::get<1>(raw_results[i]), &ioc); - // we always issue aio for reading, so errors other than EIO are not allowed + std::get<0>(raw_results[i]), std::get<2>(raw_results[i]), + nullptr); + r = _prepare_read_ioc(std::get<2>(raw_results[i]), &std::get<1>(raw_results[i]), + &ioc, nullptr); if (r < 0) return r; } @@ -14855,6 +14985,35 @@ int BlueStore::queue_transactions( txc->bytes += (*p).get_num_bytes(); _txc_add_transaction(txc, &(*p)); } + _txc_exec(txc, handle); + + // we're immediately readable (unlike FileStore) + for (auto c : on_applied_sync) { + c->complete(0); + } + if (!on_applied.empty()) { + if (c->commit_queue) { + c->commit_queue->queue(on_applied); + } else { + finisher.queue(on_applied); + } + } + +#ifdef WITH_BLKIN + if (txc->trace) { + txc->trace.event("txc applied"); + } +#endif + + log_latency("submit_transact", + l_bluestore_submit_lat, + mono_clock::now() - start, + cct->_conf->bluestore_log_op_age); + return 0; +} + +void BlueStore::_txc_exec(TransContext* txc, ThreadPool::TPHandle* handle) +{ _txc_calc_cost(txc); _txc_write_nodes(txc, txc->t); @@ -14883,12 +15042,12 @@ int BlueStore::queue_transactions( auto tstart = mono_clock::now(); if (!throttle.try_start_transaction( - *db, - *txc, - tstart)) { + *db, + *txc, + tstart)) { // ensure we do not block here because of deferred writes dout(10) << __func__ << " failed get throttle_deferred_bytes, aggressive" - << dendl; + << dendl; ++deferred_aggressive; deferred_try_submit(); { @@ -14908,37 +15067,13 @@ int BlueStore::queue_transactions( handle->reset_tp_timeout(); logger->inc(l_bluestore_txc); - - // execute (start) - _txc_state_proc(txc); - - // we're immediately readable (unlike FileStore) - for (auto c : on_applied_sync) { - c->complete(0); - } - if (!on_applied.empty()) { - if (c->commit_queue) { - c->commit_queue->queue(on_applied); - } else { - finisher.queue(on_applied); - } - } - -#ifdef WITH_BLKIN - if (txc->trace) { - txc->trace.event("txc applied"); - } -#endif - - log_latency("submit_transact", - l_bluestore_submit_lat, - mono_clock::now() - start, - cct->_conf->bluestore_log_op_age); log_latency("throttle_transact", l_bluestore_throttle_lat, tend - tstart, cct->_conf->bluestore_log_op_age); - return 0; + + // execute (start) + _txc_state_proc(txc); } void BlueStore::_txc_aio_submit(TransContext *txc) @@ -15329,7 +15464,34 @@ void BlueStore::_txc_add_transaction(TransContext *txc, Transaction *t) } } +void BlueStore::_txc_exec_reformat_write(TransContext* txc, + Collection* c, OnodeRef o, + uint64_t offset, size_t length, const bufferlist& bl, + WriteContext& wctx) +{ + dout(10) << __func__ << " " << o->oid + << std::hex << " 0x" << offset << "~" << length + << std::dec << dendl; + // initialize osd_pool_id and do a smoke test that all collections belong + // to the same pool + ceph_assert(!!c); + spg_t pgid; + if (c->cid.is_pg(&pgid) ) { + txc->osd_pool_id = pgid.pool(); + } + // object operations + ceph_assert(o->exists); + int r = _do_write(txc, txc->ch, o, offset, length, bl, wctx); + if (r < 0) { + dout(5) << __func__ << " got an error: " << cpp_strerror(r) << dendl; + wctx.rewind_prealloc(); + txc->osr->undo_queue(txc); + delete txc; + } else { + _txc_exec(txc, nullptr); + } +} // ----------------- // write operations @@ -15413,7 +15575,7 @@ void BlueStore::_do_write_small( CollectionRef &c, OnodeRef& o, uint64_t offset, uint64_t length, - bufferlist::iterator& blp, + bufferlist::const_iterator& blp, WriteContext *wctx) { dout(10) << __func__ << " 0x" << std::hex << offset << "~" << length @@ -15661,7 +15823,7 @@ void BlueStore::_do_write_small( // due to existent extents uint64_t b_off = offset - bstart; uint64_t b_off0 = b_off; - o->extent_map.punch_hole(c, offset, length, &wctx->old_extents); + o->extent_map.punch_hole(c.get(), offset, length, &wctx->old_extents); // Zero detection -- small block if (!cct->_conf->bluestore_zero_block_detection || !bl.is_zero()) { @@ -15723,7 +15885,7 @@ void BlueStore::_do_write_small( uint64_t b_off = offset - bstart; uint64_t b_off0 = b_off; - o->extent_map.punch_hole(c, offset, length, &wctx->old_extents); + o->extent_map.punch_hole(c.get(), offset, length, &wctx->old_extents); // Zero detection -- small block if (!cct->_conf->bluestore_zero_block_detection || !bl.is_zero()) { @@ -15779,7 +15941,7 @@ void BlueStore::_do_write_small( } uint64_t b_off = p2phase(offset, alloc_len); uint64_t b_off0 = b_off; - o->extent_map.punch_hole(c, offset, length, &wctx->old_extents); + o->extent_map.punch_hole(c.get(), offset, length, &wctx->old_extents); // Zero detection -- small block if (!cct->_conf->bluestore_zero_block_detection || !bl.is_zero()) { @@ -15862,7 +16024,7 @@ void BlueStore::_do_write_big_apply_deferred( CollectionRef& c, OnodeRef& o, BlueStore::BigDeferredWriteContext& dctx, - bufferlist::iterator& blp, + bufferlist::const_iterator& blp, WriteContext* wctx) { bufferlist bl; @@ -15925,7 +16087,7 @@ void BlueStore::_do_write_big( CollectionRef &c, OnodeRef& o, uint64_t offset, uint64_t length, - bufferlist::iterator& blp, + bufferlist::const_iterator& blp, WriteContext *wctx) { dout(10) << __func__ << " 0x" << std::hex << offset << "~" << length @@ -15934,6 +16096,10 @@ void BlueStore::_do_write_big( << dendl; logger->inc(l_bluestore_write_big); logger->inc(l_bluestore_write_big_bytes, length); + if (wctx->precompressed) { + dout(20) << __func__ << " has been precompressed, omitting." << dendl; + return; + } auto max_bsize = std::max(wctx->target_blob_size, min_alloc_size); uint64_t prefer_deferred_size_snapshot = prefer_deferred_size.load(); while (length > 0) { @@ -15943,7 +16109,7 @@ void BlueStore::_do_write_big( uint32_t l = 0; //attempting to reuse existing blob - if (!wctx->compress) { + if (!wctx->compress && !wctx->preallocated()) { // enforce target blob alignment with max_bsize l = max_bsize - p2phase(offset, max_bsize); l = std::min(uint64_t(l), length); @@ -16040,7 +16206,7 @@ void BlueStore::_do_write_big( } dout(20) << __func__ << " lookup for blocks to reuse..." << dendl; - o->extent_map.punch_hole(c, offset, l, &wctx->old_extents); + o->extent_map.punch_hole(c.get(), offset, l, &wctx->old_extents); // seek again as punch_hole could invalidate ep auto ep = o->extent_map.seek_lextent(offset); @@ -16096,10 +16262,10 @@ void BlueStore::_do_write_big( } } while (b == nullptr && any_change); } else { - // trying to utilize as longer chunk as permitted in case of compression. + // trying to utilize as longer chunk as permitted in case of compression/reformatting. l = std::min(max_bsize, length); - o->extent_map.punch_hole(c, offset, l, &wctx->old_extents); - } // if (!wctx->compress) + o->extent_map.punch_hole(c.get(), offset, l, &wctx->old_extents); + } // if (!wctx->compress && !wctx->preallocated) if (b == nullptr) { b = c->new_blob(); @@ -16146,44 +16312,6 @@ int BlueStore::_do_alloc_write( return 0; } - CompressorRef c; - double crr = 0; - if (wctx->compress) { - c = select_option( - "compression_algorithm", - compressor, - [&]() { - string val; - if (coll->pool_opts.get(pool_opts_t::COMPRESSION_ALGORITHM, &val)) { - CompressorRef cp = compressor; - if (!cp || cp->get_type_name() != val) { - cp = Compressor::create(cct, val); - if (!cp) { - if (_set_compression_alert(false, val.c_str())) { - derr << __func__ << " unable to initialize " << val.c_str() - << " compressor" << dendl; - } - } - } - return std::optional(cp); - } - return std::optional(); - } - ); - - crr = select_option( - "compression_required_ratio", - cct->_conf->bluestore_compression_required_ratio, - [&]() { - double val; - if (coll->pool_opts.get(pool_opts_t::COMPRESSION_REQUIRED_RATIO, &val)) { - return std::optional(val); - } - return std::optional(); - } - ); - } - // checksum int64_t csum = csum_type.load(); csum = select_option( @@ -16214,18 +16342,21 @@ int BlueStore::_do_alloc_write( auto max_bsize = std::max(wctx->target_blob_size, min_alloc_size); for (auto& wi : wctx->writes) { - if (c && wi.blob_length > min_alloc_size) { + if (wctx->compressor && wi.blob_length > min_alloc_size) { auto start = mono_clock::now(); - // compress ceph_assert(wi.b_off == 0); ceph_assert(wi.blob_length == wi.bl.length()); // FIXME: memory alignment here is bad bufferlist t; - std::optional compressor_message; - int r = c->compress(wi.bl, t, compressor_message); - uint64_t want_len_raw = wi.blob_length * crr; + int r = 0; + if (!wctx->precompressed) { + r = wctx->compressor->compress(wi.bl, t, wi.compressor_message); + } else { + std::swap(t, wi.compressed_bl); + } + uint64_t want_len_raw = wi.blob_length * wctx->crr; uint64_t want_len = p2roundup(want_len_raw, min_alloc_size); bool rejected = false; uint64_t compressed_len = t.length(); @@ -16234,9 +16365,9 @@ int BlueStore::_do_alloc_write( uint64_t result_len = p2roundup(compressed_len, min_alloc_size); if (r == 0 && result_len <= want_len && result_len < wi.blob_length) { bluestore_compression_header_t chdr; - chdr.type = c->get_type(); - chdr.length = t.length(); - chdr.compressor_message = compressor_message; + chdr.type = wctx->compressor->get_type(); + chdr.length = compressed_len; + chdr.compressor_message = wi.compressor_message; encode(chdr, wi.compressed_bl); wi.compressed_bl.claim_append(t); @@ -16251,7 +16382,7 @@ int BlueStore::_do_alloc_write( logger->inc(l_bluestore_write_pad_bytes, result_len - compressed_len); dout(20) << __func__ << std::hex << " compressed 0x" << wi.blob_length << " -> 0x" << compressed_len << " => 0x" << result_len - << " with " << c->get_type() + << " with " << wctx->compressor->get_type() << std::dec << dendl; txc->statfs_delta.compressed() += compressed_len; txc->statfs_delta.compressed_original() += wi.blob_length; @@ -16264,7 +16395,7 @@ int BlueStore::_do_alloc_write( } } else if (r != 0) { dout(5) << __func__ << std::hex << " 0x" << wi.blob_length - << " bytes compressed using " << c->get_type_name() + << " bytes compressed using " << wctx->compressor->get_type_name() << std::dec << " failed with errcode = " << r << ", leaving uncompressed" @@ -16279,7 +16410,7 @@ int BlueStore::_do_alloc_write( if (rejected) { dout(20) << __func__ << std::hex << " 0x" << wi.blob_length << " compressed to 0x" << compressed_len << " -> 0x" << result_len - << " with " << c->get_type() + << " with " << wctx->compressor->get_type() << ", which is more than required 0x" << want_len_raw << " -> 0x" << want_len << ", leaving uncompressed" @@ -16287,44 +16418,72 @@ int BlueStore::_do_alloc_write( logger->inc(l_bluestore_compress_rejected_count); need += wi.blob_length; data_size += wi.bl.length(); - } - log_latency("compress@_do_alloc_write", - l_bluestore_compress_lat, - mono_clock::now() - start, - cct->_conf->bluestore_log_op_age ); + wi.compressed_bl.clear(); + wi.compressed_len = 0; + wi.compressed = false; + } + if (!wctx->precompressed) { + log_latency("compress@_do_alloc_write", + l_bluestore_compress_lat, + mono_clock::now() - start, + cct->_conf->bluestore_log_op_age ); + } } else { need += wi.blob_length; data_size += wi.bl.length(); } } - PExtentVector prealloc; - prealloc.reserve(2 * wctx->writes.size()); - int64_t prealloc_left = 0; - auto start = mono_clock::now(); - prealloc_left = alloc->allocate( - need, min_alloc_size, need, - 0, &prealloc); - log_latency("allocator@_do_alloc_write", - l_bluestore_allocator_lat, - mono_clock::now() - start, - cct->_conf->bluestore_log_op_age); - if (prealloc_left < 0 || prealloc_left < (int64_t)need) { - derr << __func__ << " failed to allocate 0x" << std::hex << need - << " allocated 0x " << (prealloc_left < 0 ? 0 : prealloc_left) - << " min_alloc_size 0x" << min_alloc_size - << " available 0x " << alloc->get_free() + auto need0 = need; + PExtentVector pextents; + pextents.reserve(2 * wctx->writes.size()); + uint64_t preallocated = 0; + if (!wctx->prealloc_slicer.end()) { + preallocated = wctx->prealloc_slicer.slice(pextents, need); + dout(20) << __func__ << " using wxtx prealloc, consumed 0x" + << std::hex << preallocated + << ", needed 0x " << need + << std::dec << dendl; + ceph_assert(preallocated <= need); + need -= preallocated; + } + + int64_t allocated = 0; + if (need > 0) { + auto start = mono_clock::now(); + allocated = alloc->allocate( + need, min_alloc_size, need, + 0, &pextents); + log_latency("allocator@_do_alloc_write", + l_bluestore_allocator_lat, + mono_clock::now() - start, + cct->_conf->bluestore_log_op_age); + if (allocated < (int64_t)need) { + derr << __func__ << " failed to allocate 0x" << std::hex << need + << " allocated 0x " << (allocated < 0 ? 0 : allocated) + << " min_alloc_size 0x" << min_alloc_size + << " available 0x " << alloc->get_free() + << std::dec << dendl; + allocated = 0; + } + } + if (preallocated + allocated < need0) { + derr << __func__ << " failed to get 0x" << std::hex << need0 + << ", preallocated = 0x" << preallocated + << ", allocated = 0x" << allocated << std::dec << dendl; - if (prealloc.size()) { - alloc->release(prealloc); + if (pextents.size()) { + alloc->release(pextents); } return -ENOSPC; } - _collect_allocation_stats(need, min_alloc_size, prealloc); + _collect_allocation_stats(need, min_alloc_size, pextents); dout(20) << __func__ << std::hex << " need=0x" << need << " data=0x" << data_size - << " prealloc " << prealloc << dendl; - auto prealloc_pos = prealloc.begin(); - ceph_assert(prealloc_pos != prealloc.end()); + << " prealloc " << pextents << dendl; + + int64_t prealloc_left = allocated + preallocated; + auto prealloc_pos = pextents.begin(); + ceph_assert(prealloc_pos != pextents.end()); for (auto& wi : wctx->writes) { bluestore_blob_t& dblob = wi.b->dirty_blob(); @@ -16466,11 +16625,141 @@ int BlueStore::_do_alloc_write( } } } - ceph_assert(prealloc_pos == prealloc.end()); + ceph_assert(prealloc_pos == pextents.end()); ceph_assert(prealloc_left == 0); return 0; } +void BlueStore::_maybe_reformat_object(Collection* c, OnodeRef& o, + uint64_t offset, size_t length, const bufferlist& bl, uint32_t op_flags, + const span_stat_t& span_stat) +{ + int64_t opt_defragment = 0; + int64_t opt_recompress = 0; + c->pool_opts.get(pool_opts_t::DEEP_SCRUB_DEFRAGMENT, &opt_defragment); + c->pool_opts.get(pool_opts_t::DEEP_SCRUB_RECOMPRESS, &opt_recompress); + dout(10) << __func__ << " defragment = " << opt_defragment + << " recompress = " << opt_recompress + << " span stat {" << span_stat << "}" + << dendl; + _dump_onode<30>(cct, *o); + + bool might_need_defragment = false; + bool might_need_recompress = false; + auto need = length; + WriteContext wctx; + + if (span_stat.cached == 0 && span_stat.allocated_shared == 0) { + if (opt_defragment || opt_recompress) { + // will probably need write context + _choose_write_options(c, o, op_flags, &wctx); + } + // do reformat if + // - object isn't cached (meaning it's not being written at the moment), + // - and there are no shared blobs withing the span. + // as this might result is used space increase. + if (opt_recompress) { + if (wctx.compress && + span_stat.allocated > 0) { + ceph_assert(wctx.compressor); + logger->inc(l_bluestore_reformat_compress_attempted); + need = 0; + auto bl_it = bl.begin(); + uint64_t offs = offset; + while (bl_it != bl.end()) { + BlobRef blob = c->new_blob(); + + bufferlist from_bl; + size_t l = std::min(wctx.target_blob_size, uint64_t(bl_it.get_remaining())); + bl_it.copy(l, from_bl); + //FIXME: add zero detection + auto& wi = wctx.write(offs, blob, l, 0, from_bl, 0, l, false, true); + + uint64_t res_len = from_bl.length(); + std::optional compressor_message; + bufferlist cbl; + if (l > min_alloc_size && + wctx.compressor->compress(from_bl, cbl, compressor_message) == 0) { + res_len = cbl.length(); + wi.compressor_message = compressor_message; + std::swap(wi.compressed_bl, cbl); + // don't set wi.compress_len and wi.compressed as this is s redundant + // at this point, to be assigned in _do_alloc_write if needed. + } + need += p2roundup(res_len, min_alloc_size); + offs += l; + dout(20) << __func__ << " precompress : 0x" + << std::hex << offs << "~" << l << "->" << res_len + << std::dec << " " << *blob + << dendl; + } + uint64_t allocated = span_stat.allocated + span_stat.allocated_compressed; + dout(10) << __func__ + << " recompress info, 0x" + << std::hex << need << " vs. 0x" << allocated + << std::dec << dendl; + if (need < allocated) { + might_need_recompress = true; + wctx.precompressed = true; + } + else if (need == allocated) { + wctx.precompressed = true; // may be do compression if defragmenting below + } + else { + // don't even try compression if defragmenting below + logger->inc(l_bluestore_reformat_compress_omitted); + wctx.reset(); + need = length; + } + } + } + if (!might_need_recompress && opt_defragment) { + if (span_stat.frags > 1) { + logger->inc(l_bluestore_reformat_defragment_attempted); + PExtentVector prealloc; + prealloc.reserve(need / min_alloc_size + 1); + auto start = mono_clock::now(); + int64_t preallocated = alloc->allocate( + need, min_alloc_size, need, + 0, &prealloc); + log_latency("allocator@_prepare_reformat", + l_bluestore_allocator_lat, + mono_clock::now() - start, + cct->_conf->bluestore_log_op_age); + might_need_defragment = preallocated >= (int64_t)need && prealloc.size() < span_stat.frags; + size_t frags = 0; + if (might_need_defragment) { + frags = prealloc.size(); + wctx.setup_prealloc(std::move(prealloc), preallocated); + } + else { + logger->inc(l_bluestore_reformat_defragment_omitted); + } + dout(10) << __func__ << " preallocated: 0x" + << std::hex << preallocated << std::dec + << " new frags:" << frags + << " defragment: " << might_need_defragment + << dendl; + } + } + } + + if (might_need_defragment || might_need_recompress) { + TransContext* txc = + _txc_create(c, c->osr.get(), nullptr); + txc->bytes += length; + if (wctx.precompressed) { + o->extent_map.punch_hole(c, offset, length, &wctx.old_extents); + } + logger->inc(l_bluestore_reformat_issued); + _txc_exec_reformat_write(txc, c, o, offset, length, bl, wctx); + } + else if (wctx.precompressed) { + logger->inc(l_bluestore_reformat_compress_omitted); + } + wctx.dispose_remaining_prealloc(alloc); +} + void BlueStore::_wctx_finish( TransContext *txc, CollectionRef& c, @@ -16548,11 +16837,11 @@ void BlueStore::_do_write_data( OnodeRef& o, uint64_t offset, uint64_t length, - bufferlist& bl, + const bufferlist& bl, WriteContext *wctx) { uint64_t end = offset + length; - bufferlist::iterator p = bl.begin(); + bufferlist::const_iterator p = bl.cbegin(); if (offset / min_alloc_size == (end - 1) / min_alloc_size && (length != min_alloc_size)) { @@ -16585,7 +16874,7 @@ void BlueStore::_do_write_data( } void BlueStore::_choose_write_options( - CollectionRef& c, + Collection* c, OnodeRef& o, uint32_t fadvise_flags, WriteContext *wctx) @@ -16623,7 +16912,16 @@ void BlueStore::_choose_write_options( (cm == Compressor::COMP_AGGRESSIVE && (alloc_hints & CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE) == 0) || (cm == Compressor::COMP_PASSIVE && - (alloc_hints & CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE))); + (alloc_hints & CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE)) || + (cm == Compressor::COMP_FORCE_LAZY && + (fadvise_flags & CEPH_OSD_OP_FLAG_ALLOW_DATA_REFORMATTING)) || + (cm == Compressor::COMP_AGGRESSIVE_LAZY && + (fadvise_flags & CEPH_OSD_OP_FLAG_ALLOW_DATA_REFORMATTING) && + (alloc_hints & CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE) == 0) || + (cm == Compressor::COMP_PASSIVE_LAZY && + (fadvise_flags & CEPH_OSD_OP_FLAG_ALLOW_DATA_REFORMATTING) && + (alloc_hints & CEPH_OSD_ALLOC_HINT_FLAG_COMPRESSIBLE)) + ); if ((alloc_hints & CEPH_OSD_ALLOC_HINT_FLAG_SEQUENTIAL_READ) && (alloc_hints & CEPH_OSD_ALLOC_HINT_FLAG_RANDOM_READ) == 0 && @@ -16681,6 +16979,41 @@ void BlueStore::_choose_write_options( wctx->target_blob_size < min_alloc_size * 2) { wctx->target_blob_size = min_alloc_size * 2; } + if (wctx->compress) { + wctx->compressor = select_option( + "compression_algorithm", + compressor, + [&]() { + string val; + if (c->pool_opts.get(pool_opts_t::COMPRESSION_ALGORITHM, &val)) { + CompressorRef cp = compressor; + if (!cp || cp->get_type_name() != val) { + cp = Compressor::create(cct, val); + if (!cp) { + if (_set_compression_alert(false, val.c_str())) { + derr << __func__ << " unable to initialize " << val.c_str() + << " compressor" << dendl; + } + } + } + return std::optional(cp); + } + return std::optional(); + } + ); + + wctx->crr = select_option( + "compression_required_ratio", + cct->_conf->bluestore_compression_required_ratio, + [&]() { + double val; + if (c->pool_opts.get(pool_opts_t::COMPRESSION_REQUIRED_RATIO, &val)) { + return std::optional(val); + } + return std::optional(); + } + ); + } dout(20) << __func__ << " prefer csum_order " << wctx->csum_order << " target_blob_size 0x" << std::hex << wctx->target_blob_size @@ -16750,8 +17083,8 @@ int BlueStore::_do_write( OnodeRef& o, uint64_t offset, uint64_t length, - bufferlist& bl, - uint32_t fadvise_flags) + const bufferlist& bl, + WriteContext& wctx) { int r = 0; @@ -16761,7 +17094,6 @@ int BlueStore::_do_write( << " - have 0x" << o->onode.size << " (" << std::dec << o->onode.size << ")" << " bytes" << std::hex - << " fadvise_flags 0x" << fadvise_flags << " alloc_hint 0x" << o->onode.alloc_hint_flags << " expected_object_size " << o->onode.expected_object_size << " expected_write_size " << o->onode.expected_write_size @@ -16780,8 +17112,6 @@ int BlueStore::_do_write( auto dirty_start = offset; auto dirty_end = end; - WriteContext wctx; - _choose_write_options(c, o, fadvise_flags, &wctx); o->extent_map.fault_range(db, offset, length); _do_write_data(txc, c, o, offset, length, bl, &wctx); r = _do_alloc_write(txc, c, o, &wctx); @@ -16843,18 +17173,22 @@ int BlueStore::_write(TransContext *txc, CollectionRef& c, OnodeRef& o, uint64_t offset, size_t length, - bufferlist& bl, + const bufferlist& bl, uint32_t fadvise_flags) { dout(15) << __func__ << " " << c->cid << " " << o->oid - << " 0x" << std::hex << offset << "~" << length << std::dec - << dendl; + << std::hex << " 0x" << offset << "~" << length + << " fadvise_flags 0x" << fadvise_flags + << std::dec << dendl; int r = 0; if (offset + length >= OBJECT_MAX_SIZE) { r = -E2BIG; } else { _assign_nid(txc, o); - r = _do_write(txc, c, o, offset, length, bl, fadvise_flags); + + WriteContext wctx; + _choose_write_options(c.get(), o, fadvise_flags, &wctx); + r = _do_write(txc, c, o, offset, length, bl, wctx); txc->write_onode(o); } dout(10) << __func__ << " " << c->cid << " " << o->oid @@ -16898,7 +17232,7 @@ int BlueStore::_do_zero(TransContext *txc, WriteContext wctx; o->extent_map.fault_range(db, offset, length); - o->extent_map.punch_hole(c, offset, length, &wctx.old_extents); + o->extent_map.punch_hole(c.get(), offset, length, &wctx.old_extents); o->extent_map.dirty_range(offset, length); _wctx_finish(txc, c, o, &wctx); @@ -16931,7 +17265,7 @@ void BlueStore::_do_truncate( if (offset < o->onode.size) { uint64_t length = o->onode.size - offset; o->extent_map.fault_range(db, offset, length); - o->extent_map.punch_hole(c, offset, length, &wctx.old_extents); + o->extent_map.punch_hole(c.get(), offset, length, &wctx.old_extents); o->extent_map.dirty_range(offset, length); _wctx_finish(txc, c, o, &wctx, maybe_unshared_blobs); @@ -17456,7 +17790,9 @@ int BlueStore::_clone(TransContext *txc, r = _do_read(c.get(), oldo, 0, oldo->onode.size, bl, 0); if (r < 0) goto out; - r = _do_write(txc, c, newo, 0, oldo->onode.size, bl, 0); + WriteContext wctx; + _choose_write_options(c.get(), newo, 0, &wctx); + r = _do_write(txc, c, newo, 0, oldo->onode.size, bl, wctx); if (r < 0) goto out; } @@ -17575,7 +17911,9 @@ int BlueStore::_clone_range(TransContext *txc, r = _do_read(c.get(), oldo, srcoff, length, bl, 0); if (r < 0) goto out; - r = _do_write(txc, c, newo, dstoff, bl.length(), bl, 0); + WriteContext wctx; + _choose_write_options(c.get(), newo, 0, &wctx); + r = _do_write(txc, c, newo, dstoff, bl.length(), bl, wctx); if (r < 0) goto out; } diff --git a/src/os/bluestore/BlueStore.h b/src/os/bluestore/BlueStore.h index a9b510e162e00..c5c1e03dc31c1 100644 --- a/src/os/bluestore/BlueStore.h +++ b/src/os/bluestore/BlueStore.h @@ -228,6 +228,17 @@ enum { l_bluestore_slow_read_onode_meta_count, l_bluestore_slow_read_wait_aio_count, //**************************************** + + // reformatting counters + //**************************************** + l_bluestore_reformat_lat, + l_bluestore_reformat_compress_attempted, + l_bluestore_reformat_compress_omitted, + l_bluestore_reformat_defragment_attempted, + l_bluestore_reformat_defragment_omitted, + l_bluestore_reformat_issued, + //**************************************** + l_bluestore_last }; @@ -238,6 +249,8 @@ class BlueStore : public ObjectStore, public md_config_obs_t { // ----------------------------------------------------- // types + struct WriteContext; + public: // config observer const char** get_tracked_conf_keys() const override; @@ -575,7 +588,11 @@ class BlueStore : public ObjectStore, inline bool is_loaded() const { return loaded; } - + template + int map_fn(uint32_t x_off, uint32_t x_len, F&& f) const { + ceph_assert(loaded && persistent); + return persistent->ref_map.map_fn(x_off, x_len, f); + } }; typedef boost::intrusive_ptr SharedBlobRef; @@ -935,7 +952,7 @@ class BlueStore : public ObjectStore, OldExtent(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b) : e(lo, o, l, b), blob_empty(false) { } - static OldExtent* create(CollectionRef c, + static OldExtent* create(Collection* c, uint32_t lo, uint32_t o, uint32_t l, @@ -1163,7 +1180,7 @@ class BlueStore : public ObjectStore, int compress_extent_map(uint64_t offset, uint64_t length); /// punch a logical hole. add lextents to deref to target list. - void punch_hole(CollectionRef &c, + void punch_hole(Collection* c, uint64_t offset, uint64_t length, old_extent_map_t *old_extents); @@ -1969,6 +1986,7 @@ class BlueStore : public ObjectStore, } private: state_t state = STATE_PREPARE; + }; class BlueStoreThrottle { @@ -2798,8 +2816,14 @@ class BlueStore : public ObjectStore, TransContext *_txc_create(Collection *c, OpSequencer *osr, std::list *on_commits, TrackedOpRef osd_op=TrackedOpRef()); + void _txc_exec(TransContext* txc, ThreadPool::TPHandle* handle); void _txc_update_store_statfs(TransContext *txc); void _txc_add_transaction(TransContext *txc, Transaction *t); + void _txc_exec_reformat_write(TransContext* txc, + Collection* c, OnodeRef o, + uint64_t offset, size_t length, + const bufferlist& bl, + WriteContext& wctx); void _txc_calc_cost(TransContext *txc); void _txc_write_nodes(TransContext *txc, KeyValueDB::Transaction t); void _txc_state_proc(TransContext *txc); @@ -3174,13 +3198,15 @@ class BlueStore : public ObjectStore, size_t length, int read_cache_policy, ready_regions_t& ready_regions, - blobs2read_t& blobs2read); + blobs2read_t& blobs2read, + span_stat_t* span_stat); int _prepare_read_ioc( blobs2read_t& blobs2read, std::vector* compressed_blob_bls, - IOContext* ioc); + IOContext* ioc, + span_stat_t* span_stat); int _generate_read_result_bl( OnodeRef& o, @@ -3200,7 +3226,8 @@ class BlueStore : public ObjectStore, size_t len, ceph::buffer::list& bl, uint32_t op_flags = 0, - uint64_t retry_count = 0); + uint64_t retry_count = 0, + span_stat_t* span_stat = nullptr); int _do_readv( Collection *c, @@ -3417,10 +3444,11 @@ class BlueStore : public ObjectStore, OnodeRef& o, uint32_t off, uint32_t len) { - BlueStore::TransContext txc(cct, c.get(), nullptr, nullptr); + BlueStore::TransContext* txc = new TransContext(cct, c.get(), nullptr, nullptr); BlueStore::WriteContext wctx; - o->extent_map.punch_hole(c, off, len, &wctx.old_extents); - _wctx_finish(&txc, c, o, &wctx, nullptr); + o->extent_map.punch_hole(c.get(), off, len, &wctx.old_extents); + _wctx_finish(txc, c, o, &wctx, nullptr); + delete txc; } inline void log_latency(const char* name, @@ -3515,9 +3543,19 @@ class BlueStore : public ObjectStore, struct WriteContext { bool buffered = false; ///< buffered write - bool compress = false; ///< compressed write + PExtentVectorSlicer prealloc_slicer; ///< Prealloc vector incremental slicer + void setup_prealloc(PExtentVector&& from, uint32_t total_bytes); + void dispose_remaining_prealloc(Allocator* alloc); + void rewind_prealloc(); + + uint32_t preallocated() const { return prealloc_slicer.size(); } + bool precompressed = false; ///< reformatting write, + ///< ctx has precompressed data uint64_t target_blob_size = 0; ///< target (max) blob size unsigned csum_order = 0; ///< target checksum chunk order + bool compress = false; ///< compressed write + CompressorRef compressor; ///< effective compression engine + double crr = 0.0; ///< compression required ratio old_extent_map_t old_extents; ///< must deref these blobs interval_set extents_to_gc; ///< extents for garbage collection @@ -3537,7 +3575,7 @@ class BlueStore : public ObjectStore, bool compressed = false; ceph::buffer::list compressed_bl; size_t compressed_len = 0; - + std::optional compressor_message; write_item( uint64_t logical_offs, BlobRef b, @@ -3568,17 +3606,16 @@ class BlueStore : public ObjectStore, target_blob_size = other.target_blob_size; csum_order = other.csum_order; } - void write( - uint64_t loffs, - BlobRef b, - uint64_t blob_len, - uint64_t o, - ceph::buffer::list& bl, - uint64_t o0, - uint64_t len0, - bool _mark_unused, - bool _new_blob) { - writes.emplace_back(loffs, + write_item& write(uint64_t loffs, + BlobRef b, + uint64_t blob_len, + uint64_t o, + ceph::buffer::list& bl, + uint64_t o0, + uint64_t len0, + bool _mark_unused, + bool _new_blob) { + return writes.emplace_back(loffs, b, blob_len, o, @@ -3594,33 +3631,45 @@ class BlueStore : public ObjectStore, uint64_t loffs, uint64_t loffs_end, uint64_t min_alloc_size); + void reset() { + WriteContext wctx0; + std::swap(*this, wctx0); + } }; void _do_write_small( TransContext *txc, CollectionRef &c, OnodeRef& o, uint64_t offset, uint64_t length, - ceph::buffer::list::iterator& blp, + ceph::buffer::list::const_iterator& blp, WriteContext *wctx); void _do_write_big_apply_deferred( TransContext* txc, CollectionRef& c, OnodeRef& o, BigDeferredWriteContext& dctx, - bufferlist::iterator& blp, + bufferlist::const_iterator& blp, WriteContext* wctx); void _do_write_big( TransContext *txc, CollectionRef &c, OnodeRef& o, uint64_t offset, uint64_t length, - ceph::buffer::list::iterator& blp, + ceph::buffer::list::const_iterator& blp, WriteContext *wctx); int _do_alloc_write( TransContext *txc, CollectionRef c, OnodeRef& o, WriteContext *wctx); + void _maybe_reformat_object( + Collection* c, + OnodeRef& o, + uint64_t offset, + size_t length, + const bufferlist& bl, + uint32_t op_flags, + const span_stat_t& span_stat); void _wctx_finish( TransContext *txc, CollectionRef& c, @@ -3632,12 +3681,12 @@ class BlueStore : public ObjectStore, CollectionRef& c, OnodeRef& o, uint64_t offset, size_t len, - ceph::buffer::list& bl, + const ceph::buffer::list& bl, uint32_t fadvise_flags); void _pad_zeros(ceph::buffer::list *bl, uint64_t *offset, uint64_t chunk_size); - void _choose_write_options(CollectionRef& c, + void _choose_write_options(Collection* c, OnodeRef& o, uint32_t fadvise_flags, WriteContext *wctx); @@ -3653,14 +3702,14 @@ class BlueStore : public ObjectStore, CollectionRef &c, OnodeRef& o, uint64_t offset, uint64_t length, - ceph::buffer::list& bl, - uint32_t fadvise_flags); + const ceph::buffer::list& bl, + WriteContext& wctx); void _do_write_data(TransContext *txc, CollectionRef& c, OnodeRef& o, uint64_t offset, uint64_t length, - ceph::buffer::list& bl, + const ceph::buffer::list& bl, WriteContext *wctx); int _touch(TransContext *txc, diff --git a/src/os/bluestore/bluestore_types.cc b/src/os/bluestore/bluestore_types.cc index 3c8dc84810c3b..558b7f5b60a43 100644 --- a/src/os/bluestore/bluestore_types.cc +++ b/src/os/bluestore/bluestore_types.cc @@ -1380,3 +1380,11 @@ bool shared_blob_2hash_tracker_t::test_all_zero_range( } return true; } + +std::ostream& operator<<(std::ostream& out, const span_stat_t& s) +{ + return out << "s=" << s.stored << " ch=" << s.cached << " a=" << s.allocated + << " sc=" << s.stored_compressed << " ac=" << s.allocated_compressed + << " sh=" << s.allocated_shared + << " e=" << s.extents << " f=" << s.frags; +} diff --git a/src/os/bluestore/bluestore_types.h b/src/os/bluestore/bluestore_types.h index 7032ae904e930..2abd1b990e828 100644 --- a/src/os/bluestore/bluestore_types.h +++ b/src/os/bluestore/bluestore_types.h @@ -115,6 +115,96 @@ std::ostream& operator<<(std::ostream& out, const bluestore_pextent_t& o); typedef mempool::bluestore_cache_other::vector PExtentVector; +class PExtentVectorSlicer +{ + size_t idx = 0; + uint32_t entry_pos = 0; + uint32_t pos = 0; + uint32_t total = 0; + PExtentVector pext; +public: + uint32_t size() const { + return total; + } + void setup(PExtentVector&& _p, uint32_t _total) { + reset(); + std::swap(pext, _p); + total = _total; + } + void reset() { + idx = 0; + entry_pos = pos = 0; + total = 0; + pext.clear(); + } + inline bool begin() const { + return idx == 0 && pos == 0; + } + inline bool end() const { + return idx >= pext.size(); + } + + void rewind() { + idx = 0; + entry_pos = pos = 0; + } + void fast_forward(uint32_t seek_pos) { + pos = 0; + entry_pos = 0; + if (seek_pos >= total) { + idx = pext.size(); + } else { + size_t i = 0; + for (i = 0; i < pext.size(); i++) { + uint32_t delta = seek_pos - pos; + if (delta < pext[i].length) { + entry_pos = pos; + pos += delta; + break; + } else { + pos += pext[i].length; + } + } + idx = i; + } + } + + uint32_t slice(PExtentVector& res, + uint32_t len = std::numeric_limits::max()) { + res.clear(); + if (end()) { + return 0; + } else if (begin() && len == total ) { + // fast track + res = pext; + idx = pext.size(); + pos = entry_pos = 0; + return total; + } + size_t i; + uint32_t ret = 0; // amount of bytes returned + for (i = idx; i < pext.size() && len > 0; i++) { + ceph_assert(pos >= entry_pos); + uint32_t delta = pos - entry_pos; + uint32_t remaining = pext[i].length - delta; + uint32_t l; + if (len < remaining) { + l = len; + pos += l; + } else { + l = remaining; + entry_pos += pext[i].length; + pos = entry_pos; + idx = i + 1; + } + res.emplace_back(bluestore_pextent_t(pext[i].offset + delta, l)); + ret += l; + len -= l; + } + return ret; + } +}; + template<> struct denc_traits { static constexpr bool supported = true; @@ -232,6 +322,32 @@ struct bluestore_extent_ref_map_t { } } } + template + int map_fn(uint64_t offset, uint32_t len, F&& f) const { + auto p = ref_map.lower_bound(offset); + auto pend = ref_map.end(); + auto end = offset + len; + if ((p == pend || p->first > offset) && p != ref_map.begin()) { + --p; + if (p->first + p->second.length > offset) { + uint32_t l = std::min(uint32_t(p->first + p->second.length - offset), len); + int r = f(offset, l, p->second.refs); + if (r < 0) + return r; + offset +=l; + } + ++p; + } + while (p != pend && end > offset && end > p->first) { + uint32_t l = std::min(p->second.length, uint32_t(end - p->first)); + int r = f(p->first, l, p->second.refs); + if (r < 0) + return r; + offset = p->first + p->second.length; + ++p; + } + return 0; + } void dump(ceph::Formatter *f) const; static void generate_test_instances(std::list& o); @@ -1440,4 +1556,16 @@ struct sb_info_space_efficient_map_t { } }; +struct span_stat_t { + int64_t stored = 0; // total bytes stored within a span + int64_t cached = 0; // bytes from span kept in cache, not included in the analysis below + int64_t allocated = 0; + int64_t stored_compressed = 0; + int64_t allocated_compressed = 0; + int64_t allocated_shared = 0; + size_t extents = 0; + size_t frags = 0; +}; +std::ostream& operator<<(std::ostream& out, const span_stat_t&); + #endif diff --git a/src/osd/ECBackend.cc b/src/osd/ECBackend.cc index 74f58520d46f3..6100eeadcebfd 100644 --- a/src/osd/ECBackend.cc +++ b/src/osd/ECBackend.cc @@ -1727,7 +1727,8 @@ int ECBackend::be_deep_scrub( uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL | CEPH_OSD_OP_FLAG_FADVISE_DONTNEED | - CEPH_OSD_OP_FLAG_BYPASS_CLEAN_CACHE; + CEPH_OSD_OP_FLAG_BYPASS_CLEAN_CACHE | + CEPH_OSD_OP_FLAG_ALLOW_DATA_REFORMATTING; utime_t sleeptime; sleeptime.set_from_double(cct->_conf->osd_debug_deep_scrub_sleep); diff --git a/src/osd/ReplicatedBackend.cc b/src/osd/ReplicatedBackend.cc index 8fd2d2022f81f..6c1ccdfbd6362 100644 --- a/src/osd/ReplicatedBackend.cc +++ b/src/osd/ReplicatedBackend.cc @@ -640,7 +640,8 @@ int ReplicatedBackend::be_deep_scrub( int r; uint32_t fadvise_flags = CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL | CEPH_OSD_OP_FLAG_FADVISE_DONTNEED | - CEPH_OSD_OP_FLAG_BYPASS_CLEAN_CACHE; + CEPH_OSD_OP_FLAG_BYPASS_CLEAN_CACHE | + CEPH_OSD_OP_FLAG_ALLOW_DATA_REFORMATTING; utime_t sleeptime; sleeptime.set_from_double(cct->_conf->osd_debug_deep_scrub_sleep); diff --git a/src/osd/osd_types.cc b/src/osd/osd_types.cc index 05d50def0a77a..36510cf1109e0 100644 --- a/src/osd/osd_types.cc +++ b/src/osd/osd_types.cc @@ -147,6 +147,9 @@ const char * ceph_osd_op_flag_name(unsigned flag) case CEPH_OSD_OP_FLAG_BYPASS_CLEAN_CACHE: name = "bypass_clean_cache"; break; + case CEPH_OSD_OP_FLAG_ALLOW_DATA_REFORMATTING: + name = "allow_data_reformatting"; + break; default: name = "???"; }; @@ -1378,7 +1381,11 @@ static opt_mapping_t opt_mapping = boost::assign::map_list_of ("pg_num_max", pool_opts_t::opt_desc_t( pool_opts_t::PG_NUM_MAX, pool_opts_t::INT)) ("read_ratio", pool_opts_t::opt_desc_t( - pool_opts_t::READ_RATIO, pool_opts_t::INT)); + pool_opts_t::READ_RATIO, pool_opts_t::INT)) + ("deep_scrub_defragment", pool_opts_t::opt_desc_t( + pool_opts_t::DEEP_SCRUB_DEFRAGMENT, pool_opts_t::INT)) + ("deep_scrub_recompress", pool_opts_t::opt_desc_t( + pool_opts_t::DEEP_SCRUB_RECOMPRESS, pool_opts_t::INT)); bool pool_opts_t::is_opt_name(const std::string& name) { diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index a82b167b571ca..07adfbc08cbaa 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -1106,6 +1106,8 @@ class pool_opts_t { DEDUP_CDC_CHUNK_SIZE, PG_NUM_MAX, // max pg_num READ_RATIO, // read ration for the read balancer work [0-100] + DEEP_SCRUB_DEFRAGMENT, // perform defragmentation when deep-scrubbing + DEEP_SCRUB_RECOMPRESS, // perform recompression when deep-scrubbing }; enum type_t { diff --git a/src/test/objectstore/store_test.cc b/src/test/objectstore/store_test.cc index 4974fae125bdf..8e7c68478916b 100644 --- a/src/test/objectstore/store_test.cc +++ b/src/test/objectstore/store_test.cc @@ -89,7 +89,13 @@ static bool bl_eq(bufferlist& expected, bufferlist& actual) } return false; } - +std::unique_ptr gen_buffer(uint64_t size) +{ + std::unique_ptr buffer = std::make_unique(size); + std::independent_bits_engine e; + std::generate(buffer.get(), buffer.get() + size, std::ref(e)); + return buffer; +} void dump_bluefs_stats() { AdminSocket* admin_socket = g_ceph_context->get_admin_socket(); @@ -11166,6 +11172,753 @@ TEST_P(StoreTestOmapUpgrade, LargeLegacyToPG) { } } +TEST_P(StoreTest, BasicReformattingTest) { + int r; + coll_t cid; + ghobject_t obj(hobject_t(sobject_t("Object 1", CEPH_NOSNAP))); + ghobject_t obj_clone = obj; + obj_clone.hobj.snap = 1; + + auto ch = store->create_new_collection(cid); + const PerfCounters* logger = store->get_perf_counters(); + + pool_opts_t popts; + popts.set(pool_opts_t::DEEP_SCRUB_DEFRAGMENT, static_cast (1)); + store->set_collection_opts(ch, popts); + + cerr << "Creating collection " << cid << std::endl; + { + ObjectStore::Transaction t; + t.create_collection(cid, 0); + r = queue_transaction(store, ch, std::move(t)); + ASSERT_EQ(r, 0); + } + auto wait_fn = [&]() { + C_SaferCond c; + ObjectStore::Transaction t; + t.touch(cid, obj); + t.register_on_complete(&c); + r = queue_transaction(store, ch, std::move(t)); + ASSERT_EQ(r, 0); + c.wait(); + }; + cerr << "Making object " << cid << " " << obj << std::endl; + bufferlist bl; + bufferlist expected_bl; + uint64_t len = 512 * 1024; + bl.append(std::string(len, 'a')); + { + C_SaferCond c; + ObjectStore::Transaction t; + t.write(cid, obj, 0, len, bl, 0); + t.register_on_commit(&c); + r = queue_transaction(store, ch, std::move(t)); + ASSERT_EQ(r, 0); + c.wait(); + } + wait_fn(); + { + bufferlist bl; + int r = store->read(ch, obj, 0, len, bl, + CEPH_OSD_OP_FLAG_FADVISE_DONTNEED | CEPH_OSD_OP_FLAG_ALLOW_DATA_REFORMATTING); + ASSERT_EQ(r, (int)len); + ASSERT_EQ(0, logger->get(l_bluestore_reformat_defragment_attempted)); + ASSERT_EQ(0, logger->get(l_bluestore_reformat_issued)); + } + cerr << "Fragmenting object " << std::endl; + { + C_SaferCond c; + bufferlist bl1; + uint64_t pos = 0; + uint64_t len1 = 4096; + bl1.append(std::string(len1, 'c')); + ObjectStore::Transaction t; + auto p = bl.begin(); + while (pos < len) { + t.write(cid, obj, pos, len1, bl1, 0); + expected_bl.append(bl1); + p.copy(len1, expected_bl); + p += len1; + pos += 2 * len1; + } + t.register_on_commit(&c); + r = queue_transaction(store, ch, std::move(t)); + ASSERT_EQ(r, 0); + c.wait(); + } + wait_fn(); + { + bufferlist bl; + int r = store->read(ch, obj, 0, len, bl, + CEPH_OSD_OP_FLAG_FADVISE_DONTNEED | CEPH_OSD_OP_FLAG_ALLOW_DATA_REFORMATTING); + ASSERT_EQ(r, (int)len); + ASSERT_TRUE(bl_eq(expected_bl, bl)); + ASSERT_EQ(1, logger->get(l_bluestore_reformat_defragment_attempted)); + ASSERT_EQ(0, logger->get(l_bluestore_reformat_defragment_omitted)); + ASSERT_EQ(1, logger->get(l_bluestore_reformat_issued)); + } + wait_fn(); + { + bufferlist bl; + int r = store->read(ch, obj, 0, len, bl, + CEPH_OSD_OP_FLAG_FADVISE_DONTNEED | CEPH_OSD_OP_FLAG_ALLOW_DATA_REFORMATTING); + ASSERT_EQ(r, (int)len); + ASSERT_TRUE(bl_eq(expected_bl, bl)); + ASSERT_EQ(1, logger->get(l_bluestore_reformat_defragment_attempted)); + ASSERT_EQ(0, logger->get(l_bluestore_reformat_defragment_omitted)); + ASSERT_EQ(1, logger->get(l_bluestore_reformat_issued)); + } + // object with shared blobs + cerr << "Making and fragmenting shared object " << std::endl; + { + expected_bl.clear(); + C_SaferCond c; + ObjectStore::Transaction t; + t.write(cid, obj, 0, len, bl, 0); + { + bufferlist bl1; + uint64_t pos = 0; + uint64_t len1 = 4096; + bl1.append(std::string(len1, 'c')); + auto p = bl.begin(); + while (pos < len) { + t.write(cid, obj, pos, len1, bl1, 0); + expected_bl.append(bl1); + p.copy(len1, expected_bl); + p += len1; + pos += 2 * len1; + } + } + t.clone(cid, obj, obj_clone); + t.register_on_commit(&c); + r = queue_transaction(store, ch, std::move(t)); + ASSERT_EQ(r, 0); + c.wait(); + } + wait_fn(); + { + bufferlist bl; + int r = store->read(ch, obj, 0, len, bl, + CEPH_OSD_OP_FLAG_FADVISE_DONTNEED | CEPH_OSD_OP_FLAG_ALLOW_DATA_REFORMATTING); + ASSERT_EQ(r, (int)len); + ASSERT_TRUE(bl_eq(expected_bl, bl)); + ASSERT_EQ(1, logger->get(l_bluestore_reformat_defragment_attempted)); + ASSERT_EQ(0, logger->get(l_bluestore_reformat_defragment_omitted)); + ASSERT_EQ(1, logger->get(l_bluestore_reformat_issued)); + } + { + // remove the clone hence enabling reformatting + ObjectStore::Transaction t; + t.remove(cid, obj_clone); + r = queue_transaction(store, ch, std::move(t)); + ASSERT_EQ(r, 0); + } + { + bufferlist bl; + int r = store->read(ch, obj, 0, len, bl, + CEPH_OSD_OP_FLAG_FADVISE_DONTNEED | CEPH_OSD_OP_FLAG_ALLOW_DATA_REFORMATTING); + ASSERT_EQ(r, (int)len); + ASSERT_TRUE(bl_eq(expected_bl, bl)); + ASSERT_EQ(2, logger->get(l_bluestore_reformat_defragment_attempted)); + ASSERT_EQ(0, logger->get(l_bluestore_reformat_defragment_omitted)); + ASSERT_EQ(2, logger->get(l_bluestore_reformat_issued)); + } + wait_fn(); + { + bufferlist bl; + int r = store->read(ch, obj, 0, len, bl, + CEPH_OSD_OP_FLAG_FADVISE_DONTNEED | CEPH_OSD_OP_FLAG_ALLOW_DATA_REFORMATTING); + ASSERT_EQ(r, (int)len); + ASSERT_TRUE(bl_eq(expected_bl, bl)); + ASSERT_EQ(2, logger->get(l_bluestore_reformat_defragment_attempted)); + ASSERT_EQ(0, logger->get(l_bluestore_reformat_defragment_omitted)); + ASSERT_EQ(2, logger->get(l_bluestore_reformat_issued)); + } + + // object with mostly contiguous allocation but having a small gap + cerr << "Making and fragmenting object, single small gap" << std::endl; + { + uint64_t o = 4096; + uint64_t l = 4096; + expected_bl.clear(); + auto p = bl.begin(); + p.copy(o, expected_bl); + expected_bl.append_zero(l); + p += l; + p.copy_all(expected_bl); + + C_SaferCond c; + ObjectStore::Transaction t; + t.remove(cid, obj); + t.write(cid, obj, 0, len, bl, 0); + t.zero(cid, obj, 4096, 4096); + t.register_on_commit(&c); + r = queue_transaction(store, ch, std::move(t)); + ASSERT_EQ(r, 0); + c.wait(); + } + wait_fn(); + { + bufferlist bl; + int r = store->read(ch, obj, 0, len, bl, + CEPH_OSD_OP_FLAG_FADVISE_DONTNEED | CEPH_OSD_OP_FLAG_ALLOW_DATA_REFORMATTING); + ASSERT_EQ(r, (int)len); + ASSERT_TRUE(bl_eq(expected_bl, bl)); + ASSERT_EQ(3, logger->get(l_bluestore_reformat_defragment_attempted)); + ASSERT_EQ(0, logger->get(l_bluestore_reformat_defragment_omitted)); + ASSERT_EQ(3, logger->get(l_bluestore_reformat_issued)); + } + wait_fn(); + { + bufferlist bl; + int r = store->read(ch, obj, 0, len, bl, + CEPH_OSD_OP_FLAG_FADVISE_DONTNEED | CEPH_OSD_OP_FLAG_ALLOW_DATA_REFORMATTING); + ASSERT_EQ(r, (int)len); + ASSERT_TRUE(bl_eq(expected_bl, bl)); + ASSERT_EQ(3, logger->get(l_bluestore_reformat_defragment_attempted)); + ASSERT_EQ(0, logger->get(l_bluestore_reformat_defragment_omitted)); + ASSERT_EQ(3, logger->get(l_bluestore_reformat_issued)); + } + // check none reformatting setting + popts.set(pool_opts_t::DEEP_SCRUB_DEFRAGMENT, static_cast (0)); + store->set_collection_opts(ch, popts); + cerr << "Making and fragmenting object, single small gap, no reformatting" << std::endl; + { + // reuse existing expected_bl + C_SaferCond c; + ObjectStore::Transaction t; + t.remove(cid, obj); + t.write(cid, obj, 0, len, bl, 0); + t.zero(cid, obj, 4096, 4096); + t.register_on_commit(&c); + r = queue_transaction(store, ch, std::move(t)); + ASSERT_EQ(r, 0); + c.wait(); + } + wait_fn(); + { + bufferlist bl; + int r = store->read(ch, obj, 0, len, bl, + CEPH_OSD_OP_FLAG_FADVISE_DONTNEED | CEPH_OSD_OP_FLAG_ALLOW_DATA_REFORMATTING); + ASSERT_EQ(r, (int)len); + ASSERT_TRUE(bl_eq(expected_bl, bl)); + ASSERT_EQ(3, logger->get(l_bluestore_reformat_defragment_attempted)); + ASSERT_EQ(0, logger->get(l_bluestore_reformat_defragment_omitted)); + ASSERT_EQ(3, logger->get(l_bluestore_reformat_issued)); + } + wait_fn(); + { + ObjectStore::Transaction t; + t.remove(cid, obj); + t.remove(cid, obj_clone); + t.remove_collection(cid); + cerr << "Cleaning" << std::endl; + r = queue_transaction(store, ch, std::move(t)); + ASSERT_EQ(r, 0); + } +} + +TEST_P(StoreTest, CompressedReformattingTest) { + int r; + coll_t cid; + + SetVal(g_conf(), "bluestore_compression_algorithm", "lz4"); + SetVal(g_conf(), "bluestore_compression_mode", "force"); + g_ceph_context->_conf.apply_changes(nullptr); + + ghobject_t obj(hobject_t(sobject_t("Object 1", CEPH_NOSNAP))); + ghobject_t objw(hobject_t(sobject_t("Object 2", CEPH_NOSNAP))); + auto ch = store->create_new_collection(cid); + const PerfCounters* logger = store->get_perf_counters(); + + pool_opts_t popts; + popts.set(pool_opts_t::DEEP_SCRUB_RECOMPRESS, static_cast (1)); + store->set_collection_opts(ch, popts); + + cerr << "Creating collection " << cid << std::endl; + { + ObjectStore::Transaction t; + t.create_collection(cid, 0); + r = queue_transaction(store, ch, std::move(t)); + ASSERT_EQ(r, 0); + } + cerr << "Making object " << cid << " " << obj << std::endl; + auto wait_fn = [&]() { + C_SaferCond c; + ObjectStore::Transaction t; + t.touch(cid, obj); + t.register_on_complete(&c); + r = queue_transaction(store, ch, std::move(t)); + ASSERT_EQ(r, 0); + c.wait(); + }; + bufferlist bl; + bufferlist expected_bl; + uint64_t len = 512 * 1024; + bl.append(std::string(len, 'a')); + { + C_SaferCond c; + ObjectStore::Transaction t; + t.write(cid, obj, 0, len, bl, CEPH_OSD_OP_FLAG_FADVISE_DONTNEED); + t.register_on_complete(&c); + r = queue_transaction(store, ch, std::move(t)); + ASSERT_EQ(r, 0); + c.wait(); + } + wait_fn(); + { + bufferlist bl; + int r = store->read(ch, obj, 0, len, bl, + CEPH_OSD_OP_FLAG_FADVISE_DONTNEED | CEPH_OSD_OP_FLAG_ALLOW_DATA_REFORMATTING); + ASSERT_EQ(r, (int)len); + ASSERT_EQ(0, logger->get(l_bluestore_reformat_compress_attempted)); + ASSERT_EQ(0, logger->get(l_bluestore_reformat_issued)); + } + cerr << "Fragmenting object " << std::endl; + { + C_SaferCond c; + bufferlist bl1; + uint64_t pos = 0; + uint64_t len1 = 4096; + bl1.append(std::string(len1, 'b')); + ObjectStore::Transaction t; + auto p = bl.begin(); + while (pos < len) { + t.write(cid, obj, pos, len1, bl1, CEPH_OSD_OP_FLAG_FADVISE_DONTNEED); + expected_bl.append(bl1); + p.copy(len1, expected_bl); + p += len1; + pos += 2 * len1; + } + t.register_on_complete(&c); + r = queue_transaction(store, ch, std::move(t)); + ASSERT_EQ(r, 0); + c.wait(); + } + wait_fn(); + { + bufferlist bl; + int r = store->read(ch, obj, 0, len, bl, + CEPH_OSD_OP_FLAG_FADVISE_DONTNEED | CEPH_OSD_OP_FLAG_ALLOW_DATA_REFORMATTING); + ASSERT_EQ(r, (int)len); + ASSERT_TRUE(bl_eq(expected_bl, bl)); + ASSERT_EQ(1, logger->get(l_bluestore_reformat_compress_attempted)); + ASSERT_EQ(0, logger->get(l_bluestore_reformat_compress_omitted)); + ASSERT_EQ(1, logger->get(l_bluestore_reformat_issued)); + } + wait_fn(); + { + bufferlist bl; + int r = store->read(ch, obj, 0, len, bl, + CEPH_OSD_OP_FLAG_FADVISE_DONTNEED | CEPH_OSD_OP_FLAG_ALLOW_DATA_REFORMATTING); + ASSERT_EQ(r, (int)len); + ASSERT_TRUE(bl_eq(expected_bl, bl)); + ASSERT_EQ(1, logger->get(l_bluestore_reformat_compress_attempted)); + ASSERT_EQ(0, logger->get(l_bluestore_reformat_compress_omitted)); + ASSERT_EQ(1, logger->get(l_bluestore_reformat_issued)); + } + // now let's write non-compressible data + cerr << "Fragmenting non-compressible object " << std::endl; + { + C_SaferCond c; + uint64_t pos = 0; + uint64_t len1 = 4096; + ObjectStore::Transaction t; + expected_bl.clear(); + bufferlist bl1; + bl1.append(gen_buffer(len).get(), len); + auto p = bl.begin(); + auto p1 = bl1.begin(); + while (pos < len) { + bufferlist b; + p1.copy(len1, b); + t.write(cid, obj, pos, len1, b, CEPH_OSD_OP_FLAG_FADVISE_DONTNEED); + expected_bl.append(b); + p.copy(len1, expected_bl); + p += len1; + pos += 2 * len1; + } + t.register_on_complete(&c); + r = queue_transaction(store, ch, std::move(t)); + ASSERT_EQ(r, 0); + c.wait(); + } + wait_fn(); + { + bufferlist bl; + int r = store->read(ch, obj, 0, len, bl, + CEPH_OSD_OP_FLAG_FADVISE_DONTNEED | CEPH_OSD_OP_FLAG_ALLOW_DATA_REFORMATTING); + ASSERT_EQ(r, (int)len); + ASSERT_TRUE(bl_eq(expected_bl, bl)); + ASSERT_EQ(2, logger->get(l_bluestore_reformat_compress_attempted)); + ASSERT_EQ(1, logger->get(l_bluestore_reformat_compress_omitted)); + ASSERT_EQ(1, logger->get(l_bluestore_reformat_issued)); + } + wait_fn(); + + //check both reformatting options enabled, data is compressible + popts.set(pool_opts_t::DEEP_SCRUB_DEFRAGMENT, static_cast (1)); + popts.set(pool_opts_t::DEEP_SCRUB_RECOMPRESS, static_cast (1)); + store->set_collection_opts(ch, popts); + cerr << "Making and fragmenting compressible object, 'both' reformatting mode" << std::endl; + { + C_SaferCond c; + ObjectStore::Transaction t; + t.write(cid, obj, 0, len, bl, CEPH_OSD_OP_FLAG_FADVISE_DONTNEED); + t.register_on_complete(&c); + r = queue_transaction(store, ch, std::move(t)); + ASSERT_EQ(r, 0); + c.wait(); + } + wait_fn(); + { + C_SaferCond c; + ObjectStore::Transaction t; + bufferlist bl1; + uint64_t pos = 0; + uint64_t len1 = 4096; + expected_bl.clear(); + bl1.append(std::string(len1, 'b')); + auto p = bl.begin(); + while (pos < len) { + t.write(cid, obj, pos, len1, bl1, CEPH_OSD_OP_FLAG_FADVISE_DONTNEED); + expected_bl.append(bl1); + p.copy(len1, expected_bl); + p += len1; + pos += 2 * len1; + } + t.register_on_complete(&c); + r = queue_transaction(store, ch, std::move(t)); + ASSERT_EQ(r, 0); + c.wait(); + } + wait_fn(); + { + bufferlist bl; + int r = store->read(ch, obj, 0, len, bl, + CEPH_OSD_OP_FLAG_FADVISE_DONTNEED | CEPH_OSD_OP_FLAG_ALLOW_DATA_REFORMATTING); + ASSERT_EQ(r, (int)len); + ASSERT_TRUE(bl_eq(expected_bl, bl)); + ASSERT_EQ(3, logger->get(l_bluestore_reformat_compress_attempted)); + ASSERT_EQ(1, logger->get(l_bluestore_reformat_compress_omitted)); + ASSERT_EQ(0, logger->get(l_bluestore_reformat_defragment_attempted)); + ASSERT_EQ(0, logger->get(l_bluestore_reformat_defragment_omitted)); + ASSERT_EQ(2, logger->get(l_bluestore_reformat_issued)); + } + wait_fn(); + { + bufferlist bl; + int r = store->read(ch, obj, 0, len, bl, + CEPH_OSD_OP_FLAG_FADVISE_DONTNEED | CEPH_OSD_OP_FLAG_ALLOW_DATA_REFORMATTING); + ASSERT_EQ(r, (int)len); + ASSERT_TRUE(bl_eq(expected_bl, bl)); + ASSERT_EQ(3, logger->get(l_bluestore_reformat_compress_attempted)); + ASSERT_EQ(1, logger->get(l_bluestore_reformat_compress_omitted)); + ASSERT_EQ(0, logger->get(l_bluestore_reformat_defragment_attempted)); + ASSERT_EQ(0, logger->get(l_bluestore_reformat_defragment_omitted)); + ASSERT_EQ(2, logger->get(l_bluestore_reformat_issued)); + } + // now write non-compressible data but this will perform recompression anyway due + // to defragmentation + // + cerr << "Making and fragmenting non-compressible object, 'both' reformatting mode" << std::endl; + { + C_SaferCond c; + ObjectStore::Transaction t; + t.write(cid, obj, 0, len, bl, CEPH_OSD_OP_FLAG_FADVISE_DONTNEED); + t.register_on_complete(&c); + r = queue_transaction(store, ch, std::move(t)); + ASSERT_EQ(r, 0); + c.wait(); + } + wait_fn(); + { + C_SaferCond c; + uint64_t pos = 0; + uint64_t len1 = 4096; + ObjectStore::Transaction t; + expected_bl.clear(); + bufferlist bl1; + bl1.append(gen_buffer(len).get(), len); + auto p = bl.begin(); + auto p1 = bl1.begin(); + while (pos < len) { + bufferlist b; + p1.copy(len1, b); + t.write(cid, obj, pos, len1, b, CEPH_OSD_OP_FLAG_FADVISE_DONTNEED); + expected_bl.append(b); + p.copy(len1, expected_bl); + p += len1; + pos += 2 * len1; + } + t.register_on_complete(&c); + r = queue_transaction(store, ch, std::move(t)); + ASSERT_EQ(r, 0); + c.wait(); + } + wait_fn(); + { + bufferlist bl; + int r = store->read(ch, obj, 0, len, bl, + CEPH_OSD_OP_FLAG_FADVISE_DONTNEED | CEPH_OSD_OP_FLAG_ALLOW_DATA_REFORMATTING); + ASSERT_EQ(r, (int)len); + ASSERT_TRUE(bl_eq(expected_bl, bl)); + ASSERT_EQ(4, logger->get(l_bluestore_reformat_compress_attempted)); + ASSERT_EQ(1, logger->get(l_bluestore_reformat_compress_omitted)); + ASSERT_EQ(1, logger->get(l_bluestore_reformat_defragment_attempted)); + ASSERT_EQ(0, logger->get(l_bluestore_reformat_defragment_omitted)); + ASSERT_EQ(3, logger->get(l_bluestore_reformat_issued)); + } + wait_fn(); + { + bufferlist bl; + int r = store->read(ch, obj, 0, len, bl, + CEPH_OSD_OP_FLAG_FADVISE_DONTNEED | CEPH_OSD_OP_FLAG_ALLOW_DATA_REFORMATTING); + ASSERT_EQ(r, (int)len); + ASSERT_TRUE(bl_eq(expected_bl, bl)); + ASSERT_EQ(4, logger->get(l_bluestore_reformat_compress_attempted)); + ASSERT_EQ(1, logger->get(l_bluestore_reformat_compress_omitted)); + ASSERT_EQ(1, logger->get(l_bluestore_reformat_defragment_attempted)); + ASSERT_EQ(0, logger->get(l_bluestore_reformat_defragment_omitted)); + ASSERT_EQ(3, logger->get(l_bluestore_reformat_issued)); + } + wait_fn(); + + { + ObjectStore::Transaction t; + t.remove(cid, obj); + t.remove_collection(cid); + cerr << "Cleaning" << std::endl; + r = queue_transaction(store, ch, std::move(t)); + ASSERT_EQ(r, 0); + } +} + +TEST_P(StoreTest, LazyCompressionReformattingTest) { + int r; + coll_t cid; + + SetVal(g_conf(), "bluestore_compression_algorithm", "lz4"); + g_ceph_context->_conf.apply_changes(nullptr); + + ghobject_t obj(hobject_t(sobject_t("Object 1", CEPH_NOSNAP))); + ghobject_t objw(hobject_t(sobject_t("Object 2", CEPH_NOSNAP))); + auto ch = store->create_new_collection(cid); + const PerfCounters* logger = store->get_perf_counters(); + + pool_opts_t popts; + popts.set(pool_opts_t::DEEP_SCRUB_RECOMPRESS, static_cast (1)); + popts.set(pool_opts_t::COMPRESSION_MODE, "force_lazy"); + + store->set_collection_opts(ch, popts); + + cerr << "Creating collection " << cid << std::endl; + { + ObjectStore::Transaction t; + t.create_collection(cid, 0); + r = queue_transaction(store, ch, std::move(t)); + ASSERT_EQ(r, 0); + } + cerr << "Making object " << cid << " " << obj << std::endl; + auto wait_fn = [&]() { + C_SaferCond c; + ObjectStore::Transaction t; + t.touch(cid, obj); + t.register_on_complete(&c); + r = queue_transaction(store, ch, std::move(t)); + ASSERT_EQ(r, 0); + c.wait(); + }; + bufferlist bl; + bufferlist expected_bl; + uint64_t len = 512 * 1024; + bl.append(std::string(len, 'a')); + expected_bl = bl; + { + C_SaferCond c; + ObjectStore::Transaction t; + t.write(cid, obj, 0, len, bl, CEPH_OSD_OP_FLAG_FADVISE_DONTNEED); + t.register_on_complete(&c); + r = queue_transaction(store, ch, std::move(t)); + ASSERT_EQ(r, 0); + c.wait(); + } + wait_fn(); + cerr << "Lazy object compression" << std::endl; + { + bufferlist bl; + int r = store->read(ch, obj, 0, len, bl, + CEPH_OSD_OP_FLAG_FADVISE_DONTNEED | CEPH_OSD_OP_FLAG_ALLOW_DATA_REFORMATTING); + ASSERT_EQ(r, (int)len); + ASSERT_TRUE(bl_eq(expected_bl, bl)); + ASSERT_EQ(1, logger->get(l_bluestore_reformat_compress_attempted)); + ASSERT_EQ(0, logger->get(l_bluestore_reformat_compress_omitted)); + ASSERT_EQ(1, logger->get(l_bluestore_reformat_issued)); + } + wait_fn(); + { + bufferlist bl; + int r = store->read(ch, obj, 0, len, bl, + CEPH_OSD_OP_FLAG_FADVISE_DONTNEED | CEPH_OSD_OP_FLAG_ALLOW_DATA_REFORMATTING); + ASSERT_EQ(r, (int)len); + ASSERT_TRUE(bl_eq(expected_bl, bl)); + ASSERT_EQ(1, logger->get(l_bluestore_reformat_compress_attempted)); + ASSERT_EQ(0, logger->get(l_bluestore_reformat_compress_omitted)); + ASSERT_EQ(1, logger->get(l_bluestore_reformat_issued)); + } + wait_fn(); + cerr << "Fragmenting object " << std::endl; + { + expected_bl.clear(); + C_SaferCond c; + bufferlist bl1; + uint64_t pos = 0; + uint64_t len1 = 4096; + bl1.append(std::string(len1, 'b')); + ObjectStore::Transaction t; + auto p = bl.begin(); + while (pos < len) { + t.write(cid, obj, pos, len1, bl1, CEPH_OSD_OP_FLAG_FADVISE_DONTNEED); + expected_bl.append(bl1); + p.copy(len1, expected_bl); + p += len1; + pos += 2 * len1; + } + t.register_on_complete(&c); + r = queue_transaction(store, ch, std::move(t)); + ASSERT_EQ(r, 0); + c.wait(); + } + wait_fn(); + { + bufferlist bl; + int r = store->read(ch, obj, 0, len, bl, + CEPH_OSD_OP_FLAG_FADVISE_DONTNEED | CEPH_OSD_OP_FLAG_ALLOW_DATA_REFORMATTING); + ASSERT_EQ(r, (int)len); + ASSERT_TRUE(bl_eq(expected_bl, bl)); + ASSERT_EQ(2, logger->get(l_bluestore_reformat_compress_attempted)); + ASSERT_EQ(0, logger->get(l_bluestore_reformat_compress_omitted)); + ASSERT_EQ(2, logger->get(l_bluestore_reformat_issued)); + } + wait_fn(); + { + bufferlist bl; + int r = store->read(ch, obj, 0, len, bl, + CEPH_OSD_OP_FLAG_FADVISE_DONTNEED | CEPH_OSD_OP_FLAG_ALLOW_DATA_REFORMATTING); + ASSERT_EQ(r, (int)len); + ASSERT_TRUE(bl_eq(expected_bl, bl)); + ASSERT_EQ(2, logger->get(l_bluestore_reformat_compress_attempted)); + ASSERT_EQ(0, logger->get(l_bluestore_reformat_compress_omitted)); + ASSERT_EQ(2, logger->get(l_bluestore_reformat_issued)); + } + // now let's write non-compressible data + cerr << "Writing non-compressible object " << std::endl; + { + C_SaferCond c; + uint64_t pos = 0; + uint64_t len1 = 4096; + ObjectStore::Transaction t; + expected_bl.clear(); + bufferlist bl1; + bl1.append(gen_buffer(len).get(), len); + auto p1 = bl1.begin(); + while (pos < len) { + bufferlist b; + p1.copy(len1, b); + t.write(cid, obj, pos, len1, b, CEPH_OSD_OP_FLAG_FADVISE_DONTNEED); + expected_bl.claim_append(b); + pos += len1; + } + t.register_on_complete(&c); + r = queue_transaction(store, ch, std::move(t)); + ASSERT_EQ(r, 0); + c.wait(); + } + wait_fn(); + { + bufferlist bl; + int r = store->read(ch, obj, 0, len, bl, + CEPH_OSD_OP_FLAG_FADVISE_DONTNEED | CEPH_OSD_OP_FLAG_ALLOW_DATA_REFORMATTING); + ASSERT_EQ(r, (int)len); + ASSERT_TRUE(bl_eq(expected_bl, bl)); + ASSERT_EQ(3, logger->get(l_bluestore_reformat_compress_attempted)); + ASSERT_EQ(1, logger->get(l_bluestore_reformat_compress_omitted)); + ASSERT_EQ(2, logger->get(l_bluestore_reformat_issued)); + } + wait_fn(); + + //check both reformatting options enabled, data is compressible + popts.set(pool_opts_t::DEEP_SCRUB_DEFRAGMENT, static_cast (1)); + popts.set(pool_opts_t::DEEP_SCRUB_RECOMPRESS, static_cast (1)); + store->set_collection_opts(ch, popts); + cerr << "Making and fragmenting compressible object, 'both' reformatting mode" << std::endl; + { + C_SaferCond c; + ObjectStore::Transaction t; + t.write(cid, obj, 0, len, bl, CEPH_OSD_OP_FLAG_FADVISE_DONTNEED); + t.register_on_complete(&c); + r = queue_transaction(store, ch, std::move(t)); + ASSERT_EQ(r, 0); + c.wait(); + } + wait_fn(); + { + C_SaferCond c; + ObjectStore::Transaction t; + bufferlist bl1; + uint64_t pos = 0; + uint64_t len1 = 4096; + expected_bl.clear(); + bl1.append(std::string(len1, 'b')); + auto p = bl.begin(); + while (pos < len) { + t.write(cid, obj, pos, len1, bl1, CEPH_OSD_OP_FLAG_FADVISE_DONTNEED); + expected_bl.append(bl1); + p.copy(len1, expected_bl); + p += len1; + pos += 2 * len1; + } + t.register_on_complete(&c); + r = queue_transaction(store, ch, std::move(t)); + ASSERT_EQ(r, 0); + c.wait(); + } + wait_fn(); + { + bufferlist bl; + int r = store->read(ch, obj, 0, len, bl, + CEPH_OSD_OP_FLAG_FADVISE_DONTNEED | CEPH_OSD_OP_FLAG_ALLOW_DATA_REFORMATTING); + ASSERT_EQ(r, (int)len); + ASSERT_TRUE(bl_eq(expected_bl, bl)); + ASSERT_EQ(4, logger->get(l_bluestore_reformat_compress_attempted)); + ASSERT_EQ(1, logger->get(l_bluestore_reformat_compress_omitted)); + ASSERT_EQ(0, logger->get(l_bluestore_reformat_defragment_attempted)); + ASSERT_EQ(0, logger->get(l_bluestore_reformat_defragment_omitted)); + ASSERT_EQ(3, logger->get(l_bluestore_reformat_issued)); + } + wait_fn(); + { + bufferlist bl; + int r = store->read(ch, obj, 0, len, bl, + CEPH_OSD_OP_FLAG_FADVISE_DONTNEED | CEPH_OSD_OP_FLAG_ALLOW_DATA_REFORMATTING); + ASSERT_EQ(r, (int)len); + ASSERT_TRUE(bl_eq(expected_bl, bl)); + ASSERT_EQ(4, logger->get(l_bluestore_reformat_compress_attempted)); + ASSERT_EQ(1, logger->get(l_bluestore_reformat_compress_omitted)); + ASSERT_EQ(0, logger->get(l_bluestore_reformat_defragment_attempted)); + ASSERT_EQ(0, logger->get(l_bluestore_reformat_defragment_omitted)); + ASSERT_EQ(3, logger->get(l_bluestore_reformat_issued)); + } + { + ObjectStore::Transaction t; + t.remove(cid, obj); + t.remove_collection(cid); + cerr << "Cleaning" << std::endl; + r = queue_transaction(store, ch, std::move(t)); + ASSERT_EQ(r, 0); + } +} + #endif // WITH_BLUESTORE int main(int argc, char **argv) {