From fdba6a66a8bb71fee14468929251cb87c624ff27 Mon Sep 17 00:00:00 2001 From: Keuntae Park Date: Mon, 6 Apr 2015 15:41:38 +0900 Subject: [PATCH 1/2] Implement SortIntersetAllExec, sort based INTERSECT ALL physical operator --- .../planner/physical/SetTupleComparator.java | 68 ++++++ .../physical/SortIntersectAllExec.java | 69 ++++++ .../physical/TestSortIntersectAllExec.java | 224 ++++++++++++++++++ 3 files changed, 361 insertions(+) create mode 100644 tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SetTupleComparator.java create mode 100644 tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortIntersectAllExec.java create mode 100644 tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortIntersectAllExec.java diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SetTupleComparator.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SetTupleComparator.java new file mode 100644 index 0000000000..a8d2bf8178 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SetTupleComparator.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import com.google.common.base.Preconditions; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SortSpec; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.storage.Tuple; + +import java.util.Comparator; + +public class SetTupleComparator implements Comparator { + private int numCompKey; + + private Datum outer; + private Datum inner; + private int compVal; + + public SetTupleComparator(Schema leftschema, Schema rightschema) { + Preconditions.checkArgument(leftschema.size() == rightschema.size(), + "The size of both side schema must be equals, but they are different: " + + leftschema.size() + " and " + rightschema.size()); + + this.numCompKey = leftschema.size(); // because it is guaranteed that the size of both schemas are the same + } + + @Override + public int compare(Tuple outerTuple, Tuple innerTuple) { + for (int i = 0; i < numCompKey; i++) { + outer = (outerTuple == null) ? NullDatum.get() : outerTuple.get(i); + inner = (innerTuple == null) ? NullDatum.get() : innerTuple.get(i); + + if (outer.isNull()) { + // NullDatum can handle comparison with all types of Datum + compVal = outer.compareTo(inner); + } else if(inner.isNull()) { + // NullDatum is greater than any non NullDatums in Tajo + compVal = -1; + } else { + // Both tuple are not NullDatum + compVal = outer.compareTo(inner); + } + + if (compVal != 0) { + return compVal; + } + } + return 0; + } +} \ No newline at end of file diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortIntersectAllExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortIntersectAllExec.java new file mode 100644 index 0000000000..89f3da4222 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortIntersectAllExec.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.SchemaUtil; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.plan.InvalidQueryException; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.worker.TaskAttemptContext; + +import java.io.IOException; +import java.util.Arrays; + +public class SortIntersectAllExec extends BinaryPhysicalExec { + SetTupleComparator comparator; + public SortIntersectAllExec(TaskAttemptContext context, PhysicalExec left, PhysicalExec right) { + super(context, left.getSchema(), right.getSchema(), left, right); + TajoDataTypes.DataType[] leftTypes = SchemaUtil.toDataTypes(left.getSchema()); + TajoDataTypes.DataType[] rightTypes = SchemaUtil.toDataTypes(right.getSchema()); + if (!CatalogUtil.isMatchedFunction(Arrays.asList(leftTypes), Arrays.asList(rightTypes))) { + throw new InvalidQueryException( + "The both schemas are not compatible"); + } + comparator = new SetTupleComparator(left.getSchema(), right.getSchema()); + } + + @Override + public Tuple next() throws IOException { + while (!context.isStopped()) { + Tuple leftTuple = leftChild.next(); + Tuple rightTuple = rightChild.next(); + if (leftTuple == null || rightTuple == null) { + return null; + } + // At this point, Both Tuples are not null + do { + int compVal = comparator.compare(leftTuple, rightTuple); + + if (compVal > 0) { + rightTuple = rightChild.next(); + } else if (compVal < 0) { + leftTuple = leftChild.next(); + } else { + return leftTuple; + } + } while (leftTuple != null && rightTuple != null); + + return null; + } + return null; + } +} diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortIntersectAllExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortIntersectAllExec.java new file mode 100644 index 0000000000..518985e1b9 --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortIntersectAllExec.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.hadoop.fs.Path; +import org.apache.tajo.LocalTajoTestingUtility; +import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.algebra.Expr; +import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.engine.parser.SQLAnalyzer; +import org.apache.tajo.engine.planner.PhysicalPlanner; +import org.apache.tajo.engine.planner.PhysicalPlannerImpl; +import org.apache.tajo.engine.planner.enforce.Enforcer; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.plan.LogicalOptimizer; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.LogicalPlanner; +import org.apache.tajo.plan.PlanningException; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.plan.logical.SortNode; +import org.apache.tajo.plan.util.PlannerUtil; +import org.apache.tajo.storage.*; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.util.TUtil; +import org.apache.tajo.worker.TaskAttemptContext; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; +import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestSortIntersectAllExec { + private TajoConf conf; + private final String TEST_PATH = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestSortIntersectAllExec"; + private TajoTestingCluster util; + private CatalogService catalog; + private SQLAnalyzer analyzer; + private LogicalPlanner planner; + private LogicalOptimizer optimizer; + private Path testDir; + + private TableDesc employee1; + private TableDesc employee2; + + private int[] leftNum = new int[] {1, 2, 3, 3, 9, 9, 3, 0, 3}; + private int[] rightNum = new int[] {3, 7, 3, 5}; + private int[] answerNum = new int[] {3, 3}; // this should be set as leftNum intersect all rightNum + + @Before + public void setUp() throws Exception { + util = new TajoTestingCluster(); + util.initTestDir(); + catalog = util.startCatalogCluster().getCatalog(); + testDir = CommonTestingUtil.getTestDir(TEST_PATH); + catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString()); + catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); + conf = util.getConfiguration(); + + Schema employeeSchema1 = new Schema(); + employeeSchema1.addColumn("managerid", TajoDataTypes.Type.INT4); + employeeSchema1.addColumn("empid", TajoDataTypes.Type.INT4); + employeeSchema1.addColumn("memid", TajoDataTypes.Type.INT4); + employeeSchema1.addColumn("deptname", TajoDataTypes.Type.TEXT); + + TableMeta employeeMeta1 = CatalogUtil.newTableMeta(CatalogProtos.StoreType.CSV); + Path employeePath1 = new Path(testDir, "employee1.csv"); + Appender appender = ((FileStorageManager) StorageManager.getFileStorageManager(conf)) + .getAppender(employeeMeta1, employeeSchema1, employeePath1); + appender.init(); + Tuple tuple = new VTuple(employeeSchema1.size()); + + for (int i : leftNum) { + tuple.put(new Datum[] { + DatumFactory.createInt4(i), + DatumFactory.createInt4(i), // empid [0-8] + DatumFactory.createInt4(10 + i), + DatumFactory.createText("dept_" + i) }); + appender.addTuple(tuple); + } + + appender.flush(); + appender.close(); + employee1 = CatalogUtil.newTableDesc("default.employee1", employeeSchema1, employeeMeta1, employeePath1); + catalog.createTable(employee1); + + Schema employeeSchema2 = new Schema(); + employeeSchema2.addColumn("managerid", TajoDataTypes.Type.INT4); + employeeSchema2.addColumn("empid", TajoDataTypes.Type.INT4); + employeeSchema2.addColumn("memid", TajoDataTypes.Type.INT4); + employeeSchema2.addColumn("deptname", TajoDataTypes.Type.TEXT); + + TableMeta employeeMeta2 = CatalogUtil.newTableMeta(CatalogProtos.StoreType.CSV); + Path employeePath2 = new Path(testDir, "employee2.csv"); + Appender appender2 = ((FileStorageManager) StorageManager.getFileStorageManager(conf)) + .getAppender(employeeMeta2, employeeSchema2, employeePath2); + appender2.init(); + Tuple tuple2 = new VTuple(employeeSchema2.size()); + + for (int i : rightNum) { + tuple2.put(new Datum[]{ + DatumFactory.createInt4(i), + DatumFactory.createInt4(i), // empid [1-9] + DatumFactory.createInt4(10 + i), + DatumFactory.createText("dept_" + i)}); + appender2.addTuple(tuple2); + } + + appender2.flush(); + appender2.close(); + employee2 = CatalogUtil.newTableDesc("default.employee2", employeeSchema2, employeeMeta2, employeePath2); + catalog.createTable(employee2); + + + analyzer = new SQLAnalyzer(); + planner = new LogicalPlanner(catalog); + optimizer = new LogicalOptimizer(conf); + } + + @After + public void tearDown() throws Exception { + util.shutdownCatalogCluster(); + } + + + // relation descriptions + // employee1 (managerid, empid, memid, deptname) + // employee2 (managerid, empid, memid, deptname) + + String[] QUERIES = { + "select * from employee1 as e1, employee2 as e2 where e1.empId = e2.empId" + }; + + @Test + public final void testSortIntersectAll() throws IOException, PlanningException { + FileFragment[] empFrags1 = FileStorageManager.splitNG(conf, "default.e1", employee1.getMeta(), + new Path(employee1.getPath()), Integer.MAX_VALUE); + FileFragment[] empFrags2 = FileStorageManager.splitNG(conf, "default.e2", employee2.getMeta(), + new Path(employee2.getPath()), Integer.MAX_VALUE); + + FileFragment[] merged = TUtil.concat(empFrags1, empFrags2); + + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testSortIntersectAll"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir); + ctx.setEnforcer(new Enforcer()); + Expr expr = analyzer.parse(QUERIES[0]); + LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr); + optimizer.optimize(plan); + LogicalNode rootNode = plan.getRootBlock().getRoot(); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); + + // replace an equal join with sort intersect all . + if (exec instanceof MergeJoinExec) { + MergeJoinExec join = (MergeJoinExec) exec; + exec = new SortIntersectAllExec(ctx, join.getLeftChild(), join.getRightChild()); + } else if (exec instanceof HashJoinExec) { + // we need to sort the results from both left and right children + HashJoinExec join = (HashJoinExec) exec; + SortSpec[] sortSpecsLeft = PlannerUtil.schemaToSortSpecs(join.getLeftChild().getSchema()); + SortSpec[] sortSpecsRight = PlannerUtil.schemaToSortSpecs(join.getRightChild().getSchema()); + + SortNode leftSortNode = LogicalPlan.createNodeWithoutPID(SortNode.class); + leftSortNode.setSortSpecs(sortSpecsLeft); + leftSortNode.setInSchema(join.getLeftChild().getSchema()); + leftSortNode.setOutSchema(join.getLeftChild().getSchema()); + ExternalSortExec leftSort = new ExternalSortExec(ctx, leftSortNode, join.getLeftChild()); + + SortNode rightSortNode = LogicalPlan.createNodeWithoutPID(SortNode.class); + rightSortNode.setSortSpecs(sortSpecsRight); + rightSortNode.setInSchema(join.getRightChild().getSchema()); + rightSortNode.setOutSchema(join.getRightChild().getSchema()); + ExternalSortExec rightSort = new ExternalSortExec(ctx, rightSortNode, join.getRightChild()); + + exec = new SortIntersectAllExec(ctx, leftSort, rightSort); + } + + Tuple tuple; + int count = 0; + int i = 0; + exec.init(); + + while ((tuple = exec.next()) != null) { + count++; + int answer = answerNum[i]; + assertTrue(answer == tuple.get(0).asInt4()); + assertTrue(answer == tuple.get(1).asInt4()); + assertTrue(10 + answer == tuple.get(2).asInt4()); + assertTrue(("dept_" + answer).equals(tuple.get(3).asChars())); + + i++; + } + exec.close(); + assertEquals(answerNum.length , count); + } +} From 734cd261ac81003203f2b362d844c51f6de55ce9 Mon Sep 17 00:00:00 2001 From: Keuntae Park Date: Wed, 8 Apr 2015 14:31:27 +0900 Subject: [PATCH 2/2] I think it is better to implement INTERSECT and INTERSECT ALL as one physical operator rather than to implement INTERSECT AS INTERSECT ALL + distinct --- ...ectAllExec.java => SortIntersectExec.java} | 28 ++++++- ...llExec.java => TestSortIntersectExec.java} | 80 +++++++++++++++++-- 2 files changed, 98 insertions(+), 10 deletions(-) rename tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/{SortIntersectAllExec.java => SortIntersectExec.java} (75%) rename tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/{TestSortIntersectAllExec.java => TestSortIntersectExec.java} (71%) diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortIntersectAllExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortIntersectExec.java similarity index 75% rename from tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortIntersectAllExec.java rename to tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortIntersectExec.java index 89f3da4222..db77f85b28 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortIntersectAllExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortIntersectExec.java @@ -28,9 +28,11 @@ import java.io.IOException; import java.util.Arrays; -public class SortIntersectAllExec extends BinaryPhysicalExec { +public class SortIntersectExec extends BinaryPhysicalExec { SetTupleComparator comparator; - public SortIntersectAllExec(TaskAttemptContext context, PhysicalExec left, PhysicalExec right) { + Tuple lastReturned = null; + boolean isDistinct = false; + public SortIntersectExec(TaskAttemptContext context, PhysicalExec left, PhysicalExec right, boolean isDistinct) { super(context, left.getSchema(), right.getSchema(), left, right); TajoDataTypes.DataType[] leftTypes = SchemaUtil.toDataTypes(left.getSchema()); TajoDataTypes.DataType[] rightTypes = SchemaUtil.toDataTypes(right.getSchema()); @@ -39,16 +41,28 @@ public SortIntersectAllExec(TaskAttemptContext context, PhysicalExec left, Physi "The both schemas are not compatible"); } comparator = new SetTupleComparator(left.getSchema(), right.getSchema()); + this.isDistinct = isDistinct; } @Override public Tuple next() throws IOException { - while (!context.isStopped()) { + if (!context.isStopped()) { Tuple leftTuple = leftChild.next(); Tuple rightTuple = rightChild.next(); if (leftTuple == null || rightTuple == null) { return null; } + + // handling routine for INTERSECT without ALL + // it eliminates duplicated return of the same row values + if (isDistinct && lastReturned != null) { + while (comparator.compare(leftTuple, lastReturned) == 0) { + leftTuple = leftChild.next(); + if (leftTuple == null) + return null; + } + } + // At this point, Both Tuples are not null do { int compVal = comparator.compare(leftTuple, rightTuple); @@ -58,6 +72,7 @@ public Tuple next() throws IOException { } else if (compVal < 0) { leftTuple = leftChild.next(); } else { + lastReturned = leftTuple; return leftTuple; } } while (leftTuple != null && rightTuple != null); @@ -66,4 +81,11 @@ public Tuple next() throws IOException { } return null; } + + @Override + public void rescan() throws IOException { + super.rescan(); + + lastReturned = null; + } } diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortIntersectAllExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortIntersectExec.java similarity index 71% rename from tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortIntersectAllExec.java rename to tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortIntersectExec.java index 518985e1b9..9d052fa2b8 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortIntersectAllExec.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortIntersectExec.java @@ -56,9 +56,9 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -public class TestSortIntersectAllExec { +public class TestSortIntersectExec { private TajoConf conf; - private final String TEST_PATH = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestSortIntersectAllExec"; + private final String TEST_PATH = TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/TestSortIntersectExec"; private TajoTestingCluster util; private CatalogService catalog; private SQLAnalyzer analyzer; @@ -71,7 +71,8 @@ public class TestSortIntersectAllExec { private int[] leftNum = new int[] {1, 2, 3, 3, 9, 9, 3, 0, 3}; private int[] rightNum = new int[] {3, 7, 3, 5}; - private int[] answerNum = new int[] {3, 3}; // this should be set as leftNum intersect all rightNum + private int[] answerAllNum = new int[] {3, 3}; // this should be set as leftNum intersect all rightNum + order by + private int[] answerDistinctNum = new int[] {3}; // this should be set as leftNum intersect rightNum + order by @Before public void setUp() throws Exception { @@ -181,7 +182,7 @@ public final void testSortIntersectAll() throws IOException, PlanningException { // replace an equal join with sort intersect all . if (exec instanceof MergeJoinExec) { MergeJoinExec join = (MergeJoinExec) exec; - exec = new SortIntersectAllExec(ctx, join.getLeftChild(), join.getRightChild()); + exec = new SortIntersectExec(ctx, join.getLeftChild(), join.getRightChild(), false); } else if (exec instanceof HashJoinExec) { // we need to sort the results from both left and right children HashJoinExec join = (HashJoinExec) exec; @@ -200,7 +201,7 @@ public final void testSortIntersectAll() throws IOException, PlanningException { rightSortNode.setOutSchema(join.getRightChild().getSchema()); ExternalSortExec rightSort = new ExternalSortExec(ctx, rightSortNode, join.getRightChild()); - exec = new SortIntersectAllExec(ctx, leftSort, rightSort); + exec = new SortIntersectExec(ctx, leftSort, rightSort, false); } Tuple tuple; @@ -210,7 +211,7 @@ public final void testSortIntersectAll() throws IOException, PlanningException { while ((tuple = exec.next()) != null) { count++; - int answer = answerNum[i]; + int answer = answerAllNum[i]; assertTrue(answer == tuple.get(0).asInt4()); assertTrue(answer == tuple.get(1).asInt4()); assertTrue(10 + answer == tuple.get(2).asInt4()); @@ -219,6 +220,71 @@ public final void testSortIntersectAll() throws IOException, PlanningException { i++; } exec.close(); - assertEquals(answerNum.length , count); + assertEquals(answerAllNum.length , count); + } + + @Test + public final void testSortIntersect() throws IOException, PlanningException { + FileFragment[] empFrags1 = FileStorageManager.splitNG(conf, "default.e1", employee1.getMeta(), + new Path(employee1.getPath()), Integer.MAX_VALUE); + FileFragment[] empFrags2 = FileStorageManager.splitNG(conf, "default.e2", employee2.getMeta(), + new Path(employee2.getPath()), Integer.MAX_VALUE); + + FileFragment[] merged = TUtil.concat(empFrags1, empFrags2); + + Path workDir = CommonTestingUtil.getTestDir(TajoTestingCluster.DEFAULT_TEST_DIRECTORY + "/testSortIntersect"); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), + LocalTajoTestingUtility.newTaskAttemptId(), merged, workDir); + ctx.setEnforcer(new Enforcer()); + Expr expr = analyzer.parse(QUERIES[0]); + LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr); + optimizer.optimize(plan); + LogicalNode rootNode = plan.getRootBlock().getRoot(); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf); + PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); + + // replace an equal join with sort intersect all . + if (exec instanceof MergeJoinExec) { + MergeJoinExec join = (MergeJoinExec) exec; + exec = new SortIntersectExec(ctx, join.getLeftChild(), join.getRightChild(), true); + } else if (exec instanceof HashJoinExec) { + // we need to sort the results from both left and right children + HashJoinExec join = (HashJoinExec) exec; + SortSpec[] sortSpecsLeft = PlannerUtil.schemaToSortSpecs(join.getLeftChild().getSchema()); + SortSpec[] sortSpecsRight = PlannerUtil.schemaToSortSpecs(join.getRightChild().getSchema()); + + SortNode leftSortNode = LogicalPlan.createNodeWithoutPID(SortNode.class); + leftSortNode.setSortSpecs(sortSpecsLeft); + leftSortNode.setInSchema(join.getLeftChild().getSchema()); + leftSortNode.setOutSchema(join.getLeftChild().getSchema()); + ExternalSortExec leftSort = new ExternalSortExec(ctx, leftSortNode, join.getLeftChild()); + + SortNode rightSortNode = LogicalPlan.createNodeWithoutPID(SortNode.class); + rightSortNode.setSortSpecs(sortSpecsRight); + rightSortNode.setInSchema(join.getRightChild().getSchema()); + rightSortNode.setOutSchema(join.getRightChild().getSchema()); + ExternalSortExec rightSort = new ExternalSortExec(ctx, rightSortNode, join.getRightChild()); + + exec = new SortIntersectExec(ctx, leftSort, rightSort, true); + } + + Tuple tuple; + int count = 0; + int i = 0; + exec.init(); + + while ((tuple = exec.next()) != null) { + count++; + int answer = answerDistinctNum[i]; + assertTrue(answer == tuple.get(0).asInt4()); + assertTrue(answer == tuple.get(1).asInt4()); + assertTrue(10 + answer == tuple.get(2).asInt4()); + assertTrue(("dept_" + answer).equals(tuple.get(3).asChars())); + + i++; + } + exec.close(); + assertEquals(answerDistinctNum.length , count); } }