diff --git a/frameworks/C/h2o/src/database.c b/frameworks/C/h2o/src/database.c index f86d1080270..3cdc45913b6 100644 --- a/frameworks/C/h2o/src/database.c +++ b/frameworks/C/h2o/src/database.c @@ -672,7 +672,7 @@ int execute_database_query(db_conn_pool_t *pool, db_query_param_t *param) msg->arg = pool; msg->super.type = TASK; msg->task = on_process_queries; - send_message(&msg->super, pool->receiver); + send_local_message(&msg->super, pool->local_messages); pool->process_queries = true; } @@ -727,16 +727,16 @@ void initialize_database_connection_pool(const char *conninfo, const struct config_t *config, const list_t *prepared_statements, h2o_loop_t *loop, - h2o_multithread_receiver_t *receiver, + h2o_linklist_t *local_messages, db_conn_pool_t *pool) { memset(pool, 0, sizeof(*pool)); pool->config = config; pool->conninfo = conninfo ? conninfo : ""; + pool->local_messages = local_messages; pool->loop = loop; pool->prepared_statements = prepared_statements; pool->queries.tail = &pool->queries.head; - pool->receiver = receiver; pool->conn_num = config->max_db_conn_num; pool->query_num = config->max_query_num; } diff --git a/frameworks/C/h2o/src/database.h b/frameworks/C/h2o/src/database.h index 52d37ec213e..b35d3805c3f 100644 --- a/frameworks/C/h2o/src/database.h +++ b/frameworks/C/h2o/src/database.h @@ -62,9 +62,9 @@ typedef struct { const struct config_t *config; list_t *conn; const char *conninfo; + h2o_linklist_t *local_messages; h2o_loop_t *loop; const list_t *prepared_statements; - h2o_multithread_receiver_t *receiver; // We use a FIFO queue instead of a simpler stack, otherwise the earlier queries may wait // an unbounded amount of time to be executed. queue_t queries; @@ -80,7 +80,7 @@ void initialize_database_connection_pool(const char *conninfo, const struct config_t *config, const list_t *prepared_statements, h2o_loop_t *loop, - h2o_multithread_receiver_t *receiver, + h2o_linklist_t *local_messages, db_conn_pool_t *pool); void remove_prepared_statements(list_t *prepared_statements); diff --git a/frameworks/C/h2o/src/event_loop.c b/frameworks/C/h2o/src/event_loop.c index 1bb2284b7dd..d5e138348de 100644 --- a/frameworks/C/h2o/src/event_loop.c +++ b/frameworks/C/h2o/src/event_loop.c @@ -277,8 +277,11 @@ static void start_accept_polling(const config_t *config, void event_loop(struct thread_context_t *ctx) { - while (!ctx->shutdown || ctx->event_loop.conn_num) + while (!ctx->shutdown || ctx->event_loop.conn_num) { h2o_evloop_run(ctx->event_loop.h2o_ctx.loop, INT32_MAX); + process_messages(&ctx->global_thread_data->h2o_receiver, + &ctx->event_loop.local_messages); + } } void free_event_loop(event_loop_t *event_loop, h2o_multithread_receiver_t *h2o_receiver) @@ -309,6 +312,7 @@ void initialize_event_loop(bool is_main_thread, h2o_context_init(&loop->h2o_ctx, h2o_evloop_create(), &global_data->h2o_config); loop->h2o_accept_ctx.ctx = &loop->h2o_ctx; loop->h2o_accept_ctx.hosts = global_data->h2o_config.hosts; + h2o_linklist_init_anchor(&loop->local_messages); if (global_data->ssl_ctx) { loop->h2o_accept_ctx.ssl_ctx = global_data->ssl_ctx; @@ -332,6 +336,11 @@ void initialize_event_loop(bool is_main_thread, } } +void send_local_message(message_t *msg, h2o_linklist_t *local_messages) +{ + h2o_linklist_insert(local_messages, &msg->super.link); +} + void send_message(message_t *msg, h2o_multithread_receiver_t *h2o_receiver) { h2o_multithread_send_message(h2o_receiver, &msg->super); diff --git a/frameworks/C/h2o/src/event_loop.h b/frameworks/C/h2o/src/event_loop.h index fe8dcabc375..cff091154c7 100644 --- a/frameworks/C/h2o/src/event_loop.h +++ b/frameworks/C/h2o/src/event_loop.h @@ -40,6 +40,7 @@ typedef struct { size_t conn_num; h2o_accept_ctx_t h2o_accept_ctx; h2o_context_t h2o_ctx; + h2o_linklist_t local_messages; } event_loop_t; typedef struct { @@ -59,6 +60,7 @@ void initialize_event_loop(bool is_main_thread, global_data_t *global_data, h2o_multithread_receiver_t *h2o_receiver, event_loop_t *loop); +void send_local_message(message_t *msg, h2o_linklist_t *local_messages); void send_message(message_t *msg, h2o_multithread_receiver_t *h2o_receiver); #endif // EVENT_LOOP_H_ diff --git a/frameworks/C/h2o/src/handlers/world.c b/frameworks/C/h2o/src/handlers/world.c index d1e561aa111..da4188ca3eb 100644 --- a/frameworks/C/h2o/src/handlers/world.c +++ b/frameworks/C/h2o/src/handlers/world.c @@ -857,7 +857,7 @@ void initialize_world_handler_thread_data(thread_context_t *ctx, ctx->global_thread_data->config, data->prepared_statements, ctx->event_loop.h2o_ctx.loop, - &ctx->global_thread_data->h2o_receiver, + &ctx->event_loop.local_messages, &thread_data->hello_world_db); } diff --git a/frameworks/C/h2o/src/thread.c b/frameworks/C/h2o/src/thread.c index d2bc6e86ba3..58d120f2c9c 100644 --- a/frameworks/C/h2o/src/thread.c +++ b/frameworks/C/h2o/src/thread.c @@ -170,11 +170,10 @@ void start_threads(global_thread_data_t *global_thread_data) CHECK_ERROR(pthread_attr_setaffinity_np, &attr, cpusetsize, cpuset); } - CHECK_ERROR(pthread_create, - &global_thread_data[i].thread, - &attr, - run_thread, - global_thread_data + i); + h2o_multithread_create_thread(&global_thread_data[i].thread, + &attr, + run_thread, + global_thread_data + i); } pthread_attr_destroy(&attr);