Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Fix lists import. Add group cache test.

  • Loading branch information...
commit 60af1923e66eec1df266276dd8a171c7c64bf545 1 parent 78dfef6
@dvryaboy authored
View
26 src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
@@ -1,13 +1,14 @@
package org.apache.pig.backend.hadoop.executionengine.spark;
-import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
-import java.util.*;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
import org.apache.pig.PigException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.Launcher;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler;
@@ -25,26 +26,27 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
-import org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter;
-import org.apache.pig.backend.hadoop.executionengine.spark.converter.POConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.CacheConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.FilterConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.ForEachConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.GlobalRearrangeConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.LoadConverter;
-import org.apache.pig.backend.hadoop.executionengine.spark.converter.StoreConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.LocalRearrangeConverter;
-import org.apache.pig.backend.hadoop.executionengine.spark.converter.GlobalRearrangeConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.POConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.PackageConverter;
+import org.apache.pig.backend.hadoop.executionengine.spark.converter.StoreConverter;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.tools.pigstats.*;
-import org.python.google.common.collect.Lists;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.SparkStats;
import spark.RDD;
import spark.SparkContext;
+import com.google.common.collect.Lists;
+
/**
* @author billg
*/
@@ -132,7 +134,7 @@ private static void startSparkIfNeeded() throws PigException {
throw new PigException("MESOS_NATIVE_LIBRARY is not set");
}
}
-
+
// Tell Spark to use Mesos in coarse-grained mode (only affects Spark 0.6+; no impact on others)
System.setProperty("spark.mesos.coarse", "true");
@@ -179,7 +181,7 @@ private void physicalToRDD(PhysicalPlan plan, PhysicalOperator physicalOperator,
}
LOG.info("Converting operator " + physicalOperator.getClass().getSimpleName()+" "+physicalOperator);
- nextRDD = (RDD<Tuple>)converter.convert(predecessorRdds, physicalOperator);
+ nextRDD = converter.convert(predecessorRdds, physicalOperator);
if (POStore.class.equals(physicalOperator.getClass())) {
return;
View
5 ...g/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
@@ -12,7 +12,6 @@
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
-import org.python.google.common.collect.Lists;
import scala.Tuple2;
import scala.collection.JavaConversions;
@@ -23,6 +22,8 @@
import spark.HashPartitioner;
import spark.RDD;
+import com.google.common.collect.Lists;
+
public class GlobalRearrangeConverter implements POConverter<Tuple, Tuple, POGlobalRearrange> {
private static final Log LOG = LogFactory.getLog(GlobalRearrangeConverter.class);
@@ -76,6 +77,7 @@
private static class GetKeyFunction extends AbstractFunction1<Tuple, Object> implements Serializable {
+ @Override
public Object apply(Tuple t) {
try {
LOG.debug("GetKeyFunction in "+t);
@@ -91,6 +93,7 @@ public Object apply(Tuple t) {
private static class GroupTupleFunction extends AbstractFunction1<Tuple2<Object, Seq<Tuple>>, Tuple> implements Serializable {
+ @Override
public Tuple apply(Tuple2<Object, Seq<Tuple>> v1) {
try {
LOG.debug("GroupTupleFunction in "+v1);
View
15 src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
@@ -1,5 +1,10 @@
package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
@@ -14,17 +19,14 @@
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.util.ObjectSerializer;
-import org.python.google.common.collect.Lists;
+
import scala.Function1;
import scala.Tuple2;
import scala.runtime.AbstractFunction1;
import spark.RDD;
import spark.SparkContext;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
+import com.google.common.collect.Lists;
/**
* Converter that loads data via POLoad and converts it to RRD&lt;Tuple>. Abuses the interface a bit
@@ -62,12 +64,13 @@ public LoadConverter(PigContext pigContext, PhysicalPlan physicalPlan, SparkCont
Text.class, Tuple.class, loadJobConf);
// map to get just RDD<Tuple>
- return (RDD<Tuple>)hadoopRDD.map(TO_TUPLE_FUNCTION, SparkUtil.getManifest(Tuple.class));
+ return hadoopRDD.map(TO_TUPLE_FUNCTION, SparkUtil.getManifest(Tuple.class));
}
private static class ToTupleFunction extends AbstractFunction1<Tuple2<Text, Tuple>, Tuple>
implements Function1<Tuple2<Text, Tuple>, Tuple>, Serializable {
+ @Override
public Tuple apply(Tuple2<Text, Tuple> v1) {
return v1._2();
}
View
13 src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
@@ -1,5 +1,10 @@
package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.pig.StoreFuncInterface;
@@ -11,17 +16,14 @@
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.util.ObjectSerializer;
-import org.python.google.common.collect.Lists;
+
import scala.Function1;
import scala.Tuple2;
import scala.runtime.AbstractFunction1;
import spark.PairRDDFunctions;
import spark.RDD;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
+import com.google.common.collect.Lists;
/**
* Converter that takes a POStore and stores it's content.
@@ -77,6 +79,7 @@ private static POStore configureStorer(JobConf jobConf,
private static Text EMPTY_TEXT = new Text();
+ @Override
public Tuple2 apply(Tuple v1) {
return new Tuple2(EMPTY_TEXT, v1);
}
View
9 test/org/apache/pig/spark/TestSpark.java
@@ -8,6 +8,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
+import java.util.Iterator;
import java.util.List;
import java.util.Properties;
@@ -360,6 +361,14 @@ public void testCachingJoin() throws Exception {
}
@Test
+ public void testCachingGroup() throws Exception {
+ testCaching("A = LOAD 'input' using mock.Storage; " +
+ "A = GROUP A by LOWER($0); " +
+ "CACHE A; " +
+ "STORE A INTO 'output' using mock.Storage;");
+ }
+
+ @Test
public void testIgnoreWrongUDFCache() throws Exception {
testIgnoreCache(
"A = LOAD 'input' using mock.Storage; " +
Please sign in to comment.
Something went wrong with that request. Please try again.