Permalink
Browse files

Merge pull request #32 from dvryaboy/spork_dvrdev

Join Caching. Cache key generation fix.
  • Loading branch information...
2 parents eb58945 + c88bf62 commit 587eadb3dc5edfb0c993c5cf33815f3e95ab2df1 @billonahill billonahill committed Jul 12, 2012
@@ -291,7 +291,7 @@ private Result getNext() throws ExecException {
// so that udf gets called both when isAccumStarted
// is first true and then set to false, even
//when the input relation is empty.
- // so the STATUS_EOP has to be sent from POUserFunc,
+ // so the STATUS_EOP has to be sent from POUserFunc,
// after the results have been sent.
result.result = null;
result.returnStatus = POStatus.STATUS_EOP;
@@ -481,7 +481,7 @@ public Boolean isAsynchronous() {
@Override
public String name() {
- return "POUserFunc" + "(" + func.getClass().getName() + ")" + "[" + DataType.findTypeName(resultType) + "]" + " - " + mKey.toString();
+ return "POUserFunc" + "(" + funcSpec.toString() + ")" + "[" + DataType.findTypeName(resultType) + "]" + " - " + mKey.toString();
}
@Override
@@ -555,16 +555,16 @@ public void setAccumStart() {
public void setResultType(byte resultType) {
this.resultType = resultType;
}
-
+
@Override
public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
return (Tuple) out;
}
-
+
public EvalFunc getFunc() {
return func;
}
-
+
public void setSignature(String signature) {
this.signature = signature;
if (this.func!=null) {
@@ -20,6 +20,7 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import org.apache.commons.logging.Log;
@@ -34,6 +35,7 @@
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.Pair;
import org.apache.pig.pen.util.ExampleTuple;
public class POCache extends PhysicalOperator {
@@ -121,6 +123,9 @@ public String computeCacheKey() throws IOException {
}
private String computeRawCacheKey(List<PhysicalOperator> preds) throws IOException {
+ if (preds == null) {
+ return "";
+ }
StringBuilder sb = new StringBuilder();
for (PhysicalOperator operator : preds) {
if (operator instanceof POLoad) {
@@ -157,12 +162,41 @@ private String computeRawCacheKey(List<PhysicalOperator> preds) throws IOExcepti
return null;
} else {
sb.append(inputKey);
- LOG.info("Input key: " + inputKey);
+ }
+ } else if (operator instanceof POLocalRearrange) {
+ POLocalRearrange localRearrange = (POLocalRearrange) operator;
+ sb.append("LocRearrange");
+ sb.append("ProjCol");
+ for (Map.Entry<Integer, Integer> entry : localRearrange.getProjectedColsMap().entrySet()) {
+ sb.append(entry.getKey() + "+" + entry.getValue());
+ }
+ sb.append("SecProjCol");
+ for (Map.Entry<Integer, Integer> entry : localRearrange.getSecondaryProjectedColsMap().entrySet()) {
+ sb.append(entry.getKey() + "+" + entry.getValue());
+ }
+ sb.append(localRearrange.getIndex());
+ sb.append(localRearrange.getKeyType());
+ for (PhysicalPlan plan : localRearrange.getPlans()) {
+ sb.append(innerPlanKey(plan));
+ }
+ } else if (operator instanceof POGlobalRearrange) {
+ sb.append("POGLOBALREARRANGE");
+ } else if (operator instanceof POPackage) {
+ POPackage pkg = (POPackage) operator;
+ sb.append("POPakage");
+ for (Map.Entry<Integer, Pair<Boolean, Map<Integer, Integer>>> entry : pkg.getKeyInfo().entrySet()) {
+ sb.append(entry.getKey()).append("-").append(entry.getValue().first);
+ sb.append("->");
+ for (Map.Entry<Integer, Integer> valentry : entry.getValue().second.entrySet()) {
+ sb.append(valentry.getKey()).append("-").append(valentry.getValue());
+ }
+ sb.append(".");
}
} else {
LOG.info("Don't know how to generate cache key for " + operator.getClass() + "; not caching");
return null;
}
+ sb.append(computeRawCacheKey(operator.getInputs()));
}
return sb.toString();
}
@@ -11,6 +11,8 @@
import java.util.List;
import java.util.Properties;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.log4j.Level;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
@@ -19,11 +21,13 @@
import org.apache.pig.builtin.mock.Storage;
import org.apache.pig.builtin.mock.Storage.Data;
import org.apache.pig.data.Tuple;
+import org.junit.Assert;
import org.junit.Test;
public class TestSpark {
private static final ExecType MODE = ExecType.SPARK;
+ private static final Log LOG = LogFactory.getLog(TestSpark.class);
static {
org.apache.log4j.Logger.getLogger("org.apache.pig.backend.hadoop.executionengine.spark").setLevel(Level.DEBUG);
@@ -97,7 +101,7 @@ public void testGroupBy() throws Exception {
Arrays.asList(
tuple("key1", bag(tuple("foo", "key1", "test1"), tuple("bar", "key1", "test2"))),
tuple("key2", bag(tuple("baz", "key2", "test3")))),
- sortByIndex(data.get("output"), 0));
+ sortByIndex(data.get("output"), 0));
}
private List<Tuple> sortByIndex(List<Tuple> out, final int i) {
@@ -106,9 +110,9 @@ public void testGroupBy() throws Exception {
@Override
public int compare(Tuple o1, Tuple o2) {
try {
- Comparable c1 = (Comparable)o1.get(i);
- Comparable c2 = (Comparable)o2.get(i);
- return c1.compareTo(c2);
+ Comparable c1 = (Comparable)o1.get(i);
+ Comparable c2 = (Comparable)o2.get(i);
+ return c1.compareTo(c2);
} catch (ExecException e) {
throw new RuntimeException(e);
}
@@ -288,7 +292,7 @@ public void testCoGroup() throws Exception {
tuple(2,bag(tuple("foo", 2,"b")),bag(tuple("bar", 2,"f"))),
tuple(3,bag(tuple("foo", 3,"c")),bag())
),
- sortByIndex(data.get("output"), 0));
+ sortByIndex(data.get("output"), 0));
}
@Test
@@ -318,7 +322,7 @@ public void testJoin() throws Exception {
tuple(1, "d", 1, "g"),
tuple(2, "b", 2, "f")
),
- data.get("output"));
+ data.get("output"));
}
@Test
@@ -346,12 +350,78 @@ public void testCachingWithFilter() throws Exception {
"STORE A INTO 'output' using mock.Storage;");
}
+ @Test
+ public void testCachingJoin() throws Exception {
+ testCaching("A = LOAD 'input' using mock.Storage; " +
+ "B = LOAD 'input' using mock.Storage; " +
+ "A = JOIN A by $0, B by LOWER($0); " +
+ "CACHE A; " +
+ "STORE A INTO 'output' using mock.Storage;");
+ }
+
+ @Test
+ public void testIgnoreWrongUDFCache() throws Exception {
+ testIgnoreCache(
+ "A = LOAD 'input' using mock.Storage; " +
+ "B = LOAD 'input' using mock.Storage; " +
+ "A = JOIN A by $0, B by LOWER($0); " +
+ "CACHE A; " +
+ "STORE A INTO 'output' using mock.Storage;",
+ "A = LOAD 'input' using mock.Storage; " +
+ "B = LOAD 'input' using mock.Storage; " +
+ "A = JOIN A by $0, B by UPPER($0); " +
+ "CACHE A; " +
+ "STORE A INTO 'output' using mock.Storage;");
+ }
+
+ @Test
+ public void testIgnoreDiffFilterCache() throws Exception {
+ testIgnoreCache("A = LOAD 'input' using mock.Storage;" +
+ "A = FILTER A by $0 == 'test1';" +
+ "CACHE A;" +
+ "STORE A INTO 'output' using mock.Storage;",
+ "A = LOAD 'input' using mock.Storage;" +
+ "A = FILTER A by $0 == 'test2';" +
+ "CACHE A;" +
+ "STORE A INTO 'output' using mock.Storage;");
+
+ }
+
+ public void testIgnoreCache(String query1, String query2) throws Exception {
+ PigServer pigServer = newPigServer();
+
+ Data data = Storage.resetData(pigServer);
+ data.set("input",
+ tuple("test1"),
+ tuple("test2"));
+
+ pigServer.setBatchOn();
+ pigServer.registerQuery(query1);
+ pigServer.executeBatch();
+
+ List<Tuple> originalOutput = data.get("output");
+ LOG.debug("After first query: " + originalOutput);
+
+ data = Storage.resetData(pigServer);
+ data.set("input",
+ tuple("test3"),
+ tuple("test4"));
+ pigServer.registerQuery(query2);
+ pigServer.executeBatch();
+
+ LOG.debug("After second query: " + data.get("output"));
+
+ Assert.assertFalse(
+ originalOutput.equals(
+ data.get("output")));
+ }
+
/**
* Kind of a hack: To test whether caching is happening, we modify a file on disk after caching
* it in Spark.
*/
private void testCaching(String query) throws Exception {
- PigServer pigServer = newPigServer();
+ PigServer pigServer = newPigServer();
Data data = Storage.resetData(pigServer);
data.set("input",
@@ -362,11 +432,8 @@ private void testCaching(String query) throws Exception {
pigServer.registerQuery(query);
pigServer.executeBatch();
- System.out.println("After first query: " + data.get("output"));
-
- assertEquals(
- Arrays.asList(tuple("test1"), tuple("test2")),
- data.get("output"));
+ LOG.debug("After first query: " + data.get("output"));
+ List<Tuple> originalOutput = data.get("output");
data = Storage.resetData(pigServer);
data.set("input",
@@ -376,10 +443,10 @@ private void testCaching(String query) throws Exception {
pigServer.registerQuery("STORE A INTO 'output' using mock.Storage;");
pigServer.executeBatch();
- System.out.println("After second query: " + data.get("output"));
+ LOG.debug("After second query: " + data.get("output"));
assertEquals(
- Arrays.asList(tuple("test1"), tuple("test2")),
+ originalOutput,
data.get("output"));
}
}

0 comments on commit 587eadb

Please sign in to comment.