-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
MDEV-32216 Connection pool with asynchronous query execution.
Parallelism is achieved by using mysql_send_query on multiple connections without waiting for results, and using IO multiplexing (poll/IOCP) to wait for completions. Refresh libmariadb to pick up CONC-676 (fixes for IOCP use with named pipe)
- Loading branch information
Showing
3 changed files
with
390 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,264 @@ | ||
/* | ||
Copyright (c) 2023, MariaDB. | ||
This program is free software; you can redistribute it and/or modify | ||
it under the terms of the GNU General Public License as published by | ||
the Free Software Foundation; version 2 of the License. | ||
This program is distributed in the hope that it will be useful, | ||
but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
GNU General Public License for more details. | ||
You should have received a copy of the GNU General Public License | ||
along with this program; if not, write to the Free Software | ||
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 | ||
USA | ||
*/ | ||
|
||
/* | ||
Connection pool, with parallel query execution | ||
Does not use threads, but IO multiplexing via mysql_send_query and | ||
poll/iocp to wait for completions | ||
*/ | ||
#include <my_global.h> | ||
#ifdef _WIN32 | ||
#include <winsock2.h> | ||
#else | ||
#include <poll.h> | ||
#endif | ||
#include "connection_pool.h" | ||
#include <my_compiler.h> | ||
|
||
namespace async_pool | ||
{ | ||
static ATTRIBUTE_NORETURN void die(const char *format, ...) | ||
{ | ||
va_list args; | ||
va_start(args, format); | ||
vfprintf(stderr, format, args); | ||
va_end(args); | ||
abort(); | ||
} | ||
|
||
pooled_connection *connection_pool::get_connection() | ||
{ | ||
while (free_connections.empty()) | ||
wait_for_completions(); | ||
auto c= free_connections.front(); | ||
free_connections.pop(); | ||
return c; | ||
} | ||
|
||
#ifdef _WIN32 | ||
void connection_pool::add_to_pollset(pooled_connection *c) | ||
{ | ||
DWORD err= ERROR_SUCCESS; | ||
static char ch; | ||
WSABUF buf; | ||
buf.len= 0; | ||
buf.buf= &ch; | ||
if (!c->is_pipe) | ||
{ | ||
/* Do async io (sockets). */ | ||
DWORD flags= 0; | ||
if (WSARecv((SOCKET) c->handle, &buf, 1, 0, &flags, c, NULL)) | ||
err= WSAGetLastError(); | ||
} | ||
else | ||
{ | ||
/* Do async read (named pipe) */ | ||
if (!ReadFile(c->handle, buf.buf, buf.len, 0, c)) | ||
err= GetLastError(); | ||
} | ||
|
||
if (err && err != ERROR_IO_PENDING) | ||
die("%s failed: %d\n", c->is_pipe ? "ReadFile" : "WSARecv", err); | ||
} | ||
|
||
/* | ||
Wait for completions of queries.Uses IOCP on windows to wait for completions. | ||
(ReadFile/WSARecv with 0 bytes serves as readiness notification) | ||
*/ | ||
void connection_pool::wait_for_completions() | ||
{ | ||
ULONG n; | ||
OVERLAPPED_ENTRY events[32]; | ||
if (!GetQueuedCompletionStatusEx(iocp, events, array_elements(events), &n, INFINITE, | ||
FALSE)) | ||
{ | ||
die("GetQueuedCompletionStatusEx failed: %d\n", GetLastError()); | ||
} | ||
|
||
for (ULONG i= 0; i < n; i++) | ||
{ | ||
auto c= (pooled_connection *) events[i].lpOverlapped; | ||
if (!c) | ||
die("GetQueuedCompletionStatusEx unexpected return"); | ||
DBUG_ASSERT(c->mysql); | ||
DBUG_ASSERT(!events[i].lpCompletionKey); | ||
DBUG_ASSERT(!events[i].dwNumberOfBytesTransferred); | ||
complete_query(c); | ||
} | ||
} | ||
#else /* !_WIN32 */ | ||
void connection_pool::add_to_pollset(pooled_connection *c) | ||
{ | ||
size_t idx= c - &all_connections[0]; | ||
pollfd *pfd= &pollset[idx]; | ||
pfd->fd= c->fd; | ||
pfd->events= POLLIN; | ||
pfd->revents= 0; | ||
} | ||
|
||
/* something Linux-ish, can be returned for POLLIN event*/ | ||
#ifndef POLLRDHUP | ||
#define POLLRDHUP 0 | ||
#endif | ||
|
||
void connection_pool::wait_for_completions() | ||
{ | ||
int n; | ||
while ((n= poll(pollset.data(), pollset.size(), -1)) <= 0) | ||
{ | ||
if (errno == EINTR) | ||
continue; | ||
die("poll failed: %d\n", errno); | ||
} | ||
|
||
for (uint i= 0; n > 0 && i < pollset.size(); i++) | ||
{ | ||
pollfd* pfd= &pollset[i]; | ||
if (pfd->revents & | ||
(POLLIN | POLLPRI | POLLHUP | POLLRDHUP| POLLERR | POLLNVAL)) | ||
{ | ||
pfd->events= 0; | ||
pfd->revents= 0; | ||
pfd->fd= -1; | ||
complete_query(&all_connections[i]); | ||
n--; | ||
} | ||
} | ||
if (n) | ||
die("poll() failed to find free connection: %d\n"); | ||
} | ||
#endif | ||
|
||
void connection_pool::complete_query(pooled_connection *c) | ||
{ | ||
int err= mysql_read_query_result(c->mysql); | ||
if (c->on_completion) | ||
c->on_completion(c->mysql, c->query.c_str(), !err, c->context); | ||
if (c->release_connection) | ||
{ | ||
c->in_use= false; | ||
free_connections.push(c); | ||
} | ||
} | ||
|
||
connection_pool::~connection_pool() | ||
{ | ||
close(); | ||
} | ||
|
||
void connection_pool::init(MYSQL *con[], size_t n) | ||
{ | ||
#ifdef _WIN32 | ||
iocp= CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); | ||
if (!iocp) | ||
die("CreateIoCompletionPort failed: %d\n", GetLastError()); | ||
#else | ||
pollset.resize(n); | ||
for (auto &pfd : pollset) | ||
pfd.fd= -1; | ||
#endif | ||
|
||
|
||
for (size_t i= 0; i < n; i++) | ||
all_connections.emplace_back(con[i]); | ||
|
||
for (auto &con : all_connections) | ||
{ | ||
free_connections.push(&con); | ||
#ifdef _WIN32 | ||
if (!CreateIoCompletionPort(con.handle, iocp, 0, 0)) | ||
die("CreateIoCompletionPort failed: %d\n", GetLastError()); | ||
#endif | ||
} | ||
} | ||
|
||
int connection_pool::execute_async(const char *query, | ||
query_completion_handler on_completion, | ||
void *context, bool release_connecton) | ||
{ | ||
auto c= get_connection(); | ||
c->context= context; | ||
c->on_completion= on_completion; | ||
c->release_connection= release_connecton; | ||
c->query= query; | ||
|
||
int ret= mysql_send_query(c->mysql, query, (unsigned long) c->query.size()); | ||
if (ret) | ||
{ | ||
free_connections.push(c); | ||
return ret; | ||
} | ||
|
||
c->in_use= true; | ||
add_to_pollset(c); | ||
return 0; | ||
} | ||
|
||
/* | ||
Wait until all queries are completed and all | ||
connections are idle. | ||
*/ | ||
void connection_pool::wait_all() | ||
{ | ||
while (free_connections.size() != all_connections.size()) | ||
wait_for_completions(); | ||
} | ||
|
||
void connection_pool::for_each_connection(void(*f)(MYSQL *mysql)) | ||
{ | ||
for (auto &c : all_connections) | ||
f(c.mysql); | ||
} | ||
|
||
int connection_pool::close() | ||
{ | ||
for (auto &c : all_connections) | ||
mysql_close(c.mysql); | ||
|
||
all_connections.clear(); | ||
while (!free_connections.empty()) | ||
free_connections.pop(); | ||
#ifdef _WIN32 | ||
if (iocp) | ||
{ | ||
CloseHandle(iocp); | ||
iocp= nullptr; | ||
} | ||
#endif | ||
return 0; | ||
} | ||
|
||
pooled_connection::pooled_connection(MYSQL *c) | ||
{ | ||
mysql= c; | ||
#ifdef _WIN32 | ||
OVERLAPPED *ov= static_cast<OVERLAPPED *>(this); | ||
memset(ov, 0, sizeof(OVERLAPPED)); | ||
mysql_protocol_type protocol; | ||
if (c->host && !strcmp(c->host, ".")) | ||
protocol= MYSQL_PROTOCOL_PIPE; | ||
else | ||
protocol= (mysql_protocol_type) c->options.protocol; | ||
is_pipe= protocol == MYSQL_PROTOCOL_PIPE; | ||
handle= (HANDLE) mysql_get_socket(c); | ||
#else | ||
fd= mysql_get_socket(c); | ||
#endif | ||
} | ||
|
||
} // namespace async_pool |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
/* | ||
Copyright (c) 2023, MariaDB. | ||
This program is free software; you can redistribute it and/or modify | ||
it under the terms of the GNU General Public License as published by | ||
the Free Software Foundation; version 2 of the License. | ||
This program is distributed in the hope that it will be useful, | ||
but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
GNU General Public License for more details. | ||
You should have received a copy of the GNU General Public License | ||
along with this program; if not, write to the Free Software | ||
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 | ||
USA | ||
*/ | ||
|
||
#pragma once | ||
|
||
#include <mysql.h> | ||
#include <vector> | ||
#include <queue> | ||
#include <string> | ||
#ifdef _WIN32 | ||
#include <windows.h> | ||
#else | ||
#include <poll.h> | ||
#endif | ||
|
||
/* | ||
Implementation of asynchronous mariadb connection pool. | ||
This pool consists of set of MYSQL* connections, created by C API | ||
function. The intention is that all connections have the same state | ||
same server, by the same user etc. | ||
The "asynchronous" means the queries are executed on the server | ||
without waiting for the server reply. The queries are submitted | ||
with mysql_send_query(), and completions are picked by poll/IOCP. | ||
*/ | ||
|
||
namespace async_pool | ||
{ | ||
typedef void (*query_completion_handler)(MYSQL *mysql, const char *query, bool success, void *context); | ||
|
||
struct pooled_connection | ||
#ifdef _WIN32 | ||
: OVERLAPPED | ||
#endif | ||
{ | ||
MYSQL *mysql; | ||
query_completion_handler on_completion=NULL; | ||
void *context=NULL; | ||
std::string query; | ||
bool in_use=false; | ||
bool release_connection=false; | ||
#ifdef _WIN32 | ||
bool is_pipe; | ||
HANDLE handle; | ||
#else | ||
int fd; | ||
#endif | ||
pooled_connection(MYSQL *mysql); | ||
}; | ||
|
||
|
||
struct connection_pool | ||
{ | ||
private: | ||
std::vector<pooled_connection> all_connections; | ||
std::queue<pooled_connection *> free_connections; | ||
pooled_connection *get_connection(); | ||
void wait_for_completions(); | ||
void complete_query(pooled_connection *c); | ||
void add_to_pollset(pooled_connection *c); | ||
|
||
#ifdef _WIN32 | ||
HANDLE iocp=nullptr; | ||
#else | ||
std::vector<pollfd> pollset; | ||
#endif | ||
public: | ||
~connection_pool(); | ||
|
||
/** | ||
Add connections to the connection pool | ||
@param con - connections | ||
@param n_connections - number of connections | ||
*/ | ||
void init(MYSQL *con[], size_t n_connections); | ||
|
||
/** | ||
Send query to the connection pool | ||
Executes query on a connection in the pool, using mysql_send_query | ||
@param query - query string | ||
@param on_completion - callback function to be called on completion | ||
@param context - user context that will be passed to the callback function | ||
@param release_connecton - if true, the connection should be released to the | ||
pool after the query is executed. If you execute another | ||
mysql_send_query() on the same connection, set this to false. | ||
Note: the function will block if there are no free connections in the pool. | ||
@return return code of mysql_send_query | ||
*/ | ||
int execute_async(const char *query, query_completion_handler on_completion, void *context, bool release_connecton=true); | ||
|
||
/** Waits for all outstanding queries to complete.*/ | ||
void wait_all(); | ||
|
||
/** Execute callback for each connection in the pool. */ | ||
void for_each_connection(void (*f)(MYSQL *mysql)); | ||
|
||
/** | ||
Closes all connections in pool and frees all resources. | ||
Does not wait for pending queries to complete | ||
(use wait_all() for that) | ||
*/ | ||
int close(); | ||
}; | ||
|
||
} // namespace async_pool |
Submodule libmariadb
updated
from 458a43 to 75ab6f