Skip to content

Commit

Permalink
Fix timing on queues, this could clearly lead to starvation.
Browse files Browse the repository at this point in the history
  • Loading branch information
Jan Lindström committed Feb 14, 2014
1 parent 9c61466 commit cae21c5
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 10 deletions.
11 changes: 6 additions & 5 deletions storage/innobase/buf/buf0mtflu.cc
Expand Up @@ -51,6 +51,8 @@ Modified 06/02/2014 Jan Lindström jan.lindstrom@skysql.com
#include "fil0pagecompress.h"

#define MT_COMP_WATER_MARK 50
/** Time to wait for a message. */
#define MT_WAIT_IN_USECS 5000000

/* Work item status */
typedef enum wrk_status {
Expand Down Expand Up @@ -272,10 +274,9 @@ mtflush_service_io(
{
wrk_t *work_item = NULL;
ulint n_flushed=0;
ib_time_t max_wait_usecs = 50000;

mtflush_io->wt_status = WTHR_SIG_WAITING;
work_item = (wrk_t *)ib_wqueue_timedwait(mtflush_io->wq, max_wait_usecs);
work_item = (wrk_t *)ib_wqueue_timedwait(mtflush_io->wq, MT_WAIT_IN_USECS);

if (work_item) {
mtflush_io->wt_status = WTHR_RUNNING;
Expand Down Expand Up @@ -411,7 +412,7 @@ buf_mtflu_io_thread_exit(void)
/* Wait until all work items on a work queue are processed */
while(!ib_wqueue_is_empty(mtflush_io->wq)) {
/* Wait */
os_thread_sleep(500000);
os_thread_sleep(MT_WAIT_IN_USECS * 2);
}

ut_a(ib_wqueue_is_empty(mtflush_io->wq));
Expand All @@ -420,7 +421,7 @@ buf_mtflu_io_thread_exit(void)
for (i=0; i < srv_mtflush_threads;) {
wrk_t* work_item;

work_item = (wrk_t *)ib_wqueue_timedwait(mtflush_io->wr_cq, 50000);
work_item = (wrk_t *)ib_wqueue_timedwait(mtflush_io->wr_cq, MT_WAIT_IN_USECS);

/* If we receive reply to work item and it's status is exit,
thead has processed this message and existed */
Expand Down Expand Up @@ -550,7 +551,7 @@ buf_mtflu_flush_work_items(

/* wait on the completion to arrive */
for(i=0; i< buf_pool_inst;) {
done_wi = (wrk_t *)ib_wqueue_timedwait(mtflush_ctx->wr_cq, 50000);
done_wi = (wrk_t *)ib_wqueue_timedwait(mtflush_ctx->wr_cq, MT_WAIT_IN_USECS);

if (done_wi != NULL) {
if(done_wi->n_flushed == 0) {
Expand Down
11 changes: 6 additions & 5 deletions storage/xtradb/buf/buf0mtflu.cc
Expand Up @@ -51,6 +51,8 @@ Modified 06/02/2014 Jan Lindström jan.lindstrom@skysql.com
#include "fil0pagecompress.h"

#define MT_COMP_WATER_MARK 50
/** Time to wait for a message. */
#define MT_WAIT_IN_USECS 5000000

/* Work item status */
typedef enum wrk_status {
Expand Down Expand Up @@ -278,10 +280,9 @@ mtflush_service_io(
{
wrk_t *work_item = NULL;
ulint n_flushed=0;
ib_time_t max_wait_usecs = 50000;

mtflush_io->wt_status = WTHR_SIG_WAITING;
work_item = (wrk_t *)ib_wqueue_timedwait(mtflush_io->wq, max_wait_usecs);
work_item = (wrk_t *)ib_wqueue_timedwait(mtflush_io->wq, MT_WAIT_IN_USECS);

if (work_item) {
mtflush_io->wt_status = WTHR_RUNNING;
Expand Down Expand Up @@ -417,7 +418,7 @@ buf_mtflu_io_thread_exit(void)
/* Wait until all work items on a work queue are processed */
while(!ib_wqueue_is_empty(mtflush_io->wq)) {
/* Wait */
os_thread_sleep(500000);
os_thread_sleep(MT_WAIT_IN_USECS * 2);
}

ut_a(ib_wqueue_is_empty(mtflush_io->wq));
Expand All @@ -426,7 +427,7 @@ buf_mtflu_io_thread_exit(void)
for (i=0; i < srv_mtflush_threads;) {
wrk_t* work_item;

work_item = (wrk_t *)ib_wqueue_timedwait(mtflush_io->wr_cq, 50000);
work_item = (wrk_t *)ib_wqueue_timedwait(mtflush_io->wr_cq, MT_WAIT_IN_USECS);

/* If we receive reply to work item and it's status is exit,
thead has processed this message and existed */
Expand Down Expand Up @@ -556,7 +557,7 @@ buf_mtflu_flush_work_items(

/* wait on the completion to arrive */
for(i=0; i< buf_pool_inst;) {
done_wi = (wrk_t *)ib_wqueue_timedwait(mtflush_ctx->wr_cq, 50000);
done_wi = (wrk_t *)ib_wqueue_timedwait(mtflush_ctx->wr_cq, MT_WAIT_IN_USECS);

if (done_wi != NULL) {
if(done_wi->n_flushed == 0) {
Expand Down

0 comments on commit cae21c5

Please sign in to comment.