Permalink
Browse files

Moved kqueue thread and its stuff to a separate file

  • Loading branch information...
1 parent bde25c4 commit f5dc91ffcca1b64c1e6bdd2a6f1c28a8fe4d07da @dmatveev committed Jun 4, 2011
Showing with 155 additions and 106 deletions.
  1. +2 −0 gio/kqueue/Makefile.am
  2. +4 −4 gio/kqueue/gkqueuedirectorymonitor.h
  3. +6 −102 gio/kqueue/kqueue-helper.c
  4. +128 −0 gio/kqueue/kqueue-thread.c
  5. +15 −0 gio/kqueue/kqueue-thread.h
View
@@ -11,6 +11,8 @@ libkqueue_la_SOURCES = \
gkqueuedirectorymonitor.h \
kqueue-helper.c \
kqueue-helper.h \
+ kqueue-thread.c \
+ kqueue-thread.h \
kqueue-sub.c \
kqueue-sub.h \
$(NULL)
@@ -7,10 +7,10 @@
G_BEGIN_DECLS
-#define G_TYPE_KQUEUE_DIRECTORY_MONITOR (_g_kqueue_directory_monitor_get_type ())
-#define G_KQUEUE_DIRECTORY_MONITOR(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), G_TYPE_KQUEUE_DIRECTORY_MONITOR, GKqueueDirectoryMonitor))
-#define G_KQUEUE_DIRECTORY_MONITOR_CLASS(k) (G_TYPE_CHECK_CLASS_CAST ((k), G_TYPE_KQUEUE_DIRECTORY_MONITOR, GKqueueDirectoryMonitorClass))
-#define G_IS_KQUEUE_DIRECTORY_MONITOR(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), G_TYPE_KQUEUE_DIRECTORY_MONITOR))
+#define G_TYPE_KQUEUE_DIRECTORY_MONITOR (_g_kqueue_directory_monitor_get_type ())
+#define G_KQUEUE_DIRECTORY_MONITOR(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), G_TYPE_KQUEUE_DIRECTORY_MONITOR, GKqueueDirectoryMonitor))
+#define G_KQUEUE_DIRECTORY_MONITOR_CLASS(k) (G_TYPE_CHECK_CLASS_CAST ((k), G_TYPE_KQUEUE_DIRECTORY_MONITOR, GKqueueDirectoryMonitorClass))
+#define G_IS_KQUEUE_DIRECTORY_MONITOR(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), G_TYPE_KQUEUE_DIRECTORY_MONITOR))
#define G_IS_KQUEUE_DIRECTORY_MONITOR_CLASS(k) (G_TYPE_CHECK_CLASS_TYPE ((k), G_TYPE_KQUEUE_DIRECTORY_MONITOR))
typedef struct _GKqueueDirectoryMonitor GKqueueDirectoryMonitor;
View
@@ -8,116 +8,24 @@
#include <fcntl.h>
#include <unistd.h>
#include <pthread.h>
-#include <stdlib.h> /* TODO: {c,re}alloc. Probably should use smt from Glib instead? */
#include "kqueue-helper.h"
+#include "kqueue-thread.h"
static gboolean kh_debug_enabled = TRUE;
#define KH_W if (kh_debug_enabled) g_warning
G_GNUC_INTERNAL G_LOCK_DEFINE (kqueue_lock);
-static GSList *g_pick_up_fds = NULL;
-G_GNUC_INTERNAL G_LOCK_DEFINE (pick_up_lock);
-
static GHashTable *g_sub_hash = NULL;
G_GNUC_INTERNAL G_LOCK_DEFINE (hash_lock);
-const uint32_t KQUEUE_VNODE_FLAGS =
- NOTE_DELETE | NOTE_WRITE | NOTE_EXTEND | NOTE_ATTRIB | NOTE_LINK |
- NOTE_RENAME | NOTE_REVOKE;
-
/* TODO: Too many locks. Isn't it? */
-static int g_kqueue = -1;
+int g_kqueue = -1;
static int g_sockpair[] = {-1, -1};
static pthread_t g_kqueue_thread;
-struct kqueue_notification {
- int fd;
- uint32_t flags;
-};
-
-
-void*
-g_kqueue_thread_func (void *arg)
-{
- int fd;
- struct kevent *waiting;
- size_t kq_size = 1;
-
- /* TODO: A better memory allocation strategy */
- waiting = calloc (1, sizeof (struct kevent));
-
- fd = *(int *) arg;
-
- if (g_kqueue == -1)
- {
- KH_W ("fatal: kqueue is not initialized!\n");
- return NULL;
- }
-
- EV_SET (&waiting[0],
- fd,
- EVFILT_READ,
- EV_ADD | EV_ENABLE | EV_ONESHOT,
- NOTE_LOWAT,
- 1,
- 0);
-
- for (;;) {
- struct kevent received;
- int ret = kevent (g_kqueue, waiting, kq_size, &received, 1, NULL);
-
- if (ret == -1) {
- KH_W ("kevent failed\n");
- continue;
- }
-
- if (received.ident == fd)
- {
- char c;
- read (fd, &c, 1);
- if (c == 'A')
- {
- G_LOCK (pick_up_lock);
- if (g_pick_up_fds)
- {
- GSList *head = g_pick_up_fds;
- guint count = g_slist_length (g_pick_up_fds);
- waiting = realloc (waiting, count * sizeof (struct kevent));
- while (head)
- {
- struct kevent *pevent = &waiting[kq_size++];
- EV_SET (pevent,
- GPOINTER_TO_INT (head->data),
- EVFILT_VNODE,
- EV_ADD | EV_ENABLE | EV_ONESHOT,
- KQUEUE_VNODE_FLAGS,
- 0,
- 0);
- head = head->next;
- }
- g_slist_free (g_pick_up_fds);
- g_pick_up_fds = NULL;
- }
- G_UNLOCK (pick_up_lock);
- }
- }
- else if (!(received.fflags & EV_ERROR))
- {
- struct kqueue_notification kn;
- kn.fd = received.ident;
- kn.flags = received.fflags;
-
- write (fd, &kn, sizeof (struct kqueue_notification));
- }
- }
-
- return NULL;
-}
-
-
static GFileMonitorEvent
convert_kqueue_events_to_gio (uint32_t flags)
{
@@ -132,7 +40,6 @@ convert_kqueue_events_to_gio (uint32_t flags)
{NOTE_RENAME, G_FILE_MONITOR_EVENT_MOVED}
};
/* TODO: The following notifications should be emulated:
- * G_FILE_MONITOR_EVENT_CHANGES_DONE_HINT
* G_FILE_MONITOR_EVENT_CREATED
* G_FILE_MONITOR_EVENT_PRE_UNMOUNT
* G_FILE_MONITOR_EVENT_UNMOUNTED */
@@ -179,8 +86,7 @@ process_kqueue_notifications (GIOChannel *gioc,
child = g_file_new_for_path (sub->filename);
other = NULL; /* TODO: Do something. */
-
- mask = convert_kqueue_events_to_gio (n.flags);
+ mask = convert_kqueue_events_to_gio (n.flags);
g_file_monitor_emit_event (monitor, child, other, mask);
return TRUE;
@@ -231,7 +137,7 @@ _kh_startup (void)
result = (0 == pthread_create (&g_kqueue_thread,
NULL,
- g_kqueue_thread_func,
+ _kqueue_thread_func,
&g_sockpair[1]));
if (!result)
{
@@ -277,10 +183,8 @@ _kh_add_sub (kqueue_sub *sub)
g_hash_table_insert (g_sub_hash, GINT_TO_POINTER(sub->fd), sub);
G_UNLOCK (hash_lock);
- G_LOCK (pick_up_lock);
- g_pick_up_fds = g_slist_prepend (g_pick_up_fds, GINT_TO_POINTER (sub->fd));
- G_UNLOCK (pick_up_lock);
-
+ _kqueue_thread_push_fd (sub->fd);
+
/* Bump the kqueue thread. It will pick up a new sub entry */
write(g_sockpair[0], "A", 1);
return TRUE;
View
@@ -0,0 +1,128 @@
+#include "config.h"
+#include <sys/event.h>
+#include <sys/time.h>
+#include <unistd.h>
+#include <stdlib.h> /* TODO: {c,re}alloc. Probably should use smt from Glib instead? */
+#include <glib.h>
+
+#include "kqueue-thread.h"
+#include "kqueue-sub.h"
+
+
+static gboolean kt_debug_enabled = TRUE;
+#define KT_W if (kt_debug_enabled) g_warning
+
+static GSList *g_pick_up_fds = NULL;
+G_GNUC_INTERNAL G_LOCK_DEFINE (pick_up_lock);
+
+const uint32_t KQUEUE_VNODE_FLAGS =
+ NOTE_DELETE | NOTE_WRITE | NOTE_EXTEND | NOTE_ATTRIB | NOTE_LINK |
+ NOTE_RENAME | NOTE_REVOKE;
+
+/* TODO: Probably it would be better to pass it as a thread param? */
+extern int g_kqueue;
+
+
+static void
+_kqueue_thread_update_fds (struct kevent **events, size_t *kq_size)
+{
+ g_assert (events != NULL);
+ g_assert (*events != NULL);
+ g_assert (kq_size != NULL);
+
+ G_LOCK (pick_up_lock);
+ if (g_pick_up_fds)
+ {
+ GSList *head = g_pick_up_fds;
+ guint count = g_slist_length (g_pick_up_fds);
+ *events = realloc (*events, count * sizeof (struct kevent));
+ while (head)
+ {
+ struct kevent *pevent = &(*events)[(*kq_size)++];
+ EV_SET (pevent,
+ GPOINTER_TO_INT (head->data),
+ EVFILT_VNODE,
+ EV_ADD | EV_ENABLE | EV_ONESHOT,
+ KQUEUE_VNODE_FLAGS,
+ 0,
+ 0);
+ head = head->next;
+ }
+ g_slist_free (g_pick_up_fds);
+ g_pick_up_fds = NULL;
+ }
+ G_UNLOCK (pick_up_lock);
+}
+
+
+void*
+_kqueue_thread_func (void *arg)
+{
+ int fd;
+ struct kevent *waiting;
+ size_t kq_size = 1;
+
+ /* TODO: A better memory allocation strategy */
+ waiting = calloc (1, sizeof (struct kevent));
+
+ fd = *(int *) arg;
+
+ if (g_kqueue == -1)
+ {
+ KT_W ("fatal: kqueue is not initialized!\n");
+ return NULL;
+ }
+
+ EV_SET (&waiting[0],
+ fd,
+ EVFILT_READ,
+ EV_ADD | EV_ENABLE | EV_ONESHOT,
+ NOTE_LOWAT,
+ 1,
+ 0);
+
+ for (;;) {
+ /* TODO: Provide more items in the `eventlist' to kqueue(2).
+ * Currently the backend takes notifications from the kernel one
+ * by one, i.e. there will be a lot of system calls and context
+ * switches when the application will monitor a lot of files with
+ * high filesystem activity on each. */
+
+ struct kevent received;
+ int ret = kevent (g_kqueue, waiting, kq_size, &received, 1, NULL);
+
+ if (ret == -1) {
+ KT_W ("kevent failed\n");
+ continue;
+ }
+
+ if (received.ident == fd)
+ {
+ char c;
+ read (fd, &c, 1);
+ if (c == 'A')
+ {
+ _kqueue_thread_update_fds (&waiting, &kq_size);
+ }
+ }
+ else if (!(received.fflags & EV_ERROR))
+ {
+ struct kqueue_notification kn;
+ kn.fd = received.ident;
+ kn.flags = received.fflags;
+
+ write (fd, &kn, sizeof (struct kqueue_notification));
+ }
+ }
+
+ return NULL;
+}
+
+
+void
+_kqueue_thread_push_fd (int fd)
+{
+ G_LOCK (pick_up_lock);
+ g_pick_up_fds = g_slist_prepend (g_pick_up_fds, GINT_TO_POINTER (fd));
+ G_UNLOCK (pick_up_lock);
+}
View
@@ -0,0 +1,15 @@
+#ifndef __KQUEUE_THREAD_H
+#define __KQUEUE_THREAD_H
+
+
+/* TODO: Field comments */
+struct kqueue_notification {
+ int fd;
+ uint32_t flags;
+};
+
+
+void* _kqueue_thread_func (void *arg);
+void _kqueue_thread_push_fd (int fd);
+
+#endif /* __KQUEUE_SUB_H */

0 comments on commit f5dc91f

Please sign in to comment.