Skip to content

Commit

Permalink
fixed thread pool example
Browse files Browse the repository at this point in the history
  • Loading branch information
bodokaiser committed Jun 4, 2013
1 parent 5d18143 commit a2e49ae
Showing 1 changed file with 67 additions and 56 deletions.
123 changes: 67 additions & 56 deletions threads/pool.c
@@ -1,118 +1,128 @@
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include "../queue/queue.h"

#define MAX_THREADS 1

pthread_t threads[MAX_THREADS];

pthread_cond_t cond;

pthread_mutex_t mutex;
#define THREADS 3

/**
* Task queue.
*/
QUEUE queue;

struct work_s {
/**
* Type of a calc work task.
*/
typedef struct {
int a;
int b;
int type;
QUEUE node;
};
} work_t;

void * worker();
/**
* Our threads.
*/
pthread_t threads[THREADS];

/**
* Our thread condition variable.
*/
pthread_cond_t cond;

/**
* Our thread mutex lock.
*/
pthread_mutex_t mutex;

/* function headers */
void * worker();
void submit_work(int a, int b, int type);

int main() {
/**
* Should execute the submited work tasks through thread pool.
*/
int main(void) {
QUEUE_INIT(&queue);

pthread_cond_init(&cond, NULL);
pthread_mutex_init(&mutex, NULL);

struct work_s work[2];

work[0].a = 5;
work[0].b = 7;
work[0].type = 1;

work[1].a = 3;
work[1].b = 3;
work[1].type = 3;

QUEUE_INIT(&work[0].node);
QUEUE_INIT(&work[1].node);

QUEUE_INSERT_TAIL(&queue, &work[0].node);
QUEUE_INSERT_TAIL(&queue, &work[1].node);

/* this does actually the same as above but causes a segmentation fault. */
submit_work(5, 6, 3);

for (int i = 0; i < MAX_THREADS; i++)
/* 3 x 3 = 9 */
submit_work(3, 3, 1);
/* 4 - 3 = 1 */
submit_work(4, 3, 2);
/* 7 * 8 = 56 */
submit_work(7, 8, 3);
/* 30 / 6 = 5 */
submit_work(30, 6, 4);

/* start all threads */
for (int i = 0; i < THREADS; i++)
pthread_create(&threads[i], NULL, worker, NULL);

for (int i = 0; i < MAX_THREADS; i++)
/* wait all threads to finish */
for (int i = 0; i < THREADS; i++)
pthread_join(threads[i], NULL);

/*
for (int i = 0; i < MAX_THREADS; i++)
pthread_detach(threads[i]);
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);
*/

return 0;
return EXIT_SUCCESS;
}

/**
* Adds a calculation task to queue.
*/
void submit_work(int a, int b, int type) {
struct work_s work;
/* dynamically allocate a work task */
work_t * work = malloc(sizeof(work_t));

work.a = a;
work.b = b;
work.type = type;
work->a = a;
work->b = b;
work->type = type;

/* lock the queue to avoid thread access */
pthread_mutex_lock(&mutex);

QUEUE_INIT(&work.node);
QUEUE_INSERT_TAIL(&queue, &work.node);
/* add work task to work queue */
QUEUE_INIT(&work->node);
QUEUE_INSERT_TAIL(&queue, &work->node);

/* free the lock */
pthread_mutex_unlock(&mutex);

/* signal a thread that it should check for new work */
pthread_cond_signal(&cond);
}

/**
* Worker thread.
*
* Looks for new tasks to execute.
*/
void * worker() {
QUEUE * q;

int result;

struct work_s * work;
work_t * work;

for (;;) {
pthread_mutex_lock(&mutex);

while (QUEUE_EMPTY(&queue)) {
printf("waiting for insertion\n");
pthread_cond_wait(&cond, &mutex);
}

q = QUEUE_HEAD(&queue);

/* here the segmentation fault occurs */
QUEUE_REMOVE(q);

pthread_mutex_unlock(&mutex);

work = QUEUE_DATA(q, struct work_s, node);

/* this will print some incorrect data for work */
printf("received work type %d with a %d and b %d \n", work->a, work->b, work->type);

if (work->type == 0) {
break;
}
work = QUEUE_DATA(q, work_t, node);

switch (work->type) {
case 1:
Expand All @@ -132,8 +142,9 @@ void * worker() {
printf("%d / %d = %d\n", work->a, work->b, result);
break;
}
}

free(work);
}

pthread_exit(NULL);
}

0 comments on commit a2e49ae

Please sign in to comment.