Permalink
Browse files

Implementation continued...

  • Loading branch information...
1 parent 883e6f6 commit 999dd9bf0352b2a677228aa9a52e377a87ec27fc @dmatveev committed Jul 15, 2011
Showing with 80 additions and 7 deletions.
  1. +51 −5 controller.c
  2. +16 −1 worker-thread.c
  3. +7 −1 worker-thread.h
  4. +2 −0 worker.c
  5. +4 −0 worker.h
View
@@ -1,13 +1,31 @@
#include <stddef.h> /* NULL */
+#include <string.h>
+#include <stdlib.h>
#include "worker.h"
#include "inotify.h"
+
+#define WORKER_SZ 100
+static worker* workers[WORKER_SZ];
+
int
inotify_init (void) __THROW
{
// TODO: errno is set when an original inotify_init fails
- worker *wrk = worker_create ();
- return (wrk == NULL) ? -1 : wrk->io[INOTIFY_FD];
+
+ // TODO: a dynamic structure here
+ // TODO: lock workers when adding
+ int i;
+ for (i = 0; i < WORKER_SZ; i++) {
+ if (workers[i] == NULL) {
+ worker *wrk = worker_create ();
+ if (wrk) {
+ workers[i] = wrk;
+ return wrk->io[INOTIFY_FD];
+ }
+ }
+ }
+ return -1;
}
int
@@ -22,9 +40,37 @@ inotify_add_watch (int fd,
const char *name,
uint32_t mask) __THROW
{
- /* here we will allocate a placeholder to receive a watch id
- from the worker thread. */
- return 0;
+ /* look up for an appropriate thread */
+ // TODO: lock workers when looking up
+ int i;
+ for (i = 0; i < WORKER_SZ; i++) {
+ if (workers[i]->io[INOTIFY_FD] == fd) {
+ worker *wrk = workers[i];
+ worker_cmd *cmd = calloc (1, sizeof (worker_cmd));
+ // TODO: check allocation
+ int retval = -1; // TODO: no magic numbers here
+ int error = 0; // TODO: and here
+
+ // TODO: hide these details
+ cmd->type = WCMD_ADD;
+ cmd->feedback.retval = &retval;
+ cmd->feedback.error = &error;
+ cmd->add.filename = strdup (name);
+ cmd->add.mask = mask;
+ pthread_barrier_init (&cmd->sync, NULL, 2);
+
+ // TODO: lock the operations queue
+ SIMPLEQ_INSERT_TAIL (&wrk->queue, cmd, entries);
+
+ // TODO: wake up the kqueue thread here
+ pthread_barrier_wait (&cmd->sync);
+
+ // TODO: check error here
+ // TODO: return value here
+ }
+ }
+
+ return -1;
}
int
View
@@ -1,9 +1,24 @@
+#include <sys/event.h>
#include <stddef.h> /* NULL */
+#include <assert.h>
+
+#include "worker.h"
#include "worker-thread.h"
void*
worker_thread (void *arg)
{
- // TODO: implementation
+ // TODO: obtain a pointer to a worker structure
+ worker* wrk = NULL;
+ assert (wrk != NULL);
+
+ // TODO: initialize a first element with a control FD
+
+ for (;;) {
+ struct kevent received;
+
+ int ret = kevent (wrk->kq, wrk->sets.events, wrk->sets.length, &received, 1, NULL);
+ (void) ret;
+ }
return NULL;
}
View
@@ -1,9 +1,13 @@
#ifndef __WORKER_THREAD_H__
#define __WORKER_THREAD_H__
+#include <sys/queue.h>
#include <stdint.h>
+#include <pthread.h>
+
+typedef struct worker_cmd {
+ SIMPLEQ_ENTRY(worker_cmd) entries;
-typedef struct {
enum {
WCMD_ADD,
WCMD_REMOVE
@@ -14,6 +18,8 @@ typedef struct {
int *error;
} feedback;
+ pthread_barrier_t sync;
+
union {
struct {
const char *filename;
View
@@ -32,6 +32,8 @@ worker_create ()
worker_sets_init (&wrk->sets, wrk->io[1]);
+ SIMPLEQ_INIT(&wrk->queue);
+
/* create a run a worker thread */
if (pthread_create (&wrk->thread, NULL, worker_thread, wrk) != 0) {
perror ("Failed to start a new worker thread");
View
@@ -3,6 +3,8 @@
#include <pthread.h>
#include <stdint.h>
+#include <sys/queue.h>
+#include "worker-thread.h"
#include "worker-sets.h"
@@ -15,6 +17,8 @@ typedef struct {
int io[2]; /* a socket pair */
pthread_t thread; /* worker thread */
worker_sets sets; /* kqueue events, filenames, etc */
+
+ SIMPLEQ_HEAD(operations_queue, worker_cmd) queue;
} worker;

0 comments on commit 999dd9b

Please sign in to comment.