Skip to content
Permalink
Browse files
MDEV-16264 fixup: Clean up asynchronous I/O
os_aio_userdata_t: Remove. It was basically duplicating IORequest.

buf_page_write_complete(): Take only IORequest as a parameter.

os_aio_func(), pfs_os_aio_func(): Replaced with os_aio() that has
no redundant parameters. There is only one caller, so there is no
point to pass __FILE__, __LINE__ as a parameter.
  • Loading branch information
dr-m committed Oct 26, 2020
1 parent 118e258 commit 8cb01c5
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 259 deletions.
@@ -302,16 +302,28 @@ buf_flush_relocate_on_flush_list(
}

/** Complete write of a file page from buf_pool.
@param bpage written page
@param request write request
@param dblwr whether the doublewrite buffer was used */
void buf_page_write_complete(buf_page_t *bpage, const IORequest &request,
bool dblwr)
@param request write request */
void buf_page_write_complete(const IORequest &request)
{
ut_ad(request.is_write());
ut_ad(!srv_read_only_mode/* ||
request.node->space->purpose == FIL_TYPE_TEMPORARY*/);
buf_page_t *bpage= request.bpage;
ut_ad(bpage);
ut_ad(bpage->in_file());
ut_ad(bpage->io_fix() == BUF_IO_WRITE);
ut_ad(!buf_dblwr.is_inside(bpage->id()));
bool dblwr;
if (bpage->status == buf_page_t::INIT_ON_FLUSH)
{
bpage->status= buf_page_t::NORMAL;
dblwr= false;
}
else
{
ut_ad(bpage->status == buf_page_t::NORMAL);
dblwr= request.node->space->use_doublewrite();
}

/* We do not need protect io_fix here by mutex to read it because
this and buf_page_read_complete() are the only functions where we can
@@ -3423,11 +3423,8 @@ fil_io_t fil_space_t::io(const IORequest &type, os_offset_t offset, size_t len,
goto release_sync_write;
} else {
/* Queue the aio request */
err = os_aio(
IORequest(type, node),
node->name, node->handle, buf, offset, len,
purpose != FIL_TYPE_TEMPORARY && srv_read_only_mode,
node, bpage);
err = os_aio(IORequest(bpage, node, type.type),
buf, offset, len);
}

/* We an try to recover the page from the double write buffer if
@@ -3452,62 +3449,44 @@ fil_io_t fil_space_t::io(const IORequest &type, os_offset_t offset, size_t len,
#include <tpool.h>

/** Callback for AIO completion */
void fil_aio_callback(os_aio_userdata_t *data)
void fil_aio_callback(const IORequest &request)
{
ut_ad(fil_validate_skip());
ut_ad(request.node);

fil_node_t *node= data->node;

if (UNIV_UNLIKELY(!node))
if (!request.bpage)
{
ut_ad(srv_shutdown_state == SRV_SHUTDOWN_EXIT_THREADS);
return;
}

buf_page_t *bpage= static_cast<buf_page_t*>(data->message);
if (!bpage)
{
/* Asynchronous single page writes from the doublewrite buffer,
or calls from buf_flush_freed_page() don't have access to the page. */
ut_ad(data->type.is_write());
ut_ad(!srv_read_only_mode);
ut_ad(request.type == IORequest::WRITE_ASYNC);
write_completed:
node->complete_write();
request.node->complete_write();
}
else if (data->type.is_write())
else if (request.is_write())
{
ut_ad(!srv_read_only_mode || node->space->purpose == FIL_TYPE_TEMPORARY);
bool dblwr= node->space->use_doublewrite();
if (dblwr && bpage->status == buf_page_t::INIT_ON_FLUSH)
{
bpage->status= buf_page_t::NORMAL;
dblwr= false;
}
buf_page_write_complete(bpage, data->type, dblwr);
buf_page_write_complete(request);
goto write_completed;
}
else
{
ut_ad(data->type.is_read());
ut_ad(request.is_read());

/* IMPORTANT: since i/o handling for reads will read also the insert
buffer in fil_system.sys_space, we have to be very careful not to
introduce deadlocks. We never close the system tablespace (0) data
files via fil_system.LRU and we never issue asynchronous reads of
change buffer pages. */
const page_id_t id(bpage->id());
introduce deadlocks. We never close fil_system.sys_space data
files and never issue asynchronous reads of change buffer pages. */
const page_id_t id(request.bpage->id());

if (dberr_t err= buf_page_read_complete(bpage, *node))
if (dberr_t err= buf_page_read_complete(request.bpage, *request.node))
{
if (recv_recovery_is_on() && !srv_force_recovery)
recv_sys.found_corrupt_fs= true;

ib::error() << "Failed to read page " << id.page_no()
<< " from file '" << node->name << "': " << err;
<< " from file '" << request.node->name << "': " << err;
}
}

node->space->release();
request.node->space->release();
}

