diff --git a/CHANGES.txt b/CHANGES.txt
index d297c4f01..56600ee63 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -3,6 +3,8 @@ Hama Change Log
Release 0.7.2 (unreleased changes)
NEW FEATURES
+
+ HAMA-988: Allow to add additional no-input tasks (edwardyoon)
BUG FIXES
diff --git a/core/src/main/java/org/apache/hama/Constants.java b/core/src/main/java/org/apache/hama/Constants.java
index c3563f954..351fb058b 100644
--- a/core/src/main/java/org/apache/hama/Constants.java
+++ b/core/src/main/java/org/apache/hama/Constants.java
@@ -132,6 +132,8 @@ public interface Constants {
// If true, framework launches the number of tasks by user settings.
public static final String FORCE_SET_BSP_TASKS = "hama.force.set.bsp.tasks";
+ // framework launches additional tasks to the number of input splits
+ public static final String ADDITIONAL_BSP_TASKS = "hama.additional.bsp.tasks";
// /////////////////////////////////////
// Constants for ZooKeeper
diff --git a/core/src/main/java/org/apache/hama/bsp/BSPJob.java b/core/src/main/java/org/apache/hama/bsp/BSPJob.java
index 293e6a6dc..8489f83f4 100644
--- a/core/src/main/java/org/apache/hama/bsp/BSPJob.java
+++ b/core/src/main/java/org/apache/hama/bsp/BSPJob.java
@@ -189,6 +189,11 @@ public long progress() throws IOException {
return info.progress();
}
+ public Counters getCounters() throws IOException {
+ ensureState(JobState.RUNNING);
+ return info.getCounters();
+ }
+
public boolean isComplete() throws IOException {
ensureState(JobState.RUNNING);
return info.isComplete();
diff --git a/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java b/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
index 4ca7b61b4..c1ce0f403 100644
--- a/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
+++ b/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
@@ -219,6 +219,11 @@ public void killTask(TaskAttemptID taskId, boolean shouldFail)
public TaskCompletionEvent[] getTaskCompletionEvents(int startFrom) {
return jobSubmitClient.getTaskCompletionEvents(getID(), startFrom, 10);
}
+
+ @Override
+ public Counters getCounters() {
+ return status.getCounter();
+ }
}
public BSPJobClient(Configuration conf) throws IOException {
@@ -363,17 +368,22 @@ public RunningJob submitJobInternal(BSPJob pJob, BSPJobID jobId)
splits = job.getInputFormat().getSplits(job, maxTasks);
}
- if (maxTasks < splits.length) {
+ // the number of additional tasks to the number of input splits
+ int additionalTasks = job.getConfiguration().getInt(
+ Constants.ADDITIONAL_BSP_TASKS, 0);
+ if (maxTasks < splits.length + additionalTasks) {
throw new IOException(
"Job failed! The number of splits has exceeded the number of max tasks. The number of splits: "
- + splits.length + ", The number of max tasks: " + maxTasks);
+ + splits.length
+ + ", The number of additional tasks: "
+ + +additionalTasks + ", The number of max tasks: " + maxTasks);
}
int numOfSplits = writeSplits(job, splits, submitSplitFile, maxTasks);
if (numOfSplits > configured
|| !job.getConfiguration().getBoolean(Constants.FORCE_SET_BSP_TASKS,
false)) {
- job.setNumBspTask(numOfSplits);
+ job.setNumBspTask(numOfSplits + additionalTasks);
}
job.set("bsp.job.split.file", submitSplitFile.toString());
@@ -583,8 +593,9 @@ private static int writeSplits(BSPJob job, InputSplit[] splits,
// set partitionID to rawSplit
if (split.getClass().getName().equals(FileSplit.class.getName())
&& job.getBoolean("input.has.partitioned", false)) {
- String[] extractPartitionID = ((FileSplit) split).getPath().getName().split("[-]");
- if(extractPartitionID.length > 1)
+ String[] extractPartitionID = ((FileSplit) split).getPath().getName()
+ .split("[-]");
+ if (extractPartitionID.length > 1)
rawSplit.setPartitionID(Integer.parseInt(extractPartitionID[1]));
}
diff --git a/core/src/main/java/org/apache/hama/bsp/RunningJob.java b/core/src/main/java/org/apache/hama/bsp/RunningJob.java
index 95faba569..dcbabddd3 100644
--- a/core/src/main/java/org/apache/hama/bsp/RunningJob.java
+++ b/core/src/main/java/org/apache/hama/bsp/RunningJob.java
@@ -53,6 +53,8 @@ public interface RunningJob {
*/
public String getJobFile();
+ public Counters getCounters();
+
/**
* Get the progress of the job's tasks, as a float between 0.0 and 1.0.
* When all bsp tasks have completed, the function returns 1.0.
diff --git a/core/src/test/java/org/apache/hama/bsp/TestAdditionalTasks.java b/core/src/test/java/org/apache/hama/bsp/TestAdditionalTasks.java
new file mode 100644
index 000000000..e50e4f989
--- /dev/null
+++ b/core/src/test/java/org/apache/hama/bsp/TestAdditionalTasks.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.Constants;
+import org.apache.hama.HamaCluster;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.commons.util.KeyValuePair;
+
+public class TestAdditionalTasks extends HamaCluster {
+
+ public static final Log LOG = LogFactory.getLog(TestAdditionalTasks.class);
+
+ protected HamaConfiguration configuration;
+
+ // these variables are preventing from rebooting the whole stuff again since
+ // setup and teardown are called per method.
+
+ public TestAdditionalTasks() {
+ configuration = new HamaConfiguration();
+ configuration.set("bsp.master.address", "localhost");
+ configuration.set("hama.child.redirect.log.console", "true");
+ assertEquals("Make sure master addr is set to localhost:", "localhost",
+ configuration.get("bsp.master.address"));
+ configuration.set("bsp.local.dir", "/tmp/hama-test");
+ configuration.set(Constants.ZOOKEEPER_QUORUM, "localhost");
+ configuration.setInt(Constants.ZOOKEEPER_CLIENT_PORT, 21810);
+ configuration.set("hama.sync.peer.class",
+ org.apache.hama.bsp.sync.ZooKeeperSyncClientImpl.class
+ .getCanonicalName());
+ }
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ super.tearDown();
+ }
+
+ public void testAdditionalTasks() throws Exception {
+
+ Configuration conf = new Configuration();
+ BSPJob bsp = new BSPJob(new HamaConfiguration(conf));
+ bsp.setBspClass(TestBSP.class);
+ conf.setInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 600);
+ bsp.setInputFormat(TextInputFormat.class);
+ bsp.setOutputFormat(NullOutputFormat.class);
+ bsp.setInputPath(new Path("../CHANGES.txt"));
+
+ bsp.getConfiguration().setInt(Constants.ADDITIONAL_BSP_TASKS, 1);
+ bsp.setNumBspTask(2);
+
+ assertTrue(bsp.waitForCompletion(true));
+ Counters counter = bsp.getCounters();
+ assertTrue(2 == counter.getCounter(JobInProgress.JobCounter.LAUNCHED_TASKS));
+ }
+
+ public static class TestBSP extends
+ BSP {
+
+ @Override
+ public void bsp(
+ BSPPeer peer)
+ throws IOException, SyncException, InterruptedException {
+ long numOfPairs = 0;
+ KeyValuePair readNext = null;
+ while ((readNext = peer.readNext()) != null) {
+ LOG.debug(readNext.getKey().get() + " / "
+ + readNext.getValue().toString());
+ numOfPairs++;
+ }
+
+ assertTrue(numOfPairs > 2 || numOfPairs == 0);
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java b/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java
index 4d2338416..2c2cb2357 100644
--- a/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java
+++ b/yarn/src/main/java/org/apache/hama/bsp/YARNBSPJobClient.java
@@ -123,6 +123,12 @@ public TaskCompletionEvent[] getTaskCompletionEvents(int eventCounter) {
public String getJobFile() {
return null;
}
+
+ @Override
+ public Counters getCounters() {
+ // TODO Auto-generated method stub
+ return null;
+ }
}
public YARNBSPJobClient(HamaConfiguration conf) {