/
OrcStorage.java
721 lines (669 loc) · 29 KB
/
OrcStorage.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.builtin;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.ql.io.orc.CompressionKind;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcNewOutputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
import org.apache.hadoop.hive.ql.io.orc.Reader;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.hive.shims.Hadoop23Shims;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.pig.Expression;
import org.apache.pig.Expression.BetweenExpression;
import org.apache.pig.Expression.BinaryExpression;
import org.apache.pig.Expression.Column;
import org.apache.pig.Expression.Const;
import org.apache.pig.Expression.InExpression;
import org.apache.pig.Expression.OpType;
import org.apache.pig.Expression.UnaryExpression;
import org.apache.pig.LoadFunc;
import org.apache.pig.LoadMetadata;
import org.apache.pig.LoadPredicatePushdown;
import org.apache.pig.LoadPushDown;
import org.apache.pig.PigException;
import org.apache.pig.PigWarning;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.ResourceStatistics;
import org.apache.pig.StoreFunc;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.StoreResources;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.hive.HiveShims;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.impl.util.hive.HiveUtils;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Output;
import com.google.common.annotations.VisibleForTesting;
import org.joda.time.DateTime;
/**
* A load function and store function for ORC file.
* An optional constructor argument is provided that allows one to customize
* advanced behaviors. A list of available options is below:
* <ul>
* <li><code>-s, --stripeSize</code> Set the stripe size for the file
* <li><code>-r, --rowIndexStride</code> Set the distance between entries in the row index
* <li><code>-b, --bufferSize</code> The size of the memory buffers used for compressing and storing the
* stripe in memory
* <li><code>-p, --blockPadding</code> Sets whether the HDFS blocks are padded to prevent stripes
* from straddling blocks
* <li><code>-c, --compress</code> Sets the generic compression that is used to compress the data.
* Valid codecs are: NONE, ZLIB, SNAPPY, LZO
* <li><code>-k, --keepSingleFieldTuple</code> Sets whether to keep a Tuple(struct) schema
* inside a Bag(array) even if the tuple only contains a single field
* <li><code>-v, --version</code> Sets the version of the file that will be written
* </ul>
**/
public class OrcStorage extends LoadFunc implements StoreFuncInterface, LoadMetadata, LoadPushDown, LoadPredicatePushdown, StoreResources {
//TODO Make OrcInputFormat.SARG_PUSHDOWN visible
private static final String SARG_PUSHDOWN = "sarg.pushdown";
protected RecordReader in = null;
protected RecordWriter writer = null;
private TypeInfo typeInfo = null;
private ObjectInspector oi = null;
private OrcSerde serde = new OrcSerde();
private String signature;
private Long stripeSize;
private Integer rowIndexStride;
private Integer bufferSize;
private Boolean blockPadding;
private Boolean keepSingleFieldTuple = false;
private CompressionKind compress;
private String versionName;
private static final Options validOptions;
private final CommandLineParser parser = new GnuParser();
protected final static Log log = LogFactory.getLog(OrcStorage.class);
protected boolean[] mRequiredColumns = null;
private static final String SchemaSignatureSuffix = "_schema";
private static final String RequiredColumnsSuffix = "_columns";
private static final String SearchArgsSuffix = "_sarg";
static {
validOptions = new Options();
validOptions.addOption("s", "stripeSize", true,
"Set the stripe size for the file");
validOptions.addOption("r", "rowIndexStride", true,
"Set the distance between entries in the row index");
validOptions.addOption("b", "bufferSize", true,
"The size of the memory buffers used for compressing and storing the " +
"stripe in memory");
validOptions.addOption("p", "blockPadding", false, "Sets whether the HDFS blocks " +
"are padded to prevent stripes from straddling blocks");
validOptions.addOption("c", "compress", true,
"Sets the generic compression that is used to compress the data");
validOptions.addOption("k", "keepSingleFieldTuple", false,
"Sets whether to keep Tuple(struct) schema inside a Bag(array) even if " +
"the tuple only contains a single field");
validOptions.addOption("v", "version", true,
"Sets the version of the file that will be written");
}
public OrcStorage() {
}
public OrcStorage(String options) {
String[] optsArr = options.split(" ");
try {
CommandLine configuredOptions = parser.parse(validOptions, optsArr);
if (configuredOptions.hasOption('s')) {
stripeSize = Long.parseLong(configuredOptions.getOptionValue('s'));
}
if (configuredOptions.hasOption('r')) {
rowIndexStride = Integer.parseInt(configuredOptions.getOptionValue('r'));
}
if (configuredOptions.hasOption('b')) {
bufferSize = Integer.parseInt(configuredOptions.getOptionValue('b'));
}
blockPadding = configuredOptions.hasOption('p');
if (configuredOptions.hasOption('c')) {
compress = CompressionKind.valueOf(configuredOptions.getOptionValue('c'));
}
if (configuredOptions.hasOption('v')) {
versionName = HiveShims.normalizeOrcVersionName(configuredOptions.getOptionValue('v'));
}
keepSingleFieldTuple = configuredOptions.hasOption('k');
} catch (ParseException e) {
log.error("Exception in OrcStorage", e);
log.error("OrcStorage called with arguments " + options);
warn("ParseException in OrcStorage", PigWarning.UDF_WARNING_1);
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("OrcStorage(',', '[options]')", validOptions);
throw new RuntimeException(e);
}
}
@Override
public String relToAbsPathForStoreLocation(String location, Path curDir)
throws IOException {
return LoadFunc.getAbsolutePath(location, curDir);
}
@Override
public OutputFormat getOutputFormat() throws IOException {
return new OrcNewOutputFormat();
}
@Override
public void setStoreLocation(String location, Job job) throws IOException {
if (!UDFContext.getUDFContext().isFrontend()) {
HiveShims.setOrcConfigOnJob(job, stripeSize, rowIndexStride, bufferSize, blockPadding, compress,
versionName);
}
FileOutputFormat.setOutputPath(job, new Path(location));
if (typeInfo==null) {
Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass());
typeInfo = (TypeInfo)ObjectSerializer.deserialize(p.getProperty(signature + SchemaSignatureSuffix));
}
if (oi==null) {
oi = HiveUtils.createObjectInspector(typeInfo, keepSingleFieldTuple);
}
}
@Override
public void checkSchema(ResourceSchema rs) throws IOException {
ResourceFieldSchema fs = new ResourceFieldSchema();
fs.setType(DataType.TUPLE);
fs.setSchema(rs);
typeInfo = HiveUtils.getTypeInfo(fs, keepSingleFieldTuple);
Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass());
p.setProperty(signature + SchemaSignatureSuffix, ObjectSerializer.serialize(typeInfo));
}
@Override
public void prepareToWrite(RecordWriter writer) throws IOException {
this.writer = writer;
}
@Override
public void putNext(Tuple t) throws IOException {
try {
writer.write(null, serde.serialize(t, oi));
} catch (InterruptedException e) {
throw new IOException(e);
}
}
@Override
public void setStoreFuncUDFContextSignature(String signature) {
this.signature = signature;
}
@Override
public void setUDFContextSignature(String signature) {
this.signature = signature;
}
@Override
public void cleanupOnFailure(String location, Job job) throws IOException {
StoreFunc.cleanupOnFailureImpl(location, job);
}
@Override
public void cleanupOnSuccess(String location, Job job) throws IOException {
}
@Override
public void setLocation(String location, Job job) throws IOException {
Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass());
if (!UDFContext.getUDFContext().isFrontend()) {
typeInfo = (TypeInfo)ObjectSerializer.deserialize(p.getProperty(signature + SchemaSignatureSuffix));
} else if (typeInfo == null) {
typeInfo = getTypeInfo(location, job);
}
if (typeInfo != null && oi == null) {
oi = OrcStruct.createObjectInspector(typeInfo);
}
if (!UDFContext.getUDFContext().isFrontend()) {
if (p.getProperty(signature + RequiredColumnsSuffix) != null) {
mRequiredColumns = (boolean[]) ObjectSerializer.deserialize(p
.getProperty(signature + RequiredColumnsSuffix));
job.getConfiguration().setBoolean(ColumnProjectionUtils.READ_ALL_COLUMNS, false);
job.getConfiguration().set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR,
getReqiredColumnIdString(mRequiredColumns));
if (p.getProperty(signature + SearchArgsSuffix) != null) {
// Bug in setSearchArgument which always expects READ_COLUMN_NAMES_CONF_STR to be set
job.getConfiguration().set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR,
getReqiredColumnNamesString(getSchema(location, job), mRequiredColumns));
}
} else if (p.getProperty(signature + SearchArgsSuffix) != null) {
// Bug in setSearchArgument which always expects READ_COLUMN_NAMES_CONF_STR to be set
job.getConfiguration().set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR,
getReqiredColumnNamesString(getSchema(location, job)));
}
if (p.getProperty(signature + SearchArgsSuffix) != null) {
job.getConfiguration().set(SARG_PUSHDOWN, p.getProperty(signature + SearchArgsSuffix));
}
}
Set<Path> paths = getGlobPaths(location, job.getConfiguration(), true);
if (!paths.isEmpty()) {
FileInputFormat.setInputPaths(job, paths.toArray(new Path[paths.size()]));
}
else {
throw new IOException("Input path \'" + location + "\' is not found");
}
}
private String getReqiredColumnIdString(boolean[] requiredColumns) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < requiredColumns.length; i++) {
if (requiredColumns[i]) {
sb.append(i).append(",");
}
}
if (sb.charAt(sb.length() - 1) == ',') {
sb.deleteCharAt(sb.length() - 1);
}
return sb.toString();
}
private String getReqiredColumnNamesString(ResourceSchema schema) {
StringBuilder sb = new StringBuilder();
for (ResourceFieldSchema field : schema.getFields()) {
sb.append(field.getName()).append(",");
}
if(sb.charAt(sb.length() -1) == ',') {
sb.deleteCharAt(sb.length() - 1);
}
return sb.toString();
}
private String getReqiredColumnNamesString(ResourceSchema schema, boolean[] requiredColumns) {
StringBuilder sb = new StringBuilder();
ResourceFieldSchema[] fields = schema.getFields();
for (int i = 0; i < requiredColumns.length; i++) {
if (requiredColumns[i]) {
sb.append(fields[i]).append(",");
}
}
if(sb.charAt(sb.length() - 1) == ',') {
sb.deleteCharAt(sb.length() - 1);
}
return sb.toString();
}
@Override
public InputFormat getInputFormat() throws IOException {
return new OrcNewInputFormat();
}
@Override
public void prepareToRead(RecordReader reader, PigSplit split)
throws IOException {
in = reader;
}
@Override
public Tuple getNext() throws IOException {
try {
boolean notDone = in.nextKeyValue();
if (!notDone) {
return null;
}
Object value = in.getCurrentValue();
Tuple t = (Tuple)HiveUtils.convertHiveToPig(value, oi, mRequiredColumns);
return t;
} catch (InterruptedException e) {
int errCode = 6018;
String errMsg = "Error while reading input";
throw new ExecException(errMsg, errCode,
PigException.REMOTE_ENVIRONMENT, e);
}
}
@Override
public List<String> getShipFiles() {
Class[] classList = HiveShims.getOrcDependentClasses(Hadoop23Shims.class);
return FuncUtils.getShipFiles(classList);
}
private static Path getFirstFile(String location, FileSystem fs, PathFilter filter) throws IOException {
String[] locations = getPathStrings(location);
Path[] paths = new Path[locations.length];
for (int i = 0; i < paths.length; ++i) {
paths[i] = new Path(locations[i]);
}
List<FileStatus> statusList = new ArrayList<FileStatus>();
for (int i = 0; i < paths.length; ++i) {
FileStatus[] files = fs.globStatus(paths[i]);
if (files != null) {
for (FileStatus tempf : files) {
statusList.add(tempf);
}
}
}
FileStatus[] statusArray = (FileStatus[]) statusList
.toArray(new FileStatus[statusList.size()]);
Path p = Utils.depthFirstSearchForFile(statusArray, fs, filter);
return p;
}
@Override
public ResourceSchema getSchema(String location, Job job)
throws IOException {
if (typeInfo == null) {
typeInfo = getTypeInfo(location, job);
// still null means case of multiple load store
if (typeInfo == null) {
return null;
}
}
ResourceFieldSchema fs = HiveUtils.getResourceFieldSchema(typeInfo);
return fs.getSchema();
}
private TypeInfo getTypeInfo(String location, Job job) throws IOException {
Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass());
TypeInfo typeInfo = (TypeInfo) ObjectSerializer.deserialize(p.getProperty(signature + SchemaSignatureSuffix));
if (typeInfo == null) {
typeInfo = getTypeInfoFromLocation(location, job);
}
if (typeInfo != null) {
p.setProperty(signature + SchemaSignatureSuffix, ObjectSerializer.serialize(typeInfo));
}
return typeInfo;
}
private TypeInfo getTypeInfoFromLocation(String location, Job job) throws IOException {
FileSystem fs = FileSystem.get(new Path(location).toUri(), job.getConfiguration());
Path path = getFirstFile(location, fs, new NonEmptyOrcFileFilter(fs));
if (path == null) {
log.info("Cannot find any ORC files from " + location +
". Probably multiple load store in script.");
return null;
}
Reader reader = OrcFile.createReader(fs, path);
ObjectInspector oip = (ObjectInspector)reader.getObjectInspector();
return TypeInfoUtils.getTypeInfoFromObjectInspector(oip);
}
public static class NonEmptyOrcFileFilter implements PathFilter {
private FileSystem fs;
public NonEmptyOrcFileFilter(FileSystem fs) {
this.fs = fs;
}
@Override
public boolean accept(Path path) {
Reader reader;
try {
reader = OrcFile.createReader(fs, path);
ObjectInspector oip = (ObjectInspector)reader.getObjectInspector();
ResourceFieldSchema rs = HiveUtils.getResourceFieldSchema(TypeInfoUtils.getTypeInfoFromObjectInspector(oip));
if (rs.getSchema().getFields().length!=0) {
return true;
}
} catch (IOException e) {
throw new RuntimeException(e);
}
return false;
}
}
@Override
public ResourceStatistics getStatistics(String location, Job job)
throws IOException {
return null;
}
@Override
public String[] getPartitionKeys(String location, Job job)
throws IOException {
return null;
}
@Override
public void setPartitionFilter(Expression partitionFilter)
throws IOException {
}
@Override
public List<OperatorSet> getFeatures() {
return Arrays.asList(LoadPushDown.OperatorSet.PROJECTION);
}
@Override
public RequiredFieldResponse pushProjection(
RequiredFieldList requiredFieldList) throws FrontendException {
if (requiredFieldList == null)
return null;
if (requiredFieldList.getFields() != null)
{
int schemaSize = ((StructTypeInfo)typeInfo).getAllStructFieldTypeInfos().size();
mRequiredColumns = new boolean[schemaSize];
for (RequiredField rf: requiredFieldList.getFields())
{
if (rf.getIndex()!=-1)
mRequiredColumns[rf.getIndex()] = true;
}
Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass());
try {
p.setProperty(signature + RequiredColumnsSuffix, ObjectSerializer.serialize(mRequiredColumns));
} catch (Exception e) {
throw new RuntimeException("Cannot serialize mRequiredColumns");
}
}
return new RequiredFieldResponse(true);
}
@Override
public List<String> getPredicateFields(String location, Job job) throws IOException {
ResourceSchema schema = getSchema(location, job);
List<String> predicateFields = new ArrayList<String>();
for (ResourceFieldSchema field : schema.getFields()) {
switch(field.getType()) {
case DataType.BOOLEAN:
case DataType.INTEGER:
case DataType.LONG:
case DataType.FLOAT:
case DataType.DOUBLE:
case DataType.DATETIME:
case DataType.CHARARRAY:
case DataType.BIGINTEGER:
case DataType.BIGDECIMAL:
predicateFields.add(field.getName());
break;
default:
// Skip DataType.BYTEARRAY, DataType.TUPLE, DataType.MAP and DataType.BAG
break;
}
}
return predicateFields;
}
@Override
public List<OpType> getSupportedExpressionTypes() {
List<OpType> types = new ArrayList<OpType>();
types.add(OpType.OP_EQ);
types.add(OpType.OP_NE);
types.add(OpType.OP_GT);
types.add(OpType.OP_GE);
types.add(OpType.OP_LT);
types.add(OpType.OP_LE);
types.add(OpType.OP_IN);
types.add(OpType.OP_BETWEEN);
types.add(OpType.OP_NULL);
types.add(OpType.OP_NOT);
types.add(OpType.OP_AND);
types.add(OpType.OP_OR);
return types;
}
@Override
public void setPushdownPredicate(Expression expr) throws IOException {
SearchArgument sArg = getSearchArgument(expr);
if (sArg != null) {
log.info("Pushdown predicate expression is " + expr);
log.info("Pushdown predicate SearchArgument is:\n" + sArg);
Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass());
try {
Kryo kryo = new Kryo();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Output output = new Output(baos);
kryo.writeObject(output, sArg);
p.setProperty(signature + SearchArgsSuffix, new String(Base64.encodeBase64(output.toBytes())));
} catch (Exception e) {
throw new IOException("Cannot serialize SearchArgument: " + sArg);
}
}
}
@VisibleForTesting
SearchArgument getSearchArgument(Expression expr) {
if (expr == null) {
return null;
}
Builder builder = SearchArgumentFactory.newBuilder();
boolean beginWithAnd = !(expr.getOpType().equals(OpType.OP_AND) || expr.getOpType().equals(OpType.OP_OR) || expr.getOpType().equals(OpType.OP_NOT));
if (beginWithAnd) {
builder.startAnd();
}
buildSearchArgument(expr, builder);
if (beginWithAnd) {
builder.end();
}
SearchArgument sArg = builder.build();
return sArg;
}
private void buildSearchArgument(Expression expr, Builder builder) {
if (expr instanceof BinaryExpression) {
Expression lhs = ((BinaryExpression) expr).getLhs();
Expression rhs = ((BinaryExpression) expr).getRhs();
switch (expr.getOpType()) {
case OP_AND:
builder.startAnd();
buildSearchArgument(lhs, builder);
buildSearchArgument(rhs, builder);
builder.end();
break;
case OP_OR:
builder.startOr();
buildSearchArgument(lhs, builder);
buildSearchArgument(rhs, builder);
builder.end();
break;
case OP_EQ:
HiveShims.addEqualsOpToBuilder(builder, getColumnName(lhs),
getColumnType(lhs), getExpressionValue(rhs));
break;
case OP_NE:
builder.startNot();
HiveShims.addEqualsOpToBuilder(builder, getColumnName(lhs),
getColumnType(lhs), getExpressionValue(rhs));
builder.end();
break;
case OP_LT:
HiveShims.addLessThanOpToBuilder(builder, getColumnName(lhs),
getColumnType(lhs), getExpressionValue(rhs));
break;
case OP_LE:
HiveShims.addLessThanEqualsOpToBuilder(builder, getColumnName(lhs),
getColumnType(lhs), getExpressionValue(rhs));
break;
case OP_GT:
builder.startNot();
HiveShims.addLessThanEqualsOpToBuilder(builder, getColumnName(lhs),
getColumnType(lhs), getExpressionValue(rhs));
builder.end();
break;
case OP_GE:
builder.startNot();
HiveShims.addLessThanOpToBuilder(builder, getColumnName(lhs),
getColumnType(lhs), getExpressionValue(rhs));
builder.end();
break;
case OP_BETWEEN:
BetweenExpression between = (BetweenExpression) rhs;
HiveShims.addBetweenOpToBuilder(builder, getColumnName(lhs),
getColumnType(lhs), HiveShims.getSearchArgObjValue(between.getLower()),
HiveShims.getSearchArgObjValue(between.getUpper()));
case OP_IN:
InExpression in = (InExpression) rhs;
builder.in(getColumnName(lhs), getColumnType(lhs), getSearchArgObjValues(in.getValues()).toArray());
default:
throw new RuntimeException("Unsupported binary expression type: " + expr.getOpType() + " in " + expr);
}
} else if (expr instanceof UnaryExpression) {
Expression unaryExpr = ((UnaryExpression) expr).getExpression();
switch (expr.getOpType()) {
case OP_NULL:
HiveShims.addIsNullOpToBuilder(builder, getColumnName(unaryExpr),
getColumnType(unaryExpr));
break;
case OP_NOT:
builder.startNot();
buildSearchArgument(unaryExpr, builder);
builder.end();
break;
default:
throw new RuntimeException("Unsupported unary expression type: " +
expr.getOpType() + " in " + expr);
}
} else {
throw new RuntimeException("Unsupported expression type: " + expr.getOpType() + " in " + expr);
}
}
private String getColumnName(Expression expr) {
try {
return ((Column) expr).getName();
} catch (ClassCastException e) {
throw new RuntimeException("Expected a Column but found " + expr.getClass().getName() +
" in expression " + expr, e);
}
}
private PredicateLeaf.Type getColumnType(Expression expr) {
try {
return HiveUtils.getDataTypeForSearchArgs(expr.getDataType());
} catch (ClassCastException e) {
throw new RuntimeException("Expected a Column but found " + expr.getClass().getName() +
" in expression " + expr, e);
}
}
private Object getExpressionValue(Expression expr) {
switch(expr.getOpType()) {
case TERM_COL:
return ((Column) expr).getName();
case TERM_CONST:
return HiveShims.getSearchArgObjValue(((Const) expr).getValue());
default:
throw new RuntimeException("Unsupported expression type: " + expr.getOpType() + " in " + expr);
}
}
private List<Object> getSearchArgObjValues(List<Object> values) {
if (!(values.get(0) instanceof BigInteger || values.get(0) instanceof BigDecimal || values.get(0) instanceof DateTime)) {
return values;
}
List<Object> newValues = new ArrayList<Object>(values.size());
for (Object value : values) {
newValues.add(HiveShims.getSearchArgObjValue(value));
}
return values;
}
@Override
public Boolean supportsParallelWriteToStoreLocation() {
return true;
}
}