/** Flush to disk the writes in file spaces of the given type
@@ -72,11 +72,8 @@ buf_flush_relocate_on_flush_list(
buf_page_t* dpage); /*!< in/out: destination block */

/** Complete write of a file page from buf_pool.
@param bpage written page
@param request write request
@param dblwr whether the doublewrite buffer was used */
void buf_page_write_complete(buf_page_t *bpage, const IORequest &request,
bool dblwr);
@param request write request */
void buf_page_write_complete(const IORequest &request);

/** Assign the full crc32 checksum for non-compressed page.
@param[in,out] page page to be updated */
@@ -208,12 +208,12 @@ class IORequest
PUNCH_RANGE= WRITE_SYNC | 128,
};

constexpr IORequest(buf_page_t *bpage, fil_node_t *node, Type type) :
bpage(bpage), node(node), type(type) {}

constexpr IORequest(Type type= READ_SYNC, buf_page_t *bpage= nullptr) :
bpage(bpage), type(type) {}

constexpr IORequest(const IORequest &old, fil_node_t *node= nullptr) :
bpage(old.bpage), node(node), type(old.type) {}

bool is_read() const { return (type & READ_SYNC) != 0; }
bool is_write() const { return (type & WRITE_SYNC) != 0; }
bool is_LRU() const { return (type & (WRITE_LRU ^ WRITE_ASYNC)) != 0; }
@@ -243,7 +243,7 @@ class IORequest
buf_page_t* const bpage= nullptr;

/** File descriptor */
const fil_node_t *const node= nullptr;
fil_node_t *const node= nullptr;

/** Request type bit flags */
const Type type;
@@ -608,12 +608,6 @@ The wrapper functions have the prefix of "innodb_". */
# define os_file_close(file) \
pfs_os_file_close_func(file, __FILE__, __LINE__)

# define os_aio(type, name, file, buf, offset, \
n, read_only, message1, message2) \
pfs_os_aio_func(type, name, file, buf, offset, \
n, read_only, message1, message2, \
__FILE__, __LINE__)

# define os_file_read(type, file, buf, offset, n) \
pfs_os_file_read_func(type, file, buf, offset, n, __FILE__, __LINE__)

@@ -793,42 +787,6 @@ pfs_os_file_read_no_error_handling_func(
const char* src_file,
uint src_line);

/** NOTE! Please use the corresponding macro os_aio(), not directly this
function!
Performance schema wrapper function of os_aio() which requests
an asynchronous I/O operation.
@param[in,out] type IO request context
@param[in] name Name of the file or path as NUL terminated
string
@param[in] file Open file handle
@param[out] buf buffer where to read
@param[in] offset file offset where to read
@param[in] n number of bytes to read
@param[in] read_only if true read only mode checks are enforced
@param[in,out] m1 Message for the AIO handler, (can be used to
identify a completed AIO operation); ignored
if mode is OS_AIO_SYNC
@param[in,out] m2 message for the AIO handler (can be used to
identify a completed AIO operation); ignored
if mode is OS_AIO_SYNC
@param[in] src_file file name where func invoked
@param[in] src_line line where the func invoked
@return DB_SUCCESS if request was queued successfully, FALSE if fail */
UNIV_INLINE
dberr_t
pfs_os_aio_func(
const IORequest&type,
const char* name,
pfs_os_file_t file,
void* buf,
os_offset_t offset,
ulint n,
bool read_only,
fil_node_t* m1,
void* m2,
const char* src_file,
uint src_line);

/** NOTE! Please use the corresponding macro os_file_write(), not directly
this function!
This is the performance schema instrumented wrapper function for
@@ -950,11 +908,6 @@ to original un-instrumented file I/O APIs */

# define os_file_close(file) os_file_close_func(file)

