Skip to content

Commit

Permalink
[FLINK-1490][fix][java-api] Fix incorrect local output sorting of nes…
Browse files Browse the repository at this point in the history
…ted types with field position keys.
  • Loading branch information
fhueske committed Feb 6, 2015
1 parent a0e71b8 commit 6187292
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 14 deletions.
Expand Up @@ -115,18 +115,33 @@ public DataSink<T> sortLocalOutput(int field, Order order) {
throw new InvalidProgramException("Order key out of tuple bounds.");
}

// get flat keys
Keys.ExpressionKeys<T> ek;
try {
ek = new Keys.ExpressionKeys<T>(new int[]{field}, this.type);
} catch(IllegalArgumentException iae) {
throw new InvalidProgramException("Invalid specification of field expression.", iae);
}
int[] flatKeys = ek.computeLogicalKeyPositions();

if(this.sortKeyPositions == null) {
// set sorting info
this.sortKeyPositions = new int[] {field};
this.sortOrders = new Order[] {order};
this.sortKeyPositions = flatKeys;
this.sortOrders = new Order[flatKeys.length];
Arrays.fill(this.sortOrders, order);
} else {
// append sorting info to exising info
int newLength = this.sortKeyPositions.length + 1;
int oldLength = this.sortKeyPositions.length;
int newLength = oldLength + flatKeys.length;
this.sortKeyPositions = Arrays.copyOf(this.sortKeyPositions, newLength);
this.sortOrders = Arrays.copyOf(this.sortOrders, newLength);
this.sortKeyPositions[newLength-1] = field;
this.sortOrders[newLength-1] = order;

for(int i=0; i<flatKeys.length; i++) {
this.sortKeyPositions[oldLength+i] = flatKeys[i];
this.sortOrders[oldLength+i] = order;
}
}

return this;
}

Expand Down
Expand Up @@ -208,8 +208,8 @@ public void testTupleSortingNestedDOP1() throws Exception {

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<Tuple2<Tuple2<Integer, Integer>, String>> ds =
CollectionDataSets.getGroupSortedNestedTupleDataSet(env);
DataSet<Tuple3<Tuple2<Integer, Integer>, String, Integer>> ds =
CollectionDataSets.getGroupSortedNestedTupleDataSet2(env);
ds.writeAsText(resultPath)
.sortLocalOutput("f0.f1", Order.ASCENDING)
.sortLocalOutput("f1", Order.DESCENDING)
Expand All @@ -218,13 +218,39 @@ public void testTupleSortingNestedDOP1() throws Exception {
env.execute();

expected =
"((2,1),a)\n" +
"((2,2),b)\n" +
"((1,2),a)\n" +
"((3,3),c)\n" +
"((1,3),a)\n" +
"((3,6),c)\n" +
"((4,9),c)\n";
"((2,1),a,3)\n" +
"((2,2),b,4)\n" +
"((1,2),a,1)\n" +
"((3,3),c,5)\n" +
"((1,3),a,2)\n" +
"((3,6),c,6)\n" +
"((4,9),c,7)\n";

compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath);
}

@Test
public void testTupleSortingNestedDOP1_2() throws Exception {

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<Tuple3<Tuple2<Integer, Integer>, String, Integer>> ds =
CollectionDataSets.getGroupSortedNestedTupleDataSet2(env);
ds.writeAsText(resultPath)
.sortLocalOutput(1, Order.ASCENDING)
.sortLocalOutput(2, Order.DESCENDING)
.setParallelism(1);

env.execute();

expected =
"((2,1),a,3)\n" +
"((1,3),a,2)\n" +
"((1,2),a,1)\n" +
"((2,2),b,4)\n" +
"((4,9),c,7)\n" +
"((3,6),c,6)\n" +
"((3,3),c,5)\n";

compareResultsByLinesInMemoryWithStrictOrder(expected, resultPath);
}
Expand Down
Expand Up @@ -184,6 +184,27 @@ public static DataSet<Tuple2<Tuple2<Integer, Integer>, String>> getGroupSortedNe
return env.fromCollection(data, type);
}

public static DataSet<Tuple3<Tuple2<Integer, Integer>, String, Integer>> getGroupSortedNestedTupleDataSet2(ExecutionEnvironment env) {

List<Tuple3<Tuple2<Integer, Integer>, String, Integer>> data = new ArrayList<Tuple3<Tuple2<Integer, Integer>, String, Integer>>();
data.add(new Tuple3<Tuple2<Integer, Integer>, String, Integer>(new Tuple2<Integer, Integer>(1, 3), "a", 2));
data.add(new Tuple3<Tuple2<Integer, Integer>, String, Integer>(new Tuple2<Integer, Integer>(1, 2), "a", 1));
data.add(new Tuple3<Tuple2<Integer, Integer>, String, Integer>(new Tuple2<Integer, Integer>(2, 1), "a", 3));
data.add(new Tuple3<Tuple2<Integer, Integer>, String, Integer>(new Tuple2<Integer, Integer>(2, 2), "b", 4));
data.add(new Tuple3<Tuple2<Integer, Integer>, String, Integer>(new Tuple2<Integer, Integer>(3, 3), "c", 5));
data.add(new Tuple3<Tuple2<Integer, Integer>, String, Integer>(new Tuple2<Integer, Integer>(3, 6), "c", 6));
data.add(new Tuple3<Tuple2<Integer, Integer>, String, Integer>(new Tuple2<Integer, Integer>(4, 9), "c", 7));

TupleTypeInfo<Tuple3<Tuple2<Integer, Integer>, String, Integer>> type = new
TupleTypeInfo<Tuple3<Tuple2<Integer, Integer>, String, Integer>>(
new TupleTypeInfo<Tuple2<Integer, Integer>>(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO),
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.INT_TYPE_INFO
);

return env.fromCollection(data, type);
}

public static DataSet<String> getStringDataSet(ExecutionEnvironment env) {

List<String> data = new ArrayList<String>();
Expand Down

0 comments on commit 6187292

Please sign in to comment.