Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Refactoring towards input buffer

  • Loading branch information...
commit 5375d567480b030cf97521b7183e776728461367 1 parent d41a2ad
Greg Fodor authored
View
13 src/java/org/cascading/js/FieldSet.java
@@ -0,0 +1,13 @@
+package org.cascading.js;
+
+public enum FieldSet {
+ ARGS(0),
+ GROUP(1),
+ RESULT(2);
+
+ public final int idx;
+
+ FieldSet(final int idx) {
+ this.idx = idx;
+ }
+}
View
16 src/java/org/cascading/js/JSType.java
@@ -0,0 +1,16 @@
+package org.cascading.js;
+
+public enum JSType {
+ INT(0),
+ LONG(1),
+ BOOL(2),
+ DOUBLE(3),
+ DATE(4),
+ STRING(5);
+
+ public final int idx;
+
+ JSType(final int idx) {
+ this.idx = idx;
+ }
+}
View
4 src/java/org/cascading/js/operation/ScriptBuffer.java
@@ -47,7 +47,7 @@ public void operate(FlowProcess flowProcess, BufferCall<V8OperationContext> call
ctx.addArgument(next);
if (ctx.isFull()) {
- ctx.flush();
+ ctx.flushToV8();
}
timeSetArg += (System.currentTimeMillis() - t0);
@@ -65,7 +65,7 @@ public void prepare( FlowProcess flowProcess, OperationCall<V8OperationContext>
public void cleanup(cascading.flow.FlowProcess flowProcess, cascading.operation.OperationCall<V8OperationContext> call) {
super.cleanup(flowProcess, call);
call.getContext().closeGroup();
- call.getContext().flush();
+ call.getContext().flushToV8();
System.out.println("Buffer time: " + (System.currentTimeMillis() - t0) + " " + timeInInvoke + " " + timeSetArg + " " + timeSetGroup);
call.getContext().getEnvironment().shutdown();
}
View
11 src/java/org/cascading/js/operation/ScriptOperation.java
@@ -7,6 +7,7 @@
import cascading.tuple.TupleEntryCollector;
import lu.flier.script.V8Array;
import lu.flier.script.V8Object;
+import org.cascading.js.JSType;
import org.cascading.js.util.Environment;
import javax.script.ScriptException;
@@ -63,11 +64,11 @@ public void prepare( FlowProcess flowProcess, OperationCall<V8OperationContext>
V8Object v8PipeClass = (V8Object)env.extractObject("__v8PipeClass");
env.evaluateScript("delete __v8PipeClass");
- Map<String, V8TupleBuffer.JSType> typeMap = new HashMap<String, V8TupleBuffer.JSType>();
- typeMap.put("word", V8TupleBuffer.JSType.STRING);
- typeMap.put("line", V8TupleBuffer.JSType.STRING);
- typeMap.put("count", V8TupleBuffer.JSType.INT);
- typeMap.put("offset", V8TupleBuffer.JSType.STRING);
+ Map<String, JSType> typeMap = new HashMap<String, JSType>();
+ typeMap.put("word", JSType.STRING);
+ typeMap.put("line", JSType.STRING);
+ typeMap.put("count", JSType.INT);
+ typeMap.put("offset", JSType.STRING);
V8OperationContext ctx = new V8OperationContext(env, v8PipeClass, pipeId, getGroupingFields(),
operationCall.getArgumentFields().subtract(getGroupingFields()), resultFields, typeMap);
View
26 src/java/org/cascading/js/operation/V8OperationContext.java
@@ -6,13 +6,16 @@
import lu.flier.script.V8Function;
import lu.flier.script.V8Object;
import lu.flier.script.V8ScriptEngine;
+import org.cascading.js.JSType;
import org.cascading.js.util.Environment;
import javax.script.ScriptException;
import java.util.Map;
public class V8OperationContext {
- private V8TupleBuffer tupleBuffer;
+ private V8TupleBuffer outTupleBuffer;
+ private V8TupleBuffer inTupleBuffer;
+
private Environment environment;
private TupleEntryCollector outputEntryCollector;
private V8Function flushToV8;
@@ -21,15 +24,16 @@ public Environment getEnvironment() {
return environment;
}
- public V8OperationContext(Environment environment, V8Object v8PipeClass, int pipeId, Fields groupingFields, Fields argumentFields, Fields resultFields, Map<String, V8TupleBuffer.JSType> typeMap) {
+ public V8OperationContext(Environment environment, V8Object v8PipeClass, int pipeId, Fields groupingFields, Fields argumentFields, Fields resultFields, Map<String, JSType> typeMap) {
this.environment = environment;
V8ScriptEngine eng = environment.getEngine();
- tupleBuffer = new V8TupleBuffer(eng, groupingFields, argumentFields, typeMap);
+ outTupleBuffer = V8TupleBuffer.newOutput(eng, groupingFields, argumentFields, typeMap);
+ inTupleBuffer = V8TupleBuffer.newInput(eng, resultFields, typeMap);
try {
flushToV8 = (V8Function)environment.invokeMethod(v8PipeClass, "get_flush_routine",
- tupleBuffer.getBuffer(), eng.createFunction(this, "flushFromV8"), pipeId);
+ outTupleBuffer.getBuffer(), eng.createFunction(this, "flushFromV8"), pipeId);
} catch (NoSuchMethodException e) {
throw new RuntimeException(e);
} catch (ScriptException e) {
@@ -44,21 +48,21 @@ public void setOutputEntryCollector(TupleEntryCollector out) {
}
public void addArgument(TupleEntry argument) {
- this.tupleBuffer.addArgument(argument);
+ this.outTupleBuffer.addArgument(argument);
}
public void addGroup(TupleEntry group) {
- this.tupleBuffer.addGroup(group);
+ this.outTupleBuffer.addGroup(group);
}
public boolean isFull() {
- return this.tupleBuffer.isFull();
+ return this.outTupleBuffer.isFullForArguments();
}
- public void flush() {
- this.tupleBuffer.fillV8Arrays();
+ public void flushToV8() {
+ this.outTupleBuffer.fillV8Arrays();
flushToV8.invokeVoid();
- this.tupleBuffer.clear();
+ this.outTupleBuffer.clear();
}
public void flushFromV8() {
@@ -66,7 +70,7 @@ public void flushFromV8() {
}
public void closeGroup() {
- this.tupleBuffer.closeGroup();
+ this.outTupleBuffer.closeGroup();
}
}
View
219 src/java/org/cascading/js/operation/V8TupleBuffer.java
@@ -5,10 +5,13 @@
import lu.flier.script.V8Array;
import lu.flier.script.V8Function;
import lu.flier.script.V8ScriptEngine;
+import org.cascading.js.FieldSet;
+import org.cascading.js.JSType;
import javax.script.Bindings;
import javax.script.ScriptContext;
import javax.script.ScriptException;
+import java.util.Arrays;
import java.util.Date;
import java.util.Map;
@@ -20,37 +23,11 @@
private V8ScriptEngine eng;
- public enum JSType {
- INT(0),
- LONG(1),
- BOOL(2),
- DOUBLE(3),
- DATE(4),
- STRING(5);
-
- public final int idx;
-
- JSType(final int idx) {
- this.idx = idx;
- }
- }
-
- public enum Set {
- ARGS(0),
- GROUP(1);
-
- public final int idx;
-
- Set(final int idx) {
- this.idx = idx;
- }
- }
-
// Map of [Set][Field Offset] => Type
- final int[][] fieldTypes = new int[Set.values().length][];
- final int[][] fieldTypeCounts = new int[Set.values().length][];
- final int[][] fieldDataOffsets = new int[Set.values().length][];
- final int[][] cascadingFieldOffsets = new int[Set.values().length][];
+ final int[][] fieldTypes = new int[FieldSet.values().length][];
+ final int[][] fieldTypeCounts = new int[FieldSet.values().length][];
+ final int[][] fieldDataOffsets = new int[FieldSet.values().length][];
+ final int[][] cascadingFieldOffsets = new int[FieldSet.values().length][];
// Encoded group sizes.
// If a group is of size one (very common) then it is run length encoded as [MAX_INT, # of groups of size one]
@@ -60,22 +37,22 @@
int currentGroupSize = 0;
// Boolean null bitmasks
- final int[][] jNullMasks = new int[Set.values().length][];
+ final int[][] jNullMasks = new int[FieldSet.values().length][];
// Sparse matrices of [Set][Field Offset][Tuple Offset]
// Deepest entries only filled in for field offsets that are known for this type.
- final int[][][] jIntData = new int[Set.values().length][][];
- final long[][][] jLongData = new long[Set.values().length][][];
- final boolean[][][] jBoolData = new boolean[Set.values().length][][];
- final double[][][] jDoubleData = new double[Set.values().length][][];
- final Date[][][] jDateData = new Date[Set.values().length][][];
- final String[][][] jStringData = new String[Set.values().length][][];
+ final int[][][] jIntData = new int[FieldSet.values().length][][];
+ final long[][][] jLongData = new long[FieldSet.values().length][][];
+ final boolean[][][] jBoolData = new boolean[FieldSet.values().length][][];
+ final double[][][] jDoubleData = new double[FieldSet.values().length][][];
+ final Date[][][] jDateData = new Date[FieldSet.values().length][][];
+ final String[][][] jStringData = new String[FieldSet.values().length][][];
- int currentTupleOffset = 0;
- int currentGroupOffset = 0;
+ int[] tupleOffsets = new int[FieldSet.values().length];
final private V8Array v8GroupSizesRLE;
- final private V8Array[] v8NullMasks = new V8Array[Set.values().length];
+ final private V8Array[] v8NullMasks = new V8Array[FieldSet.values().length];
+ final private int groupSizeRLEJsIndex = FieldSet.values().length;
// The buffer is a 4-nested array indexed by [w][x][y][z]
// w - 0 is group data, 1 is arg data, 2 is group rle data
@@ -84,20 +61,35 @@
// z - tuple offset (invalid index for null map)
private V8Array v8TupleBuffer;
- private V8Function resetFunction;
+
+ // Function to clear the buffer's javascript-side state
+ private V8Function clearFunction;
// Cached V8Arrays Set, iField
- private final V8Array[][] v8DataArrays = new V8Array[Set.values().length][];
+ private final V8Array[][] v8DataArrays = new V8Array[FieldSet.values().length][];
- public V8TupleBuffer(V8ScriptEngine eng, Fields groupingFields, Fields argumentFields, Map<String, JSType> typeMap) {
+ public static V8TupleBuffer newInput(V8ScriptEngine eng, Fields resultFields, Map<String, JSType> typeMap) {
+ return new V8TupleBuffer(eng, new Fields(), new Fields(), resultFields, typeMap);
+ }
+
+ public static V8TupleBuffer newOutput(V8ScriptEngine eng, Fields groupingFields, Fields argumentFields, Map<String, JSType> typeMap) {
+ return new V8TupleBuffer(eng, groupingFields, argumentFields, new Fields(), typeMap);
+ }
+
+ private V8TupleBuffer(V8ScriptEngine eng, Fields groupingFields, Fields argumentFields, Fields resultFields, Map<String, JSType> typeMap) {
this.eng = eng;
v8GroupSizesRLE = eng.createArray(groupSizesRLE);
- V8Array[] outTupleArray = new V8Array[Set.values().length + 1];
+ Fields[] fieldsForSet = new Fields[FieldSet.values().length];
+ fieldsForSet[FieldSet.GROUP.idx] = groupingFields;
+ fieldsForSet[FieldSet.ARGS.idx] = argumentFields;
+ fieldsForSet[FieldSet.RESULT.idx] = resultFields;
+
+ V8Array[] outTupleArray = new V8Array[FieldSet.values().length + 1];
outTupleArray[outTupleArray.length - 1] = v8GroupSizesRLE;
- for (Set set : Set.values()) {
- Fields fields = set == Set.GROUP ? groupingFields : argumentFields;
+ for (FieldSet set : FieldSet.values()) {
+ Fields fields = fieldsForSet[set.idx];
int numFields = fields.size();
int setIdx = set.idx;
@@ -185,55 +177,20 @@ public V8TupleBuffer(V8ScriptEngine eng, Fields groupingFields, Fields argumentF
}
v8TupleBuffer = eng.createArray(outTupleArray);
-
- // Setup accessors
- for (Set set : Set.values()) {
- Fields fields = set == Set.GROUP ? groupingFields : argumentFields;
-
- for (int iField = 0; iField < fields.size(); iField++) {
- setTupleDataAccessor(set.idx, fieldTypes[set.idx][iField],
- fieldDataOffsets[set.idx][iField], iField, (String)fields.get(iField));
- }
- }
+ setupAccessors(groupingFields, argumentFields, resultFields, fieldsForSet);
try {
- eng.compile("var __v8Temp = function(b) { b.i_group = 0; b.i_arg = 0; b.i_arg_for_group = 0; " +
- "b.i_group_rle = 0; b.i_single_group_count = 0; }").eval();
+ eng.compile("var __v8Temp = function(b) { b.i_group = 0; b.i_arg = 0; b.i_result = 0; b.i_arg_for_group = 0; " +
+ "b.i_group_rle = 0; b.i_single_group_count = 0; }").eval();
Bindings scope = eng.getBindings(ScriptContext.ENGINE_SCOPE);
- resetFunction = (V8Function)scope.get("__v8Temp");
+ clearFunction = (V8Function)scope.get("__v8Temp");
eng.eval("delete(__v8Temp);");
} catch (ScriptException e) {
throw new RuntimeException(e);
}
- setTupleAccessor("next_group",
- "var group_sizes = this[2]; var i_group_rle = this.i_group_rle; " +
- "var primary = group_sizes[i_group_rle]; " +
- "if (primary === 2147483647) { " +
- " if (this.i_single_group_count >= group_sizes[i_group_rle + 1] - 1) { " +
- " if (group_sizes[i_group_rle + 2] !== -1) { " +
- "this.i_group += 1; this.i_arg += 1; this.i_group_rle += 2; this.i_single_group_count = 0; this.i_arg_for_group = 0; return true; " +
- "} else { return false; }" +
- " } else { " +
- " this.i_group += 1; this.i_arg += 1; this.i_arg_for_group = 0; this.i_single_group_count += 1; return true; " +
- " }" +
- "} else {" +
- " if (group_sizes[i_group_rle + 1] !== -1) {" +
- " this.i_group += 1; this.i_arg += 1; this.i_group_rle += 1; this.i_arg_for_group = 0; return true; " +
- " } else { return false; }" +
- "} ");
-
- setTupleAccessor("next_arg",
- "var i_group_rle = this.i_group_rle; var primary = this[2][i_group_rle]; " +
- "if (primary === 2147483647) { return false; }" +
- "else { " +
- " if (primary > this.i_arg_for_group + 1) { " +
- " this.i_arg += 1; this.i_arg_for_group += 1; return true; " +
- " } else { return false; }" +
- "} ");
-
clear();
}
@@ -241,13 +198,59 @@ public V8Array getBuffer() {
return v8TupleBuffer;
}
+ private void setupAccessors(Fields groupingFields, Fields argumentFields, Fields resultFields, Fields[] fieldsForSet) {
+ // Setup accessors
+ for (FieldSet set : FieldSet.values()) {
+ Fields fields = fieldsForSet[set.idx];
+
+ for (int iField = 0; iField < fields.size(); iField++) {
+ setTupleDataAccessor(set.idx, fieldTypes[set.idx][iField],
+ fieldDataOffsets[set.idx][iField], iField, (String)fields.get(iField));
+ }
+ }
+
+ if (groupingFields.size() > 0) {
+ setTupleAccessor("next_group",
+ "var group_sizes = this[" + groupSizeRLEJsIndex + "]; var i_group_rle = this.i_group_rle; " +
+ "var primary = group_sizes[i_group_rle]; " +
+ "if (primary === 2147483647) { " +
+ " if (this.i_single_group_count >= group_sizes[i_group_rle + 1] - 1) { " +
+ " if (group_sizes[i_group_rle + 2] !== -1) { " +
+ "this.i_group += 1; this.i_arg += 1; this.i_group_rle += 2; this.i_single_group_count = 0; this.i_arg_for_group = 0; return true; " +
+ "} else { return false; }" +
+ " } else { " +
+ " this.i_group += 1; this.i_arg += 1; this.i_arg_for_group = 0; this.i_single_group_count += 1; return true; " +
+ " }" +
+ "} else {" +
+ " if (group_sizes[i_group_rle + 1] !== -1) {" +
+ " this.i_group += 1; this.i_arg += 1; this.i_group_rle += 1; this.i_arg_for_group = 0; return true; " +
+ " } else { return false; }" +
+ "} ");
+
+ setTupleAccessor("next_arg",
+ "var i_group_rle = this.i_group_rle; var primary = this[" + groupSizeRLEJsIndex + "][i_group_rle]; " +
+ "if (primary === 2147483647) { return false; }" +
+ "else { " +
+ " if (primary > this.i_arg_for_group + 1) { " +
+ " this.i_arg += 1; this.i_arg_for_group += 1; return true; " +
+ " } else { return false; }" +
+ "} ");
+ } else if (argumentFields.size() > 0) {
+ // todo next_arg for non-grouping mode
+ } else if (resultFields.size() > 0) {
+ // todo next_result, should call flusher when full
+ }
+ }
+
public void fillV8Arrays() {
groupSizesRLE[groupSizesRLELength] = -1;
v8GroupSizesRLE.setElements(groupSizesRLE, groupSizesRLELength + 1);
- for (Set set : Set.values()) {
+ for (FieldSet set : FieldSet.values()) {
final int setIdx = set.idx;
+ if (fieldTypes[setIdx].length == 0) continue;
+
v8NullMasks[setIdx].setElements(jNullMasks[setIdx]);
final int[] fieldTypes = this.fieldTypes[setIdx];
@@ -312,32 +315,35 @@ public void closeGroup() {
public void addGroup(final TupleEntry group) {
closeGroup();
- addData(Set.GROUP, group);
- currentGroupOffset += 1;
+ addData(FieldSet.GROUP, group);
+ tupleOffsets[FieldSet.GROUP.idx] += 1;
}
public void addArgument(final TupleEntry args) {
- addData(Set.ARGS, args);
+ addData(FieldSet.ARGS, args);
- currentTupleOffset += 1;
+ tupleOffsets[FieldSet.ARGS.idx] += 1;
currentGroupSize += 1;
}
- public boolean isFull() {
- return currentTupleOffset == BUFFER_SIZE;
+ public boolean isFullForArguments() {
+ return tupleOffsets[FieldSet.ARGS.idx] >= BUFFER_SIZE;
}
public void clear() {
- java.util.Arrays.fill(jNullMasks[Set.ARGS.idx], 0);
- java.util.Arrays.fill(jNullMasks[Set.GROUP.idx], 0);
- resetFunction.invoke(v8TupleBuffer);
- currentTupleOffset = 0;
- currentGroupOffset = 0;
+ for (FieldSet fs : FieldSet.values()) {
+ if (fieldTypes[fs.idx].length > 0) {
+ java.util.Arrays.fill(jNullMasks[fs.idx], 0);
+ }
+ }
+
+ clearFunction.invoke(v8TupleBuffer);
+ Arrays.fill(tupleOffsets, 0);
groupSizesRLELength = 0;
currentGroupSize = 0;
}
- private void addData(final Set set, final TupleEntry entry) {
+ private void addData(final FieldSet set, final TupleEntry entry) {
final int[] fieldOffsets = this.cascadingFieldOffsets[set.idx];
final int[] fieldTypes = this.fieldTypes[set.idx];
final int[][] intData = this.jIntData[set.idx];
@@ -347,7 +353,7 @@ private void addData(final Set set, final TupleEntry entry) {
final Date[][] dateData = this.jDateData[set.idx];
final String[][] stringData = this.jStringData[set.idx];
final int[] nullMask = this.jNullMasks[set.idx];
- final int entryIndex = set == Set.GROUP ? currentGroupOffset : currentTupleOffset;
+ final int entryIndex = tupleOffsets[set.idx];
final int numFields = fieldOffsets.length;
@@ -395,10 +401,23 @@ private void addData(final Set set, final TupleEntry entry) {
}
}
+ private String getJsSideIndexName(int setIdx) {
+ switch (setIdx) {
+ case 0:
+ return "i_arg";
+ case 1:
+ return "i_group";
+ case 2:
+ return "i_result";
+ }
+
+ throw new RuntimeException("Invalid setIdx");
+ }
+
private V8Function setTupleDataAccessor(int setIdx, int typeIdx, int dataOffset, int fieldOffset, String name) {
int nullMaskArrayIndex = JSType.values().length;
int numFields = fieldTypes[setIdx].length;
- String offsetField = setIdx == Set.GROUP.idx ? "i_group" : "i_arg";
+ String offsetField = getJsSideIndexName(setIdx);
return setTupleAccessor(name,
"var arr = this[" + setIdx + "]; var i_tuple = this." + offsetField + ";" +
View
27 test/java/org/cascading/js/V8TupleBufferTest.java
@@ -23,7 +23,7 @@
private V8ScriptEngine eng;
@Test
- public void baselineTest() throws ScriptException, IOException {
+ public void testBufferAsOutput() throws ScriptException, IOException {
Fields groupFields = new Fields("gk_s", "gk_i");
Fields argFields = new Fields("i", "d", "b", "s", "i2");
@@ -31,28 +31,21 @@ public void baselineTest() throws ScriptException, IOException {
env.start(new Environment.EnvironmentArgs(null, Environment.EnvironmentArgs.Mode.SPECS));
this.eng = env.getEngine();
- Map<String, V8TupleBuffer.JSType> typeMap = new HashMap<String, V8TupleBuffer.JSType>();
- typeMap.put("gk_s", V8TupleBuffer.JSType.STRING);
- typeMap.put("gk_i", V8TupleBuffer.JSType.INT);
- typeMap.put("i", V8TupleBuffer.JSType.INT);
- typeMap.put("d", V8TupleBuffer.JSType.DOUBLE);
- typeMap.put("b", V8TupleBuffer.JSType.BOOL);
- typeMap.put("s", V8TupleBuffer.JSType.STRING);
- typeMap.put("i2", V8TupleBuffer.JSType.INT);
- V8TupleBuffer b = new V8TupleBuffer(env.getEngine(), groupFields, argFields, typeMap);
+ Map<String, JSType> typeMap = new HashMap<String, JSType>();
+ typeMap.put("gk_s", JSType.STRING);
+ typeMap.put("gk_i", JSType.INT);
+ typeMap.put("i", JSType.INT);
+ typeMap.put("d", JSType.DOUBLE);
+ typeMap.put("b", JSType.BOOL);
+ typeMap.put("s", JSType.STRING);
+ typeMap.put("i2", JSType.INT);
+ V8TupleBuffer b = V8TupleBuffer.newOutput(env.getEngine(), groupFields, argFields, typeMap);
eng.compile("var buf; ").eval();
Bindings scope = eng.getBindings(ScriptContext.ENGINE_SCOPE);
scope.put("buf", b.getBuffer());
- /*assertNull(eng.eval("group.gk_s()"));
- assertNull(eng.eval("group.gk_i()"));
- assertNull(eng.eval("arg.i()"));
- assertNull(eng.eval("arg.d()"));
- assertNull(eng.eval("arg.b()"));
- assertNull(eng.eval("arg.s()"));*/
-
addGroup(b, "hello", null);
addArgument(b, 123, null, false, null, 444);
addArgument(b, 456, null, false, "something", 555);
Please sign in to comment.
Something went wrong with that request. Please try again.