Permalink
Browse files

Reading tuples is fast in reducer

  • Loading branch information...
1 parent a24ac4c commit d41a2adfc6cad69be1bd4979b153666faf9a7890 Greg Fodor committed Mar 12, 2012
View
2 jobs/examples/wordcount.coffee
@@ -1,6 +1,6 @@
job ($, _) ->
$.flow 'word count', ->
- $.source 'input', $.tap("tinywordlist.txt", $.text_line_scheme("offset", "word"))
+ $.source 'input', $.tap("words3.txt", $.text_line_scheme("offset", "word"))
assembly = $.assembly 'input', ->
#$.map { add: ["word", "word2"], remove: ["line", "offset"] }, (tuple, writer) ->
View
2 jobs/examples/wordcount.js
@@ -3,7 +3,7 @@
job(function($, _) {
return $.flow('word count', function() {
var assembly;
- $.source('input', $.tap("tinywordlist.txt", $.text_line_scheme("offset", "word")));
+ $.source('input', $.tap("words3.txt", $.text_line_scheme("offset", "word")));
assembly = $.assembly('input', function() {
var count, last_key;
last_key = null;
View
51 lib/js/cascading/components.js
@@ -183,7 +183,7 @@
};
Pipe.set_pipe_out_buffers = function(out_buffers, pipe_id) {
- var current_offsets, field, i_field, idx, num_types, offset, out_field_data_offsets, out_field_types, out_num_fields_per_type, pipe, type, type_idx, writable_fields, _ref, _results;
+ var current_offsets, field, i_field, idx, num_types, offset, out_field_data_offsets, out_field_types, out_num_fields_per_type, pipe, type, type_idx, writable_fields, _fn, _ref, _results;
pipe = Pipe.pipes[pipe_id];
pipe.out_buffers = out_buffers;
num_types = _.keys(Pipe.type_idx_map).length;
@@ -202,6 +202,11 @@
if (pipe.group_fields) {
writable_fields = _.difference(writable_fields, pipe.group_fields);
}
+ _fn = function(type_idx, offset) {
+ return pipe.out_obj[field] = function(val) {
+ return pipe.out_buffers[type_idx][offset] = val;
+ };
+ };
_results = [];
for (i_field in writable_fields) {
field = writable_fields[i_field];
@@ -210,32 +215,34 @@
offset = current_offsets[type_idx];
out_field_data_offsets[i_field] = offset;
out_field_types[i_field] = type_idx;
- pipe.out_obj[field] = (function(type_idx, offset) {
- return function(val) {
- return pipe.out_buffers[type_idx][offset] = val;
- };
- })(type_idx, offset);
+ _fn(type_idx, offset);
_results.push(current_offsets[type_idx] += 1);
}
return _results;
};
- Pipe.get_group_start_processor = function(group_tuple, argument_tuple, emit, pipe_id) {
- var f, out;
- out = Pipe.pipes[pipe_id].out_obj;
- f = Pipe.pipes[pipe_id].initializer;
- return function() {};
- };
-
- Pipe.get_argument_processor = function(group_tuple, argument_tuple, emit, pipe_id) {
- return function() {};
- };
-
- Pipe.get_group_end_processor = function(group_tuple, argument_tuple, emit, pipe_id) {
- var f, out;
- out = Pipe.pipes[pipe_id].out_obj;
- f = Pipe.pipes[pipe_id].finalizer;
- return function() {};
+ Pipe.get_flush_routine = function(in_buffer, emit, pipe_id) {
+ var finalizer, initializer, processor;
+ initializer = Pipe.pipes[pipe_id].initializer;
+ processor = Pipe.pipes[pipe_id].processor;
+ finalizer = Pipe.pipes[pipe_id].finalizer;
+ return function() {
+ var count, _results;
+ _results = [];
+ while (true) {
+ count = 0;
+ while (true) {
+ count += 1;
+ if (!in_buffer.next_arg()) break;
+ }
+ if (!in_buffer.next_group()) {
+ break;
+ } else {
+ _results.push(void 0);
+ }
+ }
+ return _results;
+ };
};
Pipe.prototype.is_pipe = true;
View
10 src/java/org/cascading/js/operation/ScriptOperation.java
@@ -11,6 +11,8 @@
import javax.script.ScriptException;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
public abstract class ScriptOperation extends BaseOperation<V8OperationContext> {
protected Environment.EnvironmentArgs environmentArgs;
@@ -61,8 +63,14 @@ public void prepare( FlowProcess flowProcess, OperationCall<V8OperationContext>
V8Object v8PipeClass = (V8Object)env.extractObject("__v8PipeClass");
env.evaluateScript("delete __v8PipeClass");
+ Map<String, V8TupleBuffer.JSType> typeMap = new HashMap<String, V8TupleBuffer.JSType>();
+ typeMap.put("word", V8TupleBuffer.JSType.STRING);
+ typeMap.put("line", V8TupleBuffer.JSType.STRING);
+ typeMap.put("count", V8TupleBuffer.JSType.INT);
+ typeMap.put("offset", V8TupleBuffer.JSType.STRING);
+
V8OperationContext ctx = new V8OperationContext(env, v8PipeClass, pipeId, getGroupingFields(),
- operationCall.getArgumentFields().subtract(getGroupingFields()), resultFields);
+ operationCall.getArgumentFields().subtract(getGroupingFields()), resultFields, typeMap);
operationCall.setContext(ctx);
View
31 src/java/org/cascading/js/operation/V8OperationContext.java
@@ -3,33 +3,38 @@
import cascading.tuple.Fields;
import cascading.tuple.TupleEntry;
import cascading.tuple.TupleEntryCollector;
+import lu.flier.script.V8Function;
import lu.flier.script.V8Object;
import lu.flier.script.V8ScriptEngine;
import org.cascading.js.util.Environment;
+import javax.script.ScriptException;
+import java.util.Map;
+
public class V8OperationContext {
private V8TupleBuffer tupleBuffer;
private Environment environment;
private TupleEntryCollector outputEntryCollector;
+ private V8Function flushToV8;
public Environment getEnvironment() {
return environment;
}
- public V8OperationContext(Environment environment, V8Object v8PipeClass, int pipeId, Fields groupingFields, Fields argumentFields, Fields resultFields) {
+ public V8OperationContext(Environment environment, V8Object v8PipeClass, int pipeId, Fields groupingFields, Fields argumentFields, Fields resultFields, Map<String, V8TupleBuffer.JSType> typeMap) {
this.environment = environment;
V8ScriptEngine eng = environment.getEngine();
- tupleBuffer = new V8TupleBuffer(eng, groupingFields, argumentFields, null);
-
- /*V8Function emitCallback = eng.createFunction(this, "emit");
-
- argumentProcessor = (V8Function)environment.invokeMethod(v8PipeClass, "get_argument_processor",
- this.tupleTransfer.getGroupTuple(), this.tupleTransfer.getArgumentTuple(), emitCallback, pipeId);
+ tupleBuffer = new V8TupleBuffer(eng, groupingFields, argumentFields, typeMap);
- eng.compile("var foo = function() { }").eval();
- Bindings scope = eng.getBindings(ScriptContext.ENGINE_SCOPE);
- argumentProcessor = (V8Function)scope.get("foo");*/
+ try {
+ flushToV8 = (V8Function)environment.invokeMethod(v8PipeClass, "get_flush_routine",
+ tupleBuffer.getBuffer(), eng.createFunction(this, "flushFromV8"), pipeId);
+ } catch (NoSuchMethodException e) {
+ throw new RuntimeException(e);
+ } catch (ScriptException e) {
+ throw new RuntimeException(e);
+ }
}
public void setOutputEntryCollector(TupleEntryCollector out) {
@@ -52,10 +57,14 @@ public boolean isFull() {
public void flush() {
this.tupleBuffer.fillV8Arrays();
- // TODO fire routine
+ flushToV8.invokeVoid();
this.tupleBuffer.clear();
}
+ public void flushFromV8() {
+ System.out.println("Flushing from V8");
+ }
+
public void closeGroup() {
this.tupleBuffer.closeGroup();
}
View
44 src/java/org/cascading/js/operation/V8TupleBuffer.java
@@ -367,26 +367,30 @@ private void addData(final Set set, final TupleEntry entry) {
int dataOffset = fieldDataOffsets[set.idx][iField];
- // Hacky, can't use .idx in switch
- switch (jsType) {
- case 0: // INT
- intData[dataOffset][entryIndex] = ((Number)val).intValue();
- break;
- case 1: // LONG
- longData[dataOffset][entryIndex] = ((Number)val).longValue();
- break;
- case 2: // BOOL
- boolData[dataOffset][entryIndex] = (Boolean)val;
- break;
- case 3: // DOUBLE
- doubleData[dataOffset][entryIndex] = ((Number)val).doubleValue();
- break;
- case 4: // DATE
- dateData[dataOffset][entryIndex] = (Date)val;
- break;
- case 5: // STRING
- stringData[dataOffset][entryIndex] = (String)val;
- break;
+ try {
+ // Hacky, can't use .idx in switch
+ switch (jsType) {
+ case 0: // INT
+ intData[dataOffset][entryIndex] = ((Number)val).intValue();
+ break;
+ case 1: // LONG
+ longData[dataOffset][entryIndex] = ((Number)val).longValue();
+ break;
+ case 2: // BOOL
+ boolData[dataOffset][entryIndex] = (Boolean)val;
+ break;
+ case 3: // DOUBLE
+ doubleData[dataOffset][entryIndex] = ((Number)val).doubleValue();
+ break;
+ case 4: // DATE
+ dateData[dataOffset][entryIndex] = (Date)val;
+ break;
+ case 5: // STRING
+ stringData[dataOffset][entryIndex] = (String)val;
+ break;
+ }
+ } catch (ClassCastException e) {
+ throw new RuntimeException("Error casting field " + iField + " value " + val + " to " + jsType);
}
}
}
View
29 src/js/cascading/components.coffee
@@ -158,27 +158,26 @@ define ["underscore"], (_) ->
out_field_data_offsets[i_field] = offset
out_field_types[i_field] = type_idx
- pipe.out_obj[field] = ((type_idx, offset) ->
- (val) ->
+ do (type_idx, offset) ->
+ pipe.out_obj[field] = (val) ->
pipe.out_buffers[type_idx][offset] = val
- )(type_idx, offset)
current_offsets[type_idx] += 1
- @get_group_start_processor: (group_tuple, argument_tuple, emit, pipe_id) =>
- out = @pipes[pipe_id].out_obj
- f = @pipes[pipe_id].initializer
- -> #f(group_tuple, argument_tuple, out, emit)
+ @get_flush_routine: (in_buffer, emit, pipe_id) =>
+ initializer = @pipes[pipe_id].initializer
+ processor = @pipes[pipe_id].processor
+ finalizer = @pipes[pipe_id].finalizer
+ ->
- @get_argument_processor: (group_tuple, argument_tuple, emit, pipe_id) ->
- -> #f(group_tuple, argument_tuple, out, emit)
- #out = @pipes[pipe_id].out_obj
- #f = @pipes[pipe_id].processor
+ while true
+ count = 0
- @get_group_end_processor: (group_tuple, argument_tuple, emit, pipe_id) =>
- out = @pipes[pipe_id].out_obj
- f = @pipes[pipe_id].finalizer
- -> #f(group_tuple, argument_tuple, out, emit)
+ while true
+ count += 1
+ break unless in_buffer.next_arg()
+
+ break unless in_buffer.next_group()
is_pipe: true

0 comments on commit d41a2ad

Please sign in to comment.