Skip to content
Permalink
Browse files
MDEV-23399 fixup: Interleaved doublewrite batches
Author: Vladislav Vaintroub
  • Loading branch information
dr-m committed Oct 26, 2020
1 parent 8cb01c5 commit ef3f71f
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 62 deletions.
@@ -53,8 +53,8 @@ inline buf_block_t *buf_dblwr_trx_sys_get(mtr_t *mtr)
@param header doublewrite page header in the TRX_SYS page */
inline void buf_dblwr_t::init(const byte *header)
{
ut_ad(!first_free);
ut_ad(!reserved);
ut_ad(!active_slot->first_free);
ut_ad(!active_slot->reserved);
ut_ad(!batch_running);

mysql_mutex_init(buf_dblwr_mutex_key, &mutex, nullptr);
@@ -63,10 +63,14 @@ inline void buf_dblwr_t::init(const byte *header)
block2= page_id_t(0, mach_read_from_4(header + TRX_SYS_DOUBLEWRITE_BLOCK2));

const uint32_t buf_size= 2 * block_size();
write_buf= static_cast<byte*>(aligned_malloc(buf_size << srv_page_size_shift,
srv_page_size));
buf_block_arr= static_cast<element*>
(ut_zalloc_nokey(buf_size * sizeof(element)));
for (int i= 0; i < 2; i++)
{
slots[i].write_buf= static_cast<byte*>
(aligned_malloc(buf_size << srv_page_size_shift, srv_page_size));
slots[i].buf_block_arr= static_cast<element*>
(ut_zalloc_nokey(buf_size * sizeof(element)));
}
active_slot= &slots[0];
}

/** Create or restore the doublewrite buffer in the TRX_SYS page.
@@ -272,6 +276,7 @@ dberr_t buf_dblwr_t::init_or_load_pages(pfs_os_file_t file, const char *path)
TRX_SYS_DOUBLEWRITE + read_buf) !=
TRX_SYS_DOUBLEWRITE_SPACE_ID_STORED_N;

auto write_buf= active_slot->write_buf;
/* Read the pages from the doublewrite buffer to memory */
err= os_file_read(IORequestRead, file, write_buf,
block1.page_no() << srv_page_size_shift,
@@ -443,16 +448,20 @@ void buf_dblwr_t::close()
return;

/* Free the double write data structures. */
ut_ad(!reserved);
ut_ad(!first_free);
ut_ad(!active_slot->reserved);
ut_ad(!active_slot->first_free);
ut_ad(!batch_running);

mysql_cond_destroy(&cond);
aligned_free(write_buf);
ut_free(buf_block_arr);
for (int i= 0; i < 2; i++)
{
aligned_free(slots[i].write_buf);
ut_free(slots[i].buf_block_arr);
}
mysql_mutex_destroy(&mutex);

memset((void*) this, 0, sizeof *this);
active_slot= &slots[0];
}

/** Update the doublewrite buffer on write completion. */
@@ -466,18 +475,19 @@ void buf_dblwr_t::write_completed()
mysql_mutex_lock(&mutex);

ut_ad(batch_running);
ut_ad(reserved);
ut_ad(reserved <= first_free);
slot *flush_slot= active_slot == &slots[0] ? &slots[1] : &slots[0];
ut_ad(flush_slot->reserved);
ut_ad(flush_slot->reserved <= flush_slot->first_free);

if (!--reserved)
if (!--flush_slot->reserved)
{
mysql_mutex_unlock(&mutex);
/* This will finish the batch. Sync data files to the disk. */
fil_flush_file_spaces();
mysql_mutex_lock(&mutex);

/* We can now reuse the doublewrite memory buffer: */
first_free= 0;
flush_slot->first_free= 0;
batch_running= false;
mysql_cond_broadcast(&cond);
}
@@ -552,25 +562,30 @@ bool buf_dblwr_t::flush_buffered_writes(const ulint size)

for (;;)
{
if (!first_free)
if (!active_slot->first_free)
return false;
if (!batch_running)
break;
mysql_cond_wait(&cond, &mutex);
}

ut_ad(reserved == first_free);
/* Disallow anyone else to post to doublewrite buffer or to
start another batch of flushing. */
ut_ad(active_slot->reserved == active_slot->first_free);

/* Disallow anyone else to start another batch of flushing. */
slot *flush_slot= active_slot;
/* Switch the active slot */
active_slot= active_slot == &slots[0] ? &slots[1] : &slots[0];
ut_a(active_slot->first_free == 0);
batch_running= true;
const ulint old_first_free= first_free;
const ulint old_first_free= flush_slot->first_free;
auto write_buf= flush_slot->write_buf;

/* Now safe to release the mutex. */
mysql_mutex_unlock(&mutex);
#ifdef UNIV_DEBUG
for (ulint len2= 0, i= 0; i < old_first_free; len2 += srv_page_size, i++)
{
buf_page_t *bpage= buf_block_arr[i].request.bpage;
buf_page_t *bpage= flush_slot->buf_block_arr[i].request.bpage;

if (bpage->zip.data)
/* No simple validate for ROW_FORMAT=COMPRESSED pages exists. */
@@ -602,7 +617,7 @@ bool buf_dblwr_t::flush_buffered_writes(const ulint size)
}

/* increment the doublewrite flushed pages counter */
srv_stats.dblwr_pages_written.add(first_free);
srv_stats.dblwr_pages_written.add(flush_slot->first_free);
srv_stats.dblwr_writes.inc();

/* Now flush the doublewrite buffer data to disk */
@@ -612,20 +627,13 @@ bool buf_dblwr_t::flush_buffered_writes(const ulint size)
and in recovery we will find them in the doublewrite buffer
blocks. Next do the writes to the intended positions. */

/* Up to this point old_first_free == first_free because we have set
the batch_running flag disallowing any other thread to post any
request but we can't safely access first_free in the loop below.
This is so because it is possible that after we are done with the
last iteration and before we terminate the loop, the batch gets
finished in the IO helper thread and another thread posts a new
batch setting first_free to a higher value. If this happens and we
are using first_free in the loop termination condition then we'll
end up dispatching the same block twice from two different
threads. */
ut_ad(old_first_free == first_free);

ut_ad(active_slot != flush_slot);
ut_ad(flush_slot->first_free == old_first_free);

for (ulint i= 0; i < old_first_free; i++)
{
auto e= buf_block_arr[i];
auto e= flush_slot->buf_block_arr[i];
buf_page_t* bpage= e.request.bpage;
ut_ad(bpage->in_file());

@@ -696,30 +704,29 @@ void buf_dblwr_t::add_to_batch(fil_space_t *space, const IORequest &request,

for (;;)
{
while (batch_running)
mysql_cond_wait(&cond, &mutex);

ut_ad(first_free <= buf_size);
if (first_free != buf_size)
ut_ad(active_slot->first_free <= buf_size);
if (active_slot->first_free != buf_size)
break;

if (flush_buffered_writes(buf_size / 2))
mysql_mutex_lock(&mutex);
}

byte *p= write_buf + srv_page_size * first_free;
byte *p= active_slot->write_buf + srv_page_size * active_slot->first_free;

/* We request frame here to get correct buffer in case of
encryption and/or page compression */
void *frame= buf_page_get_frame(request.bpage);

memcpy_aligned<OS_FILE_LOG_BLOCK_SIZE>(p, frame, size);
ut_ad(!request.bpage->zip_size() || request.bpage->zip_size() == size);
ut_ad(reserved == first_free);
ut_ad(reserved < buf_size);
new (buf_block_arr + first_free++) element{space, request, size};
reserved= first_free;

if (first_free != buf_size || !flush_buffered_writes(buf_size / 2))
ut_ad(active_slot->reserved == active_slot->first_free);
ut_ad(active_slot->reserved < buf_size);
new (active_slot->buf_block_arr + active_slot->first_free++)
element{space, request, size};
active_slot->reserved= active_slot->first_free;

if (active_slot->first_free != buf_size ||
!flush_buffered_writes(buf_size / 2))
mysql_mutex_unlock(&mutex);
}
@@ -32,6 +32,29 @@ Created 2011/12/19 Inaam Rana
/** Doublewrite control struct */
class buf_dblwr_t
{
struct element
{
/** tablespace */
fil_space_t *space;
/** asynchronous write request */
IORequest request;
/** payload size in bytes */
size_t size;
};

struct slot
{
/** first free position in write_buf measured in units of
* srv_page_size */
ulint first_free;
/** number of slots reserved for the current write batch */
ulint reserved;
/** the doublewrite buffer, aligned to srv_page_size */
byte* write_buf;
/** buffer blocks to be written via write_buf */
element* buf_block_arr;
};

/** the page number of the first doublewrite block (block_size() pages) */
page_id_t block1= page_id_t(0, 0);
/** the page number of the second doublewrite block (block_size() pages) */
@@ -43,25 +66,10 @@ class buf_dblwr_t
mysql_cond_t cond;
/** whether a batch is being written from the doublewrite buffer */
bool batch_running;
/** first free position in write_buf measured in units of srv_page_size */
ulint first_free;
/** number of slots reserved for the current write batch */
ulint reserved;
/** the doublewrite buffer, aligned to srv_page_size */
byte *write_buf;

struct element
{
/** tablespace */
fil_space_t *space;
/** asynchronous write request */
IORequest request;
/** payload size in bytes */
size_t size;
};
slot slots[2];
slot *active_slot=&slots[0];

/** buffer blocks to be written via write_buf */
element *buf_block_arr;

/** Initialize the doublewrite buffer data structure.
@param header doublewrite page header in the TRX_SYS page */

0 comments on commit ef3f71f

Please sign in to comment.