Skip to content

Commit

Permalink
Merge pull request #10215 from ifed01/wip-bluestore-blob-release-fix
Browse files Browse the repository at this point in the history
os/bluestore: refactor dirty blob tracking along with some related fixes

Reviewed-by: Sage Weil <sage@redhat.com>
  • Loading branch information
liewegas committed Aug 11, 2016
2 parents 5a84b78 + 6beade7 commit 3111eb6
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 72 deletions.
109 changes: 72 additions & 37 deletions src/os/bluestore/BlueStore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1144,11 +1144,27 @@ bool BlueStore::OnodeSpace::map_any(std::function<bool(OnodeRef)> f)
void BlueStore::Blob::discard_unallocated()
{
size_t pos = 0;
for (auto e : blob.extents) {
if (!e.is_valid()) {
bc.discard(pos, e.length);
if (blob.is_compressed()) {
bool discard = false;
bool all_invalid = true;
for (auto e : blob.extents) {
if (!e.is_valid()) {
discard = true;
} else {
all_invalid = false;
}
}
assert(discard == all_invalid); //in case of compressed blob all or none pextents are invalid.
if(discard) {
bc.discard(0, blob.get_compressed_payload_original_length());
}
} else {
for (auto e : blob.extents) {
if (!e.is_valid()) {
bc.discard(pos, e.length);
}
pos += e.length;
}
pos += e.length;
}
}

Expand Down Expand Up @@ -1177,6 +1193,7 @@ void BlueStore::BlobMap::decode(bufferlist::iterator& p, Cache *c)
::decode(id, p);
Blob *b = new Blob(id, c);
::decode(b->blob, p);
b->get();
blob_map.insert(*b);
}
}
Expand Down Expand Up @@ -3545,6 +3562,35 @@ int BlueStore::read(
return r;
}

// --------------------------------------------------------
// intermediate data structures used while reading
struct region_t {
uint64_t logical_offset;
uint64_t blob_xoffset; //region offset within the blob
uint64_t length;

region_t(uint64_t offset, uint64_t b_offs, uint64_t len)
: logical_offset(offset),
blob_xoffset(b_offs),
length(len) {}
region_t(const region_t& from)
: logical_offset(from.logical_offset),
blob_xoffset(from.blob_xoffset),
length(from.length) {}

friend ostream& operator<<(ostream& out, const region_t& r) {
return out << "0x" << std::hex << r.logical_offset << ":"
<< r.blob_xoffset << "~" << r.length << std::dec;
}
};

bool less(const BlueStore::BlobRef& a, const BlueStore::BlobRef& b)
{
return *a < *b;
}
typedef list<region_t> regions2read_t;
typedef map<BlueStore::BlobRef, regions2read_t> blobs2read_t;

int BlueStore::_do_read(
Collection *c,
OnodeRef o,
Expand Down Expand Up @@ -3603,7 +3649,7 @@ int BlueStore::_do_read(
pos += hole;
left -= hole;
}
Blob *bptr = c->get_blob(o, lp->second.blob);
BlobRef bptr = c->get_blob(o, lp->second.blob);
if (bptr == nullptr) {
dout(20) << __func__ << " missed blob " << lp->second.blob << dendl;
assert(bptr != nullptr);
Expand Down Expand Up @@ -3651,7 +3697,7 @@ int BlueStore::_do_read(
//enumerate and read/decompress desired blobs
blobs2read_t::iterator b2r_it = blobs2read.begin();
while (b2r_it != blobs2read.end()) {
Blob* bptr = b2r_it->first;
BlobRef bptr = b2r_it->first;
dout(20) << __func__ << " blob " << *bptr << std::hex
<< " need 0x" << b2r_it->second << std::dec << dendl;
if (bptr->blob.has_flag(bluestore_blob_t::FLAG_COMPRESSED)) {
Expand Down Expand Up @@ -4681,20 +4727,10 @@ void BlueStore::_txc_state_proc(TransContext *txc)
//assert(txc->osr->qlock.is_locked()); // see _txc_finish_io
txc->log_state_latency(logger, l_bluestore_state_io_done_lat);
txc->state = TransContext::STATE_KV_QUEUED;
// FIXME: use a per-txc dirty blob list?

if (txc->first_collection) {
(txc->first_collection)->lock.get_read();
for (auto& b : txc->blobs) {
b->bc.finish_write(txc->seq);
}
for (auto& o : txc->onodes) {
for (auto& p : o->blob_map.blob_map) {
p.bc.finish_write(txc->seq);
}
}
if (txc->first_collection) {
(txc->first_collection)->lock.put_read();
}

txc->blobs.clear();
if (!g_conf->bluestore_sync_transaction) {
if (g_conf->bluestore_sync_submit_transaction) {
_txc_finalize_kv(txc, txc->t);
Expand Down Expand Up @@ -5257,7 +5293,7 @@ int BlueStore::queue_transactions(

// delayed csum calculation?
for (auto& d : txc->deferred_csum) {
Blob *b = d.onode->get_blob(d.blob);
BlobRef b = d.onode->get_blob(d.blob);
dout(20) << __func__ << " deferred csum calc blob " << d.blob
<< " b_off 0x" << std::hex << d.b_off << std::dec
<< " on " << d.onode->oid << dendl;
Expand Down Expand Up @@ -5821,7 +5857,7 @@ void BlueStore::_do_write_small(
blp.copy(length, bl);

// look for an existing mutable blob we can use
Blob *b = 0;
BlobRef b = 0;
map<uint64_t,bluestore_lextent_t>::iterator ep = o->onode.seek_lextent(offset);
if (ep != o->onode.extent_map.begin()) {
--ep;
Expand Down Expand Up @@ -5893,8 +5929,8 @@ void BlueStore::_do_write_small(
<< " pad 0x" << head_pad << " + 0x" << tail_pad
<< std::dec << " of mutable " << blob << ": " << b << dendl;
assert(b->blob.is_unreferenced(b_off, b_len));
b->bc.write(txc->seq, b_off, padded,
wctx->buffered ? 0 : Buffer::FLAG_NOCACHE);
_buffer_cache_write(txc, b, b_off, padded, wctx->buffered ? 0 : Buffer::FLAG_NOCACHE);

b->blob.map_bl(
b_off, padded,
[&](uint64_t offset, uint64_t length, bufferlist& t) {
Expand Down Expand Up @@ -5961,8 +5997,8 @@ void BlueStore::_do_write_small(
b->blob.is_allocated(b_off, b_len)) {
bluestore_wal_op_t *op = _get_wal_op(txc, o);
op->op = bluestore_wal_op_t::OP_WRITE;
b->bc.write(txc->seq, b_off, padded,
wctx->buffered ? 0 : Buffer::FLAG_NOCACHE);
_buffer_cache_write(txc, b, b_off, padded, wctx->buffered ? 0 : Buffer::FLAG_NOCACHE);

b->blob.map(
b_off, b_len,
[&](uint64_t offset, uint64_t length) {
Expand Down Expand Up @@ -5992,7 +6028,7 @@ void BlueStore::_do_write_small(
b = o->blob_map.new_blob(c->cache);
unsigned alloc_len = min_alloc_size;
uint64_t b_off = P2PHASE(offset, alloc_len);
b->bc.write(txc->seq, b_off, bl, wctx->buffered ? 0 : Buffer::FLAG_NOCACHE);
_buffer_cache_write(txc, b, b_off, bl, wctx->buffered ? 0 : Buffer::FLAG_NOCACHE);
_pad_zeros(&bl, &b_off, block_size);
bluestore_lextent_t lex(b->id, P2PHASE(offset, alloc_len), length);
o->onode.set_lextent(offset, lex, &b->blob, &wctx->lex_old);
Expand Down Expand Up @@ -6021,11 +6057,11 @@ void BlueStore::_do_write_big(
<< " compress " << (int)wctx->compress
<< std::dec << dendl;
while (length > 0) {
Blob *b = o->blob_map.new_blob(c->cache);
BlobRef b = o->blob_map.new_blob(c->cache);
auto l = MIN(max_blob_len, length);
bufferlist t;
blp.copy(l, t);
b->bc.write(txc->seq, 0, t, wctx->buffered ? 0 : Buffer::FLAG_NOCACHE);
_buffer_cache_write(txc, b, 0, t, wctx->buffered ? 0 : Buffer::FLAG_NOCACHE);
wctx->write(b, l, 0, t, false);
bluestore_lextent_t lex(b->id, 0, l);
o->onode.set_lextent(offset, lex, &b->blob, &wctx->lex_old);
Expand Down Expand Up @@ -6059,7 +6095,7 @@ int BlueStore::_do_alloc_write(

uint64_t hint = 0;
for (auto& wi : wctx->writes) {
Blob *b = wi.b;
BlobRef b = wi.b;
uint64_t b_off = wi.b_off;
bufferlist *l = &wi.bl;
uint64_t final_length = wi.blob_length;
Expand Down Expand Up @@ -6182,10 +6218,10 @@ void BlueStore::_wctx_finish(
WriteContext *wctx)
{
dout(10) << __func__ << " lex_old " << wctx->lex_old << dendl;
set<pair<bool, Blob*> > blobs2remove;
set<pair<bool, BlobRef> > blobs2remove;
for (auto &lo : wctx->lex_old) {
bluestore_lextent_t& l = lo.second;
Blob *b = c->get_blob(o, l.blob);
BlobRef b = c->get_blob(o, l.blob);
vector<bluestore_pextent_t> r;
bool compressed = b->blob.is_compressed();
if (o->onode.deref_lextent(lo.first, l, &b->blob, min_alloc_size, &r)) {
Expand Down Expand Up @@ -6215,13 +6251,12 @@ void BlueStore::_wctx_finish(
}
}
for (auto br : blobs2remove) {
Blob* b = br.second;
dout(20) << __func__ << " rm blob " << *b << dendl;
txc->statfs_delta.compressed() -= b->blob.get_compressed_payload_length();
dout(20) << __func__ << " rm blob " << *br.second << dendl;
txc->statfs_delta.compressed() -= br.second->blob.get_compressed_payload_length();
if (br.first) {
o->blob_map.erase(b);
o->blob_map.erase(br.second);
} else {
o->bnode->blob_map.erase(b);
o->bnode->blob_map.erase(br.second);
}
}

Expand Down Expand Up @@ -6767,7 +6802,7 @@ int BlueStore::_clone(TransContext *txc,
map<int64_t,int64_t> moved_blobs;
for (auto& p : oldo->onode.extent_map) {
if (!p.second.is_shared()) {
Blob *b;
BlobRef b;
if (moved_blobs.count(p.second.blob) == 0) {
b = oldo->blob_map.get(p.second.blob);
oldo->blob_map.erase(b);
Expand Down

0 comments on commit 3111eb6

Please sign in to comment.