Skip to content

Commit

Permalink
JobStatus was not properly registered, causing an exception.
Browse files Browse the repository at this point in the history
  • Loading branch information
Caleb Perkins committed Apr 27, 2012
1 parent d9314bd commit 933fc53
Showing 1 changed file with 17 additions and 16 deletions.
33 changes: 17 additions & 16 deletions src/c2c/stages/MappingStage.java
Expand Up @@ -28,16 +28,16 @@ public final class MappingStage extends MapReduceStage {

// Here KeyPayload corresponds to a mapper key
private final Map<KeyPayload, Integer> remaining = new HashMap<KeyPayload, Integer>();

private final Map<String, Job> jobs = new HashMap<String, MappingStage.Job>();
private boolean working; // Job active?

private final ExecutorService pool = Executors.newCachedThreadPool();

private class Job {
public final BigInteger master;
public final Mapper mapper;

public Job(String domain, BigInteger master) throws Exception {
mapper = (Mapper) classLoader.loadClass(domain).newInstance();
this.master = master;
Expand All @@ -49,6 +49,7 @@ public Job(String domain, BigInteger master) throws Exception {

public MappingStage() throws Exception {
super(KeyValue.class, Dht.PutResp.class);
ostore.util.TypeTable.register_type(JobStatus.class);
}

@Override
Expand Down Expand Up @@ -76,37 +77,37 @@ private Job getJob(String domain, BigInteger master) {
private void handleMapRequest(final BambooRouteDeliver event) {
final KeyValue kv = (KeyValue) event.payload;
final Job job = getJob(kv.key.domain, event.src);

// Notify the master that we're now working
working = true;
acore.registerTimer(10, new Runnable() {
public void run() {
if (working) {
dispatchTo(event.src, MasterStage.app_id,
dispatchTo(event.src, MasterStage.app_id,
new JobStatus(kv.key.domain, false, true));
acore.registerTimer(1000, this);
}
}
});

logger.info("Mapping " + kv.key);

// The user's map function may be blocking so start a new thread.
pool.execute(new Runnable() {

@Override
public void run() {
final Collector c = new Collector(kv.key);
job.mapper.map(kv.key.data, kv.value, c);

// Get back to main thread
acore.registerTimer(0, new Runnable() {

@Override
public void run() {
c.flush();
working = false;

// Tell the master that we're done
dispatchTo(event.src, MasterStage.app_id,
new JobStatus(kv.key.domain, true, true));
Expand All @@ -128,7 +129,7 @@ private void handlePutResp(Dht.PutResp response) {
doPut(kv);
}
}

private void doPut(IntermediateKeyValue kv) {
Dht.PutReq req = new Dht.PutReq(kv.key.toNode(), kv.value.toByteBuffer(),
kv.value.hash(), true, my_sink, kv, 600,
Expand All @@ -143,15 +144,15 @@ public long getAppID() {

private class Collector implements OutputCollector {
private KeyPayload mapping_key;

private Set<String> keys = new HashSet<String>();
private Collection<KeyValue> keyvalues = new LinkedList<KeyValue>();

public Collector(KeyPayload mapping_key) {
this.mapping_key = mapping_key;
assert mapping_key != null;
}

public void flush() {
remaining.put(mapping_key, keys.size() + keyvalues.size());
KeyPayload inter = KeyPayload.intermediateKeys(mapping_key.domain);
Expand All @@ -168,7 +169,7 @@ public void collect(String key, String value) {
keys.add(key);
keyvalues.add(new KeyValue(new KeyPayload(mapping_key.domain, key), value));
}

private void makePut(KeyPayload key, String value, boolean allow_duplicates) {
Value val = new Value(value, allow_duplicates);
IntermediateKeyValue ud = new IntermediateKeyValue(mapping_key, key, val);
Expand Down

0 comments on commit 933fc53

Please sign in to comment.