Skip to content

Commit

Permalink
cleaned up masterstage
Browse files Browse the repository at this point in the history
  • Loading branch information
Caleb Perkins committed Apr 29, 2012
1 parent 17b7439 commit b319c46
Showing 1 changed file with 30 additions and 24 deletions.
54 changes: 30 additions & 24 deletions src/c2c/stages/MasterStage.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,34 @@ private void handleReducingUnderway(ReducingUnderway event) {
expected.put(event.domain, event.reducers);
completed.put(event.domain, new HashSet<String>());
}

private void handleJobRequest(JobRequest req) {
dispatch(new MappingUnderway(req.domain, req.pairs.size()));
for (KeyValue pair : req.pairs) {
workers.addJob(pair.key.domain);
jobs.put(pair.key.domain, pair);

// Distribute to different nodes.
dispatchTo(pair.key.toNode(), MappingStage.app_id,
pair);
}

// Schedule rescan of worker table
acore.register_timer(4500, rescanTable);
}

private void handleJobStatus(JobStatus status) {
if (status.mapper) {
if (status.done) {
// Mapper is done - remove from tables
workers.removeJob(status.domain);
jobs.remove(status.domain);
} else {
// Mapper still working - refresh in table
workers.addJob(status.domain);
}
}
}

@Override
protected void handleOperationalEvent(QueueElementIF event) {
Expand All @@ -66,34 +94,12 @@ protected void handleOperationalEvent(QueueElementIF event) {
} else if (deliver.payload instanceof KeyPayload) {
handleReducerDone((KeyPayload) deliver.payload);
} else if (deliver.payload instanceof JobStatus) {
JobStatus status = (JobStatus) deliver.payload;
if (status.mapper) {
if (status.done) {
// Mapper is done - remove from tables
workers.removeJob(status.domain);
jobs.remove(status.domain);
} else {
// Mapper still working - refresh in table
workers.addJob(status.domain);
}
}
handleJobStatus((JobStatus) deliver.payload);
} else {
BUG("Unknown payload.");
}
} else if (event instanceof JobRequest) { // Distribute jobs to mappers.
JobRequest req = (JobRequest) event;
dispatch(new MappingUnderway(req.domain, req.pairs.size()));
for (KeyValue pair : req.pairs) {
workers.addJob(pair.key.domain);
jobs.put(pair.key.domain, pair);

// Distribute to different nodes.
dispatchTo(pair.key.toNode(), MappingStage.app_id,
pair);
}

// Schedule rescan of worker table
acore.register_timer(4500, rescanTable);
handleJobRequest((JobRequest) event);
} else if (event instanceof ReducingUnderway) {
handleReducingUnderway((ReducingUnderway) event);
} else {
Expand Down

0 comments on commit b319c46

Please sign in to comment.