-
Notifications
You must be signed in to change notification settings - Fork 1
/
suisnail.go
83 lines (70 loc) · 1.79 KB
/
suisnail.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
//
// Suicidal Snail
//
package main
import (
zmq "github.com/pebbe/zmq3"
"fmt"
"log"
"math/rand"
"strconv"
"time"
)
// This is our subscriber. It connects to the publisher and subscribes to
// everything. It sleeps for a short time between messages to simulate doing
// too much work. If a message is more than 1 second late, it croaks:
const (
MAX_ALLOWED_DELAY = 1000 * time.Millisecond
)
func subscriber(pipe chan<- string) {
// Subscribe to everything
subscriber, _ := zmq.NewSocket(zmq.SUB)
subscriber.SetSubscribe("")
subscriber.Connect("tcp://localhost:5556")
defer subscriber.Close()
// Get and process messages
for {
msg, _ := subscriber.RecvMessage(0)
i, _ := strconv.Atoi(msg[0])
clock := time.Unix(int64(i), 0)
fmt.Println(clock)
// Suicide snail logic
if time.Now().After(clock.Add(MAX_ALLOWED_DELAY)) {
log.Println("E: subscriber cannot keep up, aborting")
break
}
// Work for 1 msec plus some random additional time
time.Sleep(time.Duration(1 + rand.Intn(2)))
}
pipe <- "gone and died"
}
// This is our publisher task. It publishes a time-stamped message to its
// PUB socket every 1 msec:
func publisher(pipe <-chan string) {
// Prepare publisher
publisher, _ := zmq.NewSocket(zmq.PUB)
publisher.Bind("tcp://*:5556")
defer publisher.Close()
LOOP:
for {
// Send current clock (msecs) to subscribers
publisher.SendMessage(time.Now().Unix())
select {
case <-pipe:
break LOOP
default:
}
time.Sleep(time.Millisecond)
}
}
// The main task simply starts a client, and a server, and then
// waits for the client to signal that it has died:
func main() {
pubpipe := make(chan string)
subpipe := make(chan string)
go publisher(pubpipe)
go subscriber(subpipe)
<-subpipe
pubpipe <- "break"
time.Sleep(100 * time.Millisecond)
}