-
Notifications
You must be signed in to change notification settings - Fork 0
/
MasterStage.java
127 lines (108 loc) · 3.59 KB
/
MasterStage.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package c2c.stages;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import seda.sandStorm.api.QueueElementIF;
import c2c.events.*;
import c2c.payloads.*;
import c2c.utilities.MapReduceStage;
import c2c.utilities.WorkerTable;
import bamboo.api.*;
/**
* Takes job requests from a Client and disperses them to mappers.
*
* @author Caleb Perkins
*
*/
public final class MasterStage extends MapReduceStage {
public static final long app_id = bamboo.router.Router
.app_id(MasterStage.class);
private final Map<String, Integer> expected = new HashMap<String, Integer>();
private final Map<String, Set<String>> completed = new HashMap<String, Set<String>>();
// KV's have to be stored in case we need to reissue a job
private Map<String, KeyValue> jobs = new HashMap<String, KeyValue>();
// When was the last time we heard from a worker?
private WorkerTable workers = new WorkerTable();
public MasterStage() throws Exception {
super(JobRequest.class, ReducingUnderway.class);
ostore.util.TypeTable.register_type(KeyValue.class);
ostore.util.TypeTable.register_type(KeyPayload.class);
}
private void handleReducerDone(KeyPayload k) {
logger.debug("Reducer done for " + k);
Set<String> comp = completed.get(k.domain);
comp.add(k.data);
if (comp.size() == expected.get(k.domain)) {
dispatch(new JobDone(k.domain));
}
}
private void handleResultBack(KeyValue kv) {
logger.debug("Result back: " + kv);
dispatch(kv);
}
private void handleReducingUnderway(ReducingUnderway event) {
expected.put(event.domain, event.reducers);
completed.put(event.domain, new HashSet<String>());
}
@Override
protected void handleOperationalEvent(QueueElementIF event) {
if (event instanceof BambooRouteDeliver) { // get back the results
BambooRouteDeliver deliver = (BambooRouteDeliver) event;
if (deliver.payload instanceof KeyValue) {
handleResultBack((KeyValue) deliver.payload);
} else if (deliver.payload instanceof KeyPayload) {
handleReducerDone((KeyPayload) deliver.payload);
} else if (deliver.payload instanceof JobStatus) {
JobStatus status = (JobStatus) deliver.payload;
if (status.mapper) {
if (status.done) {
// Mapper is done - remove from tables
workers.removeJob(status.domain);
jobs.remove(status.domain);
} else {
// Mapper still working - refresh in table
workers.addJob(status.domain);
}
}
} else {
BUG("Unknown payload.");
}
} else if (event instanceof JobRequest) { // Distribute jobs to mappers.
JobRequest req = (JobRequest) event;
dispatch(new MappingUnderway(req.domain, req.pairs.size()));
for (KeyValue pair : req.pairs) {
workers.addJob(pair.key.domain);
jobs.put(pair.key.domain, pair);
// Distribute to different nodes.
dispatchTo(pair.key.toNode(), MappingStage.app_id,
pair);
}
// Schedule rescan of worker table
acore.register_timer(4500, rescanTable);
} else if (event instanceof ReducingUnderway) {
handleReducingUnderway((ReducingUnderway) event);
} else {
BUG("Event " + event + " unknown.");
}
}
private Runnable rescanTable = new Runnable() {
public void run() {
// Redispatch all failed jobs
for (String failed : workers.scan()) {
KeyValue pair = jobs.get(failed);
assert(failed.equals(pair.key.domain));
// Readd as current
workers.addJob(failed);
dispatchTo(pair.key.toNode(), MappingStage.app_id,
pair);
}
// Schedule next rescan
acore.registerTimer(4500, rescanTable);
}
};
@Override
public long getAppID() {
return app_id;
}
}