From 293a013949562360640687423992058bf0777259 Mon Sep 17 00:00:00 2001 From: NicEggert Date: Thu, 23 Mar 2017 15:23:03 -0500 Subject: [PATCH 1/7] Set checkpointStatus to NONE in master when restarting from checkpoint. Workers already do this, so the job hangs when restarting from checkpoint while the master waits for workers to create checkpoints they're never going to create. --- .../src/main/java/org/apache/giraph/master/BspServiceMaster.java | 1 + 1 file changed, 1 insertion(+) diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java index 7c546785a..639f988c9 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java @@ -1211,6 +1211,7 @@ public void restartFromCheckpoint(long checkpoint) { setApplicationAttempt(getApplicationAttempt() + 1); setCachedSuperstep(checkpoint); setRestartedSuperstep(checkpoint); + checkpointStatus = CheckpointStatus.NONE; setJobState(ApplicationState.START_SUPERSTEP, getApplicationAttempt(), checkpoint); From b329eddf88325ed4777ef9018e7f0b6685cbabfe Mon Sep 17 00:00:00 2001 From: NicEggert Date: Thu, 23 Mar 2017 15:26:02 -0500 Subject: [PATCH 2/7] Checkpoint on superstep 0 if checkpointing is enabled --- .../main/java/org/apache/giraph/master/BspServiceMaster.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java index 639f988c9..affb2b318 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java @@ -1741,7 +1741,7 @@ private CheckpointStatus getCheckpointStatus(long superstep) { if (checkpointFrequency == 0) { return CheckpointStatus.NONE; } - long firstCheckpoint = INPUT_SUPERSTEP + 1 + checkpointFrequency; + long firstCheckpoint = INPUT_SUPERSTEP + 1; if (getRestartedSuperstep() != UNSET_SUPERSTEP) { firstCheckpoint = getRestartedSuperstep() + checkpointFrequency; } From 50807bb4771b512b1655e4c3781c7ff6bdcc1760 Mon Sep 17 00:00:00 2001 From: NicEggert Date: Thu, 23 Mar 2017 15:26:47 -0500 Subject: [PATCH 3/7] Set unique task id for each worker attempt Previously, a worker would reuse the task id from the prior attempt. This gets propagated to the Netty client id, which makes the master think it has already processed any requests that come from that client, causing it to discard them. This obviously causes problems. --- .../java/org/apache/giraph/worker/BspServiceWorker.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java index b6b9c121c..ba4d72ff2 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java @@ -216,7 +216,7 @@ public BspServiceWorker( graphTaskManager.createUncaughtExceptionHandler()); workerInfo.setInetSocketAddress(workerServer.getMyAddress(), workerServer.getLocalHostOrIp()); - workerInfo.setTaskId(getTaskPartition()); + workerInfo.setTaskId((int)getApplicationAttempt() * conf.getMaxWorkers() + getTaskPartition()); workerClient = new NettyWorkerClient(context, conf, this, graphTaskManager.createUncaughtExceptionHandler()); workerServer.setFlowControl(workerClient.getFlowControl()); @@ -921,7 +921,7 @@ private void writeFinshedSuperstepInfoToZK( String finishedWorkerPath = getWorkerFinishedPath(getApplicationAttempt(), getSuperstep()) + - "/" + getHostnamePartitionId(); + "/" + workerInfo.getHostnameId(); try { getZkExt().createExt(finishedWorkerPath, workerFinishedInfoObj.toString().getBytes(Charset.defaultCharset()), @@ -1303,7 +1303,7 @@ public void storeCheckpoint() throws IOException { // Notify master that checkpoint is stored String workerWroteCheckpoint = getWorkerWroteCheckpointPath(getApplicationAttempt(), - getSuperstep()) + "/" + getHostnamePartitionId(); + getSuperstep()) + "/" + workerInfo.getHostnameId(); try { getZkExt().createExt(workerWroteCheckpoint, new byte[0], From 1b7be4cf36604cd1d02f02d19e7fc100679ee57c Mon Sep 17 00:00:00 2001 From: NicEggert Date: Sun, 23 Apr 2017 12:06:07 -0500 Subject: [PATCH 4/7] Set unique task id on master as well --- .../main/java/org/apache/giraph/master/BspServiceMaster.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java index affb2b318..6d223315b 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java @@ -860,7 +860,7 @@ public boolean becomeMaster() { getGraphTaskManager().createUncaughtExceptionHandler()); masterInfo.setInetSocketAddress(masterServer.getMyAddress(), masterServer.getLocalHostOrIp()); - masterInfo.setTaskId(getTaskPartition()); + masterInfo.setTaskId((int)getApplicationAttempt() * getConfiguration().getMaxWorkers() + getTaskPartition()); masterClient = new NettyMasterClient(getContext(), getConfiguration(), this, getGraphTaskManager().createUncaughtExceptionHandler()); From cde1a7bdcb5526d20e08c5c0d51a66f10bcc3fe0 Mon Sep 17 00:00:00 2001 From: NicEggert Date: Sun, 23 Apr 2017 14:52:39 -0500 Subject: [PATCH 5/7] Identify jobs by task id instead of partition id. Task id ensures uniqueness across application attempts. --- .../org/apache/giraph/bsp/BspService.java | 26 +++++++++++-------- .../giraph/master/BspServiceMaster.java | 14 +++++----- .../giraph/worker/BspServiceWorker.java | 6 ++--- 3 files changed, 25 insertions(+), 21 deletions(-) diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java index 976997fa8..ec7a8bc02 100644 --- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java @@ -188,12 +188,12 @@ public abstract class BspService graphPartitionerFactory; /** Mapper that will do the graph computation */ @@ -231,8 +231,8 @@ public BspService( this.context = context; this.graphTaskManager = graphTaskManager; this.conf = graphTaskManager.getConf(); + this.jobId = conf.getJobId(); - this.taskPartition = conf.getTaskPartition(); this.restartedSuperstep = conf.getLong( GiraphConstants.RESTART_SUPERSTEP, UNSET_SUPERSTEP); try { @@ -240,7 +240,6 @@ public BspService( } catch (UnknownHostException e) { throw new RuntimeException(e); } - this.hostnamePartitionId = hostname + "_" + getTaskPartition(); this.graphPartitionerFactory = conf.createGraphPartitioner(); basePath = ZooKeeperManager.getBasePath(conf) + BASE_DIR + "/" + jobId; @@ -252,6 +251,8 @@ public BspService( applicationAttemptsPath = basePath + APPLICATION_ATTEMPTS_DIR; cleanedUpPath = basePath + CLEANED_UP_DIR; + + String restartJobId = RESTART_JOB_ID.get(conf); savedCheckpointBasePath = @@ -272,7 +273,7 @@ public BspService( } if (LOG.isInfoEnabled()) { LOG.info("BspService: Connecting to ZooKeeper with job " + jobId + - ", " + getTaskPartition() + " on " + serverPortList); + ", " + getTaskId() + " on " + serverPortList); } try { this.zk = new ZooKeeperExt(serverPortList, @@ -288,6 +289,9 @@ public BspService( throw new RuntimeException(e); } + this.taskId = (int)getApplicationAttempt() * conf.getMaxWorkers() + conf.getTaskPartition(); + this.hostnameTaskId = hostname + "_" + getTaskId(); + //Trying to restart from the latest superstep if (restartJobId != null && restartedSuperstep == UNSET_SUPERSTEP) { @@ -529,12 +533,12 @@ public final String getHostname() { return hostname; } - public final String getHostnamePartitionId() { - return hostnamePartitionId; + public final String getHostnameTaskId() { + return hostnameTaskId; } - public final int getTaskPartition() { - return taskPartition; + public final int getTaskId() { + return taskId; } public final GraphTaskManager getGraphTaskManager() { diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java index 6d223315b..a9e98a8ce 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java @@ -579,15 +579,15 @@ private void logMissingWorkersOnSuperstep( if (LOG.isInfoEnabled()) { Set partitionSet = new TreeSet(); for (WorkerInfo workerInfo : healthyWorkerInfoList) { - partitionSet.add(workerInfo.getTaskId()); + partitionSet.add(workerInfo.getTaskId() % maxWorkers); } for (WorkerInfo workerInfo : unhealthyWorkerInfoList) { - partitionSet.add(workerInfo.getTaskId()); + partitionSet.add(workerInfo.getTaskId() % maxWorkers); } for (int i = 1; i <= maxWorkers; ++i) { if (partitionSet.contains(Integer.valueOf(i))) { continue; - } else if (i == getTaskPartition()) { + } else if (i == getTaskId() % maxWorkers) { continue; } else { LOG.info("logMissingWorkersOnSuperstep: No response from " + @@ -802,7 +802,7 @@ public boolean becomeMaster() { try { myBid = getZkExt().createExt(masterElectionPath + - "/" + getHostnamePartitionId(), + "/" + getHostnameTaskId(), null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, @@ -841,7 +841,7 @@ public boolean becomeMaster() { } if (masterChildArr.get(0).equals(myBid)) { GiraphStats.getInstance().getCurrentMasterTaskPartition(). - setValue(getTaskPartition()); + setValue(getTaskId()); globalCommHandler = new MasterGlobalCommHandler( new MasterAggregatorHandler(getConfiguration(), getContext()), @@ -860,7 +860,7 @@ public boolean becomeMaster() { getGraphTaskManager().createUncaughtExceptionHandler()); masterInfo.setInetSocketAddress(masterServer.getMyAddress(), masterServer.getLocalHostOrIp()); - masterInfo.setTaskId((int)getApplicationAttempt() * getConfiguration().getMaxWorkers() + getTaskPartition()); + masterInfo.setTaskId(getTaskId()); masterClient = new NettyMasterClient(getContext(), getConfiguration(), this, getGraphTaskManager().createUncaughtExceptionHandler()); @@ -1913,7 +1913,7 @@ public void cleanup(SuperstepState superstepState) throws IOException { // for workers and masters, the master will clean up the ZooKeeper // znodes associated with this job. String masterCleanedUpPath = cleanedUpPath + "/" + - getTaskPartition() + MASTER_SUFFIX; + getTaskId() + MASTER_SUFFIX; try { String finalFinishedPath = getZkExt().createExt(masterCleanedUpPath, diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java index ba4d72ff2..6f0274994 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java @@ -216,7 +216,7 @@ public BspServiceWorker( graphTaskManager.createUncaughtExceptionHandler()); workerInfo.setInetSocketAddress(workerServer.getMyAddress(), workerServer.getLocalHostOrIp()); - workerInfo.setTaskId((int)getApplicationAttempt() * conf.getMaxWorkers() + getTaskPartition()); + workerInfo.setTaskId(getTaskId()); workerClient = new NettyWorkerClient(context, conf, this, graphTaskManager.createUncaughtExceptionHandler()); workerServer.setFlowControl(workerClient.getFlowControl()); @@ -243,7 +243,7 @@ public BspServiceWorker( } observers = conf.createWorkerObservers(context); - WorkerProgress.get().setTaskId(getTaskPartition()); + WorkerProgress.get().setTaskId(getTaskId()); workerProgressWriter = conf.trackJobProgressOnClient() ? new WorkerProgressWriter(graphTaskManager.getJobProgressTracker()) : null; @@ -1202,7 +1202,7 @@ public void cleanup(FinishedSuperstepStats finishedSuperstepStats) // for workers and masters, the master will clean up the ZooKeeper // znodes associated with this job. String workerCleanedUpPath = cleanedUpPath + "/" + - getTaskPartition() + WORKER_SUFFIX; + getTaskId() + WORKER_SUFFIX; try { String finalFinishedPath = getZkExt().createExt(workerCleanedUpPath, From 9ecbd27e75a980efa091036cc99c4b5cc5f4ccb4 Mon Sep 17 00:00:00 2001 From: NicEggert Date: Wed, 14 Jun 2017 12:03:25 -0500 Subject: [PATCH 6/7] Log partition before taskid is set --- giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java index ec7a8bc02..f34d71196 100644 --- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java @@ -273,7 +273,7 @@ public BspService( } if (LOG.isInfoEnabled()) { LOG.info("BspService: Connecting to ZooKeeper with job " + jobId + - ", " + getTaskId() + " on " + serverPortList); + ", partition " + conf.getTaskPartition() + " on " + serverPortList); } try { this.zk = new ZooKeeperExt(serverPortList, From eb89568c673fef6a66c6586a1080f34c5a3a73c7 Mon Sep 17 00:00:00 2001 From: NicEggert Date: Wed, 14 Jun 2017 12:03:30 -0500 Subject: [PATCH 7/7] Style --- .../src/main/java/org/apache/giraph/bsp/BspService.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java index f34d71196..c3fd141fe 100644 --- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java @@ -188,7 +188,7 @@ public abstract class BspService