Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

better logging, atomic state changes

  • Loading branch information...
commit 10b8265dbdc45f14feffd4d31689c2990f4485a6 1 parent 73040e1
@dforsyth authored
View
79 coordination.go
@@ -3,6 +3,7 @@ package waffle
import (
"donut"
"encoding/json"
+ "errors"
"gozk"
"log"
"net"
@@ -18,6 +19,7 @@ import (
const (
NewState = iota
SetupState
+ PrepareState
LoadState
RunState
WriteState
@@ -34,19 +36,17 @@ type Coordinator struct {
graph *Graph
zk *gozk.ZooKeeper
- barrierCount int
- barrierMap *donut.SafeMap
watchers map[string]chan byte
basePath, lockPath, barriersPath, workersPath string
state int32
clusterName string
- donutConfig *donut.Config
- partitions map[int]string
- workerInfos map[string]map[string]interface{}
+ // needed for CreateWork
+ donutConfig *donut.Config
+ partitions map[int]string
+ cachedWorkerInfo map[string]map[string]interface{}
rpcClients map[string]*rpc.Client
- host, port string
done chan byte
}
@@ -64,13 +64,10 @@ func newCoordinator(clusterName string, c *Config) *Coordinator {
}
func (c *Coordinator) setup() {
- if !atomic.CompareAndSwapInt32(&c.state, NewState, SetupState) {
- log.Fatal("Could not swap to setup state to begin setup")
- }
c.basePath = path.Join("/", c.config.JobId)
- c.lockPath = path.Join(c.basePath, "lock")
- c.workersPath = path.Join(c.basePath, "workers")
- c.barriersPath = path.Join(c.basePath, "barriers")
+ c.lockPath = path.Join(c.basePath, LockPath)
+ c.workersPath = path.Join(c.basePath, WorkersPath)
+ c.barriersPath = path.Join(c.basePath, BarriersPath)
c.zk.Create(c.basePath, "", 0, gozk.WorldACL(gozk.PERM_ALL))
c.zk.Create(c.workersPath, "", 0, gozk.WorldACL(gozk.PERM_ALL))
@@ -87,9 +84,7 @@ func (c *Coordinator) setup() {
func (c *Coordinator) startServer() {
rpc.Register(c)
rpc.HandleHTTP()
- c.host = "localhost"
- c.port = "500" + c.config.NodeId
- l, e := net.Listen("tcp", net.JoinHostPort(c.host, c.port))
+ l, e := net.Listen("tcp", net.JoinHostPort(c.config.RPCHost, c.config.RPCPort))
if e != nil {
log.Fatal("listen error:", e)
}
@@ -107,12 +102,10 @@ func (c *Coordinator) sendVertex(v Vertex, pid int) error {
w := c.partitions[pid]
cl := c.rpcClients[w]
var r int
- // log.Printf("sending %s", v.Id())
return cl.Call("Coordinator.SubmitVertex", &v, &r)
}
func (c *Coordinator) SubmitEdge(e Edge, r *int) error {
- // log.Printf("got edge %s-%s", e.Source(), e.Destination())
c.graph.addEdge(e)
*r = 0
return nil
@@ -122,7 +115,6 @@ func (c *Coordinator) sendEdge(e Edge, pid int) error {
w := c.partitions[pid]
cl := c.rpcClients[w]
var r int
- // log.Printf("sending edge %s-%s", e.Source(), e.Destination())
return cl.Call("Coordinator.SubmitEdge", &e, &r)
}
@@ -140,7 +132,6 @@ func (c *Coordinator) sendMessage(m Message, pid int) error {
}
func (c *Coordinator) register() {
- log.Println("register")
for {
log.Printf("len: %d", c.workers.Len())
m := c.workers.RangeLock()
@@ -168,9 +159,9 @@ func (c *Coordinator) createBarrier(name string, onChange func(*donut.SafeMap))
bPath := path.Join(c.barriersPath, name)
if _, ok := c.watchers[bPath]; !ok {
if _, err := c.zk.Create(bPath, "", 0, gozk.WorldACL(gozk.PERM_ALL)); err == nil {
- log.Printf("created barrier %s", bPath)
+ log.Printf("Created barrier %s", bPath)
} else {
- log.Printf("failed to create barrier %s: %v", bPath, err)
+ log.Printf("Failed to create barrier %s: %v", bPath, err)
}
kill, err := watchZKChildren(c.zk, bPath, donut.NewSafeMap(make(map[string]interface{})), onChange)
if err != nil {
@@ -181,17 +172,21 @@ func (c *Coordinator) createBarrier(name string, onChange func(*donut.SafeMap))
}
func (c *Coordinator) enterBarrier(name, entry, data string) {
- log.Printf("entering barrier %s", name)
+ log.Printf("Entering barrier %s as %s", name, entry)
if _, err := c.zk.Create(path.Join(c.barriersPath, name, entry), data, gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)); err != nil {
- log.Printf("error on barrier entry (%s entering %s): %v", entry, name, err)
+ log.Printf("Error on barrier entry (%s entering %s): %v", entry, name, err)
panic(err)
}
}
-func (c *Coordinator) start(zk *gozk.ZooKeeper) {
+func (c *Coordinator) start(zk *gozk.ZooKeeper) error {
+ if !atomic.CompareAndSwapInt32(&c.state, NewState, SetupState) {
+ return errors.New("Error moving from NewState to SetupState")
+ }
c.zk = zk
c.setup()
c.register()
+ return nil
}
func (c *Coordinator) startWork(workId string, data map[string]interface{}) {
@@ -206,19 +201,18 @@ func (c *Coordinator) startWork(workId string, data map[string]interface{}) {
c.onStepBarrierChange(step, m)
})
- log.Printf("ready to run superstep %d", step)
+ log.Printf("Running superstep %d", step)
active, msgs, aggr := c.graph.runSuperstep(step)
+ log.Printf("Step %d stats: %d active verts, %d sent messages", step, active, msgs)
stepData := make(map[string]interface{})
stepData["active"] = active
stepData["msgs"] = msgs
stepData["aggr"] = aggr
-
var data []byte
var err error
if data, err = json.Marshal(stepData); err != nil {
panic(err)
}
- log.Printf("preparing to enter barrier for superstep %d", step)
c.enterBarrier("superstep-"+strconv.Itoa(step), c.config.NodeId, string(data))
} else if t == "write" {
c.createBarrier("write", func(m *donut.SafeMap) {
@@ -274,7 +268,12 @@ func (c *Coordinator) onWorkersChange(m *donut.SafeMap) {
// roll back to last checkpoint
} else {
if m.Len() == c.config.InitialWorkers {
- log.Printf("len == partitions")
+ // go into prepare state
+ if !atomic.CompareAndSwapInt32(&c.state, SetupState, PrepareState) {
+ log.Println("Could not properly move from SetupState to PrepareState")
+ return
+ }
+ log.Printf("InitialWorkers met, preparing node for work")
// everyone is here, create the partition mapping
lm := m.RangeLock()
var workers []string
@@ -291,17 +290,18 @@ func (c *Coordinator) onWorkersChange(m *donut.SafeMap) {
}
// set up connections to all the other nodes
- c.workerInfos = make(map[string]map[string]interface{})
+ c.cachedWorkerInfo = make(map[string]map[string]interface{})
c.rpcClients = make(map[string]*rpc.Client)
for _, w := range workers {
// pull down worker info for all of the existing workers
- c.workerInfos[w] = c.workerInfo(w)
- c.rpcClients[w], _ = rpc.DialHTTP("tcp", net.JoinHostPort(c.workerInfos[w]["host"].(string), c.workerInfos[w]["port"].(string)))
+ c.cachedWorkerInfo[w] = c.workerInfo(w)
+ c.rpcClients[w], _ = rpc.DialHTTP("tcp", net.JoinHostPort(c.cachedWorkerInfo[w]["host"].(string), c.cachedWorkerInfo[w]["port"].(string)))
}
// go into loadstate
- if !atomic.CompareAndSwapInt32(&c.state, SetupState, LoadState) {
- panic("state swap from setup to loadstate failed")
+ if !atomic.CompareAndSwapInt32(&c.state, PrepareState, LoadState) {
+ log.Println("Could not properly move from PrepareState to LoadState")
+ return
}
go c.createLoadWork()
}
@@ -310,8 +310,8 @@ func (c *Coordinator) onWorkersChange(m *donut.SafeMap) {
func (c *Coordinator) info() string {
m := make(map[string]interface{})
- m["host"] = c.host
- m["port"] = c.port
+ m["host"] = c.config.RPCHost
+ m["port"] = c.config.RPCPort
info, _ := json.Marshal(m)
return string(info)
@@ -333,16 +333,19 @@ func (c *Coordinator) onLoadBarrierChange(m *donut.SafeMap) {
log.Printf("load complete")
c.watchers["load"] <- 1
delete(c.watchers, "load")
- atomic.StoreInt32(&c.state, RunState)
+ if !atomic.CompareAndSwapInt32(&c.state, LoadState, RunState) {
+ log.Println("Could not properly move from LoadState to RunState")
+ return
+ }
go c.createStepWork(1)
} else {
- log.Printf("%d != %d", m.Len(), len(c.graph.job.LoadPaths()))
+ log.Printf("Load barrier has %d/%d entries", m.Len(), len(c.graph.job.LoadPaths()))
}
}
func (c *Coordinator) onWriteBarrierChange(m *donut.SafeMap) {
if m.Len() == c.workers.Len() {
- log.Println("Writes are done, killing job")
+ log.Println("Write barrier full, ending job")
c.done <- 1
}
}
View
9 examples/maxval/maxval.go
@@ -11,14 +11,16 @@ import (
"path"
"strconv"
"strings"
+ "time"
"waffle"
)
type MVJob struct {
+ JobId string
}
func (j *MVJob) Id() string {
- return "MVJob"
+ return j.JobId
}
func (j *MVJob) LoadPaths() (paths []string) {
@@ -176,6 +178,9 @@ func main() {
config := &waffle.Config{
InitialWorkers: 2,
NodeId: os.Args[1],
+ ZKServers: "localhost:50000",
+ RPCHost: "localhost",
+ RPCPort: "500" + os.Args[1],
}
- waffle.Run(config, &MVJob{})
+ waffle.Run(config, &MVJob{JobId: "MVJob-" + time.Now().String()})
}
View
1  job.go
@@ -7,5 +7,4 @@ type Job interface {
Checkpoint(int) bool
Write(*Graph) error
Persist(*Graph) error
- // Compute(g, v, m)
}
View
15 listener.go
@@ -11,11 +11,12 @@ type waffleListener struct {
clusterName string
- graph *Graph
- done chan byte
- zk *gozk.ZooKeeper
- job Job
- config *donut.Config
+ graph *Graph
+ done chan byte
+ zk *gozk.ZooKeeper
+ job Job
+ config *donut.Config
+ cluster *donut.Cluster
}
func (l *waffleListener) OnJoin(zk *gozk.ZooKeeper) {
@@ -24,7 +25,9 @@ func (l *waffleListener) OnJoin(zk *gozk.ZooKeeper) {
l.graph = newGraph(l.job, l.coordinator)
l.coordinator.graph = l.graph
l.coordinator.donutConfig = l.config
- l.coordinator.start(zk)
+ if err := l.coordinator.start(zk); err != nil {
+ l.cluster.Shutdown()
+ }
}
func (l *waffleListener) StartWork(workId string, data map[string]interface{}) {
View
11 waffle.go
@@ -9,6 +9,7 @@ type Config struct {
JobId string
InitialWorkers int
RPCHost, RPCPort string
+ ZKServers string
}
func Run(c *Config, j Job) {
@@ -20,12 +21,13 @@ func Run(c *Config, j Job) {
}
balancer := &waffleBalancer{}
config := donut.NewConfig()
- config.Servers = "localhost:50000"
+ config.Servers = c.ZKServers
config.NodeId = c.NodeId
config.Timeout = 1 * 1e9
cluster := donut.NewCluster(clusterName, config, balancer, listener)
+ listener.cluster = cluster
listener.done = make(chan byte)
listener.config = config
listener.coordinator.done = listener.done
@@ -35,9 +37,6 @@ func Run(c *Config, j Job) {
const (
BarriersPath = "barriers"
- // JoinablePath = "joinable"
- // ReadyPath = "ready"
- LockPath = "lock"
- WorkersPath = "workers"
- // coordinator = "coordinator"
+ LockPath = "lock"
+ WorkersPath = "workers"
)

0 comments on commit 10b8265

Please sign in to comment.
Something went wrong with that request. Please try again.