Permalink
Browse files

Switch to REQ-ROUTER pattern

The last version was out-of-sync, when running the client
for multiple hours (caused by the SUBSCRIBE pattern). Now, the
client requests an update and the server async fetches the data and
respond with the most recent information.

This fixes #1.
  • Loading branch information...
PatWie committed Jan 12, 2018
1 parent 3dc5478 commit 1c7446a1ca0068c71f6e340a9d71ae8a27cd14f0
Showing with 265 additions and 145 deletions.
  1. +3 −3 Makefile
  2. +7 −4 cluster-smi-node.go
  3. +66 −42 cluster-smi-server.go
  4. +46 −13 cluster-smi.go
  5. +7 −51 cluster.go
  6. +85 −0 cluster/data.go
  7. +0 −32 data.go
  8. +51 −0 messaging/multipart.go
View
@@ -5,9 +5,9 @@ include cluster-smi.env
LDFLAGS="-X ${PACKAGER}.ServerIp=${cluster_smi_server_ip} -X ${PACKAGER}.PortGather=${cluster_smi_server_port_gather} -X ${PACKAGER}.PortDistribute=${cluster_smi_server_port_distribute} -X ${PACKAGER}.Tick=${cluster_smi_tick_ms}"
all:
go build -ldflags ${LDFLAGS} cluster-smi.go cluster.go config.go data.go
go build -ldflags ${LDFLAGS} cluster-smi-server.go config.go data.go
go build -ldflags ${LDFLAGS} cluster-smi-node.go cluster.go config.go data.go
go build -ldflags ${LDFLAGS} cluster-smi.go config.go cluster.go
go build -ldflags ${LDFLAGS} cluster-smi-server.go config.go
go build -ldflags ${LDFLAGS} cluster-smi-node.go config.go cluster.go
# PKG_CONFIG_PATH=/graphics/opt/opt_Ubuntu16.04/libzmq/dist/lib/pkgconfig \
# go build -v --ldflags '-extldflags "-static"' -a cluster-smi-node.go
View
@@ -1,24 +1,27 @@
package main
import (
"github.com/patwie/cluster-smi/cluster"
"github.com/patwie/cluster-smi/nvml"
"github.com/pebbe/zmq4"
"github.com/vmihailenco/msgpack"
"log"
"time"
)
var node Node
var node cluster.Node
func main() {
// load ports and ip-address
cfg := CreateConfig()
if err := nvml.InitNVML(); err != nil {
log.Fatalf("Failed initializing NVML: %s\n", err.Error())
}
defer nvml.ShutdownNVML()
// sending messages (PUSH-PULL)
SocketAddr := "tcp://" + cfg.ServerIp + ":" + cfg.ServerPortGather
log.Println("Now pushing to", SocketAddr)
socket, err := zmq4.NewSocket(zmq4.PUSH)
@@ -28,12 +31,12 @@ func main() {
defer socket.Close()
socket.Connect(SocketAddr)
node := &Node{}
node.Init()
node := &cluster.Node{}
InitNode(node)
log.Println("Cluster-SMI-Node is active. Press CTRL+C to shut down.")
for _ = range time.Tick(cfg.Tick) {
node.Fetch()
FetchNode(node)
// encode data
msg, err := msgpack.Marshal(&node)
View
@@ -1,71 +1,95 @@
package main
import (
"github.com/patwie/cluster-smi/cluster"
"github.com/patwie/cluster-smi/messaging"
"github.com/pebbe/zmq4"
"github.com/vmihailenco/msgpack"
"log"
"sort"
"sync"
)
var cluster Cluster
var allNodes map[string]Node
// nice cluster struct
var clus cluster.Cluster
// intermediate struct (under mutex lock)
var allNodes map[string]cluster.Node
func main() {
allNodes = make(map[string]Node)
// load ports and ip-address
cfg := CreateConfig()
// incoming messages (Push-Pull)
SocketAddr := "tcp://" + "*" + ":" + cfg.ServerPortGather
log.Println("Now listening on", SocketAddr)
node_socket, err := zmq4.NewSocket(zmq4.PULL)
if err != nil {
panic(err)
}
defer node_socket.Close()
node_socket.Bind(SocketAddr)
allNodes = make(map[string]cluster.Node)
var mutex = &sync.Mutex{}
// message loop
log.Println("Cluster-SMI-Server is active. Press CTRL+C to shut down.")
// receiving messages in extra thread
go func() {
// incoming messages (PUSH-PULL)
SocketAddr := "tcp://" + "*" + ":" + cfg.ServerPortGather
log.Println("Now listening on", SocketAddr)
node_socket, err := zmq4.NewSocket(zmq4.PULL)
if err != nil {
panic(err)
}
defer node_socket.Close()
node_socket.Bind(SocketAddr)
// outgoing messages (Pub-Sub)
SocketAddr = "tcp://" + "*" + ":" + cfg.ServerPortDistribute
log.Println("Now publishing to", SocketAddr)
publisher, err := zmq4.NewSocket(zmq4.PUB)
for {
// read node information
s, err := node_socket.RecvBytes(0)
if err != nil {
log.Println(err)
continue
}
var node cluster.Node
err = msgpack.Unmarshal(s, &node)
if err != nil {
log.Println(err)
continue
}
mutex.Lock()
allNodes[node.Name] = node
mutex.Unlock()
}
}()
// outgoing messages (REQ-ROUTER)
SocketAddr := "tcp://" + "*" + ":" + cfg.ServerPortDistribute
log.Println("Router binds to", SocketAddr)
router_socket, err := zmq4.NewSocket(zmq4.ROUTER)
if err != nil {
panic(err)
}
defer publisher.Close()
publisher.Bind(SocketAddr)
defer router_socket.Close()
router_socket.Bind(SocketAddr)
// message loop
log.Println("Cluster-SMI-Server is active. Press CTRL+C to shut down.")
for {
// read node information
s, err := node_socket.RecvBytes(0)
if err != nil {
log.Println(err)
continue
}
var node Node
err = msgpack.Unmarshal(s, &node)
// read request of client
msg, err := messaging.ReceiveMultipartMessage(router_socket)
if err != nil {
log.Println(err)
// panic(err)
continue
panic(err)
}
// update information
allNodes[node.Name] = node
// rebuild cluster struct
cluster := Cluster{}
mutex.Lock()
// rebuild cluster struct from map
clus := cluster.Cluster{}
for _, n := range allNodes {
cluster.Nodes = append(cluster.Nodes, n)
clus.Nodes = append(clus.Nodes, n)
}
sort.Sort(ByName(cluster.Nodes))
mutex.Unlock()
// send cluster information
msg, err := msgpack.Marshal(&cluster)
publisher.SendBytes(msg, 0)
// send cluster information to client
body, err := msgpack.Marshal(&clus)
msg.Body = body
messaging.SendMultipartMessage(router_socket, &msg)
}
View
@@ -1,41 +1,74 @@
package main
import (
"fmt"
"github.com/patwie/cluster-smi/cluster"
"github.com/pebbe/zmq4"
"github.com/vmihailenco/msgpack"
"log"
"sort"
"os"
"time"
)
// dummy request for REQ-ROUTER pattern
type Request struct {
Identity string
}
func RequestUpdateMessage() (buf []byte, err error) {
id := fmt.Sprintf("REQ %v", os.Getpid())
req := Request{id}
return msgpack.Marshal(&req)
}
func main() {
request_attempts := 0
cfg := CreateConfig()
subscriber, err := zmq4.NewSocket(zmq4.SUB)
// ask for updates messages (REQ-ROUTER)
request_socket, err := zmq4.NewSocket(zmq4.REQ)
if err != nil {
log.Fatalf("Failed open Socket ZMQ: %s\n", err.Error())
panic(err)
}
defer subscriber.Close()
defer request_socket.Close()
SocketAddr := "tcp://" + cfg.ServerIp + ":" + cfg.ServerPortDistribute
subscriber.Connect(SocketAddr)
subscriber.SetLinger(0)
subscriber.SetSubscribe("")
// subscriber.SetRcvhwm(1)
request_socket.Connect(SocketAddr)
for {
s, err := subscriber.RecvBytes(0)
// request new update
msg, err := RequestUpdateMessage()
if err != nil {
log.Fatal("request messsage error:", err)
panic(err)
}
_, err = request_socket.SendBytes(msg, 0)
if err != nil {
log.Fatal("sending request messsage error:", err)
panic(err)
}
// response from cluster-smi-server
s, err := request_socket.RecvBytes(0)
if err != nil {
log.Println(err)
time.Sleep(10 * time.Second)
request_attempts += 1
if request_attempts == 0 {
panic("too many request attempts yielding an error")
}
continue
}
var cluster Cluster
err = msgpack.Unmarshal(s, &cluster)
sort.Sort(ByName(cluster.Nodes))
cluster.Print()
var clus cluster.Cluster
err = msgpack.Unmarshal(s, &clus)
clus.Sort()
clus.Print()
time.Sleep(cfg.Tick)
}
View
@@ -1,57 +1,20 @@
package main
import (
"fmt"
"github.com/apcera/termtables"
"github.com/patwie/cluster-smi/cluster"
"github.com/patwie/cluster-smi/nvml"
"os"
"strconv"
)
// Cluster
func (c *Cluster) Fetch() {
func FetchCluster(c *cluster.Cluster) {
for i, _ := range c.Nodes {
c.Nodes[i].Fetch()
FetchNode(&c.Nodes[i])
}
}
func (c *Cluster) Print() {
table := termtables.CreateTable()
table.AddHeaders("Node", "Gpu", "Memory-Usage", "Mem-Util", "GPU-Util")
for n_id, n := range c.Nodes {
for d_id, d := range n.Devices {
memPercent := int(d.MemoryUtilization.Used * 100 / d.MemoryUtilization.Total)
name := ""
if d_id == 0 {
name = n.Name
}
table.AddRow(
name,
strconv.Itoa(d.Id)+": "+d.Name,
strconv.FormatInt(d.MemoryUtilization.Used/1024/1024, 10)+
"MiB / "+
strconv.FormatInt(d.MemoryUtilization.Total/1024/1024, 10)+"MiB",
strconv.Itoa(memPercent)+"%",
strconv.Itoa(d.Utilization)+"%",
)
table.SetAlign(termtables.AlignRight, 3)
}
if n_id < len(c.Nodes)-1 {
table.AddSeparator()
}
}
fmt.Printf("\033[2J")
fmt.Println(table.Render())
}
// Node
func (n *Node) Init() {
func InitNode(n *cluster.Node) {
name, err := os.Hostname()
if err != nil {
panic(err)
@@ -60,18 +23,11 @@ func (n *Node) Init() {
devices, _ := nvml.GetDevices()
for i := 0; i < len(devices); i++ {
n.Devices = append(n.Devices, Device{0, "", 0, Memory{0, 0, 0, 0}})
}
}
func (n *Node) Print() {
fmt.Println(n.Name)
for _, device := range n.Devices {
fmt.Println(device.Name)
n.Devices = append(n.Devices, cluster.Device{0, "", 0, cluster.Memory{0, 0, 0, 0}})
}
}
func (n *Node) Fetch() {
func FetchNode(n *cluster.Node) {
devices, _ := nvml.GetDevices()
@@ -82,7 +38,7 @@ func (n *Node) Fetch() {
n.Devices[idx].Id = idx
n.Devices[idx].Name = device.DeviceName
n.Devices[idx].Utilization = gpuPercent
n.Devices[idx].MemoryUtilization = Memory{meminfo.Used, meminfo.Free, meminfo.Total, memPercent}
n.Devices[idx].MemoryUtilization = cluster.Memory{meminfo.Used, meminfo.Free, meminfo.Total, memPercent}
}
}
Oops, something went wrong.

0 comments on commit 1c7446a

Please sign in to comment.