Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Basic implementation. Squashed commit of the following:

commit 023205c
Author: Dmitry Matveev <me@dmitrymatveev.co.uk>
Date:   Sun Jul 24 18:59:49 2011 +0000

    More progress on directory notifications. IN_ATTRIB, IN_MODIFY, IN_MOVED_FROM and IN_MOVED_TO are now supported

commit 11c968b
Author: Dmitry Matveev <me@dmitrymatveev.co.uk>
Date:   Sun Jul 24 15:11:53 2011 +0000

    Added routines to monitor file moves inside the watched directory. Very dirty implementation

commit 91a7b63
Author: Dmitry Matveev <me@dmitrymatveev.co.uk>
Date:   Sun Jul 24 02:01:03 2011 +0000

    Added directory diff implementation + some small refactoring

commit 3dae4a3
Author: Dmitry Matveev <me@dmitrymatveev.co.uk>
Date:   Sat Jul 23 14:34:16 2011 +0000

    Introduced dependency watches (they are created when starting monitoring on a directory)

commit a5c1864
Author: Dmitry Matveev <me@dmitrymatveev.co.uk>
Date:   Tue Jul 19 23:04:45 2011 +0000

    Wrote a new test app (actually reused a freely available inotify sample), resolved some TODOs, looks like it works

commit f01a2ba
Author: Dmitry Matveev <me@dmitrymatveev.co.uk>
Date:   Sun Jul 17 21:39:47 2011 +0000

    KISS

commit d003d9d
Author: Dmitry Matveev <me@dmitrymatveev.co.uk>
Date:   Sun Jul 17 14:10:34 2011 +0000

    Decided to make a snaphost of the project before a great simplifying and
    refactory

commit 4361d41
Author: Dmitry Matveev <me@dmitrymatveev.co.uk>
Date:   Sat Jul 16 15:36:38 2011 +0000

    Implenentation continued: resolved some TODOs, added some new ones,
    implemented add/edit command processing in the kqueue thread

commit 999dd9b
Author: Dmitry Matveev <me@dmitrymatveev.co.uk>
Date:   Fri Jul 15 00:26:36 2011 +0000

    Implementation continued...

commit 883e6f6
Author: Dmitry Matveev <me@dmitrymatveev.co.uk>
Date:   Mon Jul 11 22:57:31 2011 +0000

    Implementation continued

commit d71922b
Author: Dmitry Matveev <me@dmitrymatveev.co.uk>
Date:   Sun Jul 10 23:33:48 2011 +0000

    Started working on a core implementation. Just an interim commit (one of the few)
  • Loading branch information...
commit 0d54d278256e560d812afa26f387893179324f0c 1 parent fad0d75
@dmatveev authored
View
29 Makefile
@@ -0,0 +1,29 @@
+APP=test
+APP_CFLAGS=-O2 -Wall -ggdb
+APP_LDFLAGS=-lpthread
+
+CC=gcc
+COBJ=$(CC) $(APP_CFLAGS) -c
+
+OBJS=worker-sets.o worker-thread.o worker.o controller.o test.o
+
+test: $(OBJS)
+ $(CC) $(OBJS) $(APP_LDFLAGS) -o $(APP)
+
+worker-sets.o: worker-sets.c
+ $(COBJ) worker-sets.c
+
+worker-thread.o: worker-thread.c
+ $(COBJ) worker-thread.c
+
+worker.o: worker.c
+ $(COBJ) worker.c
+
+controller.o: controller.c
+ $(COBJ) controller.c
+
+test.o: test.c
+ $(COBJ) test.c
+
+clean:
+ rm -f *~ *.o
View
121 controller.c
@@ -0,0 +1,121 @@
+#include <sys/event.h>
+#include <stddef.h> /* NULL */
+#include <string.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <assert.h>
+#include <stdio.h> /* printf */
+
+#include "worker.h"
+#include "inotify.h"
+
+
+#define WORKER_SZ 100
+static worker* workers[WORKER_SZ] = {NULL};
+static pthread_mutex_t workers_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+int
+inotify_init (void) __THROW
+{
+ // TODO: a dynamic structure here
+ pthread_mutex_lock (&workers_mutex);
+ int i;
+ for (i = 0; i < WORKER_SZ; i++) {
+ if (workers[i] == NULL) {
+ worker *wrk = worker_create ();
+ if (wrk != NULL) {
+ workers[i] = wrk;
+ pthread_mutex_unlock (&workers_mutex);
+ return wrk->io[INOTIFY_FD];
+ }
+ }
+ }
+
+ // TODO: errno is set when an original inotify_init fails
+ pthread_mutex_unlock (&workers_mutex);
+ return -1;
+}
+
+int
+inotify_init1 (int flags) __THROW
+{
+ // TODO: implementation
+ return 0;
+}
+
+int
+inotify_add_watch (int fd,
+ const char *name,
+ uint32_t mask) __THROW
+{
+ /* look up for an appropriate thread */
+ pthread_mutex_lock (&workers_mutex);
+
+ int i;
+ for (i = 0; i < WORKER_SZ; i++) {
+ if (workers[i]->io[INOTIFY_FD] == fd) {
+ worker *wrk = workers[i];
+
+ pthread_mutex_lock (&wrk->mutex);
+
+ // TODO: hide these details
+ worker_cmd_reset (&wrk->cmd);
+ wrk->cmd.type = WCMD_ADD;
+ wrk->cmd.add.filename = strdup (name);
+ wrk->cmd.add.mask = mask;
+ pthread_barrier_init (&wrk->cmd.sync, NULL, 2);
+
+ write (wrk->io[INOTIFY_FD], "*", 1); // TODO: EINTR
+ pthread_barrier_wait (&wrk->cmd.sync);
+
+ // TODO: hide these details too
+ pthread_barrier_destroy (&wrk->cmd.sync);
+
+ // TODO: check error here
+ pthread_mutex_unlock (&workers_mutex);
+ return wrk->cmd.retval;
+ }
+ }
+
+ // TODO: unlock workers earlier?
+ pthread_mutex_unlock (&workers_mutex);
+ return -1;
+}
+
+int
+inotify_rm_watch (int fd,
+ int wd) __THROW
+{
+ assert (fd != -1);
+ assert (wd != -1);
+
+ pthread_mutex_lock (&workers_mutex);
+
+ int i;
+ for (i = 0; i < WORKER_SZ; i++) {
+ if (workers[i]->io[INOTIFY_FD] == fd) {
+ worker *wrk = workers[i];
+ pthread_mutex_lock (&wrk->mutex);
+
+ // TODO: hide these details
+ worker_cmd_reset (&wrk->cmd);
+ wrk->cmd.type = WCMD_REMOVE;
+ wrk->cmd.rm_id = fd;
+ pthread_barrier_init (&wrk->cmd.sync, NULL, 2);
+
+ write (wrk->io[INOTIFY_FD], "*", 1); // TODO: EINTR
+ pthread_barrier_wait (&wrk->cmd.sync);
+
+ // TODO: hide these details too
+ pthread_barrier_destroy (&wrk->cmd.sync);
+
+ // TODO: check error here
+ // TODO: unlock workers earlier?
+ pthread_mutex_unlock (&workers_mutex);
+ return -1; // TODO: obtain return value
+ }
+ }
+
+ pthread_mutex_unlock (&workers_mutex);
+ return 0;
+}
View
73 inotify.h
@@ -0,0 +1,73 @@
+#ifndef __BSD_INOTIFY_H__
+#define __BSD_INOTIFY_H__
+
+#include <stdint.h>
+
+#ifndef __THROW
+ #ifdef __cplusplus
+ #define __THROW throw()
+ #else
+ #define __THROW
+ #endif
+#endif
+
+
+/* Flags for the parameter of inotify_init1. */
+enum {
+ IN_CLOEXEC = 02000000,
+ IN_NONBLOCK = 04000
+};
+
+
+/* Structure describing an inotify event. */
+struct inotify_event
+{
+ int wd; /* Watch descriptor. */
+ uint32_t mask; /* Watch mask. */
+ uint32_t cookie; /* Cookie to synchronize two events. */
+ uint32_t len; /* Length (including NULs) of name. */
+ char name[]; /* Name. */
+};
+
+
+/* Supported events suitable for MASK parameter of INOTIFY_ADD_WATCH. */
+#define IN_ACCESS 0x00000001 /* File was accessed. */
+#define IN_MODIFY 0x00000002 /* File was modified. */
+#define IN_ATTRIB 0x00000004 /* Metadata changed. */
+#define IN_CLOSE_WRITE 0x00000008 /* Writtable file was closed. */
+#define IN_CLOSE_NOWRITE 0x00000010 /* Unwrittable file closed. */
+#define IN_CLOSE (IN_CLOSE_WRITE | IN_CLOSE_NOWRITE) /* Close. */
+#define IN_OPEN 0x00000020 /* File was opened. */
+#define IN_MOVED_FROM 0x00000040 /* File was moved from X. */
+#define IN_MOVED_TO 0x00000080 /* File was moved to Y. */
+#define IN_MOVE (IN_MOVED_FROM | IN_MOVED_TO) /* Moves. */
+#define IN_CREATE 0x00000100 /* Subfile was created. */
+#define IN_DELETE 0x00000200 /* Subfile was deleted. */
+#define IN_DELETE_SELF 0x00000400 /* Self was deleted. */
+#define IN_MOVE_SELF 0x00000800 /* Self was moved. */
+
+/*
+ * All of the events - we build the list by hand so that we can add flags in
+ * the future and not break backward compatibility. Apps will get only the
+ * events that they originally wanted. Be sure to add new events here!
+ */
+#define IN_ALL_EVENTS (IN_ACCESS | IN_MODIFY | IN_ATTRIB | IN_CLOSE_WRITE | \
+ IN_CLOSE_NOWRITE | IN_OPEN | IN_MOVED_FROM | \
+ IN_MOVED_TO | IN_DELETE | IN_CREATE | IN_DELETE_SELF)
+
+
+/* Create and initialize inotify-kqueue instance. */
+int inotify_init (void) __THROW;
+
+/* Create and initialize inotify-kqueue instance. */
+int inotify_init1 (int flags) __THROW;
+
+/* Add watch of object NAME to inotify-kqueue instance FD. Notify about
+ events specified by MASK. */
+int inotify_add_watch (int fd, const char *name, uint32_t mask) __THROW;
+
+/* Remove the watch specified by WD from the inotify instance FD. */
+int inotify_rm_watch (int fd, int wd) __THROW;
+
+
+#endif /* __BSD_INOTIFY_H__ */
View
133 test.c
@@ -0,0 +1,133 @@
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <string.h>
+#include <stdio.h>
+#include <errno.h>
+
+#include "inotify.h"
+
+void get_event (int fd, const char * target);
+void handle_error (int error);
+
+/* ----------------------------------------------------------------- */
+
+int main (int argc, char *argv[])
+{
+ char target[FILENAME_MAX];
+ int fd;
+ int wd; /* watch descriptor */
+
+ if (argc < 2) {
+ fprintf (stderr, "Watching the current directory\n");
+ strcpy (target, ".");
+ }
+ else {
+ fprintf (stderr, "Watching %s\n", argv[1]);
+ strcpy (target, argv[1]);
+ }
+
+ fd = inotify_init();
+ if (fd < 0) {
+ printf("inotify_init failed\n");
+ handle_error (errno);
+ return 1;
+ }
+
+ wd = inotify_add_watch (fd, target, IN_ALL_EVENTS);
+ if (wd < 0) {
+ printf("add_watch failed\n");
+ handle_error (errno);
+ return 1;
+ }
+
+ while (1) {
+ get_event(fd, target);
+ }
+
+ return 0;
+}
+
+/* ----------------------------------------------------------------- */
+/* Allow for 1024 simultanious events */
+#define BUFF_SIZE ((sizeof(struct inotify_event)+FILENAME_MAX)*1024)
+
+void get_event (int fd, const char * target)
+{
+ ssize_t len, i = 0;
+ char buff[BUFF_SIZE] = {0};
+
+ len = read (fd, buff, BUFF_SIZE);
+
+ while (i < len) {
+ struct inotify_event *pevent = (struct inotify_event *)&buff[i];
+ char action[81+FILENAME_MAX] = {0};
+
+ if (pevent->len)
+ strcpy (action, pevent->name);
+ else
+ strcpy (action, target);
+
+ if (pevent->mask & IN_ACCESS)
+ strcat(action, " was read");
+ if (pevent->mask & IN_ATTRIB)
+ strcat(action, " Metadata changed");
+ if (pevent->mask & IN_CLOSE_WRITE)
+ strcat(action, " opened for writing was closed");
+ if (pevent->mask & IN_CLOSE_NOWRITE)
+ strcat(action, " not opened for writing was closed");
+ if (pevent->mask & IN_CREATE)
+ strcat(action, " created in watched directory");
+ if (pevent->mask & IN_DELETE)
+ strcat(action, " deleted from watched directory");
+ if (pevent->mask & IN_DELETE_SELF)
+ strcat(action, "Watched file/directory was itself deleted");
+ if (pevent->mask & IN_MODIFY)
+ strcat(action, " was modified");
+ if (pevent->mask & IN_MOVE_SELF)
+ strcat(action, "Watched file/directory was itself moved");
+ if (pevent->mask & IN_MOVED_FROM)
+ strcat(action, " moved out of watched directory");
+ if (pevent->mask & IN_MOVED_TO)
+ strcat(action, " moved into watched directory");
+ if (pevent->mask & IN_OPEN)
+ strcat(action, " was opened");
+
+ /*
+ printf ("wd=%d mask=%d cookie=%d len=%d dir=%s\n",
+ pevent->wd, pevent->mask, pevent->cookie, pevent->len,
+ (pevent->mask & IN_ISDIR)?"yes":"no");
+
+ if (pevent->len) printf ("name=%s\n", pevent->name);
+ */
+
+ printf ("%s [%s]\n", action, pevent->name);
+
+ i += sizeof(struct inotify_event) + pevent->len;
+
+ }
+
+} /* get_event */
+
+/* ----------------------------------------------------------------- */
+
+void handle_error (int error)
+{
+ fprintf (stderr, "Error: %s\n", strerror(error));
+
+} /* handle_error */
+
+/* ----------------------------------------------------------------- */
+
+
+
+
+
+/* #include "inotify.h" */
+
+/* int main (int argc, char *argv[]) { */
+/* int fd = inotify_init(); */
+/* (void) fd; */
+/* return 0; */
+/* } */
View
309 worker-sets.c
@@ -0,0 +1,309 @@
+#include <assert.h>
+#include <stdlib.h> /* realloc */
+#include <string.h> /* memset */
+#include <stddef.h> /* NULL */
+#include <fcntl.h> /* open, fstat */
+#include <stdio.h> /* perror */
+#include <dirent.h> /* opendir, readdir, closedir */
+#include <sys/event.h>
+
+#include "inotify.h"
+#include "worker-sets.h"
+
+
+void
+dl_print (dep_list *dl)
+{
+ while (dl != NULL) {
+ printf ("%lld:%s ", dl->inode, dl->path);
+ dl = dl->next;
+ }
+ printf ("\n");
+}
+
+dep_list*
+dl_shallow_copy (dep_list *dl)
+{
+ assert (dl != NULL);
+
+ dep_list *head = calloc (1, sizeof (dep_list)); // TODO: check allocation
+ dep_list *cp = head;
+ dep_list *it = dl;
+
+ while (it != NULL) {
+ cp->fd = it->fd;
+ cp->path = it->path;
+ cp->inode = it->inode;
+ if (it->next) {
+ cp->next = calloc (1, sizeof (dep_list)); // TODO: check allocation
+ cp = cp->next;
+ }
+ it = it->next;
+ }
+
+ return head;
+}
+
+void
+dl_shallow_free (dep_list *dl)
+{
+ while (dl != NULL) {
+ dep_list *ptr = dl;
+ dl = dl->next;
+ free (ptr);
+ }
+}
+
+void
+dl_free (dep_list *dl)
+{
+ while (dl != NULL) {
+ dep_list *ptr = dl;
+ dl = dl->next;
+
+ free (ptr->path);
+ free (ptr);
+ }
+}
+
+dep_list*
+dl_listing (const char *path)
+{
+ assert (path != NULL);
+
+ dep_list *head = calloc (1, sizeof (dep_list)); // TODO: check allocation
+ dep_list *prev = NULL;
+ DIR *dir = opendir (path);
+ if (dir != NULL) {
+ struct dirent *ent;
+
+ while ((ent = readdir (dir)) != NULL) {
+ if (!strcmp (ent->d_name, ".") || !strcmp (ent->d_name, "..")) {
+ continue;
+ }
+
+ // TODO: check allocation
+ dep_list *iter = (prev == NULL) ? head : calloc (1, sizeof (dep_list));
+ iter->path = strdup (ent->d_name);
+ iter->inode = ent->d_ino;
+ iter->next = NULL;
+ if (prev) {
+ prev->next = iter;
+ }
+ prev = iter;
+ }
+
+ closedir (dir);
+ }
+ return head;
+}
+
+void dl_diff (dep_list **before, dep_list **after)
+{
+ assert (before != NULL);
+ assert (after != NULL);
+
+ dep_list *before_iter = *before;
+ dep_list *before_prev = NULL;
+
+ assert (before_iter != NULL);
+
+ while (before_iter != NULL) {
+ dep_list *after_iter = *after;
+ dep_list *after_prev = NULL;
+
+ int matched = 0;
+ while (after_iter != NULL) {
+ if (strcmp (before_iter->path, after_iter->path) == 0) {
+ matched = 1;
+ /* removing the entry from the both lists */
+ if (before_prev) {
+ before_prev->next = before_iter->next;
+ } else {
+ *before = before_iter->next;
+ }
+
+ if (after_prev) {
+ after_prev->next = after_iter->next;
+ } else {
+ *after = after_iter->next;
+ }
+ free (after_iter); // TODO: dl_free?
+ break;
+ }
+ after_prev = after_iter;
+ after_iter = after_iter->next;
+ }
+
+ dep_list *oldptr = before_iter;
+ before_iter = before_iter->next;
+ if (matched == 0) {
+ before_prev = oldptr;
+ } else {
+ free (oldptr); // TODO: dl_free?
+ }
+ }
+}
+
+
+static uint32_t
+inotify_flags_to_kqueue (uint32_t flags, int is_directory)
+{
+ uint32_t result = 0;
+ static const uint32_t NOTE_MODIFIED = (NOTE_WRITE | NOTE_EXTEND);
+
+ if (flags & IN_ATTRIB)
+ result |= (NOTE_ATTRIB | NOTE_LINK);
+ if (flags & IN_MODIFY)
+ result |= NOTE_MODIFIED;
+ if (flags & IN_MOVED_FROM && is_directory)
+ result |= NOTE_MODIFIED;
+ if (flags & IN_MOVED_TO && is_directory)
+ result |= NOTE_MODIFIED;
+ if (flags & IN_CREATE && is_directory)
+ result |= NOTE_MODIFIED;
+ if (flags & IN_DELETE && is_directory)
+ result |= NOTE_MODIFIED;
+ if (flags & IN_DELETE_SELF)
+ result |= NOTE_DELETE;
+ if (flags & IN_MOVE_SELF)
+ result |= NOTE_RENAME;
+
+ return result;
+}
+
+
+static int
+_is_directory (int fd)
+{
+ assert (fd != -1);
+
+ struct stat st;
+ memset (&st, 0, sizeof (struct stat));
+
+ if (fstat (fd, &st) == -1) {
+ perror ("fstat failed, assuming it is just a file");
+ return 0;
+ }
+
+ return (st.st_mode & S_IFDIR) == S_IFDIR;
+}
+
+int
+watch_init_user (watch *w, struct kevent *kv, const char *path, uint32_t flags, int index)
+{
+ assert (w != NULL);
+ assert (kv != NULL);
+ assert (path != NULL);
+
+ memset (w, 0, sizeof (watch));
+ memset (kv, 0, sizeof (struct kevent));
+
+ int fd = open (path, O_RDONLY);
+ if (fd == -1) {
+ // TODO: error
+ return -1;
+ }
+
+ w->type = WATCH_USER;
+ w->flags = flags;
+ w->is_directory = _is_directory (fd);
+ w->filename = strdup (path);
+ w->fd = fd;
+ EV_SET (kv,
+ fd,
+ EVFILT_VNODE,
+ EV_ADD | EV_ENABLE | EV_ONESHOT,
+ inotify_flags_to_kqueue (flags, w->is_directory),
+ 0,
+ index);
+
+ return 0;
+}
+
+int
+watch_init_dependency (watch *w, struct kevent *kv, const char *path, uint32_t flags, int index)
+{
+ assert (w != NULL);
+ assert (kv != NULL);
+ assert (path != NULL);
+
+ memset (w, 0, sizeof (watch));
+ memset (kv, 0, sizeof (struct kevent));
+
+ int fd = open (path, O_RDONLY);
+ if (fd == -1) {
+ // TODO: error
+ return -1;
+ }
+
+ w->type = WATCH_DEPENDENCY;
+ w->flags = flags;
+ w->filename = strdup (path);
+ w->fd = fd;
+
+ // TODO: drop some flags from flags.
+ // They are actual for a parent watch, but should be modified
+ // for dependant ones
+ EV_SET (kv,
+ fd,
+ EVFILT_VNODE,
+ EV_ADD | EV_ENABLE | EV_ONESHOT,
+ inotify_flags_to_kqueue (flags, 0),
+ 0,
+ index);
+
+ return 0;
+}
+
+
+#define WS_RESERVED 10
+
+void
+worker_sets_init (worker_sets *ws,
+ int fd)
+{
+ assert (ws != NULL);
+
+ memset (ws, 0, sizeof (worker_sets));
+ worker_sets_extend (ws, 1);
+
+ EV_SET (&ws->events[0],
+ fd,
+ EVFILT_READ,
+ EV_ADD | EV_ENABLE | EV_ONESHOT,
+ NOTE_LOWAT,
+ 1,
+ 0);
+ ws->length = 1;
+}
+
+int
+worker_sets_extend (worker_sets *ws,
+ int count)
+{
+ assert (ws != NULL);
+
+ if (ws->length + count > ws->allocated) {
+ ws->allocated =+ (count + WS_RESERVED);
+ ws->events = realloc (ws->events, sizeof (struct kevent) * ws->allocated);
+ ws->watches = realloc (ws->watches, sizeof (struct watch) * ws->allocated);
+ // TODO: check realloc fails
+ }
+ return 0;
+}
+
+void
+worker_sets_free (worker_sets *ws)
+{
+ assert (ws != NULL);
+
+ /* int i; */
+ /* for (i = 0; i < ws->allocated; i++) { */
+ /* free (ws->filenames[i]); */
+ /* } */
+ /* free (ws->is_user); */
+ /* free (ws->is_directory); */
+ /* free (ws->events); */
+ /* free (ws); */
+}
View
56 worker-sets.h
@@ -0,0 +1,56 @@
+#ifndef __WORKER_SETS_H__
+#define __WORKER_SETS_H__
+
+#include <stdint.h>
+#include <sys/types.h> /* size_t */
+
+typedef struct dep_list {
+ struct dep_list *next;
+
+ int fd;
+ char *path;
+ ino_t inode;
+} dep_list;
+
+void dl_print (dep_list *dl);
+dep_list* dl_shallow_copy (dep_list *dl);
+void dl_shallow_free (dep_list *dl);
+void dl_free (dep_list *dl);
+dep_list* dl_listing (const char *path);
+void dl_diff (dep_list **before, dep_list **after);
+
+
+#define WATCH_USER 0
+#define WATCH_DEPENDENCY 1
+
+typedef struct watch {
+ int type:1; /* TODO: enum? */
+ int is_directory:1; /* a flag, a directory or not */
+
+ uint32_t flags; /* flags in the inotify format */
+ char *filename; /* file name of a watched file */
+ int fd; /* file descriptor of a watched entry */
+
+ union {
+ dep_list *deps; /* dependencies for an user-defined watch */
+ struct watch *parent; /* parent watch for an automatic (dependency) watch */
+ };
+} watch;
+
+typedef struct worker_sets {
+ struct kevent *events; /* kevent entries */
+ struct watch *watches; /* appropriate watches with additional info */
+ size_t length; /* size of active entries */
+ size_t allocated; /* size of allocated entries */
+} worker_sets;
+
+
+int watch_init_user (watch *w, struct kevent *kv, const char *path, uint32_t flags, int index);
+int watch_init_dependency (watch *w, struct kevent *kv, const char *path, uint32_t flags, int index);
+
+void worker_sets_init (worker_sets *ws, int fd);
+int worker_sets_extend (worker_sets *ws, int count);
+void worker_sets_free (worker_sets *ws);
+
+
+#endif /* __WORKER_SETS_H__ */
View
262 worker-thread.c
@@ -0,0 +1,262 @@
+#include <sys/event.h>
+#include <stddef.h> /* NULL */
+#include <assert.h>
+#include <unistd.h> /* write */
+#include <stdlib.h> /* calloc */
+#include <stdio.h> /* perror */
+#include <string.h> /* memset */
+
+#include "inotify.h"
+#include "worker.h"
+#include "worker-sets.h"
+#include "worker-thread.h"
+
+static uint32_t
+kqueue_to_inotify (uint32_t flags)
+{
+ uint32_t result = 0;
+
+ if (flags & (NOTE_ATTRIB | NOTE_LINK))
+ result |= IN_ATTRIB;
+
+ if (flags & (NOTE_WRITE | NOTE_EXTEND)) // TODO: NOTE_MODIFY?
+ result |= IN_MODIFY;
+
+ return result;
+}
+
+void
+process_command (worker *wrk)
+{
+ assert (wrk != NULL);
+
+ // read a byte
+ char unused;
+ read (wrk->io[KQUEUE_FD], &unused, 1);
+
+ if (wrk->cmd.type == WCMD_ADD) {
+ wrk->cmd.retval = worker_add_or_modify (wrk,
+ wrk->cmd.add.filename,
+ wrk->cmd.add.mask);
+ } else if (wrk->cmd.type == WCMD_REMOVE) {
+ wrk->cmd.retval = worker_remove (wrk, wrk->cmd.rm_id);
+ } else {
+ // TODO: signal error
+ }
+
+ // TODO: is the situation when nobody else waits on a barrier possible?
+ pthread_barrier_wait (&wrk->cmd.sync);
+}
+
+
+static struct inotify_event*
+create_inotify_event (int wd, uint32_t mask, uint32_t cookie, const char *name, int *event_len)
+{
+ struct inotify_event *event = NULL;
+ int name_len = strlen (name) + 1;
+ *event_len = sizeof (struct inotify_event) + name_len;
+ event = calloc (1, *event_len); // TODO: check allocation
+
+ event->wd = wd;
+ event->mask = mask;
+ event->cookie = cookie;
+ event->len = name_len;
+ strcpy (event->name, name);
+
+ return event;
+}
+
+// TODO: drop unnecessary arguments
+void
+produce_directory_moves (worker *wrk,
+ watch *w,
+ struct kevent *event,
+ dep_list **was, // TODO: removed
+ dep_list **now) // TODO: added
+{
+ assert (wrk != NULL);
+ assert (w != NULL);
+ assert (w->parent != NULL);
+ assert (event != NULL);
+ assert (was != NULL);
+ assert (now != NULL);
+
+ dep_list *was_iter = *was;
+ dep_list *was_prev = NULL;
+
+ while (was_iter != NULL) {
+ dep_list *now_iter = *now;
+ dep_list *now_prev = NULL;
+
+ int matched = 0;
+ while (now_iter != NULL) {
+ if (was_iter->inode == now_iter->inode) {
+ matched = 1;
+ uint32_t cookie = was_iter->inode & 0x00000000FFFFFFFF;
+ int event_len = 0;
+ struct inotify_event *ev;
+
+ ev = create_inotify_event (w->parent->fd,
+ IN_MOVED_FROM,
+ cookie,
+ was_iter->path,
+ &event_len);
+ // TODO: EINTR
+ write (wrk->io[KQUEUE_FD], ev, event_len);
+ free (ev);
+
+ ev = create_inotify_event (w->parent->fd,
+ IN_MOVED_TO,
+ cookie,
+ now_iter->path,
+ &event_len);
+ // TODO: EINTR
+ write (wrk->io[KQUEUE_FD], ev, event_len);
+ free (ev);
+
+ if (was_prev) {
+ was_prev->next = was_iter->next;
+ } else {
+ *was = was_iter->next;
+ }
+
+ if (now_prev) {
+ now_prev->next = now_iter->next;
+ } else {
+ *now = now_iter->next;
+ }
+ // TODO: free smt
+ break;
+ }
+ }
+
+ dep_list *oldptr = was_iter;
+ was_iter = was_iter->next;
+ if (matched == 0) {
+ was_prev = oldptr;
+ } else {
+ free (oldptr); // TODO: dl_free?
+ }
+ }
+}
+
+
+// TODO: drop unnecessary arguments
+void
+produce_directory_changes (worker *wrk,
+ watch *w,
+ struct kevent *event,
+ dep_list *list,
+ uint32_t flag) // TODO: added
+{
+ assert (wrk != NULL);
+ assert (w != NULL);
+ assert (w->parent != NULL);
+ assert (event != NULL);
+ assert (flag != 0);
+
+ while (list != NULL) {
+ struct inotify_event *ie = NULL;
+ int ie_len = 0;
+ // TODO: check allocation
+ ie = create_inotify_event (w->parent->fd,
+ flag,
+ 0,
+ list->path,
+ &ie_len);
+
+ write (wrk->io[KQUEUE_FD], ie, ie_len);
+ free (ie);
+
+ list = list->next;
+ }
+}
+
+
+// TODO: drop unnecessary arguments
+void
+produce_directory_diff (worker *wrk, watch *w, struct kevent *event)
+{
+ assert (wrk != NULL);
+ assert (w != NULL);
+ assert (event != NULL);
+
+ assert (w->type == WATCH_USER);
+ assert (w->is_directory);
+
+ dep_list *was = NULL, *now = NULL;
+ was = dl_shallow_copy (w->deps);
+ dl_shallow_free (w->deps);
+
+ w->deps = dl_listing (w->filename);
+ now = dl_shallow_copy (w->deps);
+
+ dl_diff (&was, &now);
+
+ produce_directory_moves (wrk, w, event, &was, &now);
+ produce_directory_changes (wrk, w, event, was, IN_DELETE);
+ produce_directory_changes (wrk, w, event, now, IN_CREATE);
+
+ dl_shallow_free (now);
+ dl_free (was);
+}
+
+void
+produce_notifications (worker *wrk, struct kevent *event)
+{
+ assert (wrk != NULL);
+ assert (event != NULL);
+
+ if (wrk->sets.watches[event->udata].type == WATCH_USER) {
+ if (wrk->sets.watches[event->udata].is_directory
+ && (event->fflags & (NOTE_WRITE | NOTE_EXTEND))) { // TODO: watch's inotify flags here
+ produce_directory_diff (wrk, &wrk->sets.watches[event->udata], event);
+ }
+ // TODO: other types of events on user entries
+ } else {
+ /* for dependency events, ignore some notifications */
+ if (event->fflags & (NOTE_ATTRIB | NOTE_LINK | NOTE_WRITE | NOTE_EXTEND)) {
+ struct inotify_event *ie = NULL;
+ watch *w = &wrk->sets.watches[event->udata];
+ watch *p = w->parent;
+ int ev_len;
+ ie = create_inotify_event (p->fd,
+ kqueue_to_inotify (event->fflags),
+ 0,
+ // TODO: /foo and /foo/ cases
+ w->filename + 1 + strlen(p->filename),
+ &ev_len);
+
+ // TODO: EINTR
+ write (wrk->io[KQUEUE_FD], ie, ev_len);
+ }
+ }
+}
+
+void*
+worker_thread (void *arg)
+{
+ assert (arg != NULL);
+ worker* wrk = (worker *) arg;
+
+ for (;;) {
+ struct kevent received;
+
+ int ret = kevent (wrk->kq,
+ wrk->sets.events,
+ wrk->sets.length,
+ &received,
+ 1,
+ NULL);
+ if (ret == -1) {
+ perror ("kevent failed");
+ continue;
+ }
+ if (received.ident == wrk->io[KQUEUE_FD]) {
+ process_command (wrk);
+ } else {
+ produce_notifications (wrk, &received);
+ }
+ }
+ return NULL;
+}
View
6 worker-thread.h
@@ -0,0 +1,6 @@
+#ifndef __WORKER_THREAD_H__
+#define __WORKER_THREAD_H__
+
+void* worker_thread (void *arg);
+
+#endif /* __WORKER_THREAD_H__ */
View
209 worker.c
@@ -0,0 +1,209 @@
+#include <sys/event.h>
+#include <sys/socket.h>
+#include <stdlib.h>
+#include <string.h>
+#include <fcntl.h> /* open() */
+#include <assert.h>
+#include <stdio.h>
+#include <dirent.h>
+
+#include "inotify.h"
+#include "worker-thread.h"
+#include "worker.h"
+
+void
+worker_cmd_reset (worker_cmd *cmd)
+{
+ assert (cmd != NULL);
+
+ free (cmd->add.filename);
+ memset (cmd, 0, sizeof (worker_cmd));
+}
+
+worker*
+worker_create ()
+{
+ worker* wrk = calloc (1, sizeof (worker));
+
+ if (wrk == NULL) {
+ perror ("Failed to create a new worker");
+ goto failure;
+ }
+
+ wrk->kq = kqueue ();
+ if (wrk->kq == -1) {
+ perror ("Failed to create a new kqueue");
+ goto failure;
+ }
+
+ if (socketpair (AF_UNIX, SOCK_STREAM, 0, wrk->io) == -1) {
+ perror ("Failed to create a socket pair");
+ goto failure;
+ }
+
+ worker_sets_init (&wrk->sets, wrk->io[KQUEUE_FD]);
+ pthread_mutex_init (&wrk->mutex, NULL);
+
+ /* create a run a worker thread */
+ if (pthread_create (&wrk->thread, NULL, worker_thread, wrk) != 0) {
+ perror ("Failed to start a new worker thread");
+ goto failure;
+ }
+
+ return wrk;
+
+ failure:
+ if (wrk != NULL) {
+ worker_free (wrk);
+ }
+ return NULL;
+}
+
+
+void
+worker_free (worker *wrk)
+{
+ assert (wrk != NULL);
+
+ worker_sets_free (&wrk->sets);
+ free (wrk);
+}
+
+static int
+worker_add_dependencies (worker *wrk,
+ struct kevent *event,
+ watch *parent)
+{
+ assert (wrk != NULL);
+ assert (parent != NULL);
+ assert (parent->type == WATCH_USER);
+ assert (event != NULL);
+
+ // start watching also on its contents
+ DIR *dir = opendir (parent->filename);
+ if (dir != NULL) {
+ struct dirent *ent;
+ dep_list *iter = parent->deps;
+
+ while ((ent = readdir (dir)) != NULL) {
+ if (!strcmp (ent->d_name, ".") || !strcmp (ent->d_name, "..")) {
+ continue;
+ }
+
+ int index = wrk->sets.length;
+ worker_sets_extend (&wrk->sets, 1);
+
+ int parent_len = strlen (parent->filename);
+ int full_len = parent_len + strlen (ent->d_name) + 2;
+ char *full_path = malloc (full_len); // TODO: check alloc
+ // TODO: strip spaces!!!
+ if (full_path[parent_len] == '/') {
+ full_path[parent_len] = '\0';
+ }
+ snprintf (full_path, full_len, "%s/%s", parent->filename, ent->d_name);
+
+ if (watch_init_dependency (&wrk->sets.watches[index],
+ &wrk->sets.events[index],
+ full_path, // do we really need a full path?
+ parent->flags,
+ index)
+ == 0) {
+ ++wrk->sets.length;
+ wrk->sets.watches[index].parent = parent;
+
+ dep_list *entry = calloc (1, sizeof (dep_list));
+ entry->fd = wrk->sets.events[index].ident;
+ entry->path = strdup (ent->d_name);
+ entry->inode = ent->d_ino;
+
+ if (iter) {
+ iter->next = entry;
+ } else {
+ parent->deps = entry;
+ }
+ iter = entry;
+ }
+ printf ("Watching also on %s\n", full_path);
+ free (full_path);
+ }
+ closedir(dir);
+ } else {
+ printf ("Failed to open directory %s\n", parent->filename);
+ }
+ return 0;
+}
+
+int
+worker_add_or_modify (worker *wrk,
+ const char *path,
+ uint32_t flags)
+{
+ assert (path != NULL);
+ assert (wrk != NULL);
+ // TODO: a pointer to sets?
+ assert (wrk->sets.events != NULL);
+ assert (wrk->sets.watches != NULL);
+
+ int i = 0;
+ // look up for an entry with this filename
+ for (i = 1; i < wrk->sets.length; i++) {
+ const char *evpath = wrk->sets.watches[i].filename;
+ assert (evpath != NULL);
+
+ if (wrk->sets.watches[i].type == WATCH_USER &&
+ strcmp (path, evpath) == 0) {
+ // TODO: update flags
+ return i;
+ }
+ }
+
+ // add a new entry if path is not found
+ printf ("Adding a new watch kevent: %s\n", path);
+ worker_sets_extend (&wrk->sets, 1);
+ i = wrk->sets.length;
+ if (watch_init_user (&wrk->sets.watches[i],
+ &wrk->sets.events[i],
+ path,
+ flags,
+ i)
+ == -1) {
+ perror ("Failed to initialize a user watch\n");
+ // TODO: error
+ return -1;
+ }
+ ++wrk->sets.length;
+
+ if (wrk->sets.watches[i].is_directory) {
+ printf ("Watched entry is a directory, adding dependencies\n");
+ worker_add_dependencies (wrk, &wrk->sets.events[i], &wrk->sets.watches[i]);
+ }
+ return wrk->sets.events[i].ident;
+}
+
+
+int
+worker_remove (worker *wrk,
+ int id)
+{
+ /* assert (wrk != NULL); */
+ /* assert (id != -1); */
+
+ /* int i; */
+ /* int last = wrk->sets.length - 1; */
+ /* for (i = 0; i < wrk->sets.length; i++) { */
+ /* if (wrk->sets.events[i].ident == id) { */
+ /* free (wrk->sets.filenames[i]); */
+
+ /* if (i != last) { */
+ /* wrk->sets.events[i] = wrk->sets.events[last]; */
+ /* wrk->sets.filenames[i] = wrk->sets.filenames[last]; */
+ /* } */
+ /* wrk->sets.filenames[last] = NULL; */
+ /* --wrk->sets.length; */
+
+ /* // TODO: reduce the allocated memory size here */
+ /* return 0; */
+ /* } */
+ /* } */
+ return -1;
+}
View
58 worker.h
@@ -0,0 +1,58 @@
+#ifndef __WORKER_H__
+#define __WORKER_H__
+
+#include <pthread.h>
+#include <stdint.h>
+#include <pthread.h>
+#include "worker-thread.h"
+#include "worker-sets.h"
+
+
+#define INOTIFY_FD 0
+#define KQUEUE_FD 1
+
+
+typedef struct worker_cmd {
+ enum {
+ WCMD_NONE = 0, /* uninitialized state */
+ WCMD_ADD,
+ WCMD_REMOVE,
+ } type;
+
+ int retval;
+
+ union {
+ struct {
+ char *filename;
+ uint32_t mask;
+ } add;
+
+ int rm_id;
+ };
+
+ pthread_barrier_t sync;
+} worker_cmd;
+
+
+void worker_cmd_reset (worker_cmd *cmd);
+
+
+typedef struct {
+ int kq; /* kqueue descriptor */
+ int io[2]; /* a socket pair */
+ pthread_t thread; /* worker thread */
+ worker_sets sets; /* kqueue events, filenames, etc */
+
+ pthread_mutex_t mutex; /* worker mutex */
+ worker_cmd cmd; /* operation to perform on a worker */
+} worker;
+
+
+worker* worker_create ();
+void worker_free (worker *wrk);
+
+int worker_add_or_modify (worker *wrk, const char *path, uint32_t flags);
+int worker_remove (worker *wrk, int id);
+
+
+#endif /* __WORKER_H__ */
Please sign in to comment.
Something went wrong with that request. Please try again.