Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Update Go examples to build like the others.

Fixes #305.

The gozmq package API was updated in a way that was necessarily
backwards incompatible: Poll must receive a time.Duration instead of an
int64.  This commit updates the examples to use the new API.

Other example directories have build scripts and require no spelunking
through code and/or build errors to get things working.  This commit
also updates the Go examples to build so simply.
  • Loading branch information...
commit dce99394642d398f5e6c68a330a447b3b2abd271 1 parent 97fd74a
@jtacoma jtacoma authored
View
13 examples/Go/README
@@ -1,11 +1,6 @@
-Examples in Go
-See LICENSE in examples directory
-
-
-To run the examples:
+Examples in Go
- go run hwserver.go # etc
+Use 'build all' to build all examples
+Use 'build hwserver' to build just hwserver (or any example)
-Or multiple files in some cases:
-
- go run mdp.go zhelpers.go mdbroker.go [-v] # etc
+See LICENSE in examples directory
View
22 examples/Go/build
@@ -0,0 +1,22 @@
+#! /bin/sh
+#
+# Examples build helper
+# Syntax: build all | clean
+#
+if [ /$1/ = /all/ ]; then
+ echo "Building Go examples..."
+ for MAIN in `egrep -l "^func main\(" *.go`; do
+ echo "$MAIN"
+ go build $MAIN `egrep -L "^func main\(" *.go`
+ done
+elif [ /$1/ = /clean/ ]; then
+ echo "Cleaning Go examples directory..."
+ for MAIN in `egrep -l "^func main\(" *.go`; do
+ rm -f `basename $MAIN .go`
+ done
+elif [ -f $1.go ]; then
+ echo "$1"
+ go build $1.go `egrep -L "^func main\(" *.go`
+else
+ echo "syntax: build all | clean"
+fi
View
23 examples/Go/mdbroker.go
@@ -20,7 +20,6 @@ import (
const (
INTERNAL_SERVICE_PREFIX = "mmi."
- HEARTBEAT_LIVENESS = 3
HEARTBEAT_INTERVAL = 2500 * time.Millisecond
HEARTBEAT_EXPIRY = HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS
)
@@ -30,7 +29,7 @@ type Broker interface {
Run()
}
-type mdWorker struct {
+type mdbWorker struct {
identity string // Hex Identity of worker
address []byte // Address to route to
expiry time.Time // Expires at this point, unless heartbeat
@@ -50,7 +49,7 @@ type mdBroker struct {
services map[string]*mdService // Known services
socket zmq.Socket // Socket for clients & workers
waiting *ZList // Idle workers
- workers map[string]*mdWorker // Known workers
+ workers map[string]*mdbWorker // Known workers
verbose bool // Print activity to stdout
}
@@ -66,13 +65,13 @@ func NewBroker(endpoint string, verbose bool) Broker {
services: make(map[string]*mdService),
socket: socket,
waiting: NewList(),
- workers: make(map[string]*mdWorker),
+ workers: make(map[string]*mdbWorker),
verbose: verbose,
}
}
// Deletes worker from all data structures, and deletes worker.
-func (self *mdBroker) deleteWorker(worker *mdWorker, disconnect bool) {
+func (self *mdBroker) deleteWorker(worker *mdbWorker, disconnect bool) {
if worker == nil {
panic("Nil worker")
}
@@ -102,7 +101,7 @@ func (self *mdBroker) dispatch(service *mdService, msg [][]byte) {
msg, service.requests = service.requests[0], service.requests[1:]
elem := service.waiting.Pop()
self.waiting.Remove(elem)
- worker, _ := elem.Value.(*mdWorker)
+ worker, _ := elem.Value.(*mdbWorker)
self.sendToWorker(worker, MDPW_REQUEST, nil, msg)
}
}
@@ -134,7 +133,7 @@ func (self *mdBroker) processWorker(sender []byte, msg [][]byte) {
identity := hex.EncodeToString(sender)
worker, workerReady := self.workers[identity]
if !workerReady {
- worker = &mdWorker{
+ worker = &mdbWorker{
identity: identity,
address: sender,
expiry: time.Now().Add(HEARTBEAT_EXPIRY),
@@ -190,7 +189,7 @@ func (self *mdBroker) processWorker(sender []byte, msg [][]byte) {
func (self *mdBroker) purgeWorkers() {
now := time.Now()
for elem := self.waiting.Front(); elem != nil; elem = self.waiting.Front() {
- worker, _ := elem.Value.(*mdWorker)
+ worker, _ := elem.Value.(*mdbWorker)
if worker.expiry.After(now) {
break
}
@@ -216,7 +215,7 @@ func (self *mdBroker) requireService(name string) *mdService {
// Send message to worker.
// If message is provided, sends that message.
-func (self *mdBroker) sendToWorker(worker *mdWorker, command string, option []byte, msg [][]byte) {
+func (self *mdBroker) sendToWorker(worker *mdbWorker, command string, option []byte, msg [][]byte) {
// Stack routing and protocol envelopes to start of message and routing envelope
if len(option) > 0 {
msg = append([][]byte{option}, msg...)
@@ -248,7 +247,7 @@ func (self *mdBroker) serviceInternal(service []byte, msg [][]byte) {
}
// This worker is now waiting for work.
-func (self *mdBroker) workerWaiting(worker *mdWorker) {
+func (self *mdBroker) workerWaiting(worker *mdbWorker) {
// Queue to broker and service waiting lists
self.waiting.PushBack(worker)
worker.service.waiting.PushBack(worker)
@@ -270,7 +269,7 @@ func (self *mdBroker) Run() {
zmq.PollItem{Socket: self.socket, Events: zmq.POLLIN},
}
- _, err := zmq.Poll(items, HEARTBEAT_INTERVAL.Nanoseconds()/1e3)
+ _, err := zmq.Poll(items, HEARTBEAT_INTERVAL)
if err != nil {
panic(err) // Interrupted
}
@@ -299,7 +298,7 @@ func (self *mdBroker) Run() {
if self.heartbeatAt.Before(time.Now()) {
self.purgeWorkers()
for elem := self.waiting.Front(); elem != nil; elem = elem.Next() {
- worker, _ := elem.Value.(*mdWorker)
+ worker, _ := elem.Value.(*mdbWorker)
self.sendToWorker(worker, MDPW_HEARTBEAT, nil, nil)
}
self.heartbeatAt = time.Now().Add(HEARTBEAT_INTERVAL)
View
2  examples/Go/mdcliapi.go
@@ -75,7 +75,7 @@ func (self *mdClient) Send(service []byte, request [][]byte) (reply [][]byte) {
zmq.PollItem{Socket: self.client, Events: zmq.POLLIN},
}
- _, err := zmq.Poll(items, self.timeout.Nanoseconds()/1e3)
+ _, err := zmq.Poll(items, self.timeout)
if err != nil {
panic(err) // Interrupted
}
View
6 examples/Go/mdwrkapi.go
@@ -12,10 +12,6 @@ import (
"time"
)
-const (
- HEARTBEAT_LIVENESS = 3 // 3-5 is reasonable
-)
-
type Worker interface {
Close()
Recv([][]byte) [][]byte
@@ -109,7 +105,7 @@ func (self *mdWorker) Recv(reply [][]byte) (msg [][]byte) {
zmq.PollItem{Socket: self.worker, Events: zmq.POLLIN},
}
- _, err := zmq.Poll(items, self.heartbeat.Nanoseconds()/1e3)
+ _, err := zmq.Poll(items, self.heartbeat)
if err != nil {
panic(err) // Interrupted
}
View
27 examples/Go/ppqueue.go
@@ -13,7 +13,6 @@ import (
)
const (
- HEARTBEAT_LIVENESS = 3 // 3-5 is reasonable
HEARTBEAT_INTERVAL = time.Second // time.Duration
// Paranoid Pirate Protocol constants
@@ -21,13 +20,13 @@ const (
PPP_HEARTBEAT = "\002" // Signals worker heartbeat
)
-type Worker struct {
+type PPWorker struct {
address []byte // Address of worker
expiry time.Time // Expires at this time
}
-func NewWorker(address []byte) *Worker {
- return &Worker{
+func NewPPWorker(address []byte) *PPWorker {
+ return &PPWorker{
address: address,
expiry: time.Now().Add(HEARTBEAT_LIVENESS * HEARTBEAT_INTERVAL),
}
@@ -49,14 +48,14 @@ func (workers *WorkerQueue) Len() int {
func (workers *WorkerQueue) Next() []byte {
elem := workers.queue.Back()
- worker, _ := elem.Value.(*Worker)
+ worker, _ := elem.Value.(*PPWorker)
workers.queue.Remove(elem)
return worker.address
}
-func (workers *WorkerQueue) Ready(worker *Worker) {
+func (workers *WorkerQueue) Ready(worker *PPWorker) {
for elem := workers.queue.Front(); elem != nil; elem = elem.Next() {
- if w, _ := elem.Value.(*Worker); string(w.address) == string(worker.address) {
+ if w, _ := elem.Value.(*PPWorker); string(w.address) == string(worker.address) {
workers.queue.Remove(elem)
break
}
@@ -67,7 +66,7 @@ func (workers *WorkerQueue) Ready(worker *Worker) {
func (workers *WorkerQueue) Purge() {
now := time.Now()
for elem := workers.queue.Front(); elem != nil; elem = workers.queue.Front() {
- if w, _ := elem.Value.(*Worker); w.expiry.After(now) {
+ if w, _ := elem.Value.(*PPWorker); w.expiry.After(now) {
break
}
workers.queue.Remove(elem)
@@ -97,9 +96,9 @@ func main() {
// Poll frontend only if we have available workers
if workers.Len() > 0 {
- zmq.Poll(items, HEARTBEAT_INTERVAL.Nanoseconds()/1e3)
+ zmq.Poll(items, HEARTBEAT_INTERVAL)
} else {
- zmq.Poll(items[:1], HEARTBEAT_INTERVAL.Nanoseconds()/1e3)
+ zmq.Poll(items[:1], HEARTBEAT_INTERVAL)
}
// Handle worker activity on backend
@@ -109,15 +108,15 @@ func main() {
panic(err) // Interrupted
}
address := frames[0]
- workers.Ready(NewWorker(address))
+ workers.Ready(NewPPWorker(address))
// Validate control message, or return reply to client
if msg := frames[1:]; len(msg) == 1 {
switch status := string(msg[0]); status {
case PPP_READY:
- fmt.Println("I: Worker ready")
+ fmt.Println("I: PPWorker ready")
case PPP_HEARTBEAT:
- fmt.Println("I: Worker heartbeat")
+ fmt.Println("I: PPWorker heartbeat")
default:
fmt.Println("E: Invalid message from worker: ", msg)
}
@@ -142,7 +141,7 @@ func main() {
// dead workers:
if heartbeatAt.Before(time.Now()) {
for elem := workers.queue.Front(); elem != nil; elem = elem.Next() {
- w, _ := elem.Value.(*Worker)
+ w, _ := elem.Value.(*PPWorker)
msg := [][]byte{w.address, []byte(PPP_HEARTBEAT)}
backend.SendMultipart(msg, 0)
}
View
3  examples/Go/ppworker.go
@@ -13,7 +13,6 @@ import (
)
const (
- HEARTBEAT_LIVENESS = 3 // 3-5 is reasonable
HEARTBEAT_INTERVAL = time.Second // time.Duration
INTERVAL_INIT = time.Second // Initial reconnect
@@ -61,7 +60,7 @@ func main() {
zmq.PollItem{Socket: worker, Events: zmq.POLLIN},
}
- zmq.Poll(items, HEARTBEAT_INTERVAL.Nanoseconds()/1e3)
+ zmq.Poll(items, HEARTBEAT_INTERVAL)
if items[0].REvents&zmq.POLLIN != 0 {
frames, err := worker.RecvMultipart(0)
View
4 examples/Go/zhelpers.go
@@ -10,6 +10,10 @@ import (
"reflect"
)
+const (
+ HEARTBEAT_LIVENESS = 3 // 3-5 is reasonable
+)
+
// Golang implementation of zlist in C
type ZList struct {
list.List
Please sign in to comment.
Something went wrong with that request. Please try again.