diff --git a/dbus/set.go b/dbus/set.go new file mode 100644 index 00000000..e36aaa03 --- /dev/null +++ b/dbus/set.go @@ -0,0 +1,22 @@ +package dbus + +type set struct { + data map[string]bool +} + +func (s *set) Add(value string) { + s.data[value] = true +} + +func (s *set) Contains(value string) (exists bool) { + _, exists = s.data[value] + return +} + +func (s *set) Length() (int) { + return len(s.data) +} + +func newSet() (*set) { + return &set{make(map[string] bool)} +} diff --git a/dbus/subscription.go b/dbus/subscription.go index b87e462e..f31ce494 100644 --- a/dbus/subscription.go +++ b/dbus/subscription.go @@ -93,14 +93,15 @@ func (c *Conn) initDispatch() { } // Returns two unbuffered channels which will receive all changed units every -// @interval@ seconds. Deleted units are sent as nil. +// interval. Deleted units are sent as nil. func (c *Conn) SubscribeUnits(interval time.Duration) (<-chan map[string]*UnitStatus, <-chan error) { - return c.SubscribeUnitsCustom(interval, 0, func(u1, u2 *UnitStatus) bool { return *u1 != *u2 }) + return c.SubscribeUnitsCustom(interval, 0, func(u1, u2 *UnitStatus) bool { return *u1 != *u2 }, nil) } // SubscribeUnitsCustom is like SubscribeUnits but lets you specify the buffer -// size of the channels and the comparison function for detecting changes. -func (c *Conn) SubscribeUnitsCustom(interval time.Duration, buffer int, isChanged func(*UnitStatus, *UnitStatus) bool) (<-chan map[string]*UnitStatus, <-chan error) { +// size of the channels, the comparison function for detecting changes and a filter +// function for cutting down on the noise that your channel receives. +func (c *Conn) SubscribeUnitsCustom(interval time.Duration, buffer int, isChanged func(*UnitStatus, *UnitStatus) bool, filterUnit func (string) bool) (<-chan map[string]*UnitStatus, <-chan error) { old := make(map[string]*UnitStatus) statusChan := make(chan map[string]*UnitStatus, buffer) errChan := make(chan error, buffer) @@ -113,6 +114,9 @@ func (c *Conn) SubscribeUnitsCustom(interval time.Duration, buffer int, isChange if err == nil { cur := make(map[string]*UnitStatus) for i := range units { + if filterUnit != nil && filterUnit(units[i].Name) { + continue + } cur[units[i].Name] = &units[i] } @@ -132,7 +136,9 @@ func (c *Conn) SubscribeUnitsCustom(interval time.Duration, buffer int, isChange old = cur - statusChan <- changed + if len(changed) != 0 { + statusChan <- changed + } } else { errChan <- err } diff --git a/dbus/subscription_set.go b/dbus/subscription_set.go new file mode 100644 index 00000000..26257860 --- /dev/null +++ b/dbus/subscription_set.go @@ -0,0 +1,32 @@ +package dbus + +import ( + "time" +) + +// SubscriptionSet returns a subscription set which is like conn.Subscribe but +// can filter to only return events for a set of units. +type SubscriptionSet struct { + *set + conn *Conn +} + + +func (s *SubscriptionSet) filter(unit string) bool { + return !s.Contains(unit) +} + +// Subscribe starts listening for dbus events for all of the units in the set. +// Returns channels identical to conn.SubscribeUnits. +func (s *SubscriptionSet) Subscribe() (<-chan map[string]*UnitStatus, <-chan error) { + // TODO: Make fully evented by using systemd 209 with properties changed values + return s.conn.SubscribeUnitsCustom(time.Second, 0, + func(u1, u2 *UnitStatus) bool { return *u1 != *u2 }, + func(unit string) bool { return s.filter(unit) }, + ) +} + +// NewSubscriptionSet returns a new subscription set. +func (conn *Conn) NewSubscriptionSet() (*SubscriptionSet) { + return &SubscriptionSet{newSet(), conn} +} diff --git a/dbus/subscription_set_test.go b/dbus/subscription_set_test.go new file mode 100644 index 00000000..db600850 --- /dev/null +++ b/dbus/subscription_set_test.go @@ -0,0 +1,67 @@ +package dbus + +import ( + "testing" + "time" +) + +// TestSubscribeUnit exercises the basics of subscription of a particular unit. +func TestSubscriptionSetUnit(t *testing.T) { + target := "subscribe-events-set.service" + + conn, err := New() + + if err != nil { + t.Fatal(err) + } + + err = conn.Subscribe() + if err != nil { + t.Fatal(err) + } + + subSet := conn.NewSubscriptionSet() + evChan, errChan := subSet.Subscribe() + + subSet.Add(target) + setupUnit(target, conn, t) + + job, err := conn.StartUnit(target, "replace") + if err != nil { + t.Fatal(err) + } + + if job != "done" { + t.Fatal("Couldn't start", target) + } + + timeout := make(chan bool, 1) + go func() { + time.Sleep(3 * time.Second) + close(timeout) + }() + + for { + select { + case changes := <-evChan: + tCh, ok := changes[target] + + if !ok { + t.Fatal("Unexpected event %v", changes) + } + + if tCh.ActiveState == "active" && tCh.Name == target { + goto success + } + case err = <-errChan: + t.Fatal(err) + case <-timeout: + t.Fatal("Reached timeout") + } + } + +success: + return +} + + diff --git a/fixtures/subscribe-events-set.service b/fixtures/subscribe-events-set.service new file mode 100644 index 00000000..a1f8c367 --- /dev/null +++ b/fixtures/subscribe-events-set.service @@ -0,0 +1,5 @@ +[Unit] +Description=start stop test + +[Service] +ExecStart=/bin/sleep 400