Permalink
Browse files

Merge pull request #40 from dvryaboy/spork_dvrdev

Fix lists import. Add group cache test.
  • Loading branch information...
dvryaboy committed Jul 13, 2012
2 parents 0df5039 + 9931e89 commit d6d93375dd6bc6525614d39f6e1f5e31bb056879
@@ -49,11 +49,12 @@
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.SparkStats;
-import org.python.google.common.collect.Lists;
import spark.RDD;
import spark.SparkContext;
+import com.google.common.collect.Lists;
+
/**
* @author billg
*/
@@ -192,7 +193,7 @@ private void physicalToRDD(PhysicalPlan plan, PhysicalOperator physicalOperator,
}
LOG.info("Converting operator " + physicalOperator.getClass().getSimpleName()+" "+physicalOperator);
- nextRDD = (RDD<Tuple>)converter.convert(predecessorRdds, physicalOperator);
+ nextRDD = converter.convert(predecessorRdds, physicalOperator);
if (POStore.class.equals(physicalOperator.getClass())) {
return;
@@ -12,7 +12,6 @@
import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
-import org.python.google.common.collect.Lists;
import scala.Tuple2;
import scala.collection.JavaConversions;
@@ -23,6 +22,8 @@
import spark.HashPartitioner;
import spark.RDD;
+import com.google.common.collect.Lists;
+
@SuppressWarnings({ "serial"})
public class GlobalRearrangeConverter implements POConverter<Tuple, Tuple, POGlobalRearrange> {
private static final Log LOG = LogFactory.getLog(GlobalRearrangeConverter.class);
@@ -74,6 +75,7 @@
private static class GetKeyFunction extends AbstractFunction1<Tuple, Object> implements Serializable {
+ @Override
public Object apply(Tuple t) {
try {
LOG.debug("GetKeyFunction in "+t);
@@ -89,6 +91,7 @@ public Object apply(Tuple t) {
private static class GroupTupleFunction extends AbstractFunction1<Tuple2<Object, Seq<Tuple>>, Tuple> implements Serializable {
+ @Override
public Tuple apply(Tuple2<Object, Seq<Tuple>> v1) {
try {
LOG.debug("GroupTupleFunction in "+v1);
@@ -1,5 +1,10 @@
package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
@@ -14,17 +19,14 @@
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.util.ObjectSerializer;
-import org.python.google.common.collect.Lists;
+
import scala.Function1;
import scala.Tuple2;
import scala.runtime.AbstractFunction1;
import spark.RDD;
import spark.SparkContext;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
+import com.google.common.collect.Lists;
/**
* Converter that loads data via POLoad and converts it to RRD&lt;Tuple>. Abuses the interface a bit
@@ -63,12 +65,13 @@ public LoadConverter(PigContext pigContext, PhysicalPlan physicalPlan, SparkCont
Text.class, Tuple.class, loadJobConf);
// map to get just RDD<Tuple>
- return (RDD<Tuple>)hadoopRDD.map(TO_TUPLE_FUNCTION, SparkUtil.getManifest(Tuple.class));
+ return hadoopRDD.map(TO_TUPLE_FUNCTION, SparkUtil.getManifest(Tuple.class));
}
private static class ToTupleFunction extends AbstractFunction1<Tuple2<Text, Tuple>, Tuple>
implements Function1<Tuple2<Text, Tuple>, Tuple>, Serializable {
+ @Override
public Tuple apply(Tuple2<Text, Tuple> v1) {
return v1._2();
}
@@ -16,13 +16,14 @@
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.util.ObjectSerializer;
-import org.python.google.common.collect.Lists;
import scala.Tuple2;
import scala.runtime.AbstractFunction1;
import spark.PairRDDFunctions;
import spark.RDD;
+import com.google.common.collect.Lists;
+
/**
* Converter that takes a POStore and stores it's content.
*
@@ -625,6 +625,14 @@ public void testCachingJoin() throws Exception {
"STORE A INTO 'output' using mock.Storage;");
}
+ @Test
+ public void testCachingGroup() throws Exception {
+ testCaching("A = LOAD 'input' using mock.Storage; " +
+ "A = GROUP A by LOWER($0); " +
+ "CACHE A; " +
+ "STORE A INTO 'output' using mock.Storage;");
+ }
+
@Test
public void testIgnoreWrongUDFCache() throws Exception {
testIgnoreCache(

0 comments on commit d6d9337

Please sign in to comment.