# define os_aio(type, name, file, buf, offset, \
n, read_only, message1, message2) \
os_aio_func(type, name, file, buf, offset, \
n, read_only, message1, message2)

# define os_file_read(type, file, buf, offset, n) \
os_file_read_func(type, file, buf, offset, n)

@@ -1202,48 +1155,14 @@ os_aio_init(
Frees the asynchronous io system. */
void os_aio_free();

struct os_aio_userdata_t
{
fil_node_t* node;
IORequest type;
void* message;

os_aio_userdata_t(fil_node_t*node, IORequest type, void*message) :
node(node), type(type), message(message) {}

/** Construct from tpool::aiocb::m_userdata[] */
os_aio_userdata_t(const char *buf) { memcpy((void*)this, buf, sizeof*this); }
};
/**
NOTE! Use the corresponding macro os_aio(), not directly this function!
Requests an asynchronous i/o operation.
@param[in,out] type IO request context
@param[in] name Name of the file or path as NUL terminated
string
@param[in] file Open file handle
@param[out] buf buffer where to read
@param[in] offset file offset where to read
@param[in] n number of bytes to read
@param[in] read_only if true read only mode checks are enforced
@param[in,out] m1 Message for the AIO handler, (can be used to
identify a completed AIO operation); ignored
if mode is OS_AIO_SYNC
@param[in,out] m2 message for the AIO handler (can be used to
identify a completed AIO operation); ignored
if mode is OS_AIO_SYNC
@return DB_SUCCESS or error code */
dberr_t
os_aio_func(
const IORequest&type,
const char* name,
pfs_os_file_t file,
void* buf,
os_offset_t offset,
ulint n,
bool read_only,
fil_node_t* m1,
void* m2);

/** Request a read or write.
@param type I/O request
@param buf buffer
@param offset file offset
@param n number of bytes
@retval DB_SUCCESS if request was queued successfully
@retval DB_IO_ERROR on I/O error */
dberr_t os_aio(const IORequest &type, void *buf, os_offset_t offset, size_t n);

/** Waits until there are no pending writes in os_aio_write_array. There can
be other, synchronous, pending writes. */
@@ -1,7 +1,7 @@
/*****************************************************************************

Copyright (c) 2010, 2017, Oracle and/or its affiliates. All Rights Reserved.
Copyright (c) 2013, 2019, MariaDB Corporation.
Copyright (c) 2013, 2020, MariaDB Corporation.

This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
@@ -201,59 +201,6 @@ pfs_os_file_close_func(
return(result);
}

/** NOTE! Please use the corresponding macro os_aio(), not directly this
function!
Performance schema wrapper function of os_aio() which requests
an asynchronous i/o operation.
@param[in,type] type IO request context
@param[in] name Name of the file or path as NUL terminated
string
@param[in] file Open file handle
@param[out] buf buffer where to read
@param[in] offset file offset where to read
@param[in] n number of bytes to read
@param[in] read_only if true read only mode checks are enforced
@param[in,out] m1 Message for the AIO handler, (can be used to
identify a completed AIO operation); ignored
if mode is OS_AIO_SYNC
@param[in,out] m2 message for the AIO handler (can be used to
identify a completed AIO operation); ignored
if mode is OS_AIO_SYNC
@param[in] src_file file name where func invoked
@param[in] src_line line where the func invoked
@return DB_SUCCESS if request was queued successfully, FALSE if fail */
UNIV_INLINE
dberr_t
pfs_os_aio_func(
const IORequest&type,
const char* name,
pfs_os_file_t file,
void* buf,
os_offset_t offset,
ulint n,
bool read_only,
fil_node_t* m1,
void* m2,
const char* src_file,
uint src_line)
{
PSI_file_locker_state state;
struct PSI_file_locker* locker = NULL;

/* Register the read or write I/O depending on "type" */
register_pfs_file_io_begin(
&state, locker, file, n,
type.is_write() ? PSI_FILE_WRITE : PSI_FILE_READ,
src_file, src_line);

dberr_t result = os_aio_func(
type, name, file, buf, offset, n, read_only, m1, m2);

register_pfs_file_io_end(locker, n);

return(result);
}

/** NOTE! Please use the corresponding macro os_file_read(), not directly
this function!
This is the performance schema instrumented wrapper function for

0 comments on commit 8cb01c5

Please sign in to comment.