Skip to content

Commit

Permalink
MDEV-22990 Threadpool : Optimize network/named pipe IO for Windows
Browse files Browse the repository at this point in the history
This patch reduces the overhead of system calls prior to a query, for
threadpool.  Previously, 3 system calls were done

1. WSARecv() to get notification of input data from client, asynchronous
equivalent of select() in one-thread-per-connection

2. recv(4 bytes) - reading packet header length
3. recv(packet payload)

Now there will be usually, just WSARecv(), which pre-reads user data into
a buffer, so we spared 2 syscalls

Profiler shows the most expensive call WSARecv(16%CPU) becomes 4% CPU,
after the patch, benchmark results (network heavy ones like point-select)
improve by ~20%

The buffer management was rather carefully done to keep
buffers together, as Windows would keeps the pages pinned
in memory for the duration of async calls.
At most 1MB memory is used for the buffers, and overhead per-connection is
only 256 bytes, which should cover most of the uses.

SSL does not yet use the optmization, so far it does not properly use
VIO for reads and writes. Neither one-thread-per-connection would get any
benefit, but that should be fine, it is not even default on Windows.
  • Loading branch information
vaintroub committed Jun 26, 2020
1 parent b3acad4 commit d15c839
Show file tree
Hide file tree
Showing 9 changed files with 449 additions and 169 deletions.
1 change: 1 addition & 0 deletions include/violite.h
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ struct st_vio
#ifdef _WIN32
HANDLE hPipe;
OVERLAPPED overlapped;
void *tp_ctx; /* threadpool context */
#endif
};
#endif /* vio_violite_h_ */
2 changes: 1 addition & 1 deletion sql/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ IF ((CMAKE_SYSTEM_NAME MATCHES "Linux" OR
AND (NOT DISABLE_THREADPOOL))
ADD_DEFINITIONS(-DHAVE_POOL_OF_THREADS)
IF(WIN32)
SET(SQL_SOURCE ${SQL_SOURCE} threadpool_win.cc)
SET(SQL_SOURCE ${SQL_SOURCE} threadpool_win.cc threadpool_winsockets.cc threadpool_winsockets.h)
ENDIF()
SET(SQL_SOURCE ${SQL_SOURCE} threadpool_generic.cc)
SET(SQL_SOURCE ${SQL_SOURCE} threadpool_common.cc)
Expand Down
5 changes: 4 additions & 1 deletion sql/threadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ extern uint threadpool_mode; /* Thread pool implementation , windows or generic
#define DEFAULT_THREADPOOL_STALL_LIMIT 500U

struct TP_connection;
struct st_vio;

extern void tp_callback(TP_connection *c);
extern void tp_timeout_handler(TP_connection *c);

Expand Down Expand Up @@ -115,7 +117,7 @@ struct TP_connection

virtual void wait_begin(int type)= 0;
virtual void wait_end() = 0;

IF_WIN(virtual,) void init_vio(st_vio *){};
};


Expand All @@ -136,6 +138,7 @@ struct TP_pool
};

#ifdef _WIN32

struct TP_pool_win:TP_pool
{
TP_pool_win();
Expand Down
19 changes: 16 additions & 3 deletions sql/threadpool_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
#include "wsrep_trans_observer.h"
#endif /* WITH_WSREP */

#ifdef _WIN32
#include "threadpool_winsockets.h"
#endif

/* Threadpool parameters */

uint threadpool_min_threads;
Expand All @@ -48,7 +52,7 @@ TP_STATISTICS tp_stats;

static void threadpool_remove_connection(THD *thd);
static int threadpool_process_request(THD *thd);
static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data);
static THD* threadpool_add_connection(CONNECT *connect, TP_connection *c);

extern bool do_command(THD*);

Expand Down Expand Up @@ -221,7 +225,7 @@ void tp_callback(TP_connection *c)
}


static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data)
static THD *threadpool_add_connection(CONNECT *connect, TP_connection *c)
{
THD *thd= NULL;

Expand All @@ -243,9 +247,10 @@ static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data)
return NULL;
}
delete connect;

server_threads.insert(thd);
thd->set_mysys_var(mysys_var);
thd->event_scheduler.data= scheduler_data;
thd->event_scheduler.data = c;

/* Login. */
thread_attach(thd);
Expand All @@ -260,6 +265,8 @@ static THD* threadpool_add_connection(CONNECT *connect, void *scheduler_data)
if (thd_prepare_connection(thd))
goto end;

c->init_vio(thd->net.vio);

/*
Check if THD is ok, as prepare_new_connection_state()
can fail, for example if init command failed.
Expand Down Expand Up @@ -397,6 +404,9 @@ static bool tp_init()
pool= 0;
return true;
}
#ifdef _WIN32
init_win_aio_buffers(max_connections);
#endif
return false;
}

Expand Down Expand Up @@ -484,6 +494,9 @@ static void tp_wait_end(THD *thd)
static void tp_end()
{
delete pool;
#ifdef _WIN32
destroy_win_aio_buffers();
#endif
}

static void tp_post_kill_notification(THD *thd)
Expand Down
69 changes: 29 additions & 40 deletions sql/threadpool_generic.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
#include <sql_plist.h>
#include <threadpool.h>
#include <algorithm>

#ifdef HAVE_IOCP
#ifdef _WIN32
#include "threadpool_winsockets.h"
#define OPTIONAL_IO_POLL_READ_PARAM this
#else
#define OPTIONAL_IO_POLL_READ_PARAM 0
Expand Down Expand Up @@ -348,7 +348,7 @@ static void* native_event_get_userdata(native_event *event)
return event->portev_user;
}

#elif defined(HAVE_IOCP)
#elif defined(_WIN32)


static TP_file_handle io_poll_create()
Expand All @@ -359,29 +359,8 @@ static TP_file_handle io_poll_create()

int io_poll_start_read(TP_file_handle pollfd, TP_file_handle fd, void *, void *opt)
{
static char c;
TP_connection_generic *con= (TP_connection_generic *)opt;
OVERLAPPED *overlapped= &con->overlapped;
if (con->vio_type == VIO_TYPE_NAMEDPIPE)
{
if (ReadFile(fd, &c, 0, NULL, overlapped))
return 0;
}
else
{
WSABUF buf;
buf.buf= &c;
buf.len= 0;
DWORD flags=0;

if (WSARecv((SOCKET)fd, &buf, 1,NULL, &flags,overlapped, NULL) == 0)
return 0;
}

if (GetLastError() == ERROR_IO_PENDING)
return 0;

return 1;
auto c= (TP_connection_generic *) opt;
return (int) c->win_sock.begin_read();
}


Expand All @@ -401,20 +380,33 @@ int io_poll_disassociate_fd(TP_file_handle pollfd, TP_file_handle fd)
}


int io_poll_wait(TP_file_handle pollfd, native_event *events, int maxevents, int timeout_ms)
static void *native_event_get_userdata(native_event *event)
{
ULONG n;
BOOL ok = GetQueuedCompletionStatusEx(pollfd, events,
maxevents, &n, timeout_ms, FALSE);

return ok ? (int)n : -1;
return (void *) event->lpCompletionKey;
}


static void* native_event_get_userdata(native_event *event)
int io_poll_wait(TP_file_handle pollfd, native_event *events, int maxevents,
int timeout_ms)
{
return (void *)event->lpCompletionKey;
ULONG n;
if (!GetQueuedCompletionStatusEx(pollfd, events, maxevents, &n, timeout_ms, FALSE))
return -1;

/* Update win_sock with number of bytes read.*/
for (ULONG i= 0; i < n; i++)
{
auto ev= &events[i];
auto c= (TP_connection_generic *) native_event_get_userdata(ev);
/* null userdata zero means shutdown (see PostQueuedCompletionStatus() usage*/
if (c)
{
c->win_sock.end_read(ev->dwNumberOfBytesTransferred, 0);
}
}

return (int) n;
}

#endif


Expand Down Expand Up @@ -969,7 +961,7 @@ void thread_group_destroy(thread_group_t *thread_group)
io_poll_close(thread_group->pollfd);
thread_group->pollfd= INVALID_HANDLE_VALUE;
}
#ifndef HAVE_IOCP
#ifndef _WIN32
for(int i=0; i < 2; i++)
{
if(thread_group->shutdown_pipe[i] != -1)
Expand Down Expand Up @@ -1013,7 +1005,7 @@ static int wake_thread(thread_group_t *thread_group,bool due_to_stall)
*/
static int wake_listener(thread_group_t *thread_group)
{
#ifndef HAVE_IOCP
#ifndef _WIN32
if (pipe(thread_group->shutdown_pipe))
{
return -1;
Expand Down Expand Up @@ -1354,9 +1346,6 @@ TP_connection_generic::TP_connection_generic(CONNECT *c):
abs_wait_timeout(ULONGLONG_MAX),
bound_to_poll_descriptor(false),
waiting(false)
#ifdef HAVE_IOCP
, overlapped()
#endif
{
DBUG_ASSERT(c->vio_type != VIO_CLOSED);

Expand Down
20 changes: 11 additions & 9 deletions sql/threadpool_generic.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#ifdef _WIN32
#include <windows.h>
#include "threadpool_winsockets.h"
/* AIX may define this, too ?*/
#define HAVE_IOCP
#endif
Expand Down Expand Up @@ -75,11 +76,11 @@ struct TP_connection_generic :public TP_connection
TP_connection_generic(CONNECT* c);
~TP_connection_generic();

virtual int init() { return 0; };
virtual void set_io_timeout(int sec);
virtual int start_io();
virtual void wait_begin(int type);
virtual void wait_end();
int init() override { return 0; }
void set_io_timeout(int sec) override;
int start_io() override;
void wait_begin(int type) override;
void wait_end() override;

thread_group_t* thread_group;
TP_connection_generic* next_in_queue;
Expand All @@ -89,12 +90,13 @@ struct TP_connection_generic :public TP_connection
TP_file_handle fd;
bool bound_to_poll_descriptor;
int waiting;
#ifdef HAVE_IOCP
OVERLAPPED overlapped;
#endif

#ifdef _WIN32
enum_vio_type vio_type;
win_aiosocket win_sock{};
void init_vio(st_vio *vio) override
{ win_sock.init(vio);}
#endif

};


Expand Down
Loading

0 comments on commit d15c839

Please sign in to comment.