Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

psnotify - process event notifications

Change-Id: I79a1265b726f09fe2d404e1e19ae14c930305892
  • Loading branch information...
commit 845272a4ce1ee51f9f3757590aec56766497924a 1 parent 35d6c1b
@dougm dougm authored
View
9 NOTICE
@@ -0,0 +1,9 @@
+Copyright (c) [2009-2011] VMware, Inc. All Rights Reserved.
+
+This product is licensed to you under the Apache License, Version 2.0 (the "License").
+You may not use this product except in compliance with the License.
+
+This product includes a number of subcomponents with
+separate copyright notices and license terms. Your use of these
+subcomponents is subject to the terms and conditions of the
+subcomponent's license, as noted in the LICENSE file.
View
50 psnotify/README.md
@@ -0,0 +1,50 @@
+# Process notifications for Go
+
+## Overview
+
+The psnotify package captures process events from the kernel via
+kqueue on Darwin/BSD and the netlink connector on Linux.
+
+The psnotify API is similar to the
+[fsnotify](https://github.com/howeyc/fsnotify) package.
+
+Example:
+```go
+ watcher, err := psnotify.NewWatcher()
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ // Process events
+ go func() {
+ for {
+ select {
+ case ev := <-watcher.Fork:
+ log.Println("fork event:", ev)
+ case ev := <-watcher.Exec:
+ log.Println("exec event:", ev)
+ case ev := <-watcher.Exit:
+ log.Println("exit event:", ev)
+ case err := <-watcher.Error:
+ log.Println("error:", err)
+ }
+ }
+ }()
+
+ err = watcher.Watch(os.Getpid(), psnotify.PROC_EVENT_ALL)
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ /* ... do stuff ... */
+ watcher.Close()
+```
+
+## Supported platforms
+
+Currently targeting modern flavors of Darwin and Linux.
+Should work on BSD, but untested.
+
+## License
+
+Apache 2.0
View
136 psnotify/psnotify.go
@@ -0,0 +1,136 @@
+// Copyright (c) 2012 VMware, Inc.
+
+package psnotify
+
+import (
+ "errors"
+ "fmt"
+)
+
+type ProcEventFork struct {
+ ParentPid int // Pid of the process that called fork()
+ ChildPid int // Child process pid created by fork()
+}
+
+type ProcEventExec struct {
+ Pid int // Pid of the process that called exec()
+}
+
+type ProcEventExit struct {
+ Pid int // Pid of the process that called exit()
+}
+
+type watch struct {
+ flags uint32 // Saved value of Watch() flags param
+}
+
+type eventListener interface {
+ close() error // Watch.Close() closes the OS specific listener
+}
+
+type Watcher struct {
+ listener eventListener // OS specifics (kqueue or netlink)
+ watches map[int]*watch // Map of watched process ids
+ Error chan error // Errors are sent on this channel
+ Fork chan *ProcEventFork // Fork events are sent on this channel
+ Exec chan *ProcEventExec // Exec events are sent on this channel
+ Exit chan *ProcEventExit // Exit events are sent on this channel
+ done chan bool // Used to stop the readEvents() goroutine
+ isClosed bool // Set to true when Close() is first called
+}
+
+// Initialize event listener and channels
+func NewWatcher() (*Watcher, error) {
+ listener, err := createListener()
+
+ if err != nil {
+ return nil, err
+ }
+
+ w := &Watcher{
+ listener: listener,
+ watches: make(map[int]*watch),
+ Fork: make(chan *ProcEventFork),
+ Exec: make(chan *ProcEventExec),
+ Exit: make(chan *ProcEventExit),
+ Error: make(chan error),
+ done: make(chan bool, 1),
+ }
+
+ go w.readEvents()
+ return w, nil
+}
+
+// Close event channels when done message is received
+func (w *Watcher) finish() {
+ close(w.Fork)
+ close(w.Exec)
+ close(w.Exit)
+ close(w.Error)
+}
+
+// Closes the OS specific event listener,
+// removes all watches and closes all event channels.
+func (w *Watcher) Close() error {
+ if w.isClosed {
+ return nil
+ }
+ w.isClosed = true
+
+ for pid := range w.watches {
+ w.RemoveWatch(pid)
+ }
+
+ w.done <- true
+
+ w.listener.close()
+
+ return nil
+}
+
+// Add pid to the watched process set.
+// The flags param is a bitmask of process events to capture,
+// must be one or more of: PROC_EVENT_FORK, PROC_EVENT_EXEC, PROC_EVENT_EXIT
+func (w *Watcher) Watch(pid int, flags uint32) error {
+ if w.isClosed {
+ return errors.New("psnotify watcher is closed")
+ }
+
+ watchEntry, found := w.watches[pid]
+
+ if found {
+ watchEntry.flags |= flags
+ } else {
+ if err := w.register(pid, flags); err != nil {
+ return err
+ }
+ w.watches[pid] = &watch{flags: flags}
+ }
+
+ return nil
+}
+
+// Remove pid from the watched process set.
+func (w *Watcher) RemoveWatch(pid int) error {
+ _, ok := w.watches[pid]
+ if !ok {
+ msg := fmt.Sprintf("watch for pid=%d does not exist", pid)
+ return errors.New(msg)
+ }
+ delete(w.watches, pid)
+ return w.unregister(pid)
+}
+
+// Internal helper to check if there is a message on the "done" channel.
+// The "done" message is sent by the Close() method; when received here,
+// the Watcher.finish method is called to close all channels and return
+// true - in which case the caller should break from the readEvents loop.
+func (w *Watcher) isDone() bool {
+ var done bool
+ select {
+ case done = <-w.done:
+ w.finish()
+ default:
+ }
+ return done
+}
View
93 psnotify/psnotify_bsd.go
@@ -0,0 +1,93 @@
+// Copyright (c) 2012 VMware, Inc.
+
+// +build darwin freebsd netbsd openbsd
+
+// Go interface to BSD kqueue process events.
+package psnotify
+
+import (
+ "syscall"
+)
+
+const (
+ // Flags (from <sys/event.h>)
+ PROC_EVENT_FORK = syscall.NOTE_FORK // fork() events
+ PROC_EVENT_EXEC = syscall.NOTE_EXEC // exec() events
+ PROC_EVENT_EXIT = syscall.NOTE_EXIT // exit() events
+
+ // Watch for all process events
+ PROC_EVENT_ALL = PROC_EVENT_FORK | PROC_EVENT_EXEC | PROC_EVENT_EXIT
+)
+
+type kqueueListener struct {
+ kq int // The syscall.Kqueue() file descriptor
+ buf [1]syscall.Kevent_t // An event buffer for Add/Remove watch
+}
+
+// Initialize bsd implementation of the eventListener interface
+func createListener() (eventListener, error) {
+ listener := &kqueueListener{}
+ kq, err := syscall.Kqueue()
+ listener.kq = kq
+ return listener, err
+}
+
+// Initialize Kevent_t fields and propagate changelist for the given pid
+func (w *Watcher) kevent(pid int, fflags uint32, flags int) error {
+ listener, _ := w.listener.(*kqueueListener)
+ event := &listener.buf[0]
+
+ syscall.SetKevent(event, pid, syscall.EVFILT_PROC, flags)
+ event.Fflags = fflags
+
+ _, err := syscall.Kevent(listener.kq, listener.buf[:], nil, nil)
+
+ return err
+}
+
+// Delete filter for given pid from the queue
+func (w *Watcher) unregister(pid int) error {
+ return w.kevent(pid, 0, syscall.EV_DELETE)
+}
+
+// Add and enable filter for given pid in the queue
+func (w *Watcher) register(pid int, flags uint32) error {
+ return w.kevent(pid, flags, syscall.EV_ADD|syscall.EV_ENABLE)
+}
+
+// Poll the kqueue file descriptor and dispatch to the Event channels
+func (w *Watcher) readEvents() {
+ listener, _ := w.listener.(*kqueueListener)
+ events := make([]syscall.Kevent_t, 10)
+
+ for {
+ if w.isDone() {
+ return
+ }
+
+ n, err := syscall.Kevent(listener.kq, nil, events, nil)
+ if err != nil {
+ w.Error <- err
+ continue
+ }
+
+ for _, ev := range events[:n] {
+ pid := int(ev.Ident)
+
+ switch ev.Fflags {
+ case syscall.NOTE_FORK:
+ w.Fork <- &ProcEventFork{ParentPid: pid}
+ case syscall.NOTE_EXEC:
+ w.Exec <- &ProcEventExec{Pid: pid}
+ case syscall.NOTE_EXIT:
+ w.RemoveWatch(pid)
+ w.Exit <- &ProcEventExit{Pid: pid}
+ }
+ }
+ }
+}
+
+// Close our kqueue file descriptor; deletes any remaining filters
+func (listener *kqueueListener) close() error {
+ return syscall.Close(listener.kq)
+}
View
254 psnotify/psnotify_linux.go
@@ -0,0 +1,254 @@
+// Copyright (c) 2012 VMware, Inc.
+
+// Go interface to the Linux netlink process connector.
+// See Documentation/connector/connector.txt in the linux kernel source tree.
+package psnotify
+
+import (
+ "bytes"
+ "encoding/binary"
+ "os"
+ "syscall"
+)
+
+const (
+ // internal flags (from <linux/connector.h>)
+ _CN_IDX_PROC = 0x1
+ _CN_VAL_PROC = 0x1
+
+ // internal flags (from <linux/cn_proc.h>)
+ _PROC_CN_MCAST_LISTEN = 1
+ _PROC_CN_MCAST_IGNORE = 2
+
+ // Flags (from <linux/cn_proc.h>)
+ PROC_EVENT_FORK = 0x00000001 // fork() events
+ PROC_EVENT_EXEC = 0x00000002 // exec() events
+ PROC_EVENT_EXIT = 0x80000000 // exit() events
+
+ // Watch for all process events
+ PROC_EVENT_ALL = PROC_EVENT_FORK | PROC_EVENT_EXEC | PROC_EVENT_EXIT
+)
+
+var (
+ byteOrder = binary.LittleEndian
+)
+
+// linux/connector.h: struct cb_id
+type cbId struct {
+ Idx uint32
+ Val uint32
+}
+
+// linux/connector.h: struct cb_msg
+type cnMsg struct {
+ Id cbId
+ Seq uint32
+ Ack uint32
+ Len uint16
+ Flags uint16
+}
+
+// linux/cn_proc.h: struct proc_event.{what,cpu,timestamp_ns}
+type procEventHeader struct {
+ What uint32
+ Cpu uint32
+ Timestamp uint64
+}
+
+// linux/cn_proc.h: struct proc_event.fork
+type forkProcEvent struct {
+ ParentPid uint32
+ ParentTgid uint32
+ ChildPid uint32
+ ChildTgid uint32
+}
+
+// linux/cn_proc.h: struct proc_event.exec
+type execProcEvent struct {
+ ProcessPid uint32
+ ProcessTgid uint32
+}
+
+// linux/cn_proc.h: struct proc_event.exit
+type exitProcEvent struct {
+ ProcessPid uint32
+ ProcessTgid uint32
+ ExitCode uint32
+ ExitSignal uint32
+}
+
+// standard netlink header + connector header
+type netlinkProcMessage struct {
+ Header syscall.NlMsghdr
+ Data cnMsg
+}
+
+type netlinkListener struct {
+ addr *syscall.SockaddrNetlink // Netlink socket address
+ sock int // The syscall.Socket() file descriptor
+ seq uint32 // struct cn_msg.seq
+}
+
+// Initialize linux implementation of the eventListener interface
+func createListener() (eventListener, error) {
+ listener := &netlinkListener{}
+ err := listener.bind()
+ return listener, err
+}
+
+// noop on linux
+func (w *Watcher) unregister(pid int) error {
+ return nil
+}
+
+// noop on linux
+func (w *Watcher) register(pid int, flags uint32) error {
+ return nil
+}
+
+// Read events from the netlink socket
+func (w *Watcher) readEvents() {
+ buf := make([]byte, syscall.Getpagesize())
+
+ listener, _ := w.listener.(*netlinkListener)
+
+ for {
+ if w.isDone() {
+ return
+ }
+
+ nr, _, err := syscall.Recvfrom(listener.sock, buf, 0)
+
+ if err != nil {
+ w.Error <- err
+ continue
+ }
+ if nr < syscall.NLMSG_HDRLEN {
+ w.Error <- syscall.EINVAL
+ continue
+ }
+
+ msgs, _ := syscall.ParseNetlinkMessage(buf[:nr])
+
+ for _, m := range msgs {
+ if m.Header.Type == syscall.NLMSG_DONE {
+ w.handleEvent(m.Data)
+ }
+ }
+ }
+}
+
+// Internal helper to check if pid && event is being watched
+func (w *Watcher) isWatching(pid int, event uint32) bool {
+ if watch, ok := w.watches[pid]; ok {
+ return (watch.flags & event) == event
+ }
+ return false
+}
+
+// Dispatch events from the netlink socket to the Event channels.
+// Unlike bsd kqueue, netlink receives events for all pids,
+// so we apply filtering based on the watch table via isWatching()
+func (w *Watcher) handleEvent(data []byte) {
+ buf := bytes.NewBuffer(data)
+ msg := &cnMsg{}
+ hdr := &procEventHeader{}
+
+ binary.Read(buf, byteOrder, msg)
+ binary.Read(buf, byteOrder, hdr)
+
+ switch hdr.What {
+ case PROC_EVENT_FORK:
+ event := &forkProcEvent{}
+ binary.Read(buf, byteOrder, event)
+ ppid := int(event.ParentTgid)
+ pid := int(event.ChildTgid)
+
+ if w.isWatching(ppid, PROC_EVENT_EXEC) {
+ // follow forks
+ watch, _ := w.watches[ppid]
+ w.Watch(pid, watch.flags)
+ }
+
+ if w.isWatching(ppid, PROC_EVENT_FORK) {
+ w.Fork <- &ProcEventFork{ParentPid: ppid, ChildPid: pid}
+ }
+ case PROC_EVENT_EXEC:
+ event := &execProcEvent{}
+ binary.Read(buf, byteOrder, event)
+ pid := int(event.ProcessTgid)
+
+ if w.isWatching(pid, PROC_EVENT_EXEC) {
+ w.Exec <- &ProcEventExec{Pid: pid}
+ }
+ case PROC_EVENT_EXIT:
+ event := &exitProcEvent{}
+ binary.Read(buf, byteOrder, event)
+ pid := int(event.ProcessTgid)
+
+ if w.isWatching(pid, PROC_EVENT_EXIT) {
+ w.RemoveWatch(pid)
+ w.Exit <- &ProcEventExit{Pid: pid}
+ }
+ }
+}
+
+// Bind our netlink socket and
+// send a listen control message to the connector driver.
+func (listener *netlinkListener) bind() error {
+ sock, err := syscall.Socket(
+ syscall.AF_NETLINK,
+ syscall.SOCK_DGRAM,
+ syscall.NETLINK_CONNECTOR)
+
+ if err != nil {
+ return err
+ }
+
+ listener.sock = sock
+ listener.addr = &syscall.SockaddrNetlink{
+ Family: syscall.AF_NETLINK,
+ Groups: _CN_IDX_PROC,
+ Pid: uint32(os.Getpid()),
+ }
+
+ err = syscall.Bind(listener.sock, listener.addr)
+
+ if err != nil {
+ return err
+ }
+
+ return listener.send(_PROC_CN_MCAST_LISTEN)
+}
+
+// Send an ignore control message to the connector driver
+// and close our netlink socket.
+func (listener *netlinkListener) close() error {
+ err := listener.send(_PROC_CN_MCAST_IGNORE)
+ syscall.Close(listener.sock)
+ return err
+}
+
+// Generic method for sending control messages to the connector
+// driver; where op is one of PROC_CN_MCAST_{LISTEN,IGNORE}
+func (listener *netlinkListener) send(op uint32) error {
+ listener.seq++
+ pr := &netlinkProcMessage{}
+ plen := binary.Size(pr.Data) + binary.Size(op)
+ pr.Header.Len = syscall.NLMSG_HDRLEN + uint32(plen)
+ pr.Header.Type = uint16(syscall.NLMSG_DONE)
+ pr.Header.Flags = 0
+ pr.Header.Seq = listener.seq
+ pr.Header.Pid = uint32(os.Getpid())
+
+ pr.Data.Id.Idx = _CN_IDX_PROC
+ pr.Data.Id.Val = _CN_VAL_PROC
+
+ pr.Data.Len = uint16(binary.Size(op))
+
+ buf := bytes.NewBuffer(make([]byte, pr.Header.Len))
+ binary.Write(buf, byteOrder, pr)
+ binary.Write(buf, byteOrder, op)
+
+ return syscall.Sendto(listener.sock, buf.Bytes(), 0, listener.addr)
+}
View
283 psnotify/psnotify_test.go
@@ -0,0 +1,283 @@
+// Copyright (c) 2012 VMware, Inc.
+
+package psnotify
+
+import (
+ "fmt"
+ "os"
+ "os/exec"
+ "runtime"
+ "syscall"
+ "testing"
+ "time"
+)
+
+type anyEvent struct {
+ exits []int
+ forks []int
+ execs []int
+ errors []error
+ done chan bool
+}
+
+type testWatcher struct {
+ t *testing.T
+ watcher *Watcher
+ events *anyEvent
+}
+
+// General purpose Watcher wrapper for all tests
+func newTestWatcher(t *testing.T) *testWatcher {
+ watcher, err := NewWatcher()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ events := &anyEvent{
+ done: make(chan bool, 1),
+ }
+
+ tw := &testWatcher{
+ t: t,
+ watcher: watcher,
+ events: events,
+ }
+
+ go func() {
+ for {
+ select {
+ case <-events.done:
+ return
+ case ev := <-watcher.Fork:
+ events.forks = append(events.forks, ev.ParentPid)
+ case ev := <-watcher.Exec:
+ events.execs = append(events.execs, ev.Pid)
+ case ev := <-watcher.Exit:
+ events.exits = append(events.exits, ev.Pid)
+ case err := <-watcher.Error:
+ events.errors = append(events.errors, err)
+ }
+ }
+ }()
+
+ return tw
+}
+
+func (tw *testWatcher) close() {
+ pause := 100 * time.Millisecond
+ time.Sleep(pause)
+
+ tw.events.done <- true
+
+ tw.watcher.Close()
+
+ time.Sleep(pause)
+}
+
+func skipTest(t *testing.T) bool {
+ if runtime.GOOS == "linux" && os.Getuid() != 0 {
+ fmt.Println("SKIP: test must be run as root on linux")
+ return true
+ }
+ return false
+}
+
+func startSleepCommand(t *testing.T) *exec.Cmd {
+ cmd := exec.Command("sh", "-c", "sleep 100")
+ if err := cmd.Start(); err != nil {
+ t.Error(err)
+ }
+ return cmd
+}
+
+func runCommand(t *testing.T, name string) *exec.Cmd {
+ cmd := exec.Command(name)
+ if err := cmd.Run(); err != nil {
+ t.Error(err)
+ }
+ return cmd
+}
+
+func expectEvents(t *testing.T, num int, name string, pids []int) bool {
+ if len(pids) != num {
+ t.Errorf("Expected %d %s events, got=%v", num, name, pids)
+ return false
+ }
+ return true
+}
+
+func expectEventPid(t *testing.T, name string, expect int, pid int) bool {
+ if expect != pid {
+ t.Errorf("Expected %s pid=%d, received=%d", name, expect, pid)
+ return false
+ }
+ return true
+}
+
+func TestWatchFork(t *testing.T) {
+ if skipTest(t) {
+ return
+ }
+
+ pid := os.Getpid()
+
+ tw := newTestWatcher(t)
+
+ // no watches added yet, so this fork event will no be captured
+ runCommand(t, "date")
+
+ // watch fork events for this process
+ if err := tw.watcher.Watch(pid, PROC_EVENT_FORK); err != nil {
+ t.Error(err)
+ }
+
+ // this fork event will be captured,
+ // the exec and exit events will not be captured
+ runCommand(t, "cal")
+
+ tw.close()
+
+ if expectEvents(t, 1, "forks", tw.events.forks) {
+ expectEventPid(t, "fork", pid, tw.events.forks[0])
+ }
+
+ expectEvents(t, 0, "execs", tw.events.execs)
+ expectEvents(t, 0, "exits", tw.events.exits)
+}
+
+func TestWatchExit(t *testing.T) {
+ if skipTest(t) {
+ return
+ }
+
+ tw := newTestWatcher(t)
+
+ cmd := startSleepCommand(t)
+
+ childPid := cmd.Process.Pid
+
+ // watch for exit event of our child process
+ if err := tw.watcher.Watch(childPid, PROC_EVENT_EXIT); err != nil {
+ t.Error(err)
+ }
+
+ // kill our child process, triggers exit event
+ syscall.Kill(childPid, syscall.SIGTERM)
+
+ cmd.Wait()
+
+ tw.close()
+
+ expectEvents(t, 0, "forks", tw.events.forks)
+
+ expectEvents(t, 0, "execs", tw.events.execs)
+
+ if expectEvents(t, 1, "exits", tw.events.exits) {
+ expectEventPid(t, "exit", childPid, tw.events.exits[0])
+ }
+}
+
+// combined version of TestWatchFork() and TestWatchExit()
+func TestWatchForkAndExit(t *testing.T) {
+ if skipTest(t) {
+ return
+ }
+
+ pid := os.Getpid()
+
+ tw := newTestWatcher(t)
+
+ if err := tw.watcher.Watch(pid, PROC_EVENT_FORK); err != nil {
+ t.Error(err)
+ }
+
+ cmd := startSleepCommand(t)
+
+ childPid := cmd.Process.Pid
+
+ if err := tw.watcher.Watch(childPid, PROC_EVENT_EXIT); err != nil {
+ t.Error(err)
+ }
+
+ syscall.Kill(childPid, syscall.SIGTERM)
+
+ cmd.Wait()
+
+ tw.close()
+
+ if expectEvents(t, 1, "forks", tw.events.forks) {
+ expectEventPid(t, "fork", pid, tw.events.forks[0])
+ }
+
+ expectEvents(t, 0, "execs", tw.events.execs)
+
+ if expectEvents(t, 1, "exits", tw.events.exits) {
+ expectEventPid(t, "exit", childPid, tw.events.exits[0])
+ }
+}
+
+func TestWatchFollowFork(t *testing.T) {
+ if skipTest(t) {
+ return
+ }
+
+ // Darwin is not able to follow forks, as the kqueue fork event
+ // does not provide the child pid.
+ if runtime.GOOS != "linux" {
+ fmt.Println("SKIP: test follow forks is linux only")
+ return
+ }
+
+ pid := os.Getpid()
+
+ tw := newTestWatcher(t)
+
+ // watch for all process events related to this process
+ if err := tw.watcher.Watch(pid, PROC_EVENT_ALL); err != nil {
+ t.Error(err)
+ }
+
+ commands := []string{"date", "cal"}
+ childPids := make([]int, len(commands))
+
+ // triggers fork/exec/exit events for each command
+ for i, name := range commands {
+ cmd := runCommand(t, name)
+ childPids[i] = cmd.Process.Pid
+ }
+
+ // remove watch for this process
+ tw.watcher.RemoveWatch(pid)
+
+ // run commands again to make sure we don't receive any unwanted events
+ for _, name := range commands {
+ runCommand(t, name)
+ }
+
+ tw.close()
+
+ // run commands again to make sure nothing panics after
+ // closing the watcher
+ for _, name := range commands {
+ runCommand(t, name)
+ }
+
+ num := len(commands)
+ if expectEvents(t, num, "forks", tw.events.forks) {
+ for _, epid := range tw.events.forks {
+ expectEventPid(t, "fork", pid, epid)
+ }
+ }
+
+ if expectEvents(t, num, "execs", tw.events.execs) {
+ for i, epid := range tw.events.execs {
+ expectEventPid(t, "exec", childPids[i], epid)
+ }
+ }
+
+ if expectEvents(t, num, "exits", tw.events.exits) {
+ for i, epid := range tw.events.exits {
+ expectEventPid(t, "exit", childPids[i], epid)
+ }
+ }
+}
Please sign in to comment.
Something went wrong with that request. Please try again.