Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Mappers now communicate 'still alive' messages to Master, which responds

appropriately
  • Loading branch information...
commit c0a2b86a76cb8575f210db3e035b463f4c3b18f5 1 parent fc132ef
@alex-slover alex-slover authored
View
1  .classpath
@@ -20,5 +20,6 @@
<classpathentry exported="true" kind="lib" path="lib/ostore-seda-emu.jar"/>
<classpathentry exported="true" kind="lib" path="lib/xmlrpc-1.2-b1.jar"/>
<classpathentry exported="true" kind="lib" path="lib/gson-2.1.jar"/>
+ <classpathentry kind="lib" path="lib/joda-time-2.1.jar"/>
<classpathentry kind="output" path="bin"/>
</classpath>
View
BIN  lib/joda-time-2.1.jar
Binary file not shown
View
25 src/c2c/stages/MappingStage.java
@@ -9,24 +9,29 @@
import java.util.Set;
import c2c.api.*;
+import c2c.events.JobDone;
+import c2c.events.MappingUnderway;
import c2c.payloads.IntermediateKeyValue;
import c2c.payloads.KeyPayload;
import c2c.payloads.KeyValue;
import c2c.payloads.Value;
+import c2c.utilities.WorkerTable;
import seda.sandStorm.api.*;
import bamboo.api.*;
import bamboo.dht.Dht;
import bamboo.dht.Dht.PutResp;
import bamboo.dht.bamboo_stat;
+import bamboo.router.PingMsg;
public final class MappingStage extends MapReduceStage {
private final ClassLoader classLoader = MappingStage.class.getClassLoader();
-
+
// Here KeyPayload corresponds to a mapper key
private final Map<KeyPayload, Integer> remaining = new HashMap<KeyPayload, Integer>();
private final Map<String, Job> jobs = new HashMap<String, MappingStage.Job>();
+ private boolean working; // Job active?
private class Job {
public final BigInteger master;
@@ -68,12 +73,28 @@ private Job getJob(String domain, BigInteger master) {
}
private void handleMapRequest(BambooRouteDeliver event) {
- KeyValue kv = (KeyValue) event.payload;
+ final KeyValue kv = (KeyValue) event.payload;
Job job = getJob(kv.key.domain, event.src);
+
+ // Notify the master that we're now working
+ working = true;
+ acore.register_timer(10, new Runnable() {
+ public void run() {
+ if (working) {
+ dispatch(new MappingUnderway(kv.key.domain, -1));
+ acore.register_timer(1000, this);
+ }
+ }
+ });
+
logger.info("Mapping " + kv.key);
Collector c = new Collector(kv.key);
job.mapper.map(kv.key.data, kv.value, c);
c.flush();
+ working = false;
+
+ // Tell the master - we're done
+ dispatch(new JobDone(kv.key.domain));
}
private void handlePutResp(Dht.PutResp response) {
View
38 src/c2c/stages/MasterStage.java
@@ -9,6 +9,7 @@
import c2c.events.*;
import c2c.payloads.*;
+import c2c.utilities.WorkerTable;
import bamboo.api.*;
@@ -24,6 +25,11 @@
private final Map<String, Integer> expected = new HashMap<String, Integer>();
private final Map<String, Set<String>> completed = new HashMap<String, Set<String>>();
+
+ // KV's have to be stored in case we need to reissue a job
+ private Map<String, KeyValue> jobs = new HashMap<String, KeyValue>();
+ // When was the last time we heard from a worker?
+ private WorkerTable workers = new WorkerTable();
public MasterStage() throws Exception {
super(KeyValue.class, JobRequest.class, ReducingUnderway.class);
@@ -64,16 +70,48 @@ protected void handleOperationalEvent(QueueElementIF event) {
JobRequest req = (JobRequest) event;
dispatch(new MappingUnderway(req.domain, req.pairs.size()));
for (KeyValue pair : req.pairs) {
+ workers.addJob(pair.key.domain);
+ jobs.put(pair.key.domain, pair);
+
// Distribute to different nodes.
dispatchTo(pair.key.toNode(), MappingStage.app_id,
pair);
}
+
+ // Schedule rescan of worker table
+ acore.register_timer(4500, rescanTable);
+ } else if (event instanceof MappingUnderway) {
+ // Update job in table - it's not dead!
+ workers.addJob(((MappingUnderway)event).domain);
+ } else if (event instanceof JobDone) {
+ // A mapper is finished - remove it from table and stop checking
+ String done = ((JobDone)event).domain;
+ workers.removeJob(done);
+ jobs.remove(done);
} else if (event instanceof ReducingUnderway) {
handleReducingUnderway((ReducingUnderway) event);
} else {
BUG("Event " + event + " unknown.");
}
}
+
+ private Runnable rescanTable = new Runnable() {
+ public void run() {
+ // Redispatch all failed jobs
+ for (String failed : workers.scan()) {
+ KeyValue pair = jobs.get(failed);
+ assert(failed.equals(pair.key.domain));
+
+ // Readd as current
+ workers.addJob(failed);
+ dispatchTo(pair.key.toNode(), MappingStage.app_id,
+ pair);
+ }
+
+ // Schedule next rescan
+ acore.registerTimer(4500, rescanTable);
+ }
+ };
@Override
public long getAppID() {
View
31 src/c2c/utilities/WorkerTable.java
@@ -0,0 +1,31 @@
+package c2c.utilities;
+
+import java.util.*;
+import org.joda.time.*;
+
+public class WorkerTable {
+ // Number of seconds to wait before a missing job is killed
+ protected static final Duration TIMEOUT = new Duration(10 * 1000);
+ protected Map<String, DateTime> jobs = new HashMap<String, DateTime>();
+
+ public synchronized void addJob(String job) {
+ jobs.put(job, new DateTime());
+ }
+
+ public synchronized void removeJob(String job) {
+ jobs.remove(job);
+ }
+
+ public synchronized Iterable<String> scan() {
+ DateTime now = new DateTime();
+ LinkedList<String> result = new LinkedList<String>();
+
+ for (String job : jobs.keySet()) {
+ if (jobs.get(job).plus(TIMEOUT).compareTo(now) < 0) {
+ result.add(job);
+ }
+ }
+
+ return result;
+ }
+}

0 comments on commit c0a2b86

Please sign in to comment.
Something went wrong with that request. Please try again.