Skip to content

Commit

Permalink
add thread pool
Browse files Browse the repository at this point in the history
  • Loading branch information
Fredrik Widlund committed Jan 7, 2020
1 parent d65d9b7 commit c0929e0
Show file tree
Hide file tree
Showing 8 changed files with 462 additions and 7 deletions.
26 changes: 22 additions & 4 deletions Makefile.am
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
ACLOCAL_AMFLAGS = ${ACLOCAL_FLAGS} -I m4
AM_CFLAGS = -std=gnu11 -g -O3 -flto -fuse-linker-plugin
AM_LDFLAGS = -static
AM_LDFLAGS = -static -pthread

DIST_SUBDIRS = docs benchmark examples

Expand All @@ -17,7 +17,8 @@ src/dynamic/vector.c \
src/dynamic/string.c \
src/dynamic/map.c \
src/dynamic/maps.c \
src/dynamic/mapi.c
src/dynamic/mapi.c \
src/dynamic/pool.c

HEADER_FILES = \
src/dynamic/hash.h \
Expand All @@ -27,7 +28,8 @@ src/dynamic/vector.h \
src/dynamic/string.h \
src/dynamic/map.h \
src/dynamic/maps.h \
src/dynamic/mapi.h
src/dynamic/mapi.h \
src/dynamic/pool.c

AUTOMAKE_OPTIONS = subdir-objects
lib_LTLIBRARIES= libdynamic.la
Expand All @@ -53,7 +55,17 @@ libdynamic_test_a_SOURCES = $(SOURCE_FILES) $(HEADER_FILES)

CHECK_CFLAGS = -std=gnu11 -O0 -g -ftest-coverage -fprofile-arcs
CHECK_LDADD = libdynamic_test.a -lcmocka
CHECK_LDFLAGS_EXTRA = -Wl,--wrap=malloc -Wl,--wrap=calloc -Wl,--wrap=realloc -Wl,--wrap=aligned_alloc -Wl,--wrap=abort
CHECK_LDFLAGS_EXTRA = \
-Wl,--wrap=malloc \
-Wl,--wrap=calloc \
-Wl,--wrap=realloc \
-Wl,--wrap=aligned_alloc \
-Wl,--wrap=abort \
-Wl,--wrap=recv \
-Wl,--wrap=send \
-Wl,--wrap=pthread_create \
-Wl,--wrap=socketpair \
-pthread

check_PROGRAMS = test/hash
test_hash_CFLAGS = $(CHECK_CFLAGS)
Expand Down Expand Up @@ -103,6 +115,12 @@ test_map_LDADD = $(CHECK_LDADD)
test_map_LDFLAGS = $(CHECK_LDFLAGS_EXTRA)
test_map_SOURCES = test/map.c test/mock.c

check_PROGRAMS += test/pool
test_pool_CFLAGS = $(CHECK_CFLAGS)
test_pool_LDADD = $(CHECK_LDADD)
test_pool_LDFLAGS = $(CHECK_LDFLAGS_EXTRA)
test_pool_SOURCES = test/pool.c test/mock.c

dist_noinst_SCRIPTS = test/valgrind.sh test/coverage.sh

TESTS = $(check_PROGRAMS) test/coverage.sh test/valgrind.sh
Expand Down
1 change: 1 addition & 0 deletions src/dynamic.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ extern "C" {
#include <dynamic/map.h>
#include <dynamic/maps.h>
#include <dynamic/mapi.h>
#include <dynamic/pool.h>

#ifdef __cplusplus
}
Expand Down
206 changes: 206 additions & 0 deletions src/dynamic/pool.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <stdint.h>
#include <pthread.h>
#include <errno.h>
#include <sys/socket.h>
#include <sys/param.h>

#include "list.h"
#include "pool.h"

static void *pool_thread(void *arg)
{
pool_worker *worker = arg;
pool_message *message;
ssize_t n;
int active = 1;

while (active)
{
n = recv(worker->socket, &message, sizeof message, 0);
if (n == -1)
break;

if (message->type == POOL_MESSAGE_JOB)
{
message->job.callback(message->job.state);
}
else
{
message->control.worker = arg;
active = 0;
}

n = send(worker->socket, &message, sizeof message, 0);
if (n == -1)
break;
}

return NULL;
}

static void pool_flush(pool *pool)
{
pool_message *message;
ssize_t n;

while (!list_empty(&pool->messages_queued))
{
message = list_front(&pool->messages_queued);
n = send(pool->socket, &message, sizeof message, MSG_DONTWAIT);
if (n == -1)
{
if (errno != EAGAIN)
pool->error ++;
break;
}
list_splice(list_front(&pool->messages_transit), message);
}
}

static void pool_maintain(pool *pool)
{
pool_worker *worker;
pool_message *message;
size_t i, n;
int e;

n = MIN(MAX(pool->jobs_count, pool->workers_min), pool->workers_max);

for (i = pool->workers_count; i < n; i ++)
{
worker = list_push_back(&pool->workers, NULL, sizeof *worker);
worker->socket = pool->workers_socket;
e = pthread_create(&worker->thread, NULL, pool_thread, worker);
if (e == -1)
{
list_erase(worker, NULL);
pool->error ++;
return;
}
pool->workers_count ++;
}

for (; n < pool->workers_count; n ++)
{
message = list_push_front(&pool->messages_queued, NULL, sizeof *message);
message->type = POOL_MESSAGE_CONTROL;
pool->workers_count --;
}

pool_flush(pool);
}

void pool_construct(pool *pool)
{
int e, fd[2] = {-1, -1};

*pool = (struct pool) {0};

list_construct(&pool->workers);
list_construct(&pool->messages_queued);
list_construct(&pool->messages_transit);

e = socketpair(AF_UNIX, SOCK_STREAM, PF_UNSPEC, fd);
if (e == -1)
pool->error ++;

pool->socket = fd[0];
pool->workers_socket = fd[1];
pool_limits(pool, POOL_WORKERS_MIN, POOL_WORKERS_MAX);
}

void pool_destruct(pool *pool)
{
pool_worker *worker;
void *state;

do
state = pool_collect(pool, POOL_DONTWAIT);
while (state);

list_foreach(&pool->workers, worker)
{
pthread_cancel(worker->thread);
pthread_join(worker->thread, NULL);
}

list_destruct(&pool->workers, NULL);
list_destruct(&pool->messages_queued, NULL);
list_destruct(&pool->messages_transit, NULL);

if (pool->socket >= 0)
close(pool->socket);
if (pool->workers_socket >= 0)
close(pool->workers_socket);
}

void pool_limits(pool *pool, size_t min, size_t max)
{
pool->workers_min = min;
pool->workers_max = max;
pool_maintain(pool);
}

size_t pool_jobs(pool *pool)
{
return pool->jobs_count;
}

int pool_fd(pool *pool)
{
return pool->socket;
}

int pool_error(pool *pool)
{
return pool->error != 0;
}

void pool_enqueue(pool *pool, pool_callback *callback, void *state)
{
pool_message *message;

message = list_push_back(&pool->messages_queued, NULL, sizeof *message);
message->type = POOL_MESSAGE_JOB;
message->job.callback = callback;
message->job.state = state;
pool->jobs_count ++;
pool_maintain(pool);
}

void *pool_collect(pool *pool, int flags)
{
pool_message *message;
ssize_t n;
void *state;

pool_maintain(pool);
while (1)
{
n = recv(pool->socket, &message, sizeof message, flags & POOL_DONTWAIT ? MSG_DONTWAIT : 0);
if (n == -1)
{
if (errno != EAGAIN)
pool->error ++;
return NULL;
}

if (message->type == POOL_MESSAGE_JOB)
{
pool->jobs_count --;
state = message->job.state;
list_erase(message, NULL);
return state;
}
else
{
pthread_cancel(message->control.worker->thread);
pthread_detach(message->control.worker->thread);
list_erase(message->control.worker, NULL);
list_erase(message, NULL);
}
}
}
78 changes: 78 additions & 0 deletions src/dynamic/pool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#ifndef POOL_H_INCLUDED
#define POOL_H_INCLUDED

#define POOL_WORKERS_MIN 1
#define POOL_WORKERS_MAX 16

enum pool_message_types
{
POOL_MESSAGE_JOB,
POOL_MESSAGE_CONTROL
};

enum pool_flags
{
POOL_DONTWAIT = 0x01
};

typedef void pool_callback(void *);
typedef struct pool_worker pool_worker;
typedef struct pool_job pool_job;
typedef struct pool_control pool_control;
typedef struct pool_message pool_message;
typedef struct pool pool;

struct pool_worker
{
pthread_t thread;
int socket;
};

struct pool_job
{
pool_callback *callback;
void *state;
};

struct pool_control
{
pool_worker *worker;
};

struct pool_message
{
int type;
union
{
pool_job job;
pool_control control;
};
};

struct pool
{
int socket;
int error;

size_t workers_min;
size_t workers_max;
int workers_socket;
list workers;
size_t workers_count;

list messages_queued;
list messages_transit;

size_t jobs_count;
};

void pool_construct(pool *);
void pool_destruct(pool *);
void pool_limits(pool *, size_t, size_t);
size_t pool_jobs(pool *);
int pool_fd(pool *);
int pool_error(pool *);
void pool_enqueue(pool *, pool_callback *, void *);
void *pool_collect(pool *, int);

#endif /* POOL_H_INCLUDED */
2 changes: 1 addition & 1 deletion test/coverage.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/sh

for file in hash buffer list vector string maps mapi map
for file in hash buffer list vector string maps mapi map pool
do
echo [$file]
test=`gcov -b src/dynamic/libdynamic_test_a-$file | grep -A4 File.*$file`
Expand Down

0 comments on commit c0929e0

Please sign in to comment.