Skip to content

Commit

Permalink
PIG-4359: Port local mode tests to Tez - part4
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1653445 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Jianyong Dai committed Jan 21, 2015
1 parent 280c9b5 commit 4ecb3c6
Show file tree
Hide file tree
Showing 15 changed files with 814 additions and 483 deletions.
2 changes: 2 additions & 0 deletions CHANGES.txt
Expand Up @@ -44,6 +44,8 @@ PIG-4333: Split BigData tests into multiple groups (rohini)

BUG FIXES

PIG-4359: Port local mode tests to Tez - part4 (daijy)

PIG-4340: PigStorage fails parsing empty map (daijy)

PIG-4366: Port local mode tests to Tez - part5 (daijy)
Expand Down
6 changes: 6 additions & 0 deletions shims/test/hadoop23/org/apache/pig/test/MiniCluster.java
Expand Up @@ -27,7 +27,9 @@
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
import org.apache.pig.ExecType;
import org.apache.pig.backend.hadoop.executionengine.Launcher;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;

/**
* This class builds a single instance of itself with the Singleton
Expand Down Expand Up @@ -128,4 +130,8 @@ protected void shutdownMiniMrClusters() {
if (m_mr != null) { m_mr.stop(); }
m_mr = null;
}

static public Launcher getLauncher() {
return new MapReduceLauncher();
}
}
6 changes: 6 additions & 0 deletions shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java
Expand Up @@ -33,7 +33,9 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.pig.ExecType;
import org.apache.pig.PigConfiguration;
import org.apache.pig.backend.hadoop.executionengine.Launcher;
import org.apache.pig.backend.hadoop.executionengine.tez.TezExecType;
import org.apache.pig.backend.hadoop.executionengine.tez.TezLauncher;
import org.apache.pig.backend.hadoop.executionengine.tez.TezSessionManager;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
Expand Down Expand Up @@ -185,4 +187,8 @@ private void deleteConfFiles() {
YARN_CONF_FILE.delete();
}
}

static public Launcher getLauncher() {
return new TezLauncher();
}
}
Expand Up @@ -368,7 +368,7 @@ else if (predecessors != null && predecessors.size() > 0) {
storeOnlyPhyPlan.addAsLeaf(store);
storeOnlyTezOperator.plan = storeOnlyPhyPlan;
tezPlan.add(storeOnlyTezOperator);
phyToTezOpMap.put(store, storeOnlyTezOperator);
phyToTezOpMap.put(p, storeOnlyTezOperator);

// Create new operator as second splittee
curTezOp = getTezOp();
Expand Down
1 change: 1 addition & 0 deletions test/excluded-tests-20
Expand Up @@ -7,3 +7,4 @@
**/TestGroupConstParallelTez.java
**/TestLoaderStorerShipCacheFilesTez.java
**/TestPigStatsTez.java
**/TestPOPartialAggPlanTez.java
15 changes: 15 additions & 0 deletions test/org/apache/pig/test/MiniGenericCluster.java
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.pig.ExecType;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.Launcher;

/**
* This class builds a single instance of itself with the Singleton
Expand Down Expand Up @@ -146,4 +147,18 @@ private void errorIfNotSetup(){
String msg = "function called on MiniCluster that has been shutdown";
throw new RuntimeException(msg);
}

static public Launcher getLauncher() {
String execType = System.getProperty("test.exec.type");
if (execType == null) {
System.setProperty("test.exec.type", EXECTYPE_MR);
}
if (execType.equalsIgnoreCase(EXECTYPE_MR)) {
return MiniCluster.getLauncher();
} else if (execType.equalsIgnoreCase(EXECTYPE_TEZ)) {
return TezMiniCluster.getLauncher();
} else {
throw new RuntimeException("Unknown test.exec.type: " + execType);
}
}
}
74 changes: 53 additions & 21 deletions test/org/apache/pig/test/TestMultiQueryLocal.java
Expand Up @@ -19,8 +19,8 @@

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -29,17 +29,19 @@

import junit.framework.Assert;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.pig.ExecType;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
import org.apache.pig.PigServer;
import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
import org.apache.pig.backend.hadoop.executionengine.Launcher;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRExecutionEngine;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextOutputFormat;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.builtin.PigStorage;
Expand All @@ -56,17 +58,18 @@
import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.PigStats;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;

public class TestMultiQueryLocal {

private PigServer myPig;
protected PigServer myPig;
private String TMP_DIR;

@Before
public void setUp() throws Exception {
PigContext context = new PigContext(ExecType.LOCAL, new Properties());
PigContext context = new PigContext(Util.getLocalTestMode(), new Properties());
context.getProperties().setProperty(PigConfiguration.PIG_OPT_MULTIQUERY, ""+true);
myPig = new PigServer(context);
myPig.getPigContext().getProperties().setProperty("pig.usenewlogicalplan", "false");
Expand Down Expand Up @@ -351,22 +354,33 @@ public void testMultiQueryWithNoStore2() {

public static class PigStorageWithConfig extends PigStorage {

private static final String key = "test.key";
private static final String key1 = "test.key1";
private static final String key2 = "test.key2";
private String suffix;
private String myKey;

public PigStorageWithConfig(String s) {
public PigStorageWithConfig(String key, String s) {
this.suffix = s;
this.myKey = key;
}

@Override
public void setStoreLocation(String location, Job job) throws IOException {
super.setStoreLocation(location, job);
Assert.assertNull(job.getConfiguration().get(key));
if (myKey.equals(key1)) {
Assert.assertNull(job.getConfiguration().get(key2));
} else {
Assert.assertNull(job.getConfiguration().get(key1));
}
}

@Override
public OutputFormat getOutputFormat() {
return new PigTextOutputFormatWithConfig();
if (myKey.equals(key1)) {
return new PigTextOutputFormatWithConfig1();
} else {
return new PigTextOutputFormatWithConfig2();
}
}

@Override
Expand All @@ -384,16 +398,30 @@ public void putNext(Tuple f) throws IOException {
}
}

private static class PigTextOutputFormatWithConfig extends PigTextOutputFormat {
private static class PigTextOutputFormatWithConfig1 extends PigTextOutputFormat {

public PigTextOutputFormatWithConfig1() {
super((byte) '\t');
}

@Override
public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context)
throws IOException {
context.getConfiguration().set(PigStorageWithConfig.key1, MRConfiguration.WORK_OUPUT_DIR);
return super.getOutputCommitter(context);
}
}

private static class PigTextOutputFormatWithConfig2 extends PigTextOutputFormat {

public PigTextOutputFormatWithConfig() {
public PigTextOutputFormatWithConfig2() {
super((byte) '\t');
}

@Override
public synchronized OutputCommitter getOutputCommitter(TaskAttemptContext context)
throws IOException {
context.getConfiguration().set(PigStorageWithConfig.key, MRConfiguration.WORK_OUPUT_DIR);
context.getConfiguration().set(PigStorageWithConfig.key2, MRConfiguration.WORK_OUPUT_DIR);
return super.getOutputCommitter(context);
}
}
Expand All @@ -411,17 +439,20 @@ public void testMultiStoreWithConfig() {
"using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int,gid:int);");
myPig.registerQuery("b = filter a by uid < 5;");
myPig.registerQuery("c = filter a by uid > 5;");
myPig.registerQuery("store b into '" + TMP_DIR + "/Pig-TestMultiQueryLocal1' using " + PigStorageWithConfig.class.getName() + "('a');");
myPig.registerQuery("store c into '" + TMP_DIR + "/Pig-TestMultiQueryLocal2' using " + PigStorageWithConfig.class.getName() + "('b');");
myPig.registerQuery("store b into '" + TMP_DIR + "/Pig-TestMultiQueryLocal1' using " + PigStorageWithConfig.class.getName() + "('test.key1', 'a');");
myPig.registerQuery("store c into '" + TMP_DIR + "/Pig-TestMultiQueryLocal2' using " + PigStorageWithConfig.class.getName() + "('test.key2', 'b');");

myPig.executeBatch();
myPig.discardBatch();
BufferedReader reader = new BufferedReader(new FileReader(TMP_DIR + "/Pig-TestMultiQueryLocal1/part-m-00000"));
FileSystem fs = FileSystem.getLocal(new Configuration());
BufferedReader reader = new BufferedReader(new InputStreamReader
(fs.open(Util.getFirstPartFile(new Path(TMP_DIR + "/Pig-TestMultiQueryLocal1")))));
String line;
while ((line = reader.readLine())!=null) {
Assert.assertTrue(line.endsWith("a"));
}
reader = new BufferedReader(new FileReader(TMP_DIR + "/Pig-TestMultiQueryLocal2/part-m-00000"));
reader = new BufferedReader(new InputStreamReader
(fs.open(Util.getFirstPartFile(new Path(TMP_DIR + "/Pig-TestMultiQueryLocal2")))));
while ((line = reader.readLine())!=null) {
Assert.assertTrue(line.endsWith("b"));
}
Expand Down Expand Up @@ -505,8 +536,9 @@ public void testMultiQueryWithDescribe() {
}

@Test
public void testMultiQueryWithIllustrate() {
public void testMultiQueryWithIllustrate() throws Exception {

Assume.assumeTrue("illustrate does not work in tez (PIG-3993)", !Util.getLocalTestMode().toString().startsWith("TEZ"));
System.out.println("===== test multi-query with illustrate =====");

try {
Expand Down Expand Up @@ -626,7 +658,7 @@ private PhysicalPlan checkPhysicalPlan(LogicalPlan lp, int expectedRoots,
lp.optimize(myPig.getPigContext());
System.out.println("===== check physical plan =====");

PhysicalPlan pp = ((MRExecutionEngine)myPig.getPigContext().getExecutionEngine()).compile(
PhysicalPlan pp = ((HExecutionEngine)myPig.getPigContext().getExecutionEngine()).compile(
lp, null);

Assert.assertEquals(expectedRoots, pp.getRoots().size());
Expand All @@ -638,9 +670,9 @@ private PhysicalPlan checkPhysicalPlan(LogicalPlan lp, int expectedRoots,
return pp;
}

private boolean executePlan(PhysicalPlan pp) throws IOException {
protected boolean executePlan(PhysicalPlan pp) throws IOException {
boolean failed = true;
MapReduceLauncher launcher = new MapReduceLauncher();
Launcher launcher = MiniGenericCluster.getLauncher();
PigStats stats = null;
try {
stats = launcher.launchPig(pp, "execute", myPig.getPigContext());
Expand Down
21 changes: 9 additions & 12 deletions test/org/apache/pig/test/TestPOPartialAggPlan.java
Expand Up @@ -23,28 +23,28 @@

import java.util.Iterator;

import org.apache.pig.ExecType;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
import org.apache.pig.impl.PigContext;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

/**
* Test POPartialAgg runtime
*/
@Ignore
public class TestPOPartialAggPlan {
private static PigContext pc;
private static PigServer ps;
protected static PigContext pc;
protected static PigServer ps;

@Before
public void setUp() throws ExecException {
ps = new PigServer(ExecType.LOCAL);
public void setUp() throws Exception {
ps = new PigServer(Util.getLocalTestMode());
pc = ps.getPigContext();
pc.connect();
}
Expand Down Expand Up @@ -89,7 +89,7 @@ private Object findPOPartialAgg(MROperPlan mrp) {
return findPOPartialAgg(mapPlan);
}

private String getGByQuery() {
protected String getGByQuery() {
return "l = load 'x' as (a,b,c);" +
"g = group l by a;" +
"f = foreach g generate group, COUNT(l.b);";
Expand Down Expand Up @@ -122,8 +122,8 @@ public void testMapAggNotCombinable() throws Exception{
assertNull("POPartialAgg should be absent", findPOPartialAgg(mrp));
}

private PhysicalOperator findPOPartialAgg(PhysicalPlan mapPlan) {
Iterator<PhysicalOperator> it = mapPlan.iterator();
protected PhysicalOperator findPOPartialAgg(PhysicalPlan plan) {
Iterator<PhysicalOperator> it = plan.iterator();
while(it.hasNext()){
PhysicalOperator op = it.next();
if(op instanceof POPartialAgg){
Expand All @@ -132,7 +132,4 @@ private PhysicalOperator findPOPartialAgg(PhysicalPlan mapPlan) {
}
return null;
}



}

0 comments on commit 4ecb3c6

Please sign in to comment.