Skip to content

Commit

Permalink
MDEV-31273: Refactor MYSQL_BIN_LOG::write_cache()
Browse files Browse the repository at this point in the history
Preparatory patch for pre-computing binlog checksums outside of holding
LOCK_log.

The existing code for MYSQL_BIN_LOG::write_cache() was needlessly complex
and very hard to understand and modify for handling the new case where
pre-computed checksums are already present in the IO_CACHE.

Greatly simplify the logic by replacing the (implicit) state machine with
direct code that pulls the events one by one from the IO_CACHE. This removes
a lot of state flags and avoids duplicate code for handling full vs. split
headers.

This also removes the need for the CacheWriter class. As a bonus, this fixes
the bug that CacheWriter::write() was completely ignoring write errors. No
other functional changes are done with this patch, only code cleanup.

Reviewed-by: Monty <monty@mariadb.org>
Signed-off-by: Kristian Nielsen <knielsen@knielsen-hq.org>
  • Loading branch information
knielsen committed Oct 26, 2023
1 parent 8eee980 commit 24c923d
Showing 1 changed file with 63 additions and 150 deletions.
213 changes: 63 additions & 150 deletions sql/log.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7713,40 +7713,6 @@ uint MYSQL_BIN_LOG::next_file_id()
return res;
}

class CacheWriter: public Log_event_writer
{
public:
size_t remains;

CacheWriter(THD *thd_arg, IO_CACHE *file_arg,
enum_binlog_checksum_alg checksum_alg,
Binlog_crypt_data *cr)
: Log_event_writer(file_arg, 0, checksum_alg, cr), remains(0), thd(thd_arg),
first(true)
{
}

~CacheWriter()
{ status_var_add(thd->status_var.binlog_bytes_written, bytes_written); }

int write(uchar* pos, size_t len)
{
DBUG_ENTER("CacheWriter::write");
if (first)
write_header(pos, len);
else
write_data(pos, len);

remains -= len;
if ((first= !remains))
write_footer();
DBUG_RETURN(0);
}
private:
THD *thd;
bool first;
};

static int binlog_online_alter_end_trans(THD *thd, bool all, bool commit)
{
DBUG_ENTER("binlog_online_alter_end_trans");
Expand Down Expand Up @@ -7916,13 +7882,15 @@ int Event_log::write_cache(THD *thd, IO_CACHE *cache)
mysql_mutex_assert_owner(&LOCK_log);
if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
DBUG_RETURN(ER_ERROR_ON_WRITE);
size_t length= my_b_bytes_in_cache(cache), group, carry, hdr_offs;
size_t val;
/* Amount of remaining bytes in the IO_CACHE read buffer. */
size_t group;
size_t end_log_pos_inc= 0; // each event processed adds BINLOG_CHECKSUM_LEN 2 t
uchar header[LOG_EVENT_HEADER_LEN];
CacheWriter writer(thd, get_log_file(),
(enum_binlog_checksum_alg)binlog_checksum_options,
&crypto);
uchar header_buf[LOG_EVENT_HEADER_LEN];
Log_event_writer writer(get_log_file(), 0,
(enum_binlog_checksum_alg)binlog_checksum_options,
&crypto);
uint checksum_len= writer.checksum_len;
int err= 0;

if (crypto.scheme)
{
Expand All @@ -7948,128 +7916,73 @@ int Event_log::write_cache(THD *thd, IO_CACHE *cache)
*/

group= (size_t)my_b_tell(get_log_file());
hdr_offs= carry= 0;

do
for (;;)
{
/*
if we only got a partial header in the last iteration,
get the other half now and process a full header.
Empty cache at an event boundary means we are done (but empty cache
elsewhere is an error).
*/
if (unlikely(carry > 0))
{
DBUG_ASSERT(carry < LOG_EVENT_HEADER_LEN);
size_t tail= LOG_EVENT_HEADER_LEN - carry;

/* assemble both halves */
memcpy(&header[carry], (char *)cache->read_pos, tail);

uint32 len= uint4korr(header + EVENT_LEN_OFFSET);
writer.remains= len;

/* fix end_log_pos */
end_log_pos_inc += writer.checksum_len;
val= uint4korr(header + LOG_POS_OFFSET) + group + end_log_pos_inc;
int4store(header + LOG_POS_OFFSET, val);

/* fix len */
len+= writer.checksum_len;
int4store(header + EVENT_LEN_OFFSET, len);

if (writer.write(header, LOG_EVENT_HEADER_LEN))
DBUG_RETURN(ER_ERROR_ON_WRITE);

cache->read_pos+= tail;
length-= tail;
carry= 0;

/* next event header at ... */
hdr_offs= len - LOG_EVENT_HEADER_LEN - writer.checksum_len;
}

/* if there is anything to write, process it. */
if (my_b_tell(cache) == cache->end_of_file)
break;

if (likely(length > 0))
{
DBUG_EXECUTE_IF("fail_binlog_write_1",
errno= 28; DBUG_RETURN(ER_ERROR_ON_WRITE););
/*
process all event-headers in this (partial) cache.
if next header is beyond current read-buffer,
we'll get it later (though not necessarily in the
very next iteration, just "eventually").
*/
DBUG_EXECUTE_IF("fail_binlog_write_1",
{
errno= 28;
goto error_in_write;
});

if (hdr_offs >= length)
if (my_b_read(cache, header_buf, LOG_EVENT_HEADER_LEN))
goto error_in_read;

/* Adjust the length and end_log_pos appropriately. */
uint ev_len= uint4korr(&header_buf[EVENT_LEN_OFFSET]); // netto len
DBUG_ASSERT(ev_len >= LOG_EVENT_HEADER_LEN);
if (unlikely(ev_len < LOG_EVENT_HEADER_LEN))
goto error_in_read;
int4store(&header_buf[EVENT_LEN_OFFSET], ev_len + checksum_len);
end_log_pos_inc += checksum_len;
size_t val= uint4korr(&header_buf[LOG_POS_OFFSET]) + group + end_log_pos_inc;
int4store(&header_buf[LOG_POS_OFFSET], val);

/* Write the header to the binlog. */
if (writer.write_header(header_buf, LOG_EVENT_HEADER_LEN))
goto error_in_write;
ev_len-= LOG_EVENT_HEADER_LEN;

/* Write the rest of the event. */
size_t length= my_b_bytes_in_cache(cache);
while (ev_len > 0)
{
if (length == 0)
{
if (writer.write(cache->read_pos, length))
DBUG_RETURN(ER_ERROR_ON_WRITE);
if (!(length= my_b_fill(cache)))
goto error_in_read;
}
uint chunk= MY_MIN(ev_len, (uint)length);
if (writer.write_data(cache->read_pos, chunk))
goto error_in_write;
cache->read_pos+= chunk;
length-= chunk;
ev_len-= chunk;
}

while (hdr_offs < length)
{
/*
finish off with remains of the last event that crawls
from previous into the current buffer
*/
if (writer.remains != 0)
{
if (writer.write(cache->read_pos, hdr_offs))
DBUG_RETURN(ER_ERROR_ON_WRITE);
}

/*
partial header only? save what we can get, process once
we get the rest.
*/
if (hdr_offs + LOG_EVENT_HEADER_LEN > length)
{
carry= length - hdr_offs;
memcpy(header, (char *)cache->read_pos + hdr_offs, carry);
length= hdr_offs;
}
else
{
/* we've got a full event-header, and it came in one piece */
uchar *ev= (uchar *)cache->read_pos + hdr_offs;
uint ev_len= uint4korr(ev + EVENT_LEN_OFFSET); // netto len
uchar *log_pos= ev + LOG_POS_OFFSET;

end_log_pos_inc += writer.checksum_len;
/* fix end_log_pos */
val= uint4korr(log_pos) + group + end_log_pos_inc;
int4store(log_pos, val);

/* fix length */
int4store(ev + EVENT_LEN_OFFSET, ev_len + writer.checksum_len);

writer.remains= ev_len;
if (writer.write(ev, MY_MIN(ev_len, length - hdr_offs)))
DBUG_RETURN(ER_ERROR_ON_WRITE);

/* next event header at ... */
hdr_offs += ev_len; // incr by the netto len
if (writer.write_footer())
goto error_in_write;

DBUG_ASSERT(!writer.checksum_len || writer.remains == 0 || hdr_offs >= length);
}
}
}
goto end; // All OK

/*
Adjust hdr_offs. Note that it may still point beyond the segment
read in the next iteration; if the current event is very long,
it may take a couple of read-iterations (and subsequent adjustments
of hdr_offs) for it to point into the then-current segment.
If we have a split header (!carry), hdr_offs will be set at the
beginning of the next iteration, overwriting the value we set here:
*/
hdr_offs -= length;
}
} while ((length= my_b_fill(cache)));
error_in_write:
err= ER_ERROR_ON_WRITE;
goto end;

DBUG_ASSERT(carry == 0);
DBUG_ASSERT(!writer.checksum_len || writer.remains == 0);
error_in_read:
err= ER_ERROR_ON_READ;
goto end;

DBUG_RETURN(0); // All OK
end:
status_var_add(thd->status_var.binlog_bytes_written, writer.bytes_written);
DBUG_RETURN(err);
}

/*
Expand Down

0 comments on commit 24c923d

Please sign in to comment.