Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions frameworks/C/h2o/src/database.c
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,7 @@ int execute_database_query(db_conn_pool_t *pool, db_query_param_t *param)
msg->arg = pool;
msg->super.type = TASK;
msg->task = on_process_queries;
send_message(&msg->super, pool->receiver);
send_local_message(&msg->super, pool->local_messages);
pool->process_queries = true;
}

Expand Down Expand Up @@ -727,16 +727,16 @@ void initialize_database_connection_pool(const char *conninfo,
const struct config_t *config,
const list_t *prepared_statements,
h2o_loop_t *loop,
h2o_multithread_receiver_t *receiver,
h2o_linklist_t *local_messages,
db_conn_pool_t *pool)
{
memset(pool, 0, sizeof(*pool));
pool->config = config;
pool->conninfo = conninfo ? conninfo : "";
pool->local_messages = local_messages;
pool->loop = loop;
pool->prepared_statements = prepared_statements;
pool->queries.tail = &pool->queries.head;
pool->receiver = receiver;
pool->conn_num = config->max_db_conn_num;
pool->query_num = config->max_query_num;
}
Expand Down
4 changes: 2 additions & 2 deletions frameworks/C/h2o/src/database.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ typedef struct {
const struct config_t *config;
list_t *conn;
const char *conninfo;
h2o_linklist_t *local_messages;
h2o_loop_t *loop;
const list_t *prepared_statements;
h2o_multithread_receiver_t *receiver;
// We use a FIFO queue instead of a simpler stack, otherwise the earlier queries may wait
// an unbounded amount of time to be executed.
queue_t queries;
Expand All @@ -80,7 +80,7 @@ void initialize_database_connection_pool(const char *conninfo,
const struct config_t *config,
const list_t *prepared_statements,
h2o_loop_t *loop,
h2o_multithread_receiver_t *receiver,
h2o_linklist_t *local_messages,
db_conn_pool_t *pool);
void remove_prepared_statements(list_t *prepared_statements);

Expand Down
11 changes: 10 additions & 1 deletion frameworks/C/h2o/src/event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,11 @@ static void start_accept_polling(const config_t *config,

void event_loop(struct thread_context_t *ctx)
{
while (!ctx->shutdown || ctx->event_loop.conn_num)
while (!ctx->shutdown || ctx->event_loop.conn_num) {
h2o_evloop_run(ctx->event_loop.h2o_ctx.loop, INT32_MAX);
process_messages(&ctx->global_thread_data->h2o_receiver,
&ctx->event_loop.local_messages);
}
}

void free_event_loop(event_loop_t *event_loop, h2o_multithread_receiver_t *h2o_receiver)
Expand Down Expand Up @@ -309,6 +312,7 @@ void initialize_event_loop(bool is_main_thread,
h2o_context_init(&loop->h2o_ctx, h2o_evloop_create(), &global_data->h2o_config);
loop->h2o_accept_ctx.ctx = &loop->h2o_ctx;
loop->h2o_accept_ctx.hosts = global_data->h2o_config.hosts;
h2o_linklist_init_anchor(&loop->local_messages);

if (global_data->ssl_ctx) {
loop->h2o_accept_ctx.ssl_ctx = global_data->ssl_ctx;
Expand All @@ -332,6 +336,11 @@ void initialize_event_loop(bool is_main_thread,
}
}

void send_local_message(message_t *msg, h2o_linklist_t *local_messages)
{
h2o_linklist_insert(local_messages, &msg->super.link);
}

void send_message(message_t *msg, h2o_multithread_receiver_t *h2o_receiver)
{
h2o_multithread_send_message(h2o_receiver, &msg->super);
Expand Down
2 changes: 2 additions & 0 deletions frameworks/C/h2o/src/event_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ typedef struct {
size_t conn_num;
h2o_accept_ctx_t h2o_accept_ctx;
h2o_context_t h2o_ctx;
h2o_linklist_t local_messages;
} event_loop_t;

typedef struct {
Expand All @@ -59,6 +60,7 @@ void initialize_event_loop(bool is_main_thread,
global_data_t *global_data,
h2o_multithread_receiver_t *h2o_receiver,
event_loop_t *loop);
void send_local_message(message_t *msg, h2o_linklist_t *local_messages);
void send_message(message_t *msg, h2o_multithread_receiver_t *h2o_receiver);

#endif // EVENT_LOOP_H_
2 changes: 1 addition & 1 deletion frameworks/C/h2o/src/handlers/world.c
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,7 @@ void initialize_world_handler_thread_data(thread_context_t *ctx,
ctx->global_thread_data->config,
data->prepared_statements,
ctx->event_loop.h2o_ctx.loop,
&ctx->global_thread_data->h2o_receiver,
&ctx->event_loop.local_messages,
&thread_data->hello_world_db);
}

Expand Down
9 changes: 4 additions & 5 deletions frameworks/C/h2o/src/thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,10 @@ void start_threads(global_thread_data_t *global_thread_data)
CHECK_ERROR(pthread_attr_setaffinity_np, &attr, cpusetsize, cpuset);
}

CHECK_ERROR(pthread_create,
&global_thread_data[i].thread,
&attr,
run_thread,
global_thread_data + i);
h2o_multithread_create_thread(&global_thread_data[i].thread,
&attr,
run_thread,
global_thread_data + i);
}

pthread_attr_destroy(&attr);
Expand Down