Skip to content
Browse files

fragile, but working on some test code

  • Loading branch information...
1 parent d4bcd34 commit 37ddb69fcdc56d7f10b20501a9d555c8b5d37480 @dforsyth committed
Showing with 74 additions and 101 deletions.
  1. +44 −35 coordination.go
  2. +24 −62 graph.go
  3. +1 −1 listener.go
  4. +5 −3 waffle.go
View
79 coordination.go
@@ -9,6 +9,7 @@ import (
"net/http"
"net/rpc"
"path"
+ "sort"
"strconv"
"time"
)
@@ -26,7 +27,7 @@ type Config struct {
Partitions int
}
-type coordinator struct {
+type Coordinator struct {
// workers
workers *donut.SafeMap
@@ -50,9 +51,11 @@ type coordinator struct {
clients map[string]*rpc.Client
host, port string
+
+ done chan byte
}
-func (c *coordinator) setup() {
+func (c *Coordinator) setup() {
c.basePath = path.Join("/", c.config.JobId)
c.lockPath = path.Join(c.basePath, "lock")
c.workersPath = path.Join(c.basePath, "workers")
@@ -74,7 +77,7 @@ func (c *coordinator) setup() {
})
}
-func (c *coordinator) startServer() {
+func (c *Coordinator) startServer() {
rpc.Register(c)
rpc.HandleHTTP()
c.host = "localhost"
@@ -86,46 +89,50 @@ func (c *coordinator) startServer() {
go http.Serve(l, nil)
}
-func (c *coordinator) SubmitVertex(v Vertex, r *int) error {
+func (c *Coordinator) SubmitVertex(v Vertex, r *int) error {
+ // log.Printf("got %s", v.Id())
c.graph.addVertex(v)
*r = 0
return nil
}
-func (c *coordinator) sendVertex(v Vertex, pid int) error {
+func (c *Coordinator) sendVertex(v Vertex, pid int) error {
w := c.partitions[pid]
cl := c.clients[w]
var r int
- return cl.Call("coordinator.SubmitVertex", v, &r)
+ // log.Printf("sending %s", v.Id())
+ return cl.Call("Coordinator.SubmitVertex", &v, &r)
}
-func (c *coordinator) SubmitEdge(e Edge, r *int) error {
+func (c *Coordinator) SubmitEdge(e Edge, r *int) error {
+ // log.Printf("got edge %s-%s", e.Source(), e.Destination())
c.graph.addEdge(e)
*r = 0
return nil
}
-func (c *coordinator) sendEdge(e Edge, pid int) error {
+func (c *Coordinator) sendEdge(e Edge, pid int) error {
w := c.partitions[pid]
cl := c.clients[w]
var r int
- return cl.Call("coordinator.SubmitEdge", e, &r)
+ // log.Printf("sending edge %s-%s", e.Source(), e.Destination())
+ return cl.Call("Coordinator.SubmitEdge", &e, &r)
}
-func (c *coordinator) SubmitMessage(m Message, r *int) error {
+func (c *Coordinator) SubmitMessage(m Message, r *int) error {
c.graph.addMessage(m)
*r = 0
return nil
}
-func (c *coordinator) sendMessage(m Message, pid int) error {
+func (c *Coordinator) sendMessage(m Message, pid int) error {
w := c.partitions[pid]
cl := c.clients[w]
var r int
- return cl.Call("coordinator.SubmitMessage", m, &r)
+ return cl.Call("Coordinator.SubmitMessage", &m, &r)
}
-func (c *coordinator) register() {
+func (c *Coordinator) register() {
log.Println("register")
for {
log.Printf("len: %d", c.workers.Len())
@@ -150,7 +157,7 @@ func (c *coordinator) register() {
}
}
-func (c *coordinator) createBarrier(name string, onChange func(*donut.SafeMap)) {
+func (c *Coordinator) createBarrier(name string, onChange func(*donut.SafeMap)) {
bPath := path.Join(c.barriersPath, name)
if _, ok := c.watchers[bPath]; !ok {
if _, err := c.zk.Create(bPath, "", 0, gozk.WorldACL(gozk.PERM_ALL)); err == nil {
@@ -166,21 +173,21 @@ func (c *coordinator) createBarrier(name string, onChange func(*donut.SafeMap))
}
}
-func (c *coordinator) enterBarrier(name, entry, data string) {
+func (c *Coordinator) enterBarrier(name, entry, data string) {
log.Printf("entering barrier %s", name)
if _, err := c.zk.Create(path.Join(c.barriersPath, name, entry), data, gozk.EPHEMERAL, gozk.WorldACL(gozk.PERM_ALL)); err != nil {
- log.Printf("error on barrier entry (%s): %v", name, err)
+ log.Printf("error on barrier entry (%s entering %s): %v", entry, name, err)
panic(err)
}
}
-func (c *coordinator) start(zk *gozk.ZooKeeper) {
+func (c *Coordinator) start(zk *gozk.ZooKeeper) {
c.zk = zk
c.setup()
c.register()
}
-func (c *coordinator) startWork(workId string, data map[string]interface{}) {
+func (c *Coordinator) startWork(workId string, data map[string]interface{}) {
if t := data["work"].(string); t == "load" {
p := data["path"].(string)
c.graph.Load(p)
@@ -217,7 +224,7 @@ func (c *coordinator) startWork(workId string, data map[string]interface{}) {
}
}
-func (c *coordinator) onStepBarrierChange(step int, m *donut.SafeMap) {
+func (c *Coordinator) onStepBarrierChange(step int, m *donut.SafeMap) {
if m.Len() == c.workers.Len() {
defer m.Clear()
barrierName := "superstep-" + strconv.Itoa(step)
@@ -247,12 +254,12 @@ func (c *coordinator) onStepBarrierChange(step int, m *donut.SafeMap) {
go c.createStepWork(step + 1)
}
} else {
- log.Printf("step barrier change: %d != %d", m.Len(), c.workers.Len())
+ log.Printf("step barrier change: %d entries out of %d", m.Len(), c.workers.Len())
}
}
-func (c *coordinator) onWorkersChange(m *donut.SafeMap) {
- log.Println("updated")
+func (c *Coordinator) onWorkersChange(m *donut.SafeMap) {
+ log.Println("workers updated")
if c.state > SETUP {
// invalidate current step
// update partition mapping
@@ -267,8 +274,12 @@ func (c *coordinator) onWorkersChange(m *donut.SafeMap) {
workers = append(workers, k)
}
m.RangeUnlock()
+ sort.Strings(workers)
for i := 0; i < len(workers); i++ {
c.partitions[i] = workers[i]
+ if workers[i] == c.config.NodeId {
+ c.graph.partitionId = i
+ }
}
// set up connections to all the other nodes
@@ -284,7 +295,7 @@ func (c *coordinator) onWorkersChange(m *donut.SafeMap) {
}
}
-func (c *coordinator) info() string {
+func (c *Coordinator) info() string {
m := make(map[string]interface{})
m["host"] = c.host
m["port"] = c.port
@@ -293,7 +304,7 @@ func (c *coordinator) info() string {
return string(info)
}
-func (c *coordinator) workerInfo(id string) (info map[string]interface{}) {
+func (c *Coordinator) workerInfo(id string) (info map[string]interface{}) {
raw, _, err := c.zk.Get(path.Join(c.workersPath, id))
if err != nil {
panic(err)
@@ -304,7 +315,7 @@ func (c *coordinator) workerInfo(id string) (info map[string]interface{}) {
return info
}
-func (c *coordinator) onLoadBarrierChange(m *donut.SafeMap) {
+func (c *Coordinator) onLoadBarrierChange(m *donut.SafeMap) {
if m.Len() == len(c.graph.job.LoadPaths()) {
log.Printf("load complete")
c.watchers["load"] <- 1
@@ -315,14 +326,14 @@ func (c *coordinator) onLoadBarrierChange(m *donut.SafeMap) {
}
}
-func (c *coordinator) onWriteBarrierChange(m *donut.SafeMap) {
+func (c *Coordinator) onWriteBarrierChange(m *donut.SafeMap) {
if m.Len() == c.workers.Len() {
- log.Printf("done write, killing job")
- panic("done")
+ log.Println("Writes are done, killing job")
+ c.done <- 1
}
}
-func (c *coordinator) createWriteWork() {
+func (c *Coordinator) createWriteWork() {
log.Printf("creating work for write %s", c.config.NodeId)
data := make(map[string]interface{})
data[c.clusterName] = c.config.NodeId
@@ -330,10 +341,9 @@ func (c *coordinator) createWriteWork() {
donut.CreateWork(c.clusterName, c.zk, c.donutConfig, "write-"+c.config.NodeId, data)
}
-func (c *coordinator) createLoadWork() {
+func (c *Coordinator) createLoadWork() {
log.Println("creating load work")
data := make(map[string]interface{})
- // data[c.clusterName] = c.config.NodeId
data["work"] = "load"
paths := c.graph.job.LoadPaths()
// create the load barrier here since a node might not end up with load work
@@ -342,13 +352,12 @@ func (c *coordinator) createLoadWork() {
})
for _, p := range paths {
data["path"] = p
- if err := donut.CreateWork(c.clusterName, c.zk, c.donutConfig, "load-"+p, data); err != nil {
- log.Printf("create work failed: %s", err)
- }
+ workName := "load-" + p
+ donut.CreateWork(c.clusterName, c.zk, c.donutConfig, workName, data)
}
}
-func (c *coordinator) createStepWork(step int) {
+func (c *Coordinator) createStepWork(step int) {
log.Printf("creating work for superstep %d", step)
data := make(map[string]interface{})
data[c.clusterName] = c.config.NodeId
View
86 graph.go
@@ -40,7 +40,7 @@ type Graph struct {
partitionId int
// need to point back to the coordinator so we can send things
- coord *coordinator
+ coord *Coordinator
v map[string]Vertex
e map[string][]Edge
@@ -51,7 +51,7 @@ type Graph struct {
globalStat *stepStat
}
-func newGraph(j Job, c *coordinator) *Graph {
+func newGraph(j Job, c *Coordinator) *Graph {
return &Graph{
v: make(map[string]Vertex),
e: make(map[string][]Edge),
@@ -69,54 +69,6 @@ func (g *Graph) setStepStats(active, msgs int, aggr map[string]interface{}) {
g.globalStat.aggr = aggr
}
-/*
-func (g *Graph) createServers() {
- ctx, _ := gozmq.NewContext()
-
- var sock *gozmq.Socket
- sock, _ = ctx.NewSocket(gozmq.REP)
- sock.Bind("tcp://localhost:" + strconv.Itoa(vPort))
- g.kills["vert"] = startServer(sock, func(msg []byte) {
- var v Vertex
- json.Unmarshal(msg, &v)
- })
- port++
- sock, _ = ctx.NewSocket(gozmq.REP)
- sock.Bind("tcp://localhost:" + strconv.Itoa(ePort))
- g.kills["edge"] = startServer(sock, func(msg []byte) {
- var e Edge
- json.Unmarshal(msg, &e)
- })
- port++
- sock, _ = ctx.NewSocket(gozmq.REP)
- sock.Bind("tcp://localhost:" + strconv.Itoa(mPort))
- g.kills["msg"] = startServer(sock, func(msg []byte) {
- var m Message
- json.Unmarshal(msg, &m)
- })
-}
-
-func (g *Graph) startServer(sock *gozmq.Socket, onRecv func([]byte)) chan byte {
- kill := make(chan byte)
- go func() {
- select {
- case <-kill:
- return
- default:
- }
-
- msg, err := gozmq.Recv(0)
- if err != nil {
- log.Println(err)
- continue
- }
-
- onRecv(msg)
- }()
- return kill
-}
-*/
-
func (g *Graph) Load(path string) {
verticies, edges, err := g.job.Load(path)
if err != nil {
@@ -127,21 +79,23 @@ func (g *Graph) Load(path string) {
for _, v := range verticies {
g.addVertex(v)
}
+ log.Printf("adding edges from %s", path)
for _, e := range edges {
g.addEdge(e)
}
- log.Printf("done adding verts from %s", path)
+ log.Printf("done adding verts and edges from %s", path)
}
func (g *Graph) sendVertex(v Vertex, p int) error {
- g.coord.sendVertex(v, p)
- return nil
+ return g.coord.sendVertex(v, p)
}
func (g *Graph) addVertex(v Vertex) {
if p := g.determinePartition(v.Id()); p != g.partitionId {
- log.Printf("sending")
- g.sendVertex(v, p)
+ if e := g.sendVertex(v, p); e != nil {
+ panic(e)
+ }
+ return
}
g.v[v.Id()] = v
}
@@ -155,32 +109,40 @@ func (g *Graph) Edges(id string) []Edge {
}
func (g *Graph) sendEdge(e Edge, p int) error {
- g.coord.sendEdge(e, p)
- return nil
+ return g.coord.sendEdge(e, p)
}
func (g *Graph) addEdge(e Edge) {
if p := g.determinePartition(e.Source()); p != g.partitionId {
- g.sendEdge(e, p)
+ if e := g.sendEdge(e, p); e != nil {
+ panic(e)
+ }
+ return
}
g.e[e.Source()] = append(g.e[e.Source()], e)
}
func (g *Graph) sendMessage(m Message, p int) error {
- g.coord.sendMessage(m, p)
- return nil
+ return g.coord.sendMessage(m, p)
}
func (g *Graph) addMessage(m Message) {
if p := g.determinePartition(m.Destination()); p != g.partitionId {
- g.sendMessage(m, p)
+ if e := g.sendMessage(m, p); e != nil {
+ panic(e)
+ }
+ return
}
g.m[m.Destination()] = append(g.m[m.Destination()], m)
}
// TODO: implement
func (g *Graph) determinePartition(id string) int {
- return 0
+ sum := 0
+ for _, c := range id {
+ sum += int(c)
+ }
+ return sum % g.coord.workers.Len()
}
// this can only happen during compute()
View
2 listener.go
@@ -7,7 +7,7 @@ import (
)
type waffleListener struct {
- coordinator *coordinator
+ coordinator *Coordinator
clusterName string
View
8 waffle.go
@@ -2,7 +2,8 @@ package waffle
import (
"donut"
- "time"
+ // "time"
+ // "strconv"
)
const (
@@ -20,10 +21,10 @@ type Writer interface {
}
func Run(c *Config, j Job) {
- clusterName := j.Id() + "-" + time.Now().String()
+ clusterName := j.Id() // + "-" + strconv.Itoa(int(time.Now().Unix())) // XXX why do numbers break everything?
listener := &waffleListener{
clusterName: clusterName,
- coordinator: &coordinator{
+ coordinator: &Coordinator{
clusterName: clusterName,
config: c,
},
@@ -39,6 +40,7 @@ func Run(c *Config, j Job) {
listener.done = make(chan byte)
listener.config = config
+ listener.coordinator.done = listener.done
cluster.Join()
<-listener.done
}

0 comments on commit 37ddb69

Please sign in to comment.
Something went wrong with that request. Please try again.