Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

blk/aio: fix long batch (64+K entries) submission. #56352

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
50 changes: 30 additions & 20 deletions src/blk/aio/aio.cc
Expand Up @@ -16,7 +16,7 @@ std::ostream& operator<<(std::ostream& os, const aio_t& aio)
}

int aio_queue_t::submit_batch(aio_iter begin, aio_iter end,
uint16_t aios_size, void *priv,
void *priv,
int *retries)
{
// 2^16 * 125us = ~8 seconds, so max sleep is ~16 seconds
Expand All @@ -25,33 +25,43 @@ int aio_queue_t::submit_batch(aio_iter begin, aio_iter end,
int r;

aio_iter cur = begin;
struct aio_t *piocb[aios_size];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem, as I understand it, is very simple: aios_size got an overflow.
I think solution should be to either:

  1. change uint16_t to uint32_t (preferred)
  2. count elements between begin and end; use alloca to allocate piocb.

The only benefit of current solution is that it does minimize stack allocation, but at cost of added complexity. I do not think its worth it.
Note that for every entry in piocb we will be reading at least 4K from the drive. I think we can afford having entire piocb on stack.

Copy link
Contributor Author

@ifed01 ifed01 Apr 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aclamk - you might be missing one more aspect - amount of chunks (and hence amount IOs issued in a single batch) in an object is determined (to some degree) by cluster's users. So it becomes possible for them to use "bad" objects which could DDOS OSDs or do something inappropriate.
And this patch provides a sort of protection against that - at aio level at least..

Given that I'd prefer not to [ab]use stack for this task.

int left = 0;
while (cur != end) {
cur->priv = priv;
*(piocb+left) = &(*cur);
++left;
++cur;
}
ceph_assert(aios_size >= left);
#if defined(HAVE_LIBAIO)
struct aio_t *piocb[max_iodepth];
#endif
int done = 0;
while (left > 0) {
int pushed = 0; //used for LIBAIO only
int pulled = 0;
while (cur != end || pushed < pulled) {
#if defined(HAVE_LIBAIO)
r = io_submit(ctx, std::min(left, max_iodepth), (struct iocb**)(piocb + done));
while (cur != end && pulled < max_iodepth) {
cur->priv = priv;
piocb[pulled] = &(*cur);
++pulled;
++cur;
}
int toSubmit = pulled - pushed;
r = io_submit(ctx, toSubmit, (struct iocb**)(piocb + pushed));
if (r >= 0 && r < toSubmit) {
pushed += r;
done += r;
r = -EAGAIN;
}
#elif defined(HAVE_POSIXAIO)
if (piocb[done]->n_aiocb == 1) {
cur->priv = priv;
if ((cur->n_aiocb == 1) {
// TODO: consider batching multiple reads together with lio_listio
piocb[done]->aio.aiocb.aio_sigevent.sigev_notify = SIGEV_KEVENT;
piocb[done]->aio.aiocb.aio_sigevent.sigev_notify_kqueue = ctx;
piocb[done]->aio.aiocb.aio_sigevent.sigev_value.sival_ptr = piocb[done];
r = aio_read(&piocb[done]->aio.aiocb);
cur->aio.aiocb.aio_sigevent.sigev_notify = SIGEV_KEVENT;
cur->aio.aiocb.aio_sigevent.sigev_notify_kqueue = ctx;
cur->aio.aiocb.aio_sigevent.sigev_value.sival_ptr = &(*cur);
r = aio_write(&cur->aio.aiocb);
} else {
struct sigevent sev;
sev.sigev_notify = SIGEV_KEVENT;
sev.sigev_notify_kqueue = ctx;
sev.sigev_value.sival_ptr = piocb[done];
r = lio_listio(LIO_NOWAIT, &piocb[done]->aio.aiocbp, piocb[done]->n_aiocb, &sev);
sev.sigev_value.sival_ptr = &(*cur);
r = lio_listio(LIO_NOWAIT, &cur->aio.aiocbp, cur->n_aiocb, &sev);
}
++cur;
#endif
if (r < 0) {
if (r == -EAGAIN && attempts-- > 0) {
Expand All @@ -64,9 +74,9 @@ int aio_queue_t::submit_batch(aio_iter begin, aio_iter end,
}
ceph_assert(r > 0);
done += r;
left -= r;
attempts = 16;
delay = 125;
pushed = pulled = 0;
}
return done;
}
Expand Down
4 changes: 2 additions & 2 deletions src/blk/aio/aio.h
Expand Up @@ -100,7 +100,7 @@ struct io_queue_t {

virtual int init(std::vector<int> &fds) = 0;
virtual void shutdown() = 0;
virtual int submit_batch(aio_iter begin, aio_iter end, uint16_t aios_size,
virtual int submit_batch(aio_iter begin, aio_iter end,
void *priv, int *retries) = 0;
virtual int get_next_completed(int timeout_ms, aio_t **paio, int max) = 0;
};
Expand Down Expand Up @@ -153,7 +153,7 @@ struct aio_queue_t final : public io_queue_t {
}
}

int submit_batch(aio_iter begin, aio_iter end, uint16_t aios_size,
int submit_batch(aio_iter begin, aio_iter end,
void *priv, int *retries) final;
int get_next_completed(int timeout_ms, aio_t **paio, int max) final;
};
8 changes: 3 additions & 5 deletions src/blk/kernel/KernelDevice.cc
Expand Up @@ -344,11 +344,11 @@ void KernelDevice::close()
extblkdev::release_device(ebd_impl);

for (int i = 0; i < WRITE_LIFE_MAX; i++) {
assert(fd_directs[i] >= 0);
ceph_assert(fd_directs[i] >= 0);
VOID_TEMP_FAILURE_RETRY(::close(fd_directs[i]));
fd_directs[i] = -1;

assert(fd_buffereds[i] >= 0);
ceph_assert(fd_buffereds[i] >= 0);
VOID_TEMP_FAILURE_RETRY(::close(fd_buffereds[i]));
fd_buffereds[i] = -1;
}
Expand Down Expand Up @@ -910,10 +910,8 @@ void KernelDevice::aio_submit(IOContext *ioc)

void *priv = static_cast<void*>(ioc);
int r, retries = 0;
// num of pending aios should not overflow when passed to submit_batch()
assert(pending <= std::numeric_limits<uint16_t>::max());
r = io_queue->submit_batch(ioc->running_aios.begin(), e,
pending, priv, &retries);
priv, &retries);

if (retries)
derr << __func__ << " retries " << retries << dendl;
Expand Down
3 changes: 1 addition & 2 deletions src/blk/kernel/io_uring.cc
Expand Up @@ -176,10 +176,9 @@ void ioring_queue_t::shutdown()
}

int ioring_queue_t::submit_batch(aio_iter beg, aio_iter end,
uint16_t aios_size, void *priv,
void *priv,
int *retries)
{
(void)aios_size;
(void)retries;

pthread_mutex_lock(&d->sq_mutex);
Expand Down
2 changes: 1 addition & 1 deletion src/blk/kernel/io_uring.h
Expand Up @@ -27,7 +27,7 @@ struct ioring_queue_t final : public io_queue_t {
int init(std::vector<int> &fds) final;
void shutdown() final;

int submit_batch(aio_iter begin, aio_iter end, uint16_t aios_size,
int submit_batch(aio_iter begin, aio_iter end,
void *priv, int *retries) final;
int get_next_completed(int timeout_ms, aio_t **paio, int max) final;
};
149 changes: 149 additions & 0 deletions src/test/objectstore/store_test.cc
Expand Up @@ -7734,6 +7734,155 @@ TEST_P(StoreTestSpecificAUSize, BlobReuseOnOverwrite) {
}
}

ifed01 marked this conversation as resolved.
Show resolved Hide resolved
TEST_P(StoreTestSpecificAUSize, ManyManyExtents) {

if (string(GetParam()) != "bluestore")
return;

size_t block_size = 4096;
StartDeferred(block_size);

int r;
coll_t cid;
ghobject_t hoid(hobject_t("test", "", CEPH_NOSNAP, 0, -1, ""));

const PerfCounters* logger = store->get_perf_counters();

auto ch = store->create_new_collection(cid);
{
ObjectStore::Transaction t;
t.create_collection(cid, 0);
r = queue_transaction(store, ch, std::move(t));
ASSERT_EQ(r, 0);
}
const size_t max_iterations = 129;
const size_t max_txn_ops = 512;
bufferlist bl;
{
for (size_t i = 0; i < max_iterations; i++) {
ObjectStore::Transaction t;
for (size_t j = 0; j < max_txn_ops; j++) {
bl.clear();
bl.append(std::string(1, 'a' + j % 26));
t.write(cid, hoid, (i * max_txn_ops + j) * 4096, bl.length(), bl, CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
}
r = queue_transaction(store, ch, std::move(t));
ASSERT_EQ(r, 0);
cerr << "iter " << i << "/" << max_iterations - 1 << std::endl;
}
}
ch.reset();
store->umount();
store->mount();
ch = store->open_collection(cid);
{
bl.clear();
size_t len = (max_iterations * max_txn_ops) * 4096 - 4095;
cerr << "reading in a single chunk, size =" << len << std::endl;
r = store->read(ch, hoid,
0, len,
bl, CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
ASSERT_EQ(r, len);
ASSERT_EQ(r, bl.length());
size_t idx = 0;
for (size_t i = 0; i < max_iterations; i++) {
for (size_t j = 0; j < max_txn_ops; j++) {
ASSERT_EQ(bl[idx], 'a' + j % 26);
idx += 4096;
}
}
}
ch.reset();
store->umount();
store->mount();
ch = store->open_collection(cid);
{
cerr << "reading in multiple chunks..." << std::endl;
bl.clear();
store->fiemap(ch, hoid, 0, 1ull << 31, bl);
map<uint64_t,uint64_t> m;
auto p = bl.cbegin();
decode(m, p);

bl.clear();
interval_set<uint64_t> im(std::move(m));
r = store->readv(ch, hoid, im, bl, 0);
ASSERT_EQ(r, max_txn_ops * max_iterations);
ASSERT_EQ(r, bl.length());
size_t idx = 0;
for (size_t i = 0; i < max_iterations; i++) {
for (size_t j = 0; j < max_txn_ops; j++) {
ASSERT_EQ(bl[idx++], 'a' + j % 26);
}
}
}
store->refresh_perf_counters();
cerr << "blobs = " << logger->get(l_bluestore_blobs)
<< " extents = " << logger->get(l_bluestore_extents)
<< std::endl;
{
ObjectStore::Transaction t;
t.remove(cid, hoid);
t.remove_collection(cid);
cerr << "Cleaning" << std::endl;
r = queue_transaction(store, ch, std::move(t));
ASSERT_EQ(r, 0);
}
}

TEST_P(StoreTestSpecificAUSize, ManyManyExtents2) {

if (string(GetParam()) != "bluestore")
return;

size_t block_size = 4096;
StartDeferred(block_size);

int r;
coll_t cid;
ghobject_t hoid(hobject_t("test", "", CEPH_NOSNAP, 0, -1, ""));

auto ch = store->create_new_collection(cid);
{
ObjectStore::Transaction t;
t.create_collection(cid, 0);
r = queue_transaction(store, ch, std::move(t));
ASSERT_EQ(r, 0);
}
{
ObjectStore::Transaction t;
bufferlist bl;
bl.append(std::string(1024 * 1024, 'a'));
t.write(cid, hoid, 0, bl.length(), bl, 0);
r = queue_transaction(store, ch, std::move(t));
ASSERT_EQ(r, 0);
}
ch.reset();
store->umount();
store->mount();
ch = store->open_collection(cid);
{
cerr << "reading in multiple chunks..." << std::endl;
bufferlist bl;
interval_set<uint64_t> im;
for (int i=0; i < 100000;i++) {
im.insert(i * 2, 1);
}
r = store->readv(ch, hoid, im, bl, 0);
ASSERT_EQ(r, 100000);
ASSERT_EQ(r, bl.length());
}
store->refresh_perf_counters();
{
ObjectStore::Transaction t;
t.remove(cid, hoid);
t.remove_collection(cid);
cerr << "Cleaning" << std::endl;
r = queue_transaction(store, ch, std::move(t));
ASSERT_EQ(r, 0);
}
}

TEST_P(StoreTestSpecificAUSize, ZeroBlockDetectionSmallAppend) {
CephContext *cct = (new CephContext(CEPH_ENTITY_TYPE_CLIENT))->get();
if (string(GetParam()) != "bluestore" || !cct->_conf->bluestore_zero_block_detection) {
Expand Down