/
UnionSketch.java
357 lines (316 loc) · 11 KB
/
UnionSketch.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
/*
* Copyright 2019, Verizon Media.
* Licensed under the terms of the Apache License 2.0. See LICENSE file at the project root for terms.
*/
package com.yahoo.sketches.pig.kll;
import java.io.IOException;
import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import com.yahoo.memory.Memory;
import com.yahoo.sketches.kll.KllFloatsSketch;
/**
* This UDF is to merge sketches.
* This class implements both the <i>Accumulator</i> and <i>Algebraic</i> interfaces for
* performance optimization.
*/
public class UnionSketch extends EvalFunc<DataByteArray>
implements Accumulator<DataByteArray>, Algebraic {
private static final TupleFactory TUPLE_FACTORY_ = TupleFactory.getInstance();
// With the single exception of the Accumulator interface, UDFs are stateless.
// All parameters kept at the class level must be final, except for the accumSketch.
private final int k_;
private KllFloatsSketch accumSketch_;
//TOP LEVEL API
/**
* Default constructor. Assumes default k.
*/
public UnionSketch() {
this(KllFloatsSketch.DEFAULT_K);
}
/**
* Constructor with explicit k as string.
*
* @param kStr string representation of k
*/
public UnionSketch(final String kStr) {
this(Integer.parseInt(kStr));
}
/**
* Base constructor.
*
* @param k parameter that determines the accuracy and size of the sketch.
*/
private UnionSketch(final int k) {
super();
k_ = k;
}
//@formatter:off
/**
* Top-level exec function.
* This method accepts an input Tuple containing a Bag of one or more inner <b>Sketch Tuples</b>
* and returns a single updated <b>Sketch</b> as a DataByteArray.
*
* <p>Types are in the form: Java data type: Pig DataType
*
* <p><b>Input Tuple</b>
* <ul>
* <li>Tuple: TUPLE (Must contain only one field)
* <ul>
* <li>index 0: DataBag: BAG (May contain 0 or more Inner Tuples)
* <ul>
* <li>index 0: Tuple: TUPLE <b>Sketch Tuple</b></li>
* <li>...</li>
* <li>index n-1: Tuple: TUPLE <b>Sketch Tuple</b></li>
* </ul>
* </li>
* </ul>
* </li>
* </ul>
*
* <b>Sketch Tuple</b>
* <ul>
* <li>Tuple: TUPLE (Contains exactly 1 field)
* <ul>
* <li>index 0: DataByteArray: BYTEARRAY = The serialization of a Sketch object.</li>
* </ul>
* </li>
* </ul>
*
* @param inputTuple A tuple containing a single bag, containing Sketch Tuples
* @return serialized sketch
* @see "org.apache.pig.EvalFunc.exec(org.apache.pig.data.Tuple)"
*/
//@formatter:on
@Override // TOP LEVEL EXEC
public DataByteArray exec(final Tuple inputTuple) throws IOException {
//The exec is a stateless function. It operates on the input and returns a result.
final KllFloatsSketch sketch = new KllFloatsSketch(k_);
if ((inputTuple != null) && (inputTuple.size() > 0)) {
final DataBag bag = (DataBag) inputTuple.get(0);
updateUnion(bag, sketch);
}
return new DataByteArray(sketch.toByteArray());
}
//ACCUMULATOR INTERFACE
/**
* An <i>Accumulator</i> version of the standard <i>exec()</i> method. Like <i>exec()</i>,
* accumulator is called with a bag of Sketch Tuples. Unlike <i>exec()</i>, it doesn't serialize
* the sketch at the end. Instead, it can be called multiple times, each time with another bag of
* Sketch Tuples to be input to the Union.
*
* @param inputTuple A tuple containing a single bag, containing Sketch Tuples.
* @see #exec
* @see "org.apache.pig.Accumulator.accumulate(org.apache.pig.data.Tuple)"
* @throws IOException by Pig
*/
@Override
public void accumulate(final Tuple inputTuple) throws IOException {
if ((inputTuple == null) || (inputTuple.size() == 0)) { return; }
final DataBag bag = (DataBag) inputTuple.get(0);
if (bag == null) { return; }
if (accumSketch_ == null) {
accumSketch_ = new KllFloatsSketch(k_);
}
updateUnion(bag, accumSketch_);
}
/**
* Returns the result of the Union that has been built up by multiple calls to {@link #accumulate}.
*
* @return serialized sketch
* @see "org.apache.pig.Accumulator.getValue()"
*/
@Override
public DataByteArray getValue() {
if (accumSketch_ != null) {
return new DataByteArray(accumSketch_.toByteArray());
}
// return empty sketch
return new DataByteArray(new KllFloatsSketch(k_).toByteArray());
}
/**
* Cleans up the UDF state after being called using the {@link Accumulator} interface.
*
* @see "org.apache.pig.Accumulator.cleanup()"
*/
@Override
public void cleanup() {
accumSketch_ = null;
}
//ALGEBRAIC INTERFACE
@Override
public String getInitial() {
return Initial.class.getName();
}
@Override
public String getIntermed() {
return Intermediate.class.getName();
}
@Override
public String getFinal() {
return Final.class.getName();
}
//TOP LEVEL PRIVATE STATIC METHODS
/**
* Updates a union from a bag of sketches
*
* @param bag A bag of sketchTuples.
* @param union The union to update
*/
private static void updateUnion(final DataBag bag, final KllFloatsSketch union)
throws ExecException {
for (Tuple innerTuple: bag) {
final Object f0 = innerTuple.get(0);
if (f0 == null) { continue; }
if (f0 instanceof DataByteArray) {
final DataByteArray dba = (DataByteArray) f0;
if (dba.size() > 0) {
union.merge(KllFloatsSketch.heapify(Memory.wrap(dba.get())));
}
} else {
throw new IllegalArgumentException("Field type was not DataType.BYTEARRAY: "
+ innerTuple.getType(0));
}
}
}
//STATIC Initial Class only called by Pig
/**
* Class used to calculate the initial pass of an Algebraic sketch operation.
*
* <p>
* The Initial class simply passes through all records unchanged so that they can be
* processed by the intermediate processor instead.</p>
*/
public static class Initial extends EvalFunc<Tuple> {
// The Algebraic worker classes (Initial, Intermediate and Final) are static and stateless.
// The constructors and parameters must mirror the parent class as there is no linkage
// between them.
/**
* Default constructor.
*/
public Initial() {}
/**
* Constructor for the initial pass of an Algebraic function. Pig will call this and pass the
* same constructor arguments as the base UDF. In this case the arguments are ignored.
*
* @param kStr string representation of k
*/
public Initial(final String kStr) {}
@Override
public Tuple exec(final Tuple inputTuple) throws IOException {
return inputTuple;
}
}
// STATIC Intermediate Class only called by Pig
/**
* Class used to calculate the intermediate pass of an <i>Algebraic</i> union operation.
* It will receive a bag of values returned by either the <i>Intermediate</i>
* stage or the <i>Initial</i> stages, so it needs to be able to differentiate between and
* interpret both types.
*/
public static class Intermediate extends EvalFunc<Tuple> {
// The Algebraic worker classes (Initial, Intermediate and Final) are static and stateless.
// The constructors and final parameters must mirror the parent class as there is no linkage
// between them.
private final int k_;
/**
* Default constructor. Assumes default k.
*/
public Intermediate() {
this(KllFloatsSketch.DEFAULT_K);
}
/**
* Constructor with explicit k as string.
*
* @param kStr string representation of k
*/
public Intermediate(final String kStr) {
this(Integer.parseInt(kStr));
}
/**
* Constructor with primitive k.
*
* @param k parameter that determines the accuracy and size of the sketch.
*/
private Intermediate(final int k) {
k_ = k;
}
@Override
public Tuple exec(final Tuple inputTuple) throws IOException {
return TUPLE_FACTORY_.newTuple(process(inputTuple, k_));
}
}
// STATIC Final Class only called by Pig
/**
* Class used to calculate the final pass of an <i>Algebraic</i> union operation.
* It will receive a bag of values returned by either the <i>Intermediate</i>
* stage or the <i>Initial</i> stages, so it needs to be able to differentiate between and
* interpret both types.
*/
public static class Final extends EvalFunc<DataByteArray> {
// The Algebraic worker classes (Initial, Intermediate and Final) are static and stateless.
// The constructors and final parameters must mirror the parent class as there is no linkage
// between them.
private final int k_;
/**
* Default constructor. Assumes default k.
*/
public Final() {
this(KllFloatsSketch.DEFAULT_K);
}
/**
* Constructor with explicit k as string.
*
* @param kStr string representation of k
*/
public Final(final String kStr) {
this(Integer.parseInt(kStr));
}
/**
* Constructor with primitive k.
*
* @param k parameter that determines the accuracy and size of the sketch.
*/
private Final(final int k) {
k_ = k;
}
@Override
public DataByteArray exec(final Tuple inputTuple) throws IOException {
return process(inputTuple, k_);
}
}
private static DataByteArray process(final Tuple inputTuple, final int k) throws IOException {
final KllFloatsSketch union = new KllFloatsSketch(k);
if ((inputTuple != null) && (inputTuple.size() > 0)) {
final DataBag outerBag = (DataBag) inputTuple.get(0);
for (final Tuple dataTuple: outerBag) {
final Object f0 = dataTuple.get(0);
if (f0 == null) { continue; }
if (f0 instanceof DataBag) {
final DataBag innerBag = (DataBag) f0; //inputTuple.bag0.dataTupleN.f0:bag
if (innerBag.size() == 0) { continue; }
// If field 0 of a dataTuple is again a Bag all tuples of this inner bag
// will be passed into the union.
// It is due to system bagged outputs from multiple mapper Initial functions.
// The Intermediate stage was bypassed.
updateUnion(innerBag, union);
} else if (f0 instanceof DataByteArray) { //inputTuple.bag0.dataTupleN.f0:DBA
// If field 0 of a dataTuple is a DataByteArray we assume it is a sketch from a prior call
// It is due to system bagged outputs from multiple mapper Intermediate functions.
// Each dataTuple.DBA:sketch will merged into the union.
final DataByteArray dba = (DataByteArray) f0;
union.merge(KllFloatsSketch.heapify(Memory.wrap(dba.get())));
} else {
throw new IllegalArgumentException("dataTuple.Field0: Is not a DataByteArray: "
+ f0.getClass().getName());
}
}
}
return new DataByteArray(union.toByteArray());
}
}