Permalink
Browse files

bugfix for slave status and process event notify

  • Loading branch information...
zhihuij committed Apr 24, 2012
1 parent 1752554 commit 9ce7f0fb9a9d83fdc63e15dc0a74775ff703a7fa
@@ -107,13 +107,14 @@ public void startMaster(boolean consoleFlag) {
List<String> slaves = zk.getChildren(slavesPath, slaveStatusWatcher);
for (String addr : slaves) {
SlaveTarget slaveTarget = slaveTargetMap.get(addr);
if (slaveTarget == null) {
String slaveStatusPath = Utils.getSlaveStatusPath(addr);
if (slaveTarget == null && Utils.checkNode(zk, slaveStatusPath)) {
slaveTarget = new SlaveTarget(addr);
slaveTarget.setStatus(SlaveStatus.ONLINE);
slaveTargetMap.put(addr, slaveTarget);
zk.getData(Utils.getSlaveStatusPath(addr), slaveStatusWatcher, null);
zk.getData(slaveStatusPath, slaveStatusWatcher, null);
LogUtils.logInfoLine(Utils.constructString("slave -> ", addr, " online"));
}
@@ -121,8 +121,9 @@ public void start() {
String trimLine = line.trim();
if (trimLine.length() != 0) {
try {
if (!commandMap.containsKey(trimLine) || trimLine.equals(Global.CMD_HELP)
|| !commandOptions.parseCommand(trimLine)) {
if (!commandOptions.parseCommand(trimLine)
|| !commandMap.containsKey(commandOptions.getAction())
|| commandOptions.getAction().equals(Global.CMD_HELP)) {
usage();
continue;
}
@@ -12,6 +12,7 @@
import org.apache.zookeeper.Watcher.Event.EventType;
import com.netease.automate.exception.ZooKeeperException;
import com.netease.automate.meta.Global;
import com.netease.automate.meta.SlaveStatus;
import com.netease.automate.meta.SlaveTarget;
import com.netease.automate.utils.LogUtils;
@@ -47,24 +48,44 @@ public void process(WatchedEvent event) {
if (slaveAddr != null) {
slaveTargetMap.remove(slaveAddr);
LogUtils.logInfoLine(Utils.constructString("slave -> ", slaveAddr, " offline"));
try {
// add watcher for this slave, check if it is restarted
zk.getChildren(Utils.getSlaveRootPath(slaveAddr), this);
} catch (KeeperException e) {
throw new ZooKeeperException(e);
} catch (InterruptedException e) {
throw new ZooKeeperException(e);
}
}
} else if (event.getType() == EventType.NodeChildrenChanged) {
// node added
try {
List<String> slaveList = zk.getChildren(nodePath, this);
for (String slave : slaveList) {
SlaveTarget slaveTarget = slaveTargetMap.get(slave);
if (slaveTarget == null) {
slaveTarget = new SlaveTarget(slave);
slaveTarget.setStatus(SlaveStatus.ONLINE);
slaveTargetMap.put(slave, slaveTarget);
zk.getData(Utils.getSlaveStatusPath(slave), this, null);
if (!nodePath.equals(Utils.constructString(Global.ARC_ROOT, Global.ARC_SLAVE))) {
// node restart
List<String> slaveSubNode = zk.getChildren(nodePath, false);
for (String subNode : slaveSubNode) {
if (Global.SLAVE_STATUS.indexOf(subNode) > 0) {
// status node
String statusPath = Utils.constructString(nodePath, Global.PATH_SEPARATOR, subNode);
Matcher matcher = pattern.matcher(statusPath);
String slaveAddr = null;
if (matcher.matches()) {
slaveAddr = matcher.group(1);
}
checkSlave(slaveAddr);
}
}
} else {
// node added
List<String> slaveList = zk.getChildren(nodePath, this);
LogUtils.logInfoLine(Utils.constructString("slave -> ", slave, " online"));
for (String slave : slaveList) {
checkSlave(slave);
}
}
} catch (KeeperException e) {
@@ -74,4 +95,18 @@ public void process(WatchedEvent event) {
}
}
}
private void checkSlave(String slave) throws KeeperException, InterruptedException {
SlaveTarget slaveTarget = slaveTargetMap.get(slave);
if (slaveTarget == null) {
slaveTarget = new SlaveTarget(slave);
slaveTarget.setStatus(SlaveStatus.ONLINE);
slaveTargetMap.put(slave, slaveTarget);
zk.getData(Utils.getSlaveStatusPath(slave), this, null);
LogUtils.logInfoLine(Utils.constructString("slave -> ", slave, " online"));
}
}
}
@@ -58,5 +58,6 @@ protected void beforeDoAction(AbstractProcess process, String pkgTargetPath) thr
protected void afterActionDone(AbstractProcess process, String pkgTargetPath) throws KeeperException,
InterruptedException {
process.setStatus(ProcessStatus.RESTARTED);
zk.getData(pkgTargetPath, true, null);
}
}
@@ -199,8 +199,8 @@ public void processMessage(byte[] message) {
}
public void processMasterCmd(String action, String projectName, byte[] data) {
logger.info("recv master cmd: " + projectName + " -> " + action + "@" + data);
PackageMeta pkgMeta = JsonUtils.getObject(data, PackageMeta.class);
logger.info("recv master cmd: " + projectName + " -> " + action + "@" + pkgMeta.getPkgName());
try {
if (Global.CMD_DEPLOY.equals(action)) {

0 comments on commit 9ce7f0f

Please sign in to comment.