Skip to content

Commit

Permalink
[FLINK-4804] [py] Fix first() failing when applied to groupings
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Oct 21, 2016
1 parent 41d5167 commit 5c83e78
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 2 deletions.
Expand Up @@ -459,9 +459,22 @@ private void createDistinctOperation(PythonOperationInfo info) throws IOExceptio
.map(new KeyDiscarder()).setParallelism(getParallelism(info)).name("DistinctPostStep")); .map(new KeyDiscarder()).setParallelism(getParallelism(info)).name("DistinctPostStep"));
} }


@SuppressWarnings("unchecked")
private void createFirstOperation(PythonOperationInfo info) throws IOException { private void createFirstOperation(PythonOperationInfo info) throws IOException {
DataSet op = (DataSet) sets.get(info.parentID); Object op = sets.get(info.parentID);
sets.put(info.setID, op.first(info.count).setParallelism(getParallelism(info)).name("First")); if (op instanceof DataSet) {
sets.put(info.setID, ((DataSet) op).first(info.count).setParallelism(getParallelism(info)).name("First"));
return;
}
if (op instanceof UnsortedGrouping) {
sets.put(info.setID, ((UnsortedGrouping) op).first(info.count).setParallelism(getParallelism(info)).name("First")
.map(new KeyDiscarder()).setParallelism(getParallelism(info)).name("FirstPostStep"));
return;
}
if (op instanceof SortedGrouping) {
sets.put(info.setID, ((SortedGrouping) op).first(info.count).setParallelism(getParallelism(info)).name("First")
.map(new KeyDiscarder()).setParallelism(getParallelism(info)).name("FirstPostStep"));
}
} }


private void createGroupOperation(PythonOperationInfo info) throws IOException { private void createGroupOperation(PythonOperationInfo info) throws IOException {
Expand Down
Expand Up @@ -126,6 +126,10 @@ def map(self, value):
d1 \ d1 \
.first(1) \ .first(1) \
.map_partition(Verify([1], "First")).output() .map_partition(Verify([1], "First")).output()
d4 \
.group_by(0) \
.first(1) \
.map_partition(Verify([(1, 0.5, "hello", True), (2, 0.4, "world", False)], "Grouped First")).output()
d1 \ d1 \
.rebalance() .rebalance()
d6 \ d6 \
Expand Down

0 comments on commit 5c83e78

Please sign in to comment.