Skip to content

Commit

Permalink
PIG-3294: Allow Pig use Hive UDFs
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1671956 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Jianyong Dai committed Apr 7, 2015
1 parent af47535 commit 8af34f1
Show file tree
Hide file tree
Showing 33 changed files with 2,362 additions and 710 deletions.
2 changes: 2 additions & 0 deletions CHANGES.txt
Expand Up @@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES

IMPROVEMENTS

PIG-3294: Allow Pig use Hive UDFs (daijy)

PIG-4476: Fix logging in AvroStorage* classes and SchemaTuple class (rdsr via rohini)

PIG-4458: Support UDFs in a FOREACH Before a Merge Join (wattsinabox via daijy)
Expand Down
@@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.serde2.objectinspector.primitive;

import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
import org.apache.hadoop.io.BooleanWritable;

/**
* This class will be in Hive code base begin 1.2.0 (HIVE-9766).
* Will remove once we switch to use Hive 1.2.0
*/
public class JavaConstantBooleanObjectInspector extends
JavaBooleanObjectInspector implements ConstantObjectInspector {
private Boolean value;

public JavaConstantBooleanObjectInspector(Boolean value) {
super();
this.value = value;
}

@Override
public Object getWritableConstantValue() {
if (value==null) {
return null;
}
return new BooleanWritable(value);
}
}
@@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.serde2.objectinspector.primitive;

import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;

/**
* This class will be in Hive code base begin 1.2.0 (HIVE-9766).
* Will remove once we switch to use Hive 1.2.0
*/
public class JavaConstantDoubleObjectInspector extends
JavaDoubleObjectInspector implements ConstantObjectInspector {
private Double value;

public JavaConstantDoubleObjectInspector(Double value) {
super();
this.value = value;
}

@Override
public Object getWritableConstantValue() {
if (value==null) {
return null;
}
return new DoubleWritable(value);
}
}
@@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.serde2.objectinspector.primitive;

import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
import org.apache.hadoop.io.FloatWritable;

/**
* This class will be in Hive code base begin 1.2.0 (HIVE-9766).
* Will remove once we switch to use Hive 1.2.0
*/
public class JavaConstantFloatObjectInspector extends JavaFloatObjectInspector
implements ConstantObjectInspector {
private Float value;

public JavaConstantFloatObjectInspector(Float value) {
super();
this.value = value;
}

@Override
public Object getWritableConstantValue() {
if (value==null) {
return null;
}
return new FloatWritable(value);
}
}
@@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.serde2.objectinspector.primitive;

import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
import org.apache.hadoop.io.IntWritable;

/**
* This class will be in Hive code base begin 1.2.0 (HIVE-9766).
* Will remove once we switch to use Hive 1.2.0
*/
public class JavaConstantIntObjectInspector extends JavaIntObjectInspector
implements ConstantObjectInspector {
private Integer value;

public JavaConstantIntObjectInspector(Integer value) {
super();
this.value = value;
}

@Override
public Object getWritableConstantValue() {
if (value==null) {
return null;
}
return new IntWritable(value);
}
}
@@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.serde2.objectinspector.primitive;

import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
import org.apache.hadoop.io.LongWritable;

/**
* This class will be in Hive code base begin 1.2.0 (HIVE-9766).
* Will remove once we switch to use Hive 1.2.0
*/
public class JavaConstantLongObjectInspector extends JavaLongObjectInspector
implements ConstantObjectInspector {
private Long value;

public JavaConstantLongObjectInspector(Long value) {
super();
this.value = value;
}

@Override
public Object getWritableConstantValue() {
if (value==null) {
return null;
}
return new LongWritable(value);
}
}
@@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.serde2.objectinspector.primitive;

/**
* This class will be in Hive code base begin 1.2.0 (HIVE-9766).
* Will remove once we switch to use Hive 1.2.0
*/
import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
import org.apache.hadoop.io.Text;

public class JavaConstantStringObjectInspector extends
JavaStringObjectInspector implements ConstantObjectInspector {
private String value;

public JavaConstantStringObjectInspector(String value) {
super();
this.value = value;
}

@Override
public Object getWritableConstantValue() {
if (value==null) {
return null;
}
return new Text(value);
}
}
7 changes: 7 additions & 0 deletions src/org/apache/pig/EvalFunc.java
Expand Up @@ -362,4 +362,11 @@ public SchemaType getSchemaType() {
public boolean allowCompileTimeCalculation() {
return false;
}

public boolean needEndOfAllInputProcessing() {
return false;
}

public void setEndOfAllInput(boolean endOfAllInput) {
}
}
Expand Up @@ -643,7 +643,11 @@ private Job getJob(MROperPlan plan, MapReduceOper mro, Configuration config, Pig
}
}
if (!predeployed) {
putJarOnClassPathThroughDistributedCache(pigContext, conf, jar);
if (jar.getFile().toLowerCase().endsWith(".jar")) {
putJarOnClassPathThroughDistributedCache(pigContext, conf, jar);
} else {
setupDistributedCache(pigContext, conf, new String[] {jar.getPath()}, true);
}
}
}

Expand Down Expand Up @@ -804,6 +808,7 @@ else if (mapStores.size() + reduceStores.size() > 0) { // multi store case
// set parent plan in all operators in map and reduce plans
// currently the parent plan is really used only when POStream is present in the plan
new PhyPlanSetter(mro.mapPlan).visit();
new PhyPlanSetter(mro.combinePlan).visit();
new PhyPlanSetter(mro.reducePlan).visit();

// this call modifies the ReplFiles names of POFRJoin operators
Expand Down
Expand Up @@ -38,6 +38,7 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.NullableTuple;
Expand Down Expand Up @@ -98,6 +99,7 @@ protected void setup(Context context) throws IOException, InterruptedException {
pigContext = (PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext"));
if (pigContext.getLog4jProperties()!=null)
PropertyConfigurator.configure(pigContext.getLog4jProperties());
MapRedUtil.setupUDFContext(context.getConfiguration());

cp = (PhysicalPlan) ObjectSerializer.deserialize(jConf
.get("pig.combinePlan"));
Expand Down
Expand Up @@ -112,7 +112,7 @@ public void cleanup(Context context) throws IOException, InterruptedException {
return;
}

if(PigMapReduce.sJobConfInternal.get().get(JobControlCompiler.END_OF_INP_IN_MAP, "false").equals("true")) {
if(PigMapReduce.sJobConfInternal.get().get(JobControlCompiler.END_OF_INP_IN_MAP, "false").equals("true") && !mp.isEmpty()) {
// If there is a stream in the pipeline or if this map job belongs to merge-join we could
// potentially have more to process - so lets
// set the flag stating that all map input has been sent
Expand Down
Expand Up @@ -609,7 +609,7 @@ protected void cleanup(Context context) throws IOException, InterruptedException
return;
}

if(PigMapReduce.sJobConfInternal.get().get("pig.stream.in.reduce", "false").equals("true")) {
if(PigMapReduce.sJobConfInternal.get().get("pig.stream.in.reduce", "false").equals("true") && !rp.isEmpty()) {
// If there is a stream in the pipeline we could
// potentially have more to process - so lets
// set the flag stating that all map input has been sent
Expand Down
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;

import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.VisitorException;

public class UDFEndOfAllInputNeededVisitor extends PhyPlanVisitor {

private boolean needed = false;

public UDFEndOfAllInputNeededVisitor(PhysicalPlan plan) {
super(plan, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(plan));
}

@Override
public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
super.visitUserFunc(userFunc);
if (userFunc.needEndOfAllInputProcessing()) {
needed = true;
}
}

public boolean needEndOfAllInputProcessing() {
return needed;
}
}

0 comments on commit 8af34f1

Please sign in to comment.