add SELF transaction type #34

Merged
merged 2 commits into from Mar 7, 2013
View
119 bin/doozer_init
@@ -0,0 +1,119 @@
+#!/bin/bash
+set -o pipefail
+
+function log() {
+ echo "[`date +'%Y%m%d %H:%M:%S'`] $@"
+}
+
+# doozer cluster initialization script
+
+DOOZERCLI="/usr/local/bin/doozer"
+DOOZERD_NODES=""
+HOST="127.0.0.1"
+PORT="9200"
+
+while [ "$1" != "" ]; do
+ PARAM=`echo $1 | awk -F= '{print $1}'`
+ VALUE=`echo $1 | awk -F= '{print $2}'`
+ case $PARAM in
+ --doozer-cli)
+ DOOZERCLI="$VALUE"
+ ;;
+ --doozerd-nodes)
+ DOOZERD_NODES="$VALUE"
+ ;;
+ --port)
+ PORT="$VALUE"
+ ;;
+ --host)
+ HOST="$VALUE"
+ ;;
+ esac
+ shift
+done
+
+if [ -z $PORT ]; then
+ log "ERROR: --port cannot be empty"
+ exit 1
+fi
+
+if [ -z $HOST ]; then
+ log "ERROR: --host cannot be empty"
+ exit 1
+fi
+
+if [ -z $DOOZERD_NODES ]; then
+ log "ERROR: --doozerd-nodes cannot be empty"
+ exit 1
+fi
+
+log "DOOZERD_NODES: $DOOZERD_NODES"
+
+# first identify which node to bind to (ie. the root node)
+bind_node=""
+for doozerd_node in $DOOZERD_NODES; do
+ doozerd_node_hostname=`echo $doozerd_node | awk -F: '{print $1}'`
+ doozerd_node_port=`echo $doozerd_node | awk -F: '{print $2}'`
+
+ # resolve the ip (doozerd needs the exact ip to bind to, does not work with 0.0.0.0)
+ doozerd_node_ip=`host $doozerd_node_hostname 2>/dev/null | tail -1 | awk '{print $NF}'`
+ if $DOOZERCLI -a="doozer:?ca=$doozerd_node_ip:$doozerd_node_port" nop; then
+ bind_node="$doozerd_node_ip:$doozerd_node_port"
+ break
+ fi
+done
+
+if [ -z $bind_node ]; then
+ log "ERROR: could not find node to bind to"
+ exit 1
+fi
+
+log "bind_node: $bind_node"
+
+this_node_ip=`host $FQ_HOSTNAME 2>/dev/null | tail -1 | awk '{print $NF}'`
+host_node="$this_node_ip:$PORT"
+
+log "host_node: $host_node"
+
+my_self=`$DOOZERCLI -a="doozer:?ca=$host_node" self`
+if [ -z $my_self ]; then
+ log "ERROR: could not identify self"
+ exit 1
+fi
+
+log "my_self: $my_self"
+
+my_cal=$(printf %d $(expr $(echo $FQ_HOSTNAME | awk -F. '{print $1}' | tail -c -3) - 1))
+if [ -z $my_cal ]; then
+ log "ERROR: could not identify cal"
+ exit 1
+fi
+
+log "my_cal: $my_cal"
+
+log "checking /ctl/cal/$my_cal"
+stat_output=`$DOOZERCLI -a="doozer:?ca=$bind_node" stat /ctl/cal/$my_cal 2>/dev/null`
+if [ "$?" == "0" ]; then
+ log "/ctl/cal/$my_cal exists"
+ cal_self=`$DOOZERCLI -a="doozer:?ca=$bind_node" get /ctl/cal/$my_cal 2>/dev/null`
+ if [ "$cal_self" == "$my_self" ]; then
+ log "NOTICE: nothing to do (we're already active in the cluster)"
+ exit 0
+ fi
+ rev=`echo $stat_output | awk '{print $1}'`
+ log "deleting /ctl/cal/$my_cal @ $rev"
+ $DOOZERCLI -a="doozer:?ca=$bind_node" del /ctl/cal/$my_cal $rev >/dev/null 2>&1
+ if [ "$?" != "0" ]; then
+ log "ERROR: failed to delete /ctl/cal/$my_cal"
+ exit 1
+ fi
+fi
+
+log "adding /ctl/cal/$my_cal"
+echo -n | $DOOZERCLI -a="doozer:?ca=$bind_node" add /ctl/cal/$my_cal
+if [ "$?" != "0" ]; then
+ log "ERROR: failed to add /ctl/cal/$my_cal"
+ exit 1
+fi
+
+log "SUCCESS"
View
2 peer/peer.go
@@ -151,7 +151,7 @@ func Main(clusterName, self, buri, rwsk, rosk string, cl *doozer.Conn, udpConn *
shun := make(chan string, 3) // sufficient for a cluster of 7
go member.Clean(shun, st, pr)
- go server.ListenAndServe(listener, canWrite, st, pr, rwsk, rosk)
+ go server.ListenAndServe(listener, canWrite, st, pr, rwsk, rosk, self)
if rwsk == "" && rosk == "" && webListener != nil {
web.Store = st
View
1 server/conn.go
@@ -21,6 +21,7 @@ type conn struct {
rosk string
waccess bool
raccess bool
+ self string
}
func (c *conn) serve() {
View
3 server/msg.pb.go
@@ -25,6 +25,7 @@ const (
request_WALK request_Verb = 9
request_GETDIR request_Verb = 14
request_STAT request_Verb = 16
+ request_SELF request_Verb = 20
request_ACCESS request_Verb = 99
)
@@ -38,6 +39,7 @@ var request_Verb_name = map[int32]string{
9: "WALK",
14: "GETDIR",
16: "STAT",
+ 20: "SELF",
99: "ACCESS",
}
var request_Verb_value = map[string]int32{
@@ -50,6 +52,7 @@ var request_Verb_value = map[string]int32{
"WALK": 9,
"GETDIR": 14,
"STAT": 16,
+ "SELF": 20,
"ACCESS": 99,
}
View
1 server/msg.proto
@@ -14,6 +14,7 @@ message Request {
WALK = 9;
GETDIR = 14;
STAT = 16;
+ SELF = 20;
ACCESS = 99;
}
optional Verb verb = 2;
View
7 server/server.go
@@ -10,7 +10,7 @@ import (
// ListenAndServe listens on l, accepts network connections, and
// handles requests according to the doozer protocol.
-func ListenAndServe(l net.Listener, canWrite chan bool, st *store.Store, p consensus.Proposer, rwsk, rosk string) {
+func ListenAndServe(l net.Listener, canWrite chan bool, st *store.Store, p consensus.Proposer, rwsk, rosk string, self string) {
var w bool
for {
c, err := l.Accept()
@@ -32,11 +32,11 @@ func ListenAndServe(l net.Listener, canWrite chan bool, st *store.Store, p conse
default:
}
- go serve(c, st, p, w, rwsk, rosk)
+ go serve(c, st, p, w, rwsk, rosk, self)
}
}
-func serve(nc net.Conn, st *store.Store, p consensus.Proposer, w bool, rwsk, rosk string) {
+func serve(nc net.Conn, st *store.Store, p consensus.Proposer, w bool, rwsk, rosk string, self string) {
c := &conn{
c: nc,
addr: nc.RemoteAddr().String(),
@@ -45,6 +45,7 @@ func serve(nc net.Conn, st *store.Store, p consensus.Proposer, w bool, rwsk, ros
canWrite: w,
rwsk: rwsk,
rosk: rosk,
+ self: self,
}
c.grant("") // start as if the client supplied a blank password
View
6 server/txn.go
@@ -24,6 +24,7 @@ var ops = map[int32]func(*txn){
int32(request_REV): (*txn).rev,
int32(request_SET): (*txn).set,
int32(request_STAT): (*txn).stat,
+ int32(request_SELF): (*txn).self,
int32(request_WAIT): (*txn).wait,
int32(request_WALK): (*txn).walk,
int32(request_ACCESS): (*txn).access,
@@ -154,6 +155,11 @@ func (t *txn) rev() {
t.respond()
}
+func (t *txn) self() {
+ t.resp.Value = []byte(t.c.self)
+ t.respond()
+}
+
func (t *txn) stat() {
if !t.c.raccess {
t.respondOsError(syscall.EACCES)