Skip to content
Browse files

client stage now outputs a formatted json file. Json input is also st…

…reaming.
  • Loading branch information...
1 parent aa05739 commit 26d32c0e3e56fc61ff4192faabd4af925364e695 @calebperkins committed
View
7 build.xml
@@ -63,9 +63,10 @@
<target name="singletest" description="Start a test job on one node." depends="compile">
<java fork="true" failonerror="true" maxmemory="128m" classname="c2c.stages.ClientStage">
<sysproperty key="java.library.path" value="${syslib}" />
- <arg value="configs/master.cfg"/>
- <arg value="demos.WordCount"/>
- <arg value="words.json"/>
+ <arg value="configs/master.cfg" />
+ <arg value="demos.WordCount" />
+ <arg value="words.json" />
+ <arg value="/tmp/results.json" />
<classpath>
<fileset dir="${lib}">
<include name="**/*.jar"/>
View
11 src/c2c/events/JobDone.java
@@ -0,0 +1,11 @@
+package c2c.events;
+
+import seda.sandStorm.api.QueueElementIF;
+
+public class JobDone implements QueueElementIF {
+ public final String domain;
+
+ public JobDone(String d) {
+ domain = d;
+ }
+}
View
13 src/c2c/events/ReducingUnderway.java
@@ -0,0 +1,13 @@
+package c2c.events;
+
+import seda.sandStorm.api.QueueElementIF;
+
+public class ReducingUnderway implements QueueElementIF {
+ public final String domain;
+ public final int reducers;
+
+ public ReducingUnderway(String domain, int reducers) {
+ this.domain = domain;
+ this.reducers = reducers;
+ }
+}
View
3 src/c2c/payloads/KeyValue.java
@@ -4,6 +4,7 @@
import ostore.util.OutputBuffer;
import ostore.util.QSException;
import ostore.util.QuickSerializable;
+import seda.sandStorm.api.QueueElementIF;
/**
* A key-value pair for mapping.
@@ -11,7 +12,7 @@
* @author Caleb Perkins
*
*/
-public class KeyValue implements QuickSerializable, Comparable<KeyValue> {
+public class KeyValue implements QuickSerializable, Comparable<KeyValue>, QueueElementIF {
public final KeyPayload key;
public final String value;
View
87 src/c2c/stages/ClientStage.java
@@ -1,42 +1,59 @@
package c2c.stages;
+import java.io.FileOutputStream;
import java.io.FileReader;
-import java.util.Map.Entry;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
+import com.google.gson.JsonIOException;
+import com.google.gson.JsonSyntaxException;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+import c2c.events.JobDone;
import c2c.events.JobRequest;
+import c2c.payloads.KeyValue;
import seda.sandStorm.api.ConfigDataIF;
import seda.sandStorm.api.QueueElementIF;
+/**
+ * Handles a client (someone who wants to perform a job). Reads from an input
+ * file and writes to an output file.
+ *
+ * @author Caleb Perkins
+ *
+ */
public class ClientStage extends MapReduceStage {
public static final long app_id = bamboo.router.Router
.app_id(ClientStage.class);
private static String class_name;
private static String input_file;
+ private static String output_file;
+
+ private JsonWriter writer;
public static void main(String[] args) throws Exception {
- if (args.length == 2 || args.length > 3) {
- System.err.println("Arguments: config_file class_name input_file");
+ if (args.length == 2 || args.length > 4) {
+ System.err
+ .println("Arguments: config_file class_name input_file output_file");
System.exit(1);
}
String[] config = { args[0] };
- if (args.length == 3) {
+ if (args.length == 4) {
class_name = args[1];
input_file = args[2];
+ output_file = args[3];
}
bamboo.lss.DustDevil.main(config);
}
public ClientStage() throws Exception {
- super(null);
-
+ super(null, KeyValue.class, JobDone.class);
+
// Configure built-in stages to be less noisy
Logger.getLogger(bamboo.lss.ASyncCoreImpl.class).setLevel(Level.WARN);
Logger.getLogger(bamboo.db.StorageManager.class).setLevel(Level.WARN);
@@ -49,20 +66,27 @@ public void init(ConfigDataIF config) throws Exception {
super.init(config);
if (class_name != null) {
JobRequest req = new JobRequest(class_name);
- final JsonParser parser = new JsonParser();
- final JsonElement jsonElement = parser.parse(new FileReader(
- input_file));
- final JsonObject jsonObject = jsonElement.getAsJsonObject();
-
- for (final Entry<String, JsonElement> entry : jsonObject.entrySet()) {
- final String key = entry.getKey();
- final String value = entry.getValue().getAsString();
- req.add(key, value);
- }
+ parseInputFile(req);
+
+ writer = new JsonWriter(new OutputStreamWriter(
+ new FileOutputStream(output_file), "UTF-8"));
+ writer.beginObject();
+
classifier.dispatch_later(req, 1000);
}
}
+ private void parseInputFile(JobRequest req) throws JsonIOException,
+ JsonSyntaxException, IOException {
+ JsonReader reader = new JsonReader(new FileReader(input_file));
+ reader.beginObject();
+ while (reader.hasNext()) {
+ req.add(reader.nextName(), reader.nextString());
+ }
+ reader.endObject();
+ reader.close();
+ }
+
@Override
protected long getAppID() {
return app_id;
@@ -70,6 +94,31 @@ protected long getAppID() {
@Override
protected void handleOperationalEvent(QueueElementIF event) {
+ if (event instanceof KeyValue) {
+ writeResult((KeyValue) event);
+ } else if (event instanceof JobDone) {
+ finishJob((JobDone) event);
+ } else {
+ BUG("Unknown event: " + event);
+ }
+ }
+
+ private void finishJob(JobDone job) {
+ logger.info("Job done! Results written to " + output_file);
+ try {
+ writer.endObject();
+ writer.close();
+ } catch (IOException e) {
+ logger.fatal("Could not close result file. File may be corrupted.");
+ }
+ }
+
+ private void writeResult(KeyValue payload) {
+ try {
+ writer.name(payload.key.data).value(payload.value);
+ } catch (IOException e) {
+ logger.fatal("Could not write to file.");
+ }
}
}
View
41 src/c2c/stages/MasterStage.java
@@ -1,5 +1,10 @@
package c2c.stages;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
import seda.sandStorm.api.QueueElementIF;
import c2c.events.*;
@@ -16,17 +21,47 @@
public final class MasterStage extends MapReduceStage {
public static final long app_id = bamboo.router.Router
.app_id(MasterStage.class);
+
+ private final Map<String, Integer> expected = new HashMap<String, Integer>();
+ private final Map<String, Set<String>> completed = new HashMap<String, Set<String>>();
public MasterStage() throws Exception {
- super(KeyValue.class, JobRequest.class);
+ super(KeyValue.class, JobRequest.class, ReducingUnderway.class);
+ ostore.util.TypeTable.register_type(KeyPayload.class);
+ }
+
+ private void handleReducerDone(KeyPayload k) {
+ logger.debug("Reducer done for " + k);
+ Set<String> comp = completed.get(k.domain);
+ comp.add(k.data);
+ if (comp.size() == expected.get(k.domain)) {
+ dispatch(new JobDone(k.domain));
+ }
+ }
+
+ private void handleResultBack(KeyValue kv) {
+ logger.debug("Result back: " + kv);
+ dispatch(kv);
+ }
+
+ private void handleReducingUnderway(ReducingUnderway event) {
+ expected.put(event.domain, event.reducers);
+ completed.put(event.domain, new HashSet<String>());
}
@Override
protected void handleOperationalEvent(QueueElementIF event) {
if (event instanceof BambooRouteDeliver) { // get back the results
BambooRouteDeliver deliver = (BambooRouteDeliver) event;
- logger.info("Results back: " + deliver.payload);
+ if (deliver.payload instanceof KeyValue) {
+ handleResultBack((KeyValue) deliver.payload);
+ } else if (deliver.payload instanceof KeyPayload) {
+ handleReducerDone((KeyPayload) deliver.payload);
+ } else {
+ BUG("Unknown payload.");
+ }
} else if (event instanceof JobRequest) { // Distribute jobs to mappers.
+ logger.fatal("fag");
JobRequest req = (JobRequest) event;
dispatch(new MappingUnderway(req.domain, req.pairs.size()));
for (KeyValue pair : req.pairs) {
@@ -34,6 +69,8 @@ protected void handleOperationalEvent(QueueElementIF event) {
dispatchTo(pair.key.toNode(), MappingStage.app_id,
pair);
}
+ } else if (event instanceof ReducingUnderway) {
+ handleReducingUnderway((ReducingUnderway) event);
} else {
BUG("Event " + event + " unknown.");
}
View
1 src/c2c/stages/PartitioningStage.java
@@ -51,6 +51,7 @@ protected void handleOperationalEvent(QueueElementIF event) {
Dht.GetResp resp = (Dht.GetResp) event;
KeyPayload kp = (KeyPayload) resp.user_data;
logger.info(kp + " has " + resp.values.size() + " values.");
+ dispatch(new ReducingUnderway(kp.domain, resp.values.size()));
for (String key : new DhtValues(resp)) {
KeyPayload redKey = new KeyPayload(kp.domain, key);
dispatchTo(redKey.toNode(), ReducingStage.app_id, redKey);
View
1 src/c2c/stages/ReducingStage.java
@@ -53,6 +53,7 @@ protected void handleOperationalEvent(QueueElementIF event) {
dispatchGet(k, resp.placemark);
} else {
reducers.get(k.domain).reduce(responses.get(k).getKey(), responses.get(k), new Collector(k.domain));
+ dispatchTo(BigInteger.ZERO, MasterStage.app_id, k); // FIXME reducer done
}
} else {
BUG("Unexpected event:" + event);

0 comments on commit 26d32c0

Please sign in to comment.
Something went wrong with that request. Please try again.