Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions dbus/set.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package dbus

type set struct {
data map[string]bool
}

func (s *set) Add(value string) {
s.data[value] = true
}

func (s *set) Contains(value string) (exists bool) {
_, exists = s.data[value]
return
}

func (s *set) Length() (int) {
return len(s.data)
}

func newSet() (*set) {
return &set{make(map[string] bool)}
}
16 changes: 11 additions & 5 deletions dbus/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,15 @@ func (c *Conn) initDispatch() {
}

// Returns two unbuffered channels which will receive all changed units every
// @interval@ seconds. Deleted units are sent as nil.
// interval. Deleted units are sent as nil.
func (c *Conn) SubscribeUnits(interval time.Duration) (<-chan map[string]*UnitStatus, <-chan error) {
return c.SubscribeUnitsCustom(interval, 0, func(u1, u2 *UnitStatus) bool { return *u1 != *u2 })
return c.SubscribeUnitsCustom(interval, 0, func(u1, u2 *UnitStatus) bool { return *u1 != *u2 }, nil)
}

// SubscribeUnitsCustom is like SubscribeUnits but lets you specify the buffer
// size of the channels and the comparison function for detecting changes.
func (c *Conn) SubscribeUnitsCustom(interval time.Duration, buffer int, isChanged func(*UnitStatus, *UnitStatus) bool) (<-chan map[string]*UnitStatus, <-chan error) {
// size of the channels, the comparison function for detecting changes and a filter
// function for cutting down on the noise that your channel receives.
func (c *Conn) SubscribeUnitsCustom(interval time.Duration, buffer int, isChanged func(*UnitStatus, *UnitStatus) bool, filterUnit func (string) bool) (<-chan map[string]*UnitStatus, <-chan error) {
old := make(map[string]*UnitStatus)
statusChan := make(chan map[string]*UnitStatus, buffer)
errChan := make(chan error, buffer)
Expand All @@ -113,6 +114,9 @@ func (c *Conn) SubscribeUnitsCustom(interval time.Duration, buffer int, isChange
if err == nil {
cur := make(map[string]*UnitStatus)
for i := range units {
if filterUnit != nil && filterUnit(units[i].Name) {
continue
}
cur[units[i].Name] = &units[i]
}

Expand All @@ -132,7 +136,9 @@ func (c *Conn) SubscribeUnitsCustom(interval time.Duration, buffer int, isChange

old = cur

statusChan <- changed
if len(changed) != 0 {
statusChan <- changed
}
} else {
errChan <- err
}
Expand Down
32 changes: 32 additions & 0 deletions dbus/subscription_set.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package dbus

import (
"time"
)

// SubscriptionSet returns a subscription set which is like conn.Subscribe but
// can filter to only return events for a set of units.
type SubscriptionSet struct {
*set
conn *Conn
}


func (s *SubscriptionSet) filter(unit string) bool {
return !s.Contains(unit)
}

// Subscribe starts listening for dbus events for all of the units in the set.
// Returns channels identical to conn.SubscribeUnits.
func (s *SubscriptionSet) Subscribe() (<-chan map[string]*UnitStatus, <-chan error) {
// TODO: Make fully evented by using systemd 209 with properties changed values
return s.conn.SubscribeUnitsCustom(time.Second, 0,
func(u1, u2 *UnitStatus) bool { return *u1 != *u2 },
func(unit string) bool { return s.filter(unit) },
)
}

// NewSubscriptionSet returns a new subscription set.
func (conn *Conn) NewSubscriptionSet() (*SubscriptionSet) {
return &SubscriptionSet{newSet(), conn}
}
67 changes: 67 additions & 0 deletions dbus/subscription_set_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package dbus

import (
"testing"
"time"
)

// TestSubscribeUnit exercises the basics of subscription of a particular unit.
func TestSubscriptionSetUnit(t *testing.T) {
target := "subscribe-events-set.service"

conn, err := New()

if err != nil {
t.Fatal(err)
}

err = conn.Subscribe()
if err != nil {
t.Fatal(err)
}

subSet := conn.NewSubscriptionSet()
evChan, errChan := subSet.Subscribe()

subSet.Add(target)
setupUnit(target, conn, t)

job, err := conn.StartUnit(target, "replace")
if err != nil {
t.Fatal(err)
}

if job != "done" {
t.Fatal("Couldn't start", target)
}

timeout := make(chan bool, 1)
go func() {
time.Sleep(3 * time.Second)
close(timeout)
}()

for {
select {
case changes := <-evChan:
tCh, ok := changes[target]

if !ok {
t.Fatal("Unexpected event %v", changes)
}

if tCh.ActiveState == "active" && tCh.Name == target {
goto success
}
case err = <-errChan:
t.Fatal(err)
case <-timeout:
t.Fatal("Reached timeout")
}
}

success:
return
}


5 changes: 5 additions & 0 deletions fixtures/subscribe-events-set.service
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[Unit]
Description=start stop test

[Service]
ExecStart=/bin/sleep 400