From 8af34f1971628d1eeb0cd1f07fe03397ca887b81 Mon Sep 17 00:00:00 2001 From: Jianyong Dai Date: Tue, 7 Apr 2015 21:24:48 +0000 Subject: [PATCH] PIG-3294: Allow Pig use Hive UDFs git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1671956 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 2 + .../JavaConstantBooleanObjectInspector.java | 43 + .../JavaConstantDoubleObjectInspector.java | 43 + .../JavaConstantFloatObjectInspector.java | 43 + .../JavaConstantIntObjectInspector.java | 43 + .../JavaConstantLongObjectInspector.java | 43 + .../JavaConstantStringObjectInspector.java | 43 + src/org/apache/pig/EvalFunc.java | 7 + .../mapReduceLayer/JobControlCompiler.java | 7 +- .../mapReduceLayer/PigCombiner.java | 2 + .../mapReduceLayer/PigGenericMapBase.java | 2 +- .../mapReduceLayer/PigGenericMapReduce.java | 2 +- .../UDFEndOfAllInputNeededVisitor.java | 46 ++ .../plans/EndOfAllInputSetter.java | 13 + .../expressionOperators/POUserFunc.java | 6 + .../relationalOperators/POForEach.java | 41 +- .../executionengine/tez/TezDagBuilder.java | 1 + .../util/CombinerOptimizerUtil.java | 2 + src/org/apache/pig/builtin/HiveUDAF.java | 344 ++++++++ src/org/apache/pig/builtin/HiveUDF.java | 170 ++++ src/org/apache/pig/builtin/HiveUDFBase.java | 207 +++++ src/org/apache/pig/builtin/HiveUDTF.java | 181 ++++ src/org/apache/pig/builtin/OrcStorage.java | 10 +- .../apache/pig/data/UnlimitedNullTuple.java | 74 ++ src/org/apache/pig/impl/util/Utils.java | 7 + .../apache/pig/impl/util/hive/HiveUtils.java | 780 ++++++++++++++++++ .../apache/pig/impl/util/orc/OrcUtils.java | 697 ---------------- .../ExpToPhyTranslationVisitor.java | 5 + .../expression/UserFuncExpression.java | 13 +- .../LogToPhyTranslationVisitor.java | 2 +- test/e2e/pig/build.xml | 10 +- test/e2e/pig/tests/nightly.conf | 118 +++ .../test/udf/evalfunc/DummyContextUDF.java | 65 ++ 33 files changed, 2362 insertions(+), 710 deletions(-) create mode 100644 src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantBooleanObjectInspector.java create mode 100644 src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantDoubleObjectInspector.java create mode 100644 src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantFloatObjectInspector.java create mode 100644 src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantIntObjectInspector.java create mode 100644 src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantLongObjectInspector.java create mode 100644 src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantStringObjectInspector.java create mode 100644 src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/UDFEndOfAllInputNeededVisitor.java create mode 100644 src/org/apache/pig/builtin/HiveUDAF.java create mode 100644 src/org/apache/pig/builtin/HiveUDF.java create mode 100644 src/org/apache/pig/builtin/HiveUDFBase.java create mode 100644 src/org/apache/pig/builtin/HiveUDTF.java create mode 100644 src/org/apache/pig/data/UnlimitedNullTuple.java create mode 100644 src/org/apache/pig/impl/util/hive/HiveUtils.java create mode 100644 test/e2e/pig/udfs/java/org/apache/pig/test/udf/evalfunc/DummyContextUDF.java diff --git a/CHANGES.txt b/CHANGES.txt index 238717725c..8381a6d231 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES IMPROVEMENTS +PIG-3294: Allow Pig use Hive UDFs (daijy) + PIG-4476: Fix logging in AvroStorage* classes and SchemaTuple class (rdsr via rohini) PIG-4458: Support UDFs in a FOREACH Before a Merge Join (wattsinabox via daijy) diff --git a/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantBooleanObjectInspector.java b/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantBooleanObjectInspector.java new file mode 100644 index 0000000000..6da6592469 --- /dev/null +++ b/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantBooleanObjectInspector.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.serde2.objectinspector.primitive; + +import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector; +import org.apache.hadoop.io.BooleanWritable; + +/** + * This class will be in Hive code base begin 1.2.0 (HIVE-9766). + * Will remove once we switch to use Hive 1.2.0 + */ +public class JavaConstantBooleanObjectInspector extends + JavaBooleanObjectInspector implements ConstantObjectInspector { + private Boolean value; + + public JavaConstantBooleanObjectInspector(Boolean value) { + super(); + this.value = value; + } + + @Override + public Object getWritableConstantValue() { + if (value==null) { + return null; + } + return new BooleanWritable(value); + } +} diff --git a/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantDoubleObjectInspector.java b/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantDoubleObjectInspector.java new file mode 100644 index 0000000000..8122fced68 --- /dev/null +++ b/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantDoubleObjectInspector.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.serde2.objectinspector.primitive; + +import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector; + +/** + * This class will be in Hive code base begin 1.2.0 (HIVE-9766). + * Will remove once we switch to use Hive 1.2.0 + */ +public class JavaConstantDoubleObjectInspector extends + JavaDoubleObjectInspector implements ConstantObjectInspector { + private Double value; + + public JavaConstantDoubleObjectInspector(Double value) { + super(); + this.value = value; + } + + @Override + public Object getWritableConstantValue() { + if (value==null) { + return null; + } + return new DoubleWritable(value); + } +} diff --git a/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantFloatObjectInspector.java b/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantFloatObjectInspector.java new file mode 100644 index 0000000000..9c57500c6d --- /dev/null +++ b/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantFloatObjectInspector.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.serde2.objectinspector.primitive; + +import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector; +import org.apache.hadoop.io.FloatWritable; + +/** + * This class will be in Hive code base begin 1.2.0 (HIVE-9766). + * Will remove once we switch to use Hive 1.2.0 + */ +public class JavaConstantFloatObjectInspector extends JavaFloatObjectInspector + implements ConstantObjectInspector { + private Float value; + + public JavaConstantFloatObjectInspector(Float value) { + super(); + this.value = value; + } + + @Override + public Object getWritableConstantValue() { + if (value==null) { + return null; + } + return new FloatWritable(value); + } +} diff --git a/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantIntObjectInspector.java b/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantIntObjectInspector.java new file mode 100644 index 0000000000..f955f916ab --- /dev/null +++ b/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantIntObjectInspector.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.serde2.objectinspector.primitive; + +import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector; +import org.apache.hadoop.io.IntWritable; + +/** + * This class will be in Hive code base begin 1.2.0 (HIVE-9766). + * Will remove once we switch to use Hive 1.2.0 + */ +public class JavaConstantIntObjectInspector extends JavaIntObjectInspector + implements ConstantObjectInspector { + private Integer value; + + public JavaConstantIntObjectInspector(Integer value) { + super(); + this.value = value; + } + + @Override + public Object getWritableConstantValue() { + if (value==null) { + return null; + } + return new IntWritable(value); + } +} diff --git a/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantLongObjectInspector.java b/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantLongObjectInspector.java new file mode 100644 index 0000000000..b50a69d1c6 --- /dev/null +++ b/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantLongObjectInspector.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.serde2.objectinspector.primitive; + +import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector; +import org.apache.hadoop.io.LongWritable; + +/** + * This class will be in Hive code base begin 1.2.0 (HIVE-9766). + * Will remove once we switch to use Hive 1.2.0 + */ +public class JavaConstantLongObjectInspector extends JavaLongObjectInspector + implements ConstantObjectInspector { + private Long value; + + public JavaConstantLongObjectInspector(Long value) { + super(); + this.value = value; + } + + @Override + public Object getWritableConstantValue() { + if (value==null) { + return null; + } + return new LongWritable(value); + } +} diff --git a/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantStringObjectInspector.java b/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantStringObjectInspector.java new file mode 100644 index 0000000000..bcb357f803 --- /dev/null +++ b/src/org/apache/hadoop/hive/serde2/objectinspector/primitive/JavaConstantStringObjectInspector.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.serde2.objectinspector.primitive; + +/** + * This class will be in Hive code base begin 1.2.0 (HIVE-9766). + * Will remove once we switch to use Hive 1.2.0 + */ +import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector; +import org.apache.hadoop.io.Text; + +public class JavaConstantStringObjectInspector extends + JavaStringObjectInspector implements ConstantObjectInspector { + private String value; + + public JavaConstantStringObjectInspector(String value) { + super(); + this.value = value; + } + + @Override + public Object getWritableConstantValue() { + if (value==null) { + return null; + } + return new Text(value); + } +} diff --git a/src/org/apache/pig/EvalFunc.java b/src/org/apache/pig/EvalFunc.java index 922b93dcd4..58640b0b48 100644 --- a/src/org/apache/pig/EvalFunc.java +++ b/src/org/apache/pig/EvalFunc.java @@ -362,4 +362,11 @@ public SchemaType getSchemaType() { public boolean allowCompileTimeCalculation() { return false; } + + public boolean needEndOfAllInputProcessing() { + return false; + } + + public void setEndOfAllInput(boolean endOfAllInput) { + } } diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java index df14902507..187951082d 100644 --- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java +++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java @@ -643,7 +643,11 @@ private Job getJob(MROperPlan plan, MapReduceOper mro, Configuration config, Pig } } if (!predeployed) { - putJarOnClassPathThroughDistributedCache(pigContext, conf, jar); + if (jar.getFile().toLowerCase().endsWith(".jar")) { + putJarOnClassPathThroughDistributedCache(pigContext, conf, jar); + } else { + setupDistributedCache(pigContext, conf, new String[] {jar.getPath()}, true); + } } } @@ -804,6 +808,7 @@ else if (mapStores.size() + reduceStores.size() > 0) { // multi store case // set parent plan in all operators in map and reduce plans // currently the parent plan is really used only when POStream is present in the plan new PhyPlanSetter(mro.mapPlan).visit(); + new PhyPlanSetter(mro.combinePlan).visit(); new PhyPlanSetter(mro.reducePlan).visit(); // this call modifies the ReplFiles names of POFRJoin operators diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java index d8baf840b4..0230929144 100644 --- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java +++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java @@ -38,6 +38,7 @@ import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; +import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil; import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.io.NullableTuple; @@ -98,6 +99,7 @@ protected void setup(Context context) throws IOException, InterruptedException { pigContext = (PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext")); if (pigContext.getLog4jProperties()!=null) PropertyConfigurator.configure(pigContext.getLog4jProperties()); + MapRedUtil.setupUDFContext(context.getConfiguration()); cp = (PhysicalPlan) ObjectSerializer.deserialize(jConf .get("pig.combinePlan")); diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java index 45ef73e6a3..8fa7ab1c34 100644 --- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java +++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapBase.java @@ -112,7 +112,7 @@ public void cleanup(Context context) throws IOException, InterruptedException { return; } - if(PigMapReduce.sJobConfInternal.get().get(JobControlCompiler.END_OF_INP_IN_MAP, "false").equals("true")) { + if(PigMapReduce.sJobConfInternal.get().get(JobControlCompiler.END_OF_INP_IN_MAP, "false").equals("true") && !mp.isEmpty()) { // If there is a stream in the pipeline or if this map job belongs to merge-join we could // potentially have more to process - so lets // set the flag stating that all map input has been sent diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java index 2ddf7d3ee8..3f3b3d91a4 100644 --- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java +++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java @@ -609,7 +609,7 @@ protected void cleanup(Context context) throws IOException, InterruptedException return; } - if(PigMapReduce.sJobConfInternal.get().get("pig.stream.in.reduce", "false").equals("true")) { + if(PigMapReduce.sJobConfInternal.get().get("pig.stream.in.reduce", "false").equals("true") && !rp.isEmpty()) { // If there is a stream in the pipeline we could // potentially have more to process - so lets // set the flag stating that all map input has been sent diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/UDFEndOfAllInputNeededVisitor.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/UDFEndOfAllInputNeededVisitor.java new file mode 100644 index 0000000000..ab67751be0 --- /dev/null +++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/UDFEndOfAllInputNeededVisitor.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer; + +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.impl.plan.DependencyOrderWalker; +import org.apache.pig.impl.plan.VisitorException; + +public class UDFEndOfAllInputNeededVisitor extends PhyPlanVisitor { + + private boolean needed = false; + + public UDFEndOfAllInputNeededVisitor(PhysicalPlan plan) { + super(plan, new DependencyOrderWalker(plan)); + } + + @Override + public void visitUserFunc(POUserFunc userFunc) throws VisitorException { + super.visitUserFunc(userFunc); + if (userFunc.needEndOfAllInputProcessing()) { + needed = true; + } + } + + public boolean needEndOfAllInputProcessing() { + return needed; + } +} diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java index 417349b618..9a70b81222 100644 --- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java +++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java @@ -21,6 +21,7 @@ import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg; @@ -110,6 +111,18 @@ public void visitPoissonSample(POPoissonSample poissonSample) throws VisitorExce endOfAllInputFlag = true; } + @Override + public void visitPOForEach(POForEach foreach) throws VisitorException { + try { + if (foreach.needEndOfAllInputProcessing()) { + endOfAllInputFlag = true; + } + } catch (Exception e) { + throw new VisitorException(e); + } + } + + /** * @return if end of all input is present */ diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java index d54f1b0924..d850312dc3 100644 --- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java +++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java @@ -351,6 +351,9 @@ private Result getNext() throws ExecException { } } } else { + if (parentPlan!=null && parentPlan.endOfAllInput && needEndOfAllInputProcessing()) { + func.setEndOfAllInput(true); + } if (executor != null) { result.result = executor.monitorExec((Tuple) result.result); } else { @@ -655,4 +658,7 @@ public void setFuncInputSchema(String signature) { } } + public boolean needEndOfAllInputProcessing() { + return getFunc().needEndOfAllInputProcessing(); + } } diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java index ff65ee6aa3..cb450f9476 100644 --- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java +++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java @@ -25,6 +25,7 @@ import org.apache.pig.PigException; import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFEndOfAllInputNeededVisitor; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; @@ -41,6 +42,7 @@ import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.data.TupleMaker; +import org.apache.pig.data.UnlimitedNullTuple; import org.apache.pig.impl.logicalLayer.schema.Schema; import org.apache.pig.impl.plan.DependencyOrderWalker; import org.apache.pig.impl.plan.NodeIdGenerator; @@ -92,10 +94,14 @@ public class POForEach extends PhysicalOperator { protected Tuple inpTuple; + protected boolean endOfAllInputProcessed = false; + // Indicate the foreach statement can only in map side // Currently only used in MR cross (See PIG-4175) protected boolean mapSideOnly = false; + protected Boolean endOfAllInputProcessing = false; + private Schema schema; public POForEach(OperatorKey k) { @@ -244,13 +250,21 @@ public Result getNextTuple() throws ExecException { //read while (true) { inp = processInput(); - if (inp.returnStatus == POStatus.STATUS_EOP || - inp.returnStatus == POStatus.STATUS_ERR) { + + if (inp.returnStatus == POStatus.STATUS_ERR) { return inp; } if (inp.returnStatus == POStatus.STATUS_NULL) { continue; } + if (inp.returnStatus == POStatus.STATUS_EOP) { + if (parentPlan!=null && parentPlan.endOfAllInput && !endOfAllInputProcessed && endOfAllInputProcessing) { + // continue pull one more output + inp = new Result(POStatus.STATUS_OK, new UnlimitedNullTuple()); + } else { + return inp; + } + } attachInputToPlans((Tuple) inp.result); inpTuple = (Tuple)inp.result; @@ -357,6 +371,9 @@ protected Result processPlan() throws ExecException{ if(its == null) { + if (endOfAllInputProcessed) { + return new Result(POStatus.STATUS_EOP, null); + } //getNext being called for the first time OR starting with a set of new data from inputs its = new Iterator[noItems]; bags = new Object[noItems]; @@ -424,6 +441,9 @@ protected Result processPlan() throws ExecException{ its[i] = null; } } + if (parentPlan!=null && parentPlan.endOfAllInput && endOfAllInputProcessing) { + endOfAllInputProcessed = true; + } } // if accumulating, we haven't got data yet for some fields, just return @@ -794,4 +814,21 @@ public void setMapSideOnly(boolean mapSideOnly) { public boolean isMapSideOnly() { return mapSideOnly; } + + public boolean needEndOfAllInputProcessing() throws ExecException { + try { + for (PhysicalPlan innerPlan : inputPlans) { + UDFEndOfAllInputNeededVisitor endOfAllInputNeededVisitor + = new UDFEndOfAllInputNeededVisitor(innerPlan); + endOfAllInputNeededVisitor.visit(); + if (endOfAllInputNeededVisitor.needEndOfAllInputProcessing()) { + endOfAllInputProcessing = true; + return true; + } + } + return false; + } catch (Exception e) { + throw new ExecException(e); + } + } } diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java index acfed6a704..9b2261ee8b 100644 --- a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java +++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java @@ -354,6 +354,7 @@ private EdgeProperty newEdge(TezOperator from, TezOperator to) OutputDescriptor out = OutputDescriptor.create(edge.outputClassName); Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties(), false); + UDFContext.getUDFContext().serialize(conf); if (!combinePlan.isEmpty()) { addCombiner(combinePlan, to, conf); } diff --git a/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java b/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java index 6b66ca132e..5b894decc5 100644 --- a/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java +++ b/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java @@ -206,7 +206,9 @@ public static void addCombiner(PhysicalPlan mapPlan, PhysicalPlan reducePlan, // fix projection and function time for algebraic functions in reduce foreach for (Pair op2plan : algebraicOps) { setProjectInput(op2plan.first, op2plan.second, op2newpos.get(op2plan.first)); + byte resultType = op2plan.first.getResultType(); ((POUserFunc)op2plan.first).setAlgebraicFunction(POUserFunc.FINAL); + op2plan.first.setResultType(resultType); } // we have modified the foreach inner plans - so set them again diff --git a/src/org/apache/pig/builtin/HiveUDAF.java b/src/org/apache/pig/builtin/HiveUDAF.java new file mode 100644 index 0000000000..cf53d7c246 --- /dev/null +++ b/src/org/apache/pig/builtin/HiveUDAF.java @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.builtin; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +import org.apache.hadoop.hive.ql.exec.UDAF; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFBridge; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.AggregationBuffer; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFParameterInfo; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2; +import org.apache.hadoop.hive.ql.udf.generic.SimpleGenericUDAFParameterInfo; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.pig.Algebraic; +import org.apache.pig.EvalFunc; +import org.apache.pig.ResourceSchema; +import org.apache.pig.ResourceSchema.ResourceFieldSchema; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema; +import org.apache.pig.impl.util.hive.HiveUtils; + +/** + * Use Hive UDAF or GenericUDAF. + * Example: + * define avg HiveUDAF('avg'); + * A = load 'mydata' as (name:chararray, num:double); + * B = group A by name; + * C = foreach B generate group, avg(A.num); + */ +public class HiveUDAF extends HiveUDFBase implements Algebraic { + + private boolean inited = false; + private String funcName; + private String params; + private GenericUDAFResolver udaf; + + static class SchemaAndEvaluatorInfo { + private TypeInfo inputTypeInfo; + private TypeInfo outputTypeInfo; + private TypeInfo intermediateOutputTypeInfo; + private ObjectInspector[] inputObjectInspectorAsArray; + private ObjectInspector[] intermediateInputObjectInspectorAsArray; + private StructObjectInspector inputObjectInspector; + private ObjectInspector intermediateInputObjectInspector; + private ObjectInspector intermediateOutputObjectInspector; + private ObjectInspector outputObjectInspector; + private GenericUDAFEvaluator evaluator; + + private static TypeInfo getInputTypeInfo(Schema inputSchema) throws IOException { + FieldSchema innerFieldSchema = inputSchema.getField(0).schema.getField(0); + ResourceFieldSchema rfs = new ResourceFieldSchema(innerFieldSchema); + + TypeInfo inputTypeInfo = HiveUtils.getTypeInfo(rfs); + return inputTypeInfo; + } + + private static ObjectInspector[] getInputObjectInspectorAsArray(TypeInfo inputTypeInfo, + ConstantObjectInspectInfo constantsInfo) throws IOException { + + StructObjectInspector inputObjectInspector = (StructObjectInspector)HiveUtils.createObjectInspector(inputTypeInfo); + + ObjectInspector[] arguments = new ObjectInspector[inputObjectInspector.getAllStructFieldRefs().size()]; + for (int i=0;i { + public Initial(String funcName) { + } + public Initial(String funcName, String params) { + } + @Override + public Tuple exec(Tuple input) throws IOException { + + DataBag bg = (DataBag) input.get(0); + Tuple tp = null; + if(bg.iterator().hasNext()) { + tp = bg.iterator().next(); + } + + return tp; + } + } + + static public class Intermediate extends EvalFunc { + + private boolean inited = false; + private String funcName; + ConstantObjectInspectInfo constantsInfo; + private SchemaAndEvaluatorInfo schemaAndEvaluatorInfo = new SchemaAndEvaluatorInfo(); + private static TupleFactory tf = TupleFactory.getInstance(); + + public Intermediate(String funcName) { + this.funcName = funcName; + } + public Intermediate(String funcName, String params) throws IOException { + this.funcName = funcName; + constantsInfo = ConstantObjectInspectInfo.parse(params); + } + @Override + public Tuple exec(Tuple input) throws IOException { + try { + if (!inited) { + schemaAndEvaluatorInfo.init(getInputSchema(), instantiateUDAF(funcName), Mode.PARTIAL1, constantsInfo); + inited = true; + } + DataBag b = (DataBag)input.get(0); + AggregationBuffer agg = schemaAndEvaluatorInfo.evaluator.getNewAggregationBuffer(); + for (Iterator it = b.iterator(); it.hasNext();) { + Tuple t = it.next(); + List inputs = schemaAndEvaluatorInfo.inputObjectInspector.getStructFieldsDataAsList(t); + schemaAndEvaluatorInfo.evaluator.iterate(agg, inputs.toArray()); + } + Object returnValue = schemaAndEvaluatorInfo.evaluator.terminatePartial(agg); + Tuple result = tf.newTuple(); + result.append(HiveUtils.convertHiveToPig(returnValue, schemaAndEvaluatorInfo.intermediateOutputObjectInspector, null)); + return result; + } catch (Exception e) { + throw new IOException(e); + } + } + } + + static public class Final extends EvalFunc { + + private boolean inited = false; + private String funcName; + ConstantObjectInspectInfo constantsInfo; + private SchemaAndEvaluatorInfo schemaAndEvaluatorInfo = new SchemaAndEvaluatorInfo(); + + public Final(String funcName) { + this.funcName = funcName; + } + public Final(String funcName, String params) throws IOException { + this.funcName = funcName; + constantsInfo = ConstantObjectInspectInfo.parse(params); + } + @Override + public Object exec(Tuple input) throws IOException { + try { + if (!inited) { + schemaAndEvaluatorInfo.init(getInputSchema(), instantiateUDAF(funcName), Mode.FINAL, constantsInfo); + schemaAndEvaluatorInfo.evaluator.configure(instantiateMapredContext()); + inited = true; + } + DataBag b = (DataBag)input.get(0); + AggregationBuffer agg = schemaAndEvaluatorInfo.evaluator.getNewAggregationBuffer(); + for (Iterator it = b.iterator(); it.hasNext();) { + Tuple t = it.next(); + schemaAndEvaluatorInfo.evaluator.merge(agg, t.get(0)); + } + + Object returnValue = schemaAndEvaluatorInfo.evaluator.terminate(agg); + Object result = HiveUtils.convertHiveToPig(returnValue, schemaAndEvaluatorInfo.outputObjectInspector, null); + return result; + } catch (Exception e) { + throw new IOException(e); + } + } + } + + @Override + public Object exec(Tuple input) throws IOException { + try { + if (!inited) { + schemaAndEvaluatorInfo.init(getInputSchema(), instantiateUDAF(funcName), Mode.COMPLETE, constantsInfo); + inited = true; + } + AggregationBuffer agg = schemaAndEvaluatorInfo.evaluator.getNewAggregationBuffer(); + DataBag bg = (DataBag) input.get(0); + Tuple tp = null; + for (Iterator it = bg.iterator(); it.hasNext();) { + tp = it.next(); + List inputs = schemaAndEvaluatorInfo.inputObjectInspector.getStructFieldsDataAsList(tp); + schemaAndEvaluatorInfo.evaluator.iterate(agg, inputs.toArray()); + } + Object returnValue = schemaAndEvaluatorInfo.evaluator.terminate(agg); + Object result = HiveUtils.convertHiveToPig(returnValue, schemaAndEvaluatorInfo.outputObjectInspector, null); + return result; + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + public Schema outputSchema(Schema input) { + try { + if (!inited) { + schemaAndEvaluatorInfo.init(getInputSchema(), instantiateUDAF(funcName), Mode.COMPLETE, constantsInfo); + inited = true; + } + + ResourceFieldSchema rfs = HiveUtils.getResourceFieldSchema(schemaAndEvaluatorInfo.outputTypeInfo); + ResourceSchema outputSchema = new ResourceSchema(); + outputSchema.setFields(new ResourceFieldSchema[] {rfs}); + return Schema.getPigSchema(outputSchema); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/src/org/apache/pig/builtin/HiveUDF.java b/src/org/apache/pig/builtin/HiveUDF.java new file mode 100644 index 0000000000..e215fc4888 --- /dev/null +++ b/src/org/apache/pig/builtin/HiveUDF.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.builtin; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.hive.ql.exec.UDF; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredJavaObject; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredObject; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.pig.ResourceSchema; +import org.apache.pig.ResourceSchema.ResourceFieldSchema; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.impl.util.hive.HiveUtils; + +/** + * Use Hive UDF or GenericUDF. + * Example: + * define sin HiveUDF('sin'); + * A = load 'mydata' as (num:double); + * B = foreach A generate sin(num); + * HiveUDF takes an optional second parameter if the Hive UDF require constant parameters + * define in_file HiveUDF('in_file', '(null, "names.txt")'); + */ +public class HiveUDF extends HiveUDFBase { + + private boolean inited = false; + private GenericUDF evalUDF; + + static class SchemaInfo { + private StructObjectInspector inputObjectInspector; + private ObjectInspector outputObjectInspector; + + private void init(Schema inputSchema, GenericUDF evalUDF, ConstantObjectInspectInfo constantsInfo) throws IOException { + ResourceSchema rs = new ResourceSchema(inputSchema); + ResourceFieldSchema wrappedTupleFieldSchema = new ResourceFieldSchema(); + wrappedTupleFieldSchema.setType(DataType.TUPLE); + wrappedTupleFieldSchema.setSchema(rs); + + TypeInfo ti = HiveUtils.getTypeInfo(wrappedTupleFieldSchema); + inputObjectInspector = (StructObjectInspector)HiveUtils.createObjectInspector(ti); + + try { + ObjectInspector[] arguments = new ObjectInspector[inputSchema.size()]; + for (int i=0;i getShipFiles() { + try { + if (!inited) { + schemaInfo.init(getInputSchema(), evalUDF, constantsInfo); + inited = true; + } + } catch(Exception e) { + throw new RuntimeException(e); + } + List files = super.getShipFiles(); + if (evalUDF.getRequiredFiles() != null) { + files.addAll(Arrays.asList(evalUDF.getRequiredFiles())); + } + if (evalUDF.getRequiredJars() != null) { + files.addAll(Arrays.asList(evalUDF.getRequiredJars())); + } + + return files; + } + + @Override + public Schema outputSchema(Schema input) { + try { + if (!inited) { + schemaInfo.init(getInputSchema(), evalUDF, constantsInfo); + inited = true; + } + ResourceFieldSchema rfs = HiveUtils.getResourceFieldSchema( + TypeInfoUtils.getTypeInfoFromObjectInspector(schemaInfo.outputObjectInspector)); + ResourceSchema outputSchema = new ResourceSchema(); + outputSchema.setFields(new ResourceFieldSchema[] {rfs}); + return Schema.getPigSchema(outputSchema); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void finish() { + try { + evalUDF.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/src/org/apache/pig/builtin/HiveUDFBase.java b/src/org/apache/pig/builtin/HiveUDFBase.java new file mode 100644 index 0000000000..60389dcd8f --- /dev/null +++ b/src/org/apache/pig/builtin/HiveUDFBase.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.builtin; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.exec.FunctionInfo; +import org.apache.hadoop.hive.ql.exec.FunctionRegistry; +import org.apache.hadoop.hive.ql.exec.MapredContext; +import org.apache.hadoop.hive.ql.exec.UDAF; +import org.apache.hadoop.hive.ql.exec.UDF; +import org.apache.hadoop.hive.ql.udf.generic.Collector; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; +import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.shims.HadoopShimsSecure; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapred.Counters.Counter; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.pig.EvalFunc; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.util.UDFContext; +import org.apache.pig.impl.util.Utils; +import org.apache.pig.impl.util.hive.HiveUtils; +import org.apache.pig.tools.pigstats.PigStatusReporter; + +import com.esotericsoftware.kryo.Serializer; + +abstract class HiveUDFBase extends EvalFunc { + + static protected class ConstantObjectInspectInfo { + ConstantObjectInspector[] constants; + static ConstantObjectInspectInfo parse(String params) throws IOException { + ConstantObjectInspectInfo info = new ConstantObjectInspectInfo(); + params = params.replaceAll("\"", "'"); + Object constant = Utils.parseConstant(params); + if (DataType.findType(constant) == DataType.TUPLE) { + Tuple t = (Tuple)constant; + info.constants = new ConstantObjectInspector[t.size()]; + for (int i=0;i)inputObjectInspector.getAllStructFieldRefs()).set(i, (HiveUtils.Field)newfield); + } + } + } + } + } + + static protected Class resolveFunc(String funcName) throws IOException { + String className = funcName; + Class udfClass; + if (FunctionRegistry.getFunctionNames().contains(funcName)) { + FunctionInfo func = FunctionRegistry.getFunctionInfo(funcName); + udfClass = func.getFunctionClass(); + } else { + udfClass = PigContext.resolveClassName(className); + if (udfClass == null) { + throw new IOException("Cannot find Hive UDF " + funcName); + } + } + return udfClass; + } + + /** + * A constant of Reporter type that does nothing. + */ + static protected class HiveReporter implements Reporter { + PigStatusReporter rep; + HiveReporter(PigStatusReporter rep) { + this.rep = rep; + } + public void setStatus(String s) { + rep.setStatus(s); + } + public void progress() { + rep.progress(); + } + public Counter getCounter(Enum name) { + try { + Counters counters = new Counters(); + counters.incrCounter(name, rep.getCounter(name).getValue()); + return counters.findCounter(name); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + public Counter getCounter(String group, String name) { + try { + Counters counters = new Counters(); + counters.incrCounter(group, name, rep.getCounter(group, name).getValue()); + return counters.findCounter(group, name); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + public void incrCounter(Enum key, long amount) { + rep.incrCounter(key, amount); + } + public void incrCounter(String group, String counter, long amount) { + rep.incrCounter(group, counter, amount); + } + public InputSplit getInputSplit() throws UnsupportedOperationException { + throw new UnsupportedOperationException("NULL reporter has no input"); + } + public float getProgress() { + return 0; + } + }; + + protected static MapredContext instantiateMapredContext() { + Configuration conf = UDFContext.getUDFContext().getJobConf(); + boolean isMap = conf.getBoolean(MRConfiguration.TASK_IS_MAP, false); + if (conf.get("exectype").startsWith("TEZ")) { + isMap = true; + HiveConf.setVar(conf, ConfVars.HIVE_EXECUTION_ENGINE, "tez"); + } + MapredContext context = MapredContext.init(isMap, new JobConf(UDFContext.getUDFContext().getJobConf())); + context.setReporter(new HiveReporter(PigStatusReporter.getInstance())); + return context; + } + + @Override + public List getShipFiles() { + String hadoopVersion = "20S"; + if (Utils.isHadoop23() || Utils.isHadoop2()) { + hadoopVersion = "23"; + } + Class hadoopVersionShimsClass; + try { + hadoopVersionShimsClass = Class.forName("org.apache.hadoop.hive.shims.Hadoop" + + hadoopVersion + "Shims"); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Cannot find Hadoop" + hadoopVersion + "ShimsClass in classpath"); + } + List files = FuncUtils.getShipFiles(new Class[] {GenericUDF.class, + PrimitiveObjectInspector.class, HiveConf.class, Serializer.class, ShimLoader.class, + hadoopVersionShimsClass, HadoopShimsSecure.class, Collector.class}); + return files; + } + + static protected String getErrorMessage(Class c) { + StringBuffer message = new StringBuffer("Please declare " + c.getName() + " as "); + if (UDF.class.isAssignableFrom(c) || GenericUDF.class.isAssignableFrom(c)) { + message.append(HiveUDF.class.getName()); + } else if (GenericUDTF.class.isAssignableFrom(c)) { + message.append(HiveUDTF.class.getName()); + } else if (UDAF.class.isAssignableFrom(c) || GenericUDAFResolver.class.isAssignableFrom(c)) { + message.append(HiveUDAF.class.getName()); + } else { + message = new StringBuffer(c.getName() + " is not Hive UDF"); + } + return message.toString(); + } +} diff --git a/src/org/apache/pig/builtin/HiveUDTF.java b/src/org/apache/pig/builtin/HiveUDTF.java new file mode 100644 index 0000000000..6e35df9880 --- /dev/null +++ b/src/org/apache/pig/builtin/HiveUDTF.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.builtin; + +import java.io.IOException; + +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.udf.generic.Collector; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.pig.ResourceSchema; +import org.apache.pig.ResourceSchema.ResourceFieldSchema; +import org.apache.pig.data.BagFactory; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.logicalLayer.schema.Schema; +import org.apache.pig.impl.util.hive.HiveUtils; + +/** + * Use Hive GenericUDTF. + * Example: + * define explode HiveUDTF('explode'); + * A = load 'mydata' as (a0:{(b0:chararray)}); + * B = foreach A generate flatten(explode(a0)); + */ +public class HiveUDTF extends HiveUDFBase { + + private boolean inited = false; + private GenericUDTF udtf; + private boolean endOfAllInput = false; + + static class SchemaInfo { + StructObjectInspector inputObjectInspector; + ObjectInspector outputObjectInspector; + private void init(Schema inputSchema, GenericUDTF udtf, ConstantObjectInspectInfo constantsInfo) throws IOException { + ResourceSchema rs = new ResourceSchema(inputSchema); + ResourceFieldSchema wrappedTupleFieldSchema = new ResourceFieldSchema(); + wrappedTupleFieldSchema.setType(DataType.TUPLE); + wrappedTupleFieldSchema.setSchema(rs); + + TypeInfo ti = HiveUtils.getTypeInfo(wrappedTupleFieldSchema); + inputObjectInspector = (StructObjectInspector)HiveUtils.createObjectInspector(ti); + if (constantsInfo!=null) { + constantsInfo.injectConstantObjectInspector(inputObjectInspector); + } + + try { + outputObjectInspector = udtf.initialize(inputObjectInspector); + } catch (Exception e) { + throw new IOException(e); + } + } + } + + SchemaInfo schemaInfo = new SchemaInfo(); + + ConstantObjectInspectInfo constantsInfo; + + private static BagFactory bf = BagFactory.getInstance(); + private HiveUDTFCollector collector = null; + + public HiveUDTF(String funcName) throws InstantiationException, IllegalAccessException, IOException { + Class hiveUDTFClass = resolveFunc(funcName); + if (GenericUDTF.class.isAssignableFrom(hiveUDTFClass)) { + udtf = (GenericUDTF)hiveUDTFClass.newInstance(); + } else { + throw new IOException(getErrorMessage(hiveUDTFClass)); + } + } + + public HiveUDTF(String funcName, String params) throws InstantiationException, IllegalAccessException, IOException { + this(funcName); + constantsInfo = ConstantObjectInspectInfo.parse(params); + } + @Override + public Object exec(Tuple input) throws IOException { + if (!inited) { + udtf.configure(instantiateMapredContext()); + schemaInfo.init(getInputSchema(), udtf, constantsInfo); + inited = true; + } + + if (collector == null) { + collector = new HiveUDTFCollector(); + udtf.setCollector(collector); + } else { + collector.init(); + } + + try { + if (!endOfAllInput) { + udtf.process(input.getAll().toArray()); + } else { + udtf.close(); + } + } catch (Exception e) { + throw new IOException(e); + } + return collector.getBag(); + } + + @Override + public Schema outputSchema(Schema input) { + try { + if (!inited) { + schemaInfo.init(getInputSchema(), udtf, constantsInfo); + inited = true; + } + ResourceFieldSchema rfs = HiveUtils.getResourceFieldSchema( + TypeInfoUtils.getTypeInfoFromObjectInspector(schemaInfo.outputObjectInspector)); + + ResourceSchema tupleSchema = new ResourceSchema(); + tupleSchema.setFields(new ResourceFieldSchema[] {rfs}); + + ResourceFieldSchema bagFieldSchema = new ResourceFieldSchema(); + bagFieldSchema.setType(DataType.BAG); + bagFieldSchema.setSchema(tupleSchema); + + ResourceSchema bagSchema = new ResourceSchema(); + bagSchema.setFields(new ResourceFieldSchema[] {bagFieldSchema}); + return Schema.getPigSchema(bagSchema); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + class HiveUDTFCollector implements Collector { + DataBag bag = bf.newDefaultBag(); + + public void init() { + bag.clear(); + } + + @Override + public void collect(Object input) throws HiveException { + try { + Tuple outputTuple = (Tuple)HiveUtils.convertHiveToPig(input, schemaInfo.outputObjectInspector, null); + if (outputTuple.size()==1 && outputTuple.get(0) instanceof Tuple) { + bag.add((Tuple)outputTuple.get(0)); + } else { + bag.add(outputTuple); + } + } catch(Exception e) { + throw new HiveException(e); + } + } + + public DataBag getBag() { + return bag; + } + } + + @Override + public boolean needEndOfAllInputProcessing() { + return true; + } + + @Override + public void setEndOfAllInput(boolean endOfAllInput) { + this.endOfAllInput = endOfAllInput; + } +} diff --git a/src/org/apache/pig/builtin/OrcStorage.java b/src/org/apache/pig/builtin/OrcStorage.java index 5bd9c74849..d3ac553e52 100644 --- a/src/org/apache/pig/builtin/OrcStorage.java +++ b/src/org/apache/pig/builtin/OrcStorage.java @@ -91,7 +91,7 @@ import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.impl.util.UDFContext; import org.apache.pig.impl.util.Utils; -import org.apache.pig.impl.util.orc.OrcUtils; +import org.apache.pig.impl.util.hive.HiveUtils; import org.joda.time.DateTime; import com.esotericsoftware.kryo.io.Input; @@ -235,7 +235,7 @@ public void setStoreLocation(String location, Job job) throws IOException { typeInfo = (TypeInfo)ObjectSerializer.deserialize(p.getProperty(signature + SchemaSignatureSuffix)); } if (oi==null) { - oi = OrcUtils.createObjectInspector(typeInfo); + oi = HiveUtils.createObjectInspector(typeInfo); } } @@ -244,7 +244,7 @@ public void checkSchema(ResourceSchema rs) throws IOException { ResourceFieldSchema fs = new ResourceFieldSchema(); fs.setType(DataType.TUPLE); fs.setSchema(rs); - typeInfo = OrcUtils.getTypeInfo(fs); + typeInfo = HiveUtils.getTypeInfo(fs); Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass()); p.setProperty(signature + SchemaSignatureSuffix, ObjectSerializer.serialize(typeInfo)); } @@ -376,7 +376,7 @@ public Tuple getNext() throws IOException { } Object value = in.getCurrentValue(); - Tuple t = (Tuple)OrcUtils.convertOrcToPig(value, oi, mRequiredColumns); + Tuple t = (Tuple)HiveUtils.convertHiveToPig(value, oi, mRequiredColumns); return t; } catch (InterruptedException e) { int errCode = 6018; @@ -438,7 +438,7 @@ public ResourceSchema getSchema(String location, Job job) } } - ResourceFieldSchema fs = OrcUtils.getResourceFieldSchema(typeInfo); + ResourceFieldSchema fs = HiveUtils.getResourceFieldSchema(typeInfo); return fs.getSchema(); } diff --git a/src/org/apache/pig/data/UnlimitedNullTuple.java b/src/org/apache/pig/data/UnlimitedNullTuple.java new file mode 100644 index 0000000000..3dd038e3a4 --- /dev/null +++ b/src/org/apache/pig/data/UnlimitedNullTuple.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.data; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; + +import org.apache.pig.backend.executionengine.ExecException; + +public class UnlimitedNullTuple extends AbstractTuple { + + @Override + public int size() { + throw new RuntimeException("Unimplemented"); + } + + @Override + public Object get(int fieldNum) throws ExecException { + return null; + } + + @Override + public List getAll() { + throw new RuntimeException("Unimplemented"); + } + + @Override + public void set(int fieldNum, Object val) throws ExecException { + throw new ExecException("Unimplemented"); + } + + @Override + public void append(Object val) { + throw new RuntimeException("Unimplemented"); + } + + @Override + public long getMemorySize() { + throw new RuntimeException("Unimplemented"); + } + + @Override + public void readFields(DataInput arg0) throws IOException { + throw new IOException("Unimplemented"); + } + + @Override + public void write(DataOutput arg0) throws IOException { + throw new IOException("Unimplemented"); + } + + @Override + public int compareTo(Object o) { + throw new RuntimeException("Unimplemented"); + } + +} diff --git a/src/org/apache/pig/impl/util/Utils.java b/src/org/apache/pig/impl/util/Utils.java index bc17eadc24..89cfcf7fc9 100644 --- a/src/org/apache/pig/impl/util/Utils.java +++ b/src/org/apache/pig/impl/util/Utils.java @@ -255,6 +255,13 @@ public static LogicalSchema parseSchema(String schemaString) throws ParserExcept return schema; } + public static Object parseConstant(String constantString) throws ParserException { + QueryParserDriver queryParser = new QueryParserDriver( new PigContext(), + "util", new HashMap() ) ; + Object constant = queryParser.parseConstant(constantString); + return constant; + } + /** * This method adds FieldSchema of 'input source tag/path' as the first * field. This will be called only when PigStorage is invoked with diff --git a/src/org/apache/pig/impl/util/hive/HiveUtils.java b/src/org/apache/pig/impl/util/hive/HiveUtils.java new file mode 100644 index 0000000000..f80effdea7 --- /dev/null +++ b/src/org/apache/pig/impl/util/hive/HiveUtils.java @@ -0,0 +1,780 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.impl.util.hive; + +import java.io.IOException; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hive.common.type.HiveChar; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.common.type.HiveVarchar; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.hive.serde2.io.TimestampWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitiveJavaObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantBooleanObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantDoubleObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantFloatObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantIntObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantLongObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaConstantStringObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantFloatObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.pig.PigWarning; +import org.apache.pig.ResourceSchema; +import org.apache.pig.ResourceSchema.ResourceFieldSchema; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.data.BagFactory; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.DataByteArray; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.pig.tools.pigstats.PigStatusReporter; +import org.joda.time.DateTime; + +public class HiveUtils { + + static TupleFactory tf = TupleFactory.getInstance(); + + public static Object convertHiveToPig(Object obj, ObjectInspector oi, boolean[] includedColumns) { + Object result = null; + if (obj == null) { + return result; + } + switch (oi.getCategory()) { + case PRIMITIVE: + PrimitiveObjectInspector poi = (PrimitiveObjectInspector)oi; + result = getPrimaryFromHive(obj, poi); + break; + case STRUCT: + StructObjectInspector soi = (StructObjectInspector)oi; + List elementFields = (List) soi.getAllStructFieldRefs(); + List items = soi.getStructFieldsDataAsList(obj); + Tuple t = tf.newTuple(); + for (int i=0;i m = (Map)obj; + result = new HashMap(); + for (Map.Entry entry : m.entrySet()) { + Object convertedKey = convertHiveToPig(entry.getKey(), keyObjectInspector, null); + Object convertedValue = convertHiveToPig(entry.getValue(), valueObjectInspector, null); + if (convertedKey!=null) { + ((Map)result).put(convertedKey.toString(), convertedValue); + } else { + PigStatusReporter reporter = PigStatusReporter.getInstance(); + if (reporter != null) { + reporter.incrCounter(PigWarning.UDF_WARNING_1, 1); + } + } + } + break; + case LIST: + ListObjectInspector loi = (ListObjectInspector)oi; + result = BagFactory.getInstance().newDefaultBag(); + ObjectInspector itemObjectInspector = loi.getListElementObjectInspector(); + for (Object item : loi.getList(obj)) { + Object convertedItem = convertHiveToPig(item, itemObjectInspector, null); + Tuple innerTuple; + // Hive array contains a single item of any type, if it is not tuple, + // need to wrap it in tuple + if (convertedItem instanceof Tuple) { + innerTuple = (Tuple)convertedItem; + } else { + innerTuple = tf.newTuple(1); + try { + innerTuple.set(0, convertedItem); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + ((DataBag)result).add(innerTuple); + } + break; + default: + throw new IllegalArgumentException("Unknown type " + + oi.getCategory()); + } + return result; + } + + public static Object getPrimaryFromHive(Object obj, PrimitiveObjectInspector poi) { + Object result = null; + if (obj == null) { + return result; + } + switch (poi.getPrimitiveCategory()) { + case FLOAT: + case DOUBLE: + case BOOLEAN: + case INT: + case LONG: + case STRING: + result = poi.getPrimitiveJavaObject(obj); + break; + case CHAR: + result = ((HiveChar)poi.getPrimitiveJavaObject(obj)).getValue(); + break; + case VARCHAR: + result = ((HiveVarchar)poi.getPrimitiveJavaObject(obj)).getValue(); + break; + case BYTE: + result = (int)(Byte)poi.getPrimitiveJavaObject(obj); + break; + case SHORT: + result = (int)(Short)poi.getPrimitiveJavaObject(obj); + break; + case BINARY: + byte[] b = (byte[])poi.getPrimitiveJavaObject(obj); + // Make a copy + result = new DataByteArray(b, 0, b.length); + break; + case TIMESTAMP: + java.sql.Timestamp origTimeStamp = (java.sql.Timestamp)poi.getPrimitiveJavaObject(obj); + result = new DateTime(origTimeStamp.getTime()); + break; + case DATE: + java.sql.Date origDate = (java.sql.Date)poi.getPrimitiveJavaObject(obj); + result = new DateTime(origDate.getTime()); + break; + case DECIMAL: + org.apache.hadoop.hive.common.type.HiveDecimal origDecimal = + (org.apache.hadoop.hive.common.type.HiveDecimal)poi.getPrimitiveJavaObject(obj); + result = origDecimal.bigDecimalValue(); + break; + default: + throw new IllegalArgumentException("Unknown primitive type " + + (poi).getPrimitiveCategory()); + } + return result; + } + + public static ResourceFieldSchema getResourceFieldSchema(TypeInfo ti) throws IOException { + ResourceFieldSchema fieldSchema = new ResourceFieldSchema(); + ResourceFieldSchema[] innerFs; + ResourceSchema innerSchema; + switch (ti.getCategory()) { + case STRUCT: + StructTypeInfo sti = (StructTypeInfo)ti; + fieldSchema.setType(DataType.TUPLE); + List typeInfos = sti.getAllStructFieldTypeInfos(); + List names = sti.getAllStructFieldNames(); + innerFs = new ResourceFieldSchema[typeInfos.size()]; + for (int i=0;i names = new ArrayList(); + ArrayList typeInfos = new ArrayList(); + for (ResourceFieldSchema subFs : fs.getSchema().getFields()) { + TypeInfo info = getTypeInfo(subFs); + names.add(subFs.getName()); + typeInfos.add(info); + } + ((StructTypeInfo)ti).setAllStructFieldNames(names); + ((StructTypeInfo)ti).setAllStructFieldTypeInfos(typeInfos); + break; + case DataType.BAG: + ti = new ListTypeInfo(); + if (fs.getSchema()==null || fs.getSchema().getFields().length!=1) { + throw new IOException("Wrong bag inner schema"); + } + ResourceFieldSchema tupleSchema = fs.getSchema().getFields()[0]; + ResourceFieldSchema itemSchema = tupleSchema; + // If single item tuple, remove the tuple, put the inner item into list directly + if (tupleSchema.getSchema().getFields().length == 1) { + itemSchema = tupleSchema.getSchema().getFields()[0]; + } + TypeInfo elementField = getTypeInfo(itemSchema); + ((ListTypeInfo)ti).setListElementTypeInfo(elementField); + break; + case DataType.MAP: + ti = new MapTypeInfo(); + TypeInfo valueField; + if (fs.getSchema() == null || fs.getSchema().getFields().length != 1) { + valueField = TypeInfoFactory.binaryTypeInfo; + } else { + valueField = getTypeInfo(fs.getSchema().getFields()[0]); + } + ((MapTypeInfo)ti).setMapKeyTypeInfo(TypeInfoFactory.stringTypeInfo); + ((MapTypeInfo)ti).setMapValueTypeInfo(valueField); + break; + case DataType.BOOLEAN: + ti = TypeInfoFactory.booleanTypeInfo; + break; + case DataType.INTEGER: + ti = TypeInfoFactory.intTypeInfo; + break; + case DataType.LONG: + ti = TypeInfoFactory.longTypeInfo; + break; + case DataType.FLOAT: + ti = TypeInfoFactory.floatTypeInfo; + break; + case DataType.DOUBLE: + ti = TypeInfoFactory.doubleTypeInfo; + break; + case DataType.CHARARRAY: + ti = TypeInfoFactory.stringTypeInfo; + break; + case DataType.DATETIME: + ti = TypeInfoFactory.timestampTypeInfo; + break; + case DataType.BIGDECIMAL: + ti = TypeInfoFactory.decimalTypeInfo; + break; + case DataType.BIGINTEGER: + ti = TypeInfoFactory.decimalTypeInfo; + break; + case DataType.BYTEARRAY: + ti = TypeInfoFactory.binaryTypeInfo; + break; + default: + throw new IllegalArgumentException("Unknown data type " + + DataType.findTypeName(fs.getType())); + } + return ti; + } + + static public class Field implements StructField { + private final String name; + private final ObjectInspector inspector; + private final int offset; + + public Field(String name, ObjectInspector inspector, int offset) { + this.name = name; + this.inspector = inspector; + this.offset = offset; + } + + @Override + public String getFieldName() { + return name; + } + + @Override + public ObjectInspector getFieldObjectInspector() { + return inspector; + } + + @Override + public int getFieldID() { + return offset; + } + + @Override + public String getFieldComment() { + return null; + } + } + + static class PigStructInspector extends StructObjectInspector { + private List fields; + + PigStructInspector(StructTypeInfo info) { + ArrayList fieldNames = info.getAllStructFieldNames(); + ArrayList fieldTypes = info.getAllStructFieldTypeInfos(); + fields = new ArrayList(fieldNames.size()); + for (int i = 0; i < fieldNames.size(); ++i) { + fields.add(new Field(fieldNames.get(i), + createObjectInspector(fieldTypes.get(i)), i)); + } + } + + PigStructInspector(List fields) { + this.fields = fields; + } + + @Override + public List getAllStructFieldRefs() { + return fields; + } + + @Override + public StructField getStructFieldRef(String s) { + for (StructField field : fields) { + if (field.getFieldName().equals(s)) { + return field; + } + } + return null; + } + + @Override + public Object getStructFieldData(Object object, StructField field) { + Object result = null; + try { + result = ((Tuple) object).get(((Field) field).offset); + } catch (ExecException e) { + throw new RuntimeException(e); + } + return result; + } + + @Override + public List getStructFieldsDataAsList(Object object) { + return ((Tuple) object).getAll(); + } + + @Override + public String getTypeName() { + StringBuilder buffer = new StringBuilder(); + buffer.append("struct<"); + for (int i = 0; i < fields.size(); ++i) { + StructField field = fields.get(i); + if (i != 0) { + buffer.append(","); + } + buffer.append(field.getFieldName()); + buffer.append(":"); + buffer.append(field.getFieldObjectInspector().getTypeName()); + } + buffer.append(">"); + return buffer.toString(); + } + + @Override + public Category getCategory() { + return Category.STRUCT; + } + + @Override + public boolean equals(Object o) { + if (o == null || o.getClass() != getClass()) { + return false; + } else if (o == this) { + return true; + } else { + List other = ((PigStructInspector) o).fields; + if (other.size() != fields.size()) { + return false; + } + for (int i = 0; i < fields.size(); ++i) { + StructField left = other.get(i); + StructField right = fields.get(i); + if (!(left.getFieldName().equals(right.getFieldName()) && left + .getFieldObjectInspector().equals( + right.getFieldObjectInspector()))) { + return false; + } + } + return true; + } + } + } + + static class PigMapObjectInspector implements MapObjectInspector { + private ObjectInspector key; + private ObjectInspector value; + + PigMapObjectInspector(MapTypeInfo info) { + key = PrimitiveObjectInspectorFactory.javaStringObjectInspector; + value = createObjectInspector(info.getMapValueTypeInfo()); + } + + @Override + public ObjectInspector getMapKeyObjectInspector() { + return key; + } + + @Override + public ObjectInspector getMapValueObjectInspector() { + return value; + } + + @Override + public Object getMapValueElement(Object map, Object key) { + return ((Map) map).get(key); + } + + @Override + public Map getMap(Object map) { + return (Map) map; + } + + @Override + public int getMapSize(Object map) { + return ((Map) map).size(); + } + + @Override + public String getTypeName() { + return "map<" + key.getTypeName() + "," + value.getTypeName() + ">"; + } + + @Override + public Category getCategory() { + return Category.MAP; + } + + @Override + public boolean equals(Object o) { + if (o == null || o.getClass() != getClass()) { + return false; + } else if (o == this) { + return true; + } else { + PigMapObjectInspector other = (PigMapObjectInspector) o; + return other.key.equals(key) && other.value.equals(value); + } + } + } + + static class PigListObjectInspector implements ListObjectInspector { + private ObjectInspector child; + private Object cachedObject; + private int index; + private Iterator iter; + + PigListObjectInspector(ListTypeInfo info) { + child = createObjectInspector(info.getListElementTypeInfo()); + } + + @Override + public ObjectInspector getListElementObjectInspector() { + return child; + } + + @Override + public Object getListElement(Object list, int i) { + if (i==0 || list!=cachedObject) { + cachedObject = list; + index = -1; + DataBag db = (DataBag)list; + iter = db.iterator(); + } + if (i==index+1) { + index++; + try { + Tuple t = iter.next(); + // If single item tuple, take the item directly from list + if (t.size() == 1) { + return t.get(0); + } else { + return t; + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } else { + throw new RuntimeException("Only sequential read is supported"); + } + } + + @Override + public int getListLength(Object list) { + return (int)((DataBag)list).size(); + } + + @Override + @SuppressWarnings("unchecked") + public List getList(Object list) { + List result = new ArrayList(); + DataBag bag = (DataBag)list; + for (Tuple t : bag) { + if (t.size() == 1) { + try { + result.add(t.get(0)); + } catch (ExecException e) { + throw new RuntimeException(e); + } + } else { + result.add(t); + } + } + return result; + } + + @Override + public String getTypeName() { + return "array<" + child.getTypeName() + ">"; + } + + @Override + public Category getCategory() { + return Category.LIST; + } + + @Override + public boolean equals(Object o) { + if (o == null || o.getClass() != getClass()) { + return false; + } else if (o == this) { + return true; + } else { + ObjectInspector other = ((PigListObjectInspector) o).child; + return other.equals(child); + } + } + } + + static class PigDataByteArrayObjectInspector extends AbstractPrimitiveJavaObjectInspector + implements BinaryObjectInspector { + + PigDataByteArrayObjectInspector() { + super(TypeInfoFactory.binaryTypeInfo); + } + + @Override + public BytesWritable getPrimitiveWritableObject(Object o) { + return o == null ? null : (o instanceof DataByteArray + ? new BytesWritable(((DataByteArray) o).get()) + : new BytesWritable((byte[]) o)); + } + + @Override + public byte[] getPrimitiveJavaObject(Object o) { + return ((DataByteArray) o).get(); + } + + } + + static class PigJodaTimeStampObjectInspector extends + AbstractPrimitiveJavaObjectInspector implements TimestampObjectInspector { + + protected PigJodaTimeStampObjectInspector() { + super(TypeInfoFactory.timestampTypeInfo); + } + + @Override + public TimestampWritable getPrimitiveWritableObject(Object o) { + return o == null ? null : new TimestampWritable(new Timestamp(((DateTime)o).getMillis())); + } + + @Override + public Timestamp getPrimitiveJavaObject(Object o) { + return o == null ? null : new Timestamp(((DateTime)o).getMillis()); + } + } + + static class PigDecimalObjectInspector extends + AbstractPrimitiveJavaObjectInspector implements HiveDecimalObjectInspector { + + protected PigDecimalObjectInspector() { + super(TypeInfoFactory.decimalTypeInfo); + } + + @Override + public HiveDecimalWritable getPrimitiveWritableObject(Object o) { + if (o instanceof BigDecimal) { + return o == null ? null : new HiveDecimalWritable(HiveDecimal.create((BigDecimal)o)); + } else { // BigInteger + return o == null ? null : new HiveDecimalWritable(HiveDecimal.create((BigInteger)o)); + } + } + + @Override + public HiveDecimal getPrimitiveJavaObject(Object o) { + if (o instanceof BigDecimal) { + return o == null ? null : HiveDecimal.create((BigDecimal)o); + } else { // BigInteger + return o == null ? null : HiveDecimal.create((BigInteger)o); + } + } + } + + public static ObjectInspector createObjectInspector(TypeInfo info) { + switch (info.getCategory()) { + case PRIMITIVE: + switch (((PrimitiveTypeInfo) info).getPrimitiveCategory()) { + case FLOAT: + return PrimitiveObjectInspectorFactory.javaFloatObjectInspector; + case DOUBLE: + return PrimitiveObjectInspectorFactory.javaDoubleObjectInspector; + case BOOLEAN: + return PrimitiveObjectInspectorFactory.javaBooleanObjectInspector; + case INT: + return PrimitiveObjectInspectorFactory.javaIntObjectInspector; + case LONG: + return PrimitiveObjectInspectorFactory.javaLongObjectInspector; + case STRING: + return PrimitiveObjectInspectorFactory.javaStringObjectInspector; + case TIMESTAMP: + return new PigJodaTimeStampObjectInspector(); + case DECIMAL: + return new PigDecimalObjectInspector(); + case BINARY: + return new PigDataByteArrayObjectInspector(); + case DATE: + case VARCHAR: + case BYTE: + case SHORT: + throw new IllegalArgumentException("Should never happen, " + + (((PrimitiveTypeInfo) info).getPrimitiveCategory()) + + "is not valid Pig primitive data type"); + default: + throw new IllegalArgumentException("Unknown primitive type " + + ((PrimitiveTypeInfo) info).getPrimitiveCategory()); + } + case STRUCT: + return new PigStructInspector((StructTypeInfo) info); + case MAP: + return new PigMapObjectInspector((MapTypeInfo) info); + case LIST: + return new PigListObjectInspector((ListTypeInfo) info); + default: + throw new IllegalArgumentException("Unknown type " + + info.getCategory()); + } + } + + public static ConstantObjectInspector getConstantObjectInspector(Object obj) { + switch (DataType.findType(obj)) { + case DataType.FLOAT: + return new JavaConstantFloatObjectInspector((Float)obj); + case DataType.DOUBLE: + return new JavaConstantDoubleObjectInspector((Double)obj); + case DataType.BOOLEAN: + return new JavaConstantBooleanObjectInspector((Boolean)obj); + case DataType.INTEGER: + return new JavaConstantIntObjectInspector((Integer)obj); + case DataType.LONG: + return new JavaConstantLongObjectInspector((Long)obj); + case DataType.CHARARRAY: + return new JavaConstantStringObjectInspector((String)obj); + default: + throw new IllegalArgumentException("Not implemented " + obj.getClass().getName()); + } + } +} diff --git a/src/org/apache/pig/impl/util/orc/OrcUtils.java b/src/org/apache/pig/impl/util/orc/OrcUtils.java index 93c3bbe14a..e69de29bb2 100644 --- a/src/org/apache/pig/impl/util/orc/OrcUtils.java +++ b/src/org/apache/pig/impl/util/orc/OrcUtils.java @@ -1,697 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.pig.impl.util.orc; - -import java.io.IOException; -import java.math.BigDecimal; -import java.math.BigInteger; -import java.sql.Timestamp; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -import org.apache.hadoop.hive.common.type.HiveChar; -import org.apache.hadoop.hive.common.type.HiveDecimal; -import org.apache.hadoop.hive.common.type.HiveVarchar; -import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; -import org.apache.hadoop.hive.serde2.io.TimestampWritable; -import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.AbstractPrimitiveJavaObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector; -import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; -import org.apache.hadoop.io.BytesWritable; -import org.apache.pig.PigWarning; -import org.apache.pig.ResourceSchema; -import org.apache.pig.ResourceSchema.ResourceFieldSchema; -import org.apache.pig.backend.executionengine.ExecException; -import org.apache.pig.data.BagFactory; -import org.apache.pig.data.DataBag; -import org.apache.pig.data.DataByteArray; -import org.apache.pig.data.DataType; -import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; -import org.apache.pig.tools.pigstats.PigStatusReporter; -import org.joda.time.DateTime; - -public class OrcUtils { - public static Object convertOrcToPig(Object obj, ObjectInspector oi, boolean[] includedColumns) { - Object result = null; - if (obj == null) { - return result; - } - switch (oi.getCategory()) { - case PRIMITIVE: - PrimitiveObjectInspector poi = (PrimitiveObjectInspector)oi; - result = getPrimaryFromOrc(obj, poi); - break; - case STRUCT: - StructObjectInspector soi = (StructObjectInspector)oi; - List elementFields = (List) soi.getAllStructFieldRefs(); - List items = soi.getStructFieldsDataAsList(obj); - Tuple t = TupleFactory.getInstance().newTuple(); - for (int i=0;i m = (Map)obj; - result = new HashMap(); - for (Map.Entry entry : m.entrySet()) { - Object convertedKey = convertOrcToPig(entry.getKey(), keyObjectInspector, null); - Object convertedValue = convertOrcToPig(entry.getValue(), valueObjectInspector, null); - if (convertedKey!=null) { - ((Map)result).put(convertedKey.toString(), convertedValue); - } else { - PigStatusReporter reporter = PigStatusReporter.getInstance(); - if (reporter != null) { - reporter.incrCounter(PigWarning.UDF_WARNING_1, 1); - } - } - } - break; - case LIST: - ListObjectInspector loi = (ListObjectInspector)oi; - result = BagFactory.getInstance().newDefaultBag(); - ObjectInspector itemObjectInspector = loi.getListElementObjectInspector(); - for (Object item : loi.getList(obj)) { - Tuple convertedItem = (Tuple)convertOrcToPig(item, itemObjectInspector, null); - ((DataBag)result).add(convertedItem); - } - break; - default: - throw new IllegalArgumentException("Unknown type " + - oi.getCategory()); - } - return result; - } - - public static Object getPrimaryFromOrc(Object obj, PrimitiveObjectInspector poi) { - Object result = null; - if (obj == null) { - return result; - } - switch (poi.getPrimitiveCategory()) { - case FLOAT: - case DOUBLE: - case BOOLEAN: - case INT: - case LONG: - case STRING: - result = poi.getPrimitiveJavaObject(obj); - break; - case CHAR: - result = ((HiveChar)poi.getPrimitiveJavaObject(obj)).getValue(); - break; - case VARCHAR: - result = ((HiveVarchar)poi.getPrimitiveJavaObject(obj)).getValue(); - break; - case BYTE: - result = (int)(Byte)poi.getPrimitiveJavaObject(obj); - break; - case SHORT: - result = (int)(Short)poi.getPrimitiveJavaObject(obj); - break; - case BINARY: - BytesWritable bw = (BytesWritable) obj; - // Make a copy - result = new DataByteArray(bw.getBytes(), 0, bw.getLength()); - break; - case TIMESTAMP: - java.sql.Timestamp origTimeStamp = (java.sql.Timestamp)poi.getPrimitiveJavaObject(obj); - result = new DateTime(origTimeStamp.getTime()); - break; - case DATE: - java.sql.Date origDate = (java.sql.Date)poi.getPrimitiveJavaObject(obj); - result = new DateTime(origDate.getTime()); - break; - case DECIMAL: - org.apache.hadoop.hive.common.type.HiveDecimal origDecimal = - (org.apache.hadoop.hive.common.type.HiveDecimal)poi.getPrimitiveJavaObject(obj); - result = origDecimal.bigDecimalValue(); - break; - default: - throw new IllegalArgumentException("Unknown primitive type " + - (poi).getPrimitiveCategory()); - } - return result; - } - - public static ResourceFieldSchema getResourceFieldSchema(TypeInfo ti) throws IOException { - ResourceFieldSchema fieldSchema = new ResourceFieldSchema(); - ResourceFieldSchema[] innerFs; - ResourceSchema innerSchema; - switch (ti.getCategory()) { - case STRUCT: - StructTypeInfo sti = (StructTypeInfo)ti; - fieldSchema.setType(DataType.TUPLE); - List typeInfos = sti.getAllStructFieldTypeInfos(); - List names = sti.getAllStructFieldNames(); - innerFs = new ResourceFieldSchema[typeInfos.size()]; - for (int i=0;i names = new ArrayList(); - ArrayList typeInfos = new ArrayList(); - for (ResourceFieldSchema subFs : fs.getSchema().getFields()) { - TypeInfo info = getTypeInfo(subFs); - names.add(subFs.getName()); - typeInfos.add(info); - } - ((StructTypeInfo)ti).setAllStructFieldNames(names); - ((StructTypeInfo)ti).setAllStructFieldTypeInfos(typeInfos); - break; - case DataType.BAG: - ti = new ListTypeInfo(); - if (fs.getSchema()==null || fs.getSchema().getFields().length!=1) { - throw new IOException("Wrong bag inner schema"); - } - TypeInfo elementField = getTypeInfo(fs.getSchema().getFields()[0]); - ((ListTypeInfo)ti).setListElementTypeInfo(elementField); - break; - case DataType.MAP: - ti = new MapTypeInfo(); - TypeInfo valueField; - if (fs.getSchema() == null || fs.getSchema().getFields().length != 1) { - valueField = TypeInfoFactory.binaryTypeInfo; - } else { - valueField = getTypeInfo(fs.getSchema().getFields()[0]); - } - ((MapTypeInfo)ti).setMapKeyTypeInfo(TypeInfoFactory.stringTypeInfo); - ((MapTypeInfo)ti).setMapValueTypeInfo(valueField); - break; - case DataType.BOOLEAN: - ti = TypeInfoFactory.booleanTypeInfo; - break; - case DataType.INTEGER: - ti = TypeInfoFactory.intTypeInfo; - break; - case DataType.LONG: - ti = TypeInfoFactory.longTypeInfo; - break; - case DataType.FLOAT: - ti = TypeInfoFactory.floatTypeInfo; - break; - case DataType.DOUBLE: - ti = TypeInfoFactory.doubleTypeInfo; - break; - case DataType.CHARARRAY: - ti = TypeInfoFactory.stringTypeInfo; - break; - case DataType.DATETIME: - ti = TypeInfoFactory.timestampTypeInfo; - break; - case DataType.BIGDECIMAL: - ti = TypeInfoFactory.decimalTypeInfo; - break; - case DataType.BIGINTEGER: - ti = TypeInfoFactory.decimalTypeInfo; - break; - case DataType.BYTEARRAY: - ti = TypeInfoFactory.binaryTypeInfo; - break; - default: - throw new IllegalArgumentException("Unknown data type " + - DataType.findTypeName(fs.getType())); - } - return ti; - } - - static class Field implements StructField { - private final String name; - private final ObjectInspector inspector; - private final int offset; - - Field(String name, ObjectInspector inspector, int offset) { - this.name = name; - this.inspector = inspector; - this.offset = offset; - } - - @Override - public String getFieldName() { - return name; - } - - @Override - public ObjectInspector getFieldObjectInspector() { - return inspector; - } - - @Override - public int getFieldID() { - return offset; - } - - @Override - public String getFieldComment() { - return null; - } - } - - static class PigStructInspector extends StructObjectInspector { - private List fields; - - PigStructInspector(StructTypeInfo info) { - ArrayList fieldNames = info.getAllStructFieldNames(); - ArrayList fieldTypes = info.getAllStructFieldTypeInfos(); - fields = new ArrayList(fieldNames.size()); - for (int i = 0; i < fieldNames.size(); ++i) { - fields.add(new Field(fieldNames.get(i), - createObjectInspector(fieldTypes.get(i)), i)); - } - } - - @Override - public List getAllStructFieldRefs() { - return fields; - } - - @Override - public StructField getStructFieldRef(String s) { - for (StructField field : fields) { - if (field.getFieldName().equals(s)) { - return field; - } - } - return null; - } - - @Override - public Object getStructFieldData(Object object, StructField field) { - Object result = null; - try { - result = ((Tuple) object).get(((Field) field).offset); - } catch (ExecException e) { - throw new RuntimeException(e); - } - return result; - } - - @Override - public List getStructFieldsDataAsList(Object object) { - return ((Tuple) object).getAll(); - } - - @Override - public String getTypeName() { - StringBuilder buffer = new StringBuilder(); - buffer.append("struct<"); - for (int i = 0; i < fields.size(); ++i) { - StructField field = fields.get(i); - if (i != 0) { - buffer.append(","); - } - buffer.append(field.getFieldName()); - buffer.append(":"); - buffer.append(field.getFieldObjectInspector().getTypeName()); - } - buffer.append(">"); - return buffer.toString(); - } - - @Override - public Category getCategory() { - return Category.STRUCT; - } - - @Override - public boolean equals(Object o) { - if (o == null || o.getClass() != getClass()) { - return false; - } else if (o == this) { - return true; - } else { - List other = ((PigStructInspector) o).fields; - if (other.size() != fields.size()) { - return false; - } - for (int i = 0; i < fields.size(); ++i) { - StructField left = other.get(i); - StructField right = fields.get(i); - if (!(left.getFieldName().equals(right.getFieldName()) && left - .getFieldObjectInspector().equals( - right.getFieldObjectInspector()))) { - return false; - } - } - return true; - } - } - } - - static class PigMapObjectInspector implements MapObjectInspector { - private ObjectInspector key; - private ObjectInspector value; - - PigMapObjectInspector(MapTypeInfo info) { - key = createObjectInspector(info.getMapKeyTypeInfo()); - value = createObjectInspector(info.getMapValueTypeInfo()); - } - - @Override - public ObjectInspector getMapKeyObjectInspector() { - return key; - } - - @Override - public ObjectInspector getMapValueObjectInspector() { - return value; - } - - @Override - public Object getMapValueElement(Object map, Object key) { - return ((Map) map).get(key); - } - - @Override - public Map getMap(Object map) { - return (Map) map; - } - - @Override - public int getMapSize(Object map) { - return ((Map) map).size(); - } - - @Override - public String getTypeName() { - return "map<" + key.getTypeName() + "," + value.getTypeName() + ">"; - } - - @Override - public Category getCategory() { - return Category.MAP; - } - - @Override - public boolean equals(Object o) { - if (o == null || o.getClass() != getClass()) { - return false; - } else if (o == this) { - return true; - } else { - PigMapObjectInspector other = (PigMapObjectInspector) o; - return other.key.equals(key) && other.value.equals(value); - } - } - } - - static class PigListObjectInspector implements ListObjectInspector { - private ObjectInspector child; - private Object cachedObject; - private int index; - private Iterator iter; - - PigListObjectInspector(ListTypeInfo info) { - child = createObjectInspector(info.getListElementTypeInfo()); - } - - @Override - public ObjectInspector getListElementObjectInspector() { - return child; - } - - @Override - public Object getListElement(Object list, int i) { - if (list!=cachedObject) { - cachedObject = list; - index = -1; - DataBag db = (DataBag)list; - iter = db.iterator(); - } - if (i==index+1) { - index++; - return iter.next(); - } else { - throw new RuntimeException("Only sequential read is supported"); - } - } - - @Override - public int getListLength(Object list) { - return (int)((DataBag)list).size(); - } - - @Override - @SuppressWarnings("unchecked") - public List getList(Object list) { - List result = new ArrayList(); - DataBag bag = (DataBag)list; - for (Tuple t : bag) { - result.add(t); - } - return result; - } - - @Override - public String getTypeName() { - return "array<" + child.getTypeName() + ">"; - } - - @Override - public Category getCategory() { - return Category.LIST; - } - - @Override - public boolean equals(Object o) { - if (o == null || o.getClass() != getClass()) { - return false; - } else if (o == this) { - return true; - } else { - ObjectInspector other = ((PigListObjectInspector) o).child; - return other.equals(child); - } - } - } - - static class PigDataByteArrayObjectInspector extends AbstractPrimitiveJavaObjectInspector - implements BinaryObjectInspector { - - PigDataByteArrayObjectInspector() { - super(TypeInfoFactory.binaryTypeInfo); - } - - @Override - public BytesWritable getPrimitiveWritableObject(Object o) { - return o == null ? null : (o instanceof DataByteArray - ? new BytesWritable(((DataByteArray) o).get()) - : new BytesWritable((byte[]) o)); - } - - @Override - public byte[] getPrimitiveJavaObject(Object o) { - return ((DataByteArray) o).get(); - } - - } - - static class PigJodaTimeStampObjectInspector extends - AbstractPrimitiveJavaObjectInspector implements TimestampObjectInspector { - - protected PigJodaTimeStampObjectInspector() { - super(TypeInfoFactory.timestampTypeInfo); - } - - @Override - public TimestampWritable getPrimitiveWritableObject(Object o) { - return o == null ? null : new TimestampWritable(new Timestamp(((DateTime)o).getMillis())); - } - - @Override - public Timestamp getPrimitiveJavaObject(Object o) { - return o == null ? null : new Timestamp(((DateTime)o).getMillis()); - } - } - - static class PigDecimalObjectInspector extends - AbstractPrimitiveJavaObjectInspector implements HiveDecimalObjectInspector { - - protected PigDecimalObjectInspector() { - super(TypeInfoFactory.decimalTypeInfo); - } - - @Override - public HiveDecimalWritable getPrimitiveWritableObject(Object o) { - if (o instanceof BigDecimal) { - return o == null ? null : new HiveDecimalWritable(HiveDecimal.create((BigDecimal)o)); - } else { // BigInteger - return o == null ? null : new HiveDecimalWritable(HiveDecimal.create((BigInteger)o)); - } - } - - @Override - public HiveDecimal getPrimitiveJavaObject(Object o) { - if (o instanceof BigDecimal) { - return o == null ? null : HiveDecimal.create((BigDecimal)o); - } else { // BigInteger - return o == null ? null : HiveDecimal.create((BigInteger)o); - } - } - } - - public static ObjectInspector createObjectInspector(TypeInfo info) { - switch (info.getCategory()) { - case PRIMITIVE: - switch (((PrimitiveTypeInfo) info).getPrimitiveCategory()) { - case FLOAT: - return PrimitiveObjectInspectorFactory.javaFloatObjectInspector; - case DOUBLE: - return PrimitiveObjectInspectorFactory.javaDoubleObjectInspector; - case BOOLEAN: - return PrimitiveObjectInspectorFactory.javaBooleanObjectInspector; - case INT: - return PrimitiveObjectInspectorFactory.javaIntObjectInspector; - case LONG: - return PrimitiveObjectInspectorFactory.javaLongObjectInspector; - case STRING: - return PrimitiveObjectInspectorFactory.javaStringObjectInspector; - case TIMESTAMP: - return new PigJodaTimeStampObjectInspector(); - case DECIMAL: - return new PigDecimalObjectInspector(); - case BINARY: - return new PigDataByteArrayObjectInspector(); - case DATE: - case VARCHAR: - case BYTE: - case SHORT: - throw new IllegalArgumentException("Should never happen, " + - (((PrimitiveTypeInfo) info).getPrimitiveCategory()) + - "is not valid Pig primitive data type"); - default: - throw new IllegalArgumentException("Unknown primitive type " + - ((PrimitiveTypeInfo) info).getPrimitiveCategory()); - } - case STRUCT: - return new PigStructInspector((StructTypeInfo) info); - case MAP: - return new PigMapObjectInspector((MapTypeInfo) info); - case LIST: - return new PigListObjectInspector((ListTypeInfo) info); - default: - throw new IllegalArgumentException("Unknown type " + - info.getCategory()); - } - } -} diff --git a/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java b/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java index 0f6f6963bc..4d74ef7ad1 100644 --- a/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java +++ b/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java @@ -527,11 +527,16 @@ public void visit( UserFuncExpression op ) throws FrontendException { //reinitialize input schema from signature if (((POUserFunc)p).getFunc().getInputSchema() == null) { ((POUserFunc)p).setFuncInputSchema(op.getSignature()); + ((EvalFunc) f).setInputSchema(((POUserFunc)p).getFunc().getInputSchema()); } List cacheFiles = ((EvalFunc)f).getCacheFiles(); if (cacheFiles != null) { ((POUserFunc)p).setCacheFiles(cacheFiles); } + List shipFiles = ((EvalFunc)f).getShipFiles(); + if (shipFiles != null) { + ((POUserFunc)p).setShipFiles(shipFiles); + } } else { p = new POUserComparisonFunc(new OperatorKey(DEFAULT_SCOPE, nodeGen .getNextNodeId(DEFAULT_SCOPE)), -1, diff --git a/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java b/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java index 985784a6de..22efbe3228 100644 --- a/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java +++ b/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java @@ -26,6 +26,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.pig.Algebraic; import org.apache.pig.EvalFunc; import org.apache.pig.FuncSpec; import org.apache.pig.builtin.InvokerGenerator; @@ -241,10 +242,20 @@ public LogicalSchema.LogicalFieldSchema getFieldSchema() throws FrontendExceptio } ef.setUDFContextSignature(signature); - Properties props = UDFContext.getUDFContext().getUDFProperties(ef.getClass()); Schema translatedInputSchema = Util.translateSchema(inputSchema); if(translatedInputSchema != null) { + Properties props = UDFContext.getUDFContext().getUDFProperties(ef.getClass()); props.put("pig.evalfunc.inputschema."+signature, translatedInputSchema); + if (ef instanceof Algebraic) { + // In case of Algebraic func, set original inputSchema to Initial, + // Intermed, Final + for (String func : new String[]{((Algebraic)ef).getInitial(), + ((Algebraic)ef).getIntermed(), ((Algebraic)ef).getFinal()}) { + Class c = PigContext.instantiateFuncFromSpec(new FuncSpec(func)).getClass(); + props = UDFContext.getUDFContext().getUDFProperties(c); + props.put("pig.evalfunc.inputschema."+signature, translatedInputSchema); + } + } } // Store inputSchema into the UDF context ef.setInputSchema(translatedInputSchema); diff --git a/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java b/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java index 469129bc87..a53c00ec27 100644 --- a/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java +++ b/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java @@ -422,7 +422,7 @@ public void visit(LORank loRank) throws FrontendException { poSort = new POSort(new OperatorKey(scope, nodeGen .getNextNodeId(scope)), -1, null, newPhysicalPlan, newOrderPlan, null); - poSort.setRequestedParallelism(loRank.getRequestedParallelism()); + //poSort.setRequestedParallelism(loRank.getRequestedParallelism()); poSort.addOriginalLocation(loRank.getAlias(), loRank.getLocation()); diff --git a/test/e2e/pig/build.xml b/test/e2e/pig/build.xml index b7d468db57..f60b1b1bd4 100644 --- a/test/e2e/pig/build.xml +++ b/test/e2e/pig/build.xml @@ -31,6 +31,8 @@ + + @@ -155,7 +157,7 @@ - + @@ -408,6 +410,12 @@ + + + + + diff --git a/test/e2e/pig/tests/nightly.conf b/test/e2e/pig/tests/nightly.conf index e0afd26ec3..78d5dedb6b 100644 --- a/test/e2e/pig/tests/nightly.conf +++ b/test/e2e/pig/tests/nightly.conf @@ -5621,6 +5621,124 @@ store a into ':OUTPATH:';\, \, } ] + }, + { + 'name' => 'HiveUDF', + 'tests' => [ + { + # HiveUDF extends UDF + 'num' => 1, + 'pig' => q\ + define sin HiveUDF('sin'); + A = LOAD ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double); + B = foreach A generate sin(gpa); + store B into ':OUTPATH:';\, + 'verify_pig_script' => q\ + A = LOAD ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double); + B = foreach A generate SIN(gpa); + store B into ':OUTPATH:';\, + }, + { + # HiveUDF extends GenericUDF + 'num' => 2, + 'pig' => q\ + define upper HiveUDF('upper'); + A = LOAD ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double); + B = foreach A generate upper(name); + store B into ':OUTPATH:';\, + 'verify_pig_script' => q\ + A = LOAD ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double); + B = foreach A generate UPPER(name); + store B into ':OUTPATH:';\, + }, + { + # HiveUDTF + 'num' => 3, + 'pig' => q\ + define explode HiveUDTF('explode'); + A = LOAD ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:chararray, gpa:chararray); + B = foreach A generate TOBAG(name, age, gpa) as b; + C = foreach B generate flatten(explode(b)); + store C into ':OUTPATH:';\, + 'verify_pig_script' => q\ + A = LOAD ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:chararray, gpa:chararray); + B = foreach A generate TOBAG(name, age, gpa) as b; + C = foreach B generate flatten(b); + store C into ':OUTPATH:';\, + }, + { + # HiveUDAF extends GenericUDAF, with null handling + 'num' => 4, + 'pig' => q\ + define avg HiveUDAF('avg'); + A = LOAD ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age:int, gpa:double); + B = group A by name; + C = foreach B generate group, avg(A.age); + store C into ':OUTPATH:';\, + 'verify_pig_script' => q\ + A = LOAD ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age:int, gpa:double); + B = group A by name; + C = foreach B generate group, AVG(A.age); + store C into ':OUTPATH:';\, + }, + { + # HiveUDAF extends UDAF + 'num' => 5, + 'pig' => q\ + define percentile HiveUDAF('percentile'); + A = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:long, gpa:double); + B = foreach A generate name, age, 0.5 as perc; + C = group B by name; + D = foreach C generate group, percentile(B.(age, perc)); + store D into ':OUTPATH:';\, + 'verify_pig_script' => q\ + register :FUNCPATH:/datafu.jar + define Quartile datafu.pig.stats.Quantile('0.5'); + A = LOAD ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:long, gpa:double); + B = group A by name; + C = foreach B { + sorted = order A by age; + generate group, flatten(Quartile(sorted.age)); + } + store C into ':OUTPATH:';\, + }, + { + # Constant folding and ship jars + 'num' => 6, + 'pig' => q# + sh echo -e "zach young\nzach zipper" > names.txt + define in_file HiveUDF('in_file', '(null, "names.txt")'); + A = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:long, gpa:double); + B = foreach A generate in_file(name, 'names.txt'); + store B into ':OUTPATH:';#, + 'verify_pig_script' => q#register :PIGGYBANKJAR: + sh echo -e "zach young\nzach zipper" > names.txt + rmf :INPATH:/singlefile/names.txt + fs -put names.txt :INPATH:/singlefile/names.txt + define LookupInFiles org.apache.pig.piggybank.evaluation.string.LookupInFiles(); + A = LOAD ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:long, gpa:double); + B = foreach A generate LookupInFiles(name, ':INPATH:/singlefile/names.txt'); + C = foreach B generate (boolean)$0; + store C into ':OUTPATH:'; + fs -rm :INPATH:/singlefile/names.txt# + }, + { + # Custom Hive UDF and MapredContext + 'num' => 7, + 'pig' => q\set mapred.max.split.size '100000000' + register :FUNCPATH:/testudf.jar; + define DummyContextUDF HiveUDF('org.apache.pig.test.udf.evalfunc.DummyContextUDF'); + A = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:long, gpa:double); + B = foreach A generate DummyContextUDF(); + store B into ':OUTPATH:';\, + 'verify_pig_script' => q\set mapred.max.split.size '100000000' + A = LOAD ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:long, gpa:double); + B = foreach A generate UniqueID(); + C = foreach B generate (int)SUBSTRING($0, 2, 100); + D = foreach C generate (chararray)($0+1); + store D into ':OUTPATH:';\ + } + ] } ], }, diff --git a/test/e2e/pig/udfs/java/org/apache/pig/test/udf/evalfunc/DummyContextUDF.java b/test/e2e/pig/udfs/java/org/apache/pig/test/udf/evalfunc/DummyContextUDF.java new file mode 100644 index 0000000000..8f211d7622 --- /dev/null +++ b/test/e2e/pig/udfs/java/org/apache/pig/test/udf/evalfunc/DummyContextUDF.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.test.udf.evalfunc; + +import org.apache.hadoop.hive.ql.exec.Description; +import org.apache.hadoop.hive.ql.exec.MapredContext; +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapred.Reporter; +import org.apache.pig.impl.util.UDFContext; + +@Description(name = "dummycontextudf", +value = "_FUNC_(col) - UDF to report MR counter values") +public class DummyContextUDF extends GenericUDF { + + private MapredContext context; + private LongWritable result = new LongWritable(); + + public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException { + return PrimitiveObjectInspectorFactory.writableLongObjectInspector; + } + + public Object evaluate(DeferredObject[] arguments) throws HiveException { + Reporter reporter = context.getReporter(); + Counters.Counter counter; + if (UDFContext.getUDFContext().getJobConf().get("exectype").equals("TEZ")) { + counter = reporter.getCounter("org.apache.tez.common.counters.TaskCounter", "INPUT_RECORDS_PROCESSED"); + } else { + counter = reporter.getCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS"); + } + result.set(counter.getValue()); + return result; + } + + public String getDisplayString(String[] children) { + return "dummy-func()"; + } + + @Override + public void configure(MapredContext context) { + this.context = context; + } +} +