Skip to content

Commit

Permalink
PIG-5048: HiveUDTF fail if it is the first expression in projection
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1769003 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Jianyong Dai committed Nov 9, 2016
1 parent 27a7a0c commit 9f98857
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 2 deletions.
2 changes: 2 additions & 0 deletions CHANGES.txt
Expand Up @@ -59,6 +59,8 @@ OPTIMIZATIONS

BUG FIXES

PIG-5048: HiveUDTF fail if it is the first expression in projection (nkollar via daijy)

PIG-5049: Cleanup e2e tests turing_jython.conf (Daniel Dai)

PIG-5033: MultiQueryOptimizerTez creates bad plan with union, split and FRJoin (rohini,tmwoordruff via rohini)
Expand Down
2 changes: 2 additions & 0 deletions ivy.xml
Expand Up @@ -427,6 +427,8 @@
conf="compile->master" />
<dependency org="org.apache.hive.shims" name="hive-shims-common" rev="${hive.version}" changing="true"
conf="compile->master" />
<dependency org="org.apache.hive" name="hive-contrib" rev="${hive.version}" changing="true"
conf="test->master" />
<dependency org="org.apache.hive.shims" name="hive-shims-0.23" rev="${hive.version}" changing="true"
conf="hadoop23->master" />
<dependency org="org.apache.hive.shims" name="hive-shims-0.20S" rev="${hive.version}" changing="true"
Expand Down
Expand Up @@ -55,6 +55,7 @@
@SuppressWarnings("unchecked")
public class POForEach extends PhysicalOperator {
private static final long serialVersionUID = 1L;
private static final Result UNLIMITED_NULL_RESULT = new Result(POStatus.STATUS_OK, new UnlimitedNullTuple());

protected List<PhysicalPlan> inputPlans;

Expand Down Expand Up @@ -264,7 +265,7 @@ public Result getNextTuple() throws ExecException {
if (inp.returnStatus == POStatus.STATUS_EOP) {
if (parentPlan!=null && parentPlan.endOfAllInput && !endOfAllInputProcessed && endOfAllInputProcessing) {
// continue pull one more output
inp = new Result(POStatus.STATUS_OK, new UnlimitedNullTuple());
inp = UNLIMITED_NULL_RESULT;
} else {
return inp;
}
Expand Down
2 changes: 1 addition & 1 deletion src/org/apache/pig/data/UnlimitedNullTuple.java
Expand Up @@ -28,7 +28,7 @@ public class UnlimitedNullTuple extends AbstractTuple {

@Override
public int size() {
throw new RuntimeException("Unimplemented");
return Integer.MAX_VALUE;
}

@Override
Expand Down
117 changes: 117 additions & 0 deletions test/org/apache/pig/impl/builtin/TestHiveUDTF.java
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.impl.builtin;

import org.apache.commons.collections4.IteratorUtils;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.Tuple;
import org.apache.pig.test.MiniGenericCluster;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import static org.apache.pig.builtin.mock.Storage.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class TestHiveUDTF {
private static PigServer pigServer = null;
private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();

@BeforeClass
public static void oneTimeSetup() throws ExecException {
pigServer = new PigServer(ExecType.LOCAL);
}

@AfterClass
public static void oneTimeTearDown() throws Exception {
cluster.shutDown();
}

@Test
public void testHiveUDTFOnBagInput() throws IOException {
Data data = resetData(pigServer);

Tuple tuple = tuple(bag(tuple("a"), tuple("b"), tuple("c")));

data.set("TestHiveUDTF", tuple);

pigServer.registerQuery("define posexplode HiveUDTF('posexplode');");
pigServer.registerQuery("A = load 'TestHiveUDTF' USING mock.Storage() as (a0:{(b0:chararray)});");
pigServer.registerQuery("B = foreach A generate posexplode(a0);");

Iterator<Tuple> result = pigServer.openIterator("B");
List<Tuple> out = IteratorUtils.toList(result);

assertEquals(2, out.size());
assertTrue("Result doesn't contain the HiveUDTF output",
out.contains(tuple(bag(tuple(0, "a"), tuple(1, "b"), tuple(2, "c")))));
assertTrue("Result doesn't contain an empty bag",
out.contains(tuple(bag())));
}

@Test
public void testHiveUDTFOnBagInputWithTwoProjection() throws IOException {
Data data = resetData(pigServer);

Tuple tuple = tuple(bag(tuple("a"), tuple("b"), tuple("c")));

data.set("TestHiveUDTF", tuple);

pigServer.registerQuery("define posexplode HiveUDTF('posexplode');");
pigServer.registerQuery("A = load 'TestHiveUDTF' USING mock.Storage() as (a0:{(b0:chararray)});");
pigServer.registerQuery("B = foreach A generate a0, posexplode(a0);");

Iterator<Tuple> result = pigServer.openIterator("B");
List<Tuple> out = IteratorUtils.toList(result);

assertEquals(2, out.size());
assertTrue("Result doesn't contain the HiveUDTF output",
out.contains(tuple(bag(tuple("a"), tuple("b"), tuple("c")), bag(tuple(0, "a"), tuple(1, "b"), tuple(2, "c")))));
assertTrue("Result doesn't contain an empty bag",
out.contains(tuple(null, bag())));
}

@Test
public void testHiveUDTFOnClose() throws IOException {
Data data = resetData(pigServer);

List<Tuple> tuples = Arrays.asList(tuple("a", 1), tuple("a", 2), tuple("a", 3));

data.set("TestHiveUDTF", tuples);

pigServer.registerQuery("define COUNT2 HiveUDTF('org.apache.hadoop.hive.contrib.udtf.example.GenericUDTFCount2');");
pigServer.registerQuery("a = load 'TestHiveUDTF' USING mock.Storage() as (name:chararray, id:int);");
pigServer.registerQuery("b = foreach a generate flatten(COUNT2(name));");

Iterator<Tuple> result = pigServer.openIterator("b");
List<Tuple> out = IteratorUtils.toList(result);

assertEquals(2, out.size());
assertEquals(tuple(3), out.get(0));
assertEquals(tuple(3), out.get(1));
}

}

0 comments on commit 9f98857

Please sign in to comment.