/
SortedGrouping.java
265 lines (231 loc) · 11.2 KB
/
SortedGrouping.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.operators;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.FirstReducer;
import java.util.Arrays;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import com.google.common.base.Preconditions;
/**
* SortedGrouping is an intermediate step for a transformation on a grouped and sorted DataSet.<br/>
* The following transformation can be applied on sorted groups:
* <ul>
* <li>{@link SortedGrouping#reduceGroup(org.apache.flink.api.common.functions.GroupReduceFunction)},</li>
* </ul>
*
* @param <T> The type of the elements of the sorted and grouped DataSet.
*/
public class SortedGrouping<T> extends Grouping<T> {
private int[] groupSortKeyPositions;
private Order[] groupSortOrders;
private Keys.SelectorFunctionKeys<T, ?> groupSortSelectorFunctionKey = null;
/*
* int sorting keys for tuples
*/
public SortedGrouping(DataSet<T> set, Keys<T> keys, int field, Order order) {
super(set, keys);
if (!dataSet.getType().isTupleType()) {
throw new InvalidProgramException("Specifying order keys via field positions is only valid for tuple data types");
}
if (field >= dataSet.getType().getArity()) {
throw new IllegalArgumentException("Order key out of tuple bounds.");
}
// use int-based expression key to properly resolve nested tuples for grouping
ExpressionKeys<T> ek = new ExpressionKeys<T>(new int[]{field}, dataSet.getType());
this.groupSortKeyPositions = ek.computeLogicalKeyPositions();
this.groupSortOrders = new Order[groupSortKeyPositions.length];
Arrays.fill(this.groupSortOrders, order);
}
/*
* String sorting for Pojos and tuples
*/
public SortedGrouping(DataSet<T> set, Keys<T> keys, String field, Order order) {
super(set, keys);
if (!(dataSet.getType() instanceof CompositeType)) {
throw new InvalidProgramException("Specifying order keys via field positions is only valid for composite data types (pojo / tuple / case class)");
}
// resolve String-field to int using the expression keys
ExpressionKeys<T> ek = new ExpressionKeys<T>(new String[]{field}, dataSet.getType());
this.groupSortKeyPositions = ek.computeLogicalKeyPositions();
this.groupSortOrders = new Order[groupSortKeyPositions.length];
Arrays.fill(this.groupSortOrders, order); // if field == "*"
}
/*
* KeySelector sorting for any data type
*/
public <K> SortedGrouping(DataSet<T> set, Keys<T> keys, Keys.SelectorFunctionKeys<T, K> keySelector, Order order) {
super(set, keys);
if (!(this.keys instanceof Keys.SelectorFunctionKeys)) {
throw new InvalidProgramException("Sorting on KeySelector keys only works with KeySelector grouping.");
}
this.groupSortKeyPositions = keySelector.computeLogicalKeyPositions();
for (int i = 0; i < groupSortKeyPositions.length; i++) {
groupSortKeyPositions[i] += this.keys.getNumberOfKeyFields();
}
this.groupSortSelectorFunctionKey = keySelector;
this.groupSortOrders = new Order[groupSortKeyPositions.length];
Arrays.fill(this.groupSortOrders, order);
}
// --------------------------------------------------------------------------------------------
protected int[] getGroupSortKeyPositions() {
return this.groupSortKeyPositions;
}
protected Order[] getGroupSortOrders() {
return this.groupSortOrders;
}
/**
* Uses a custom partitioner for the grouping.
*
* @param partitioner The custom partitioner.
* @return The grouping object itself, to allow for method chaining.
*/
public SortedGrouping<T> withPartitioner(Partitioner<?> partitioner) {
Preconditions.checkNotNull(partitioner);
getKeys().validateCustomPartitioner(partitioner, null);
this.customPartitioner = partitioner;
return this;
}
protected Keys.SelectorFunctionKeys<T, ?> getSortSelectionFunctionKey() {
return this.groupSortSelectorFunctionKey;
}
/**
* Applies a GroupReduce transformation on a grouped and sorted {@link DataSet}.<br/>
* The transformation calls a {@link org.apache.flink.api.common.functions.RichGroupReduceFunction} for each group of the DataSet.
* A GroupReduceFunction can iterate over all elements of a group and emit any
* number of output elements including none.
*
* @param reducer The GroupReduceFunction that is applied on each group of the DataSet.
* @return A GroupReduceOperator that represents the reduced DataSet.
*
* @see org.apache.flink.api.common.functions.RichGroupReduceFunction
* @see GroupReduceOperator
* @see DataSet
*/
public <R> GroupReduceOperator<T, R> reduceGroup(GroupReduceFunction<T, R> reducer) {
if (reducer == null) {
throw new NullPointerException("GroupReduce function must not be null.");
}
TypeInformation<R> resultType = TypeExtractor.getGroupReduceReturnTypes(reducer,
this.getDataSet().getType());
return new GroupReduceOperator<T, R>(this, resultType, dataSet.clean(reducer), Utils.getCallLocationName() );
}
/**
* Applies a CombineFunction on a grouped {@link DataSet}.
* A CombineFunction is similar to a GroupReduceFunction but does not perform a full data exchange. Instead, the
* CombineFunction calls the combine method once per partition for combining a group of results. This
* operator is suitable for combining values into an intermediate format before doing a proper groupReduce where
* the data is shuffled across the node for further reduction. The GroupReduce operator can also be supplied with
* a combiner by implementing the RichGroupReduce function. The combine method of the RichGroupReduce function
* demands input and output type to be the same. The CombineFunction, on the other side, can have an arbitrary
* output type.
* @param combiner The CombineFunction that is applied on the DataSet.
* @return A GroupCombineOperator which represents the combined DataSet.
*/
public <R> GroupCombineOperator<T, R> combineGroup(GroupCombineFunction<T, R> combiner) {
if (combiner == null) {
throw new NullPointerException("GroupReduce function must not be null.");
}
TypeInformation<R> resultType = TypeExtractor.getGroupCombineReturnTypes(combiner, this.getDataSet().getType());
return new GroupCombineOperator<T, R>(this, resultType, dataSet.clean(combiner), Utils.getCallLocationName());
}
/**
* Returns a new set containing the first n elements in this grouped and sorted {@link DataSet}.<br/>
* @param n The desired number of elements for each group.
* @return A ReduceGroupOperator that represents the DataSet containing the elements.
*/
public GroupReduceOperator<T, T> first(int n) {
if(n < 1) {
throw new InvalidProgramException("Parameter n of first(n) must be at least 1.");
}
return reduceGroup(new FirstReducer<T>(n));
}
// --------------------------------------------------------------------------------------------
// Group Operations
// --------------------------------------------------------------------------------------------
/**
* Sorts {@link org.apache.flink.api.java.tuple.Tuple} elements within a group on the specified field in the specified {@link Order}.</br>
* <b>Note: Only groups of Tuple or Pojo elements can be sorted.</b><br/>
* Groups can be sorted by multiple fields by chaining {@link #sortGroup(int, Order)} calls.
*
* @param field The Tuple field on which the group is sorted.
* @param order The Order in which the specified Tuple field is sorted.
* @return A SortedGrouping with specified order of group element.
*
* @see org.apache.flink.api.java.tuple.Tuple
* @see Order
*/
public SortedGrouping<T> sortGroup(int field, Order order) {
if (groupSortSelectorFunctionKey != null) {
throw new InvalidProgramException("Chaining sortGroup with KeySelector sorting is not supported");
}
if (!dataSet.getType().isTupleType()) {
throw new InvalidProgramException("Specifying order keys via field positions is only valid for tuple data types");
}
if (field >= dataSet.getType().getArity()) {
throw new IllegalArgumentException("Order key out of tuple bounds.");
}
ExpressionKeys<T> ek = new ExpressionKeys<T>(new int[]{field}, dataSet.getType());
addSortGroupInternal(ek, order);
return this;
}
private void addSortGroupInternal(ExpressionKeys<T> ek, Order order) {
Preconditions.checkArgument(order != null, "Order can not be null");
int[] additionalKeyPositions = ek.computeLogicalKeyPositions();
int newLength = this.groupSortKeyPositions.length + additionalKeyPositions.length;
this.groupSortKeyPositions = Arrays.copyOf(this.groupSortKeyPositions, newLength);
this.groupSortOrders = Arrays.copyOf(this.groupSortOrders, newLength);
int pos = newLength - additionalKeyPositions.length;
int off = newLength - additionalKeyPositions.length;
for(;pos < newLength; pos++) {
this.groupSortKeyPositions[pos] = additionalKeyPositions[pos - off];
this.groupSortOrders[pos] = order; // use the same order
}
}
/**
* Sorts {@link org.apache.flink.api.java.tuple.Tuple} or POJO elements within a group on the specified field in the specified {@link Order}.</br>
* <b>Note: Only groups of Tuple or Pojo elements can be sorted.</b><br/>
* Groups can be sorted by multiple fields by chaining {@link #sortGroup(String, Order)} calls.
*
* @param field The Tuple or Pojo field on which the group is sorted.
* @param order The Order in which the specified field is sorted.
* @return A SortedGrouping with specified order of group element.
*
* @see org.apache.flink.api.java.tuple.Tuple
* @see Order
*/
public SortedGrouping<T> sortGroup(String field, Order order) {
if (groupSortSelectorFunctionKey != null) {
throw new InvalidProgramException("Chaining sortGroup with KeySelector sorting is not supported");
}
if (! (dataSet.getType() instanceof CompositeType)) {
throw new InvalidProgramException("Specifying order keys via field positions is only valid for composite data types (pojo / tuple / case class)");
}
ExpressionKeys<T> ek = new ExpressionKeys<T>(new String[]{field}, dataSet.getType());
addSortGroupInternal(ek, order);
return this;
}
}