From e391b87215d139be0309e1b120dda412b70d9e9c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EA=B9=80=ED=98=95=EC=A4=80?= Date: Tue, 22 Jul 2014 20:23:55 +0900 Subject: [PATCH] TAJO-972: Broadcast join with left outer join returns duplicated rows. --- .../engine/planner/global/GlobalPlanner.java | 4 +- .../planner/global/TestBroadcastJoinPlan.java | 94 +++++++++++-------- .../tajo/engine/query/TestJoinBroadcast.java | 47 +++++++++- 3 files changed, 101 insertions(+), 44 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java index 69ecd022cf..2daf799a9c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/GlobalPlanner.java @@ -293,7 +293,7 @@ private ExecutionBlock buildJoinPlan(GlobalPlanContext context, JoinNode joinNod // Checking Left Side of Join if (ScanNode.isScanNode(leftNode)) { ScanNode scanNode = (ScanNode)leftNode; - if (getTableVolume(scanNode) >= broadcastThreshold) { + if (joinNode.getJoinType() == JoinType.LEFT_OUTER || getTableVolume(scanNode) >= broadcastThreshold) { numLargeTables++; } else { leftBroadcast = true; @@ -306,7 +306,7 @@ private ExecutionBlock buildJoinPlan(GlobalPlanContext context, JoinNode joinNod // Checking Right Side OF Join if (ScanNode.isScanNode(rightNode)) { ScanNode scanNode = (ScanNode)rightNode; - if (getTableVolume(scanNode) >= broadcastThreshold) { + if (joinNode.getJoinType() == JoinType.RIGHT_OUTER || getTableVolume(scanNode) >= broadcastThreshold) { numLargeTables++; } else { rightBroadcast = true; diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java index fd07ae4494..ec39609862 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java @@ -495,11 +495,13 @@ public final void testLeftOuterJoinCase1() throws IOException, PlanningException // ((((default.small1 ⟕ default.small2) ⟕ default.small3) ⟕ default.large1) ⟕ default.large2) /* - |-eb_1402495213549_0000_000007 - |-eb_1402495213549_0000_000006 (GROUP BY) - |-eb_1402495213549_0000_000005 (JOIN) - |-eb_1402495213549_0000_000004 (LEAF, large2) - |-eb_1402495213549_0000_000003 (LEAF, broadcast JOIN small1, small2, small3, large1) + |-eb_1406022243130_0000_000009 + |-eb_1406022243130_0000_000008 + |-eb_1406022243130_0000_000007 (join) + |-eb_1406022243130_0000_000006 (scan large2) + |-eb_1406022243130_0000_000005 (join) + |-eb_1406022243130_0000_000004 (scan large1) + |-eb_1406022243130_0000_000003 (scan small1, broadcast join small2, small3) */ ExecutionBlockCursor ebCursor = new ExecutionBlockCursor(masterPlan); @@ -508,9 +510,9 @@ public final void testLeftOuterJoinCase1() throws IOException, PlanningException ExecutionBlock eb = ebCursor.nextBlock(); if(index == 0) { Collection broadcastTables = eb.getBroadcastTables(); - assertEquals(3, broadcastTables.size()); + assertEquals(2, broadcastTables.size()); - assertTrue(broadcastTables.contains("default.small1")); + assertTrue(!broadcastTables.contains("default.small1")); assertTrue(broadcastTables.contains("default.small2")); assertTrue(broadcastTables.contains("default.small3")); } else if(index == 1 || index == 2 || index == 3) { @@ -520,7 +522,7 @@ public final void testLeftOuterJoinCase1() throws IOException, PlanningException index++; } - assertEquals(5, index); + assertEquals(7, index); } @Test @@ -712,9 +714,9 @@ public final void testLeftOuterJoinCase4() throws IOException, PlanningException globalPlanner.build(masterPlan); /* - |-eb_1402500846700_0000_000007 - |-eb_1402500846700_0000_000006 - |-eb_1402500846700_0000_000005 (LEAF, broadcast join small1, small2, small3) + |-eb_1406022971444_0000_000005 + |-eb_1406022971444_0000_000004 (group by) + |-eb_1406022971444_0000_000003 (scan small1, broadcast join small2, small3) */ ExecutionBlockCursor ebCursor = new ExecutionBlockCursor(masterPlan); @@ -735,7 +737,10 @@ public final void testLeftOuterJoinCase4() throws IOException, PlanningException assertEquals("default.small2", scanNode.getCanonicalName()); Collection broadcastTables = eb.getBroadcastTables(); - assertEquals(3, broadcastTables.size()); + assertEquals(2, broadcastTables.size()); + + assertTrue(broadcastTables.contains("default.small2")); + assertTrue(broadcastTables.contains("default.small3")); } else if(index == 1) { Collection broadcastTables = eb.getBroadcastTables(); assertEquals(0, broadcastTables.size()); @@ -769,9 +774,11 @@ public final void testLeftOuterJoinCase5() throws IOException, PlanningException //(((default.small1 ⟕ default.small2) ⟕ default.large1) ⟕ default.small3) /* - |-eb_1402642709028_0000_000005 - |-eb_1402642709028_0000_000004 (GROUP BY) - |-eb_1402642709028_0000_000003 (LEAF, broadcast JOIN small1, small2, small3, large1) + |-eb_1406023347983_0000_000007 + |-eb_1406023347983_0000_000006 + |-eb_1406023347983_0000_000005 (join, broadcast small3) + |-eb_1406023347983_0000_000004 (scan large1) + |-eb_1406023347983_0000_000003 (scan small1, broadcast join small2) */ ExecutionBlockCursor ebCursor = new ExecutionBlockCursor(masterPlan); @@ -780,19 +787,20 @@ public final void testLeftOuterJoinCase5() throws IOException, PlanningException ExecutionBlock eb = ebCursor.nextBlock(); if(index == 0) { Collection broadcastTables = eb.getBroadcastTables(); - assertEquals(3, broadcastTables.size()); - - assertTrue(broadcastTables.contains("default.small1")); + assertEquals(1, broadcastTables.size()); assertTrue(broadcastTables.contains("default.small2")); + } else if (index == 2) { + Collection broadcastTables = eb.getBroadcastTables(); + assertEquals(1, broadcastTables.size()); assertTrue(broadcastTables.contains("default.small3")); - } else if(index == 1 || index == 2 || index == 3) { + } else if(index == 1 || index == 3) { Collection broadcastTables = eb.getBroadcastTables(); assertEquals(0, broadcastTables.size()); } index++; } - assertEquals(3, index); + assertEquals(5, index); } @Test @@ -820,11 +828,13 @@ public final void testLeftOuterJoinCase6() throws IOException, PlanningException // ((((default.small1 ⟕ default.small2) ⟕ default.large1) ⟕ default.large2) ⟕ default.small3) /* - |-eb_1404125948432_0000_000007 - |-eb_1404125948432_0000_000006 - |-eb_1404125948432_0000_000005 (JOIN broadcast small3) - |-eb_1404125948432_0000_000004 (LEAF, scan large2) - |-eb_1404125948432_0000_000003 (LEAF, scan large1, broadcast small1, small2) + |-eb_1406023537578_0000_000009 + |-eb_1406023537578_0000_000008 + |-eb_1406023537578_0000_000007 (join, broadcast small3) + |-eb_1406023537578_0000_000006 (scan large2) + |-eb_1406023537578_0000_000005 (join) + |-eb_1406023537578_0000_000004 (scan large1) + |-eb_1406023537578_0000_000003 (scan small1, broadcast join small2) */ ExecutionBlockCursor ebCursor = new ExecutionBlockCursor(masterPlan); int index = 0; @@ -835,26 +845,34 @@ public final void testLeftOuterJoinCase6() throws IOException, PlanningException assertEquals(NodeType.JOIN, node.getType()); JoinNode joinNode = (JoinNode)node; - JoinNode joinNode2 = joinNode.getLeftChild(); + ScanNode scanNode1 = joinNode.getLeftChild(); ScanNode scanNode2 = joinNode.getRightChild(); - assertEquals("default.large1", scanNode2.getCanonicalName()); - - ScanNode scanNode3 = joinNode2.getLeftChild(); - ScanNode scanNode4 = joinNode2.getRightChild(); - assertEquals("default.small1", scanNode3.getCanonicalName()); - assertEquals("default.small2", scanNode4.getCanonicalName()); + assertEquals("default.small1", scanNode1.getCanonicalName()); + assertEquals("default.small2", scanNode2.getCanonicalName()); Collection broadcastTables = eb.getBroadcastTables(); - assertEquals(2, broadcastTables.size()); + assertEquals(1, broadcastTables.size()); + assertTrue(broadcastTables.contains("default.small2")); } else if (index == 1) { LogicalNode node = eb.getPlan(); assertEquals(NodeType.SCAN, node.getType()); - ScanNode scanNode = (ScanNode)node; + ScanNode scanNode = (ScanNode) node; + assertEquals("default.large1", scanNode.getCanonicalName()); + + Collection broadcastTables = eb.getBroadcastTables(); + assertEquals(0, broadcastTables.size()); + } else if (index == 2) { + LogicalNode node = eb.getPlan(); + assertEquals(NodeType.JOIN, node.getType()); + } else if (index == 3) { + LogicalNode node = eb.getPlan(); + assertEquals(NodeType.SCAN, node.getType()); + ScanNode scanNode = (ScanNode) node; assertEquals("default.large2", scanNode.getCanonicalName()); Collection broadcastTables = eb.getBroadcastTables(); assertEquals(0, broadcastTables.size()); - } else if(index == 2) { + } else if(index == 4) { LogicalNode node = eb.getPlan(); assertEquals(NodeType.GROUP_BY, node.getType()); @@ -866,8 +884,8 @@ public final void testLeftOuterJoinCase6() throws IOException, PlanningException ScanNode scanNode2 = joinNode1.getLeftChild(); ScanNode scanNode3 = joinNode1.getRightChild(); - assertTrue(scanNode2.getCanonicalName().indexOf("0000_000003") > 0); - assertTrue(scanNode3.getCanonicalName().indexOf("0000_000004") > 0); + assertTrue(scanNode2.getCanonicalName().indexOf("0000_000005") > 0); + assertTrue(scanNode3.getCanonicalName().indexOf("0000_000006") > 0); Collection broadcastTables = eb.getBroadcastTables(); assertEquals(1, broadcastTables.size()); @@ -875,7 +893,7 @@ public final void testLeftOuterJoinCase6() throws IOException, PlanningException index++; } - assertEquals(5, index); + assertEquals(7, index); } @Test diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java index e01b3c5442..9cc65bc8de 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path; import org.apache.tajo.*; import org.apache.tajo.catalog.*; +import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.Datum; import org.apache.tajo.datum.Int4Datum; @@ -32,11 +33,9 @@ import org.apache.tajo.engine.planner.logical.NodeType; import org.apache.tajo.jdbc.TajoResultSet; import org.apache.tajo.master.querymaster.QueryMasterTask; -import org.apache.tajo.storage.Appender; -import org.apache.tajo.storage.StorageManagerFactory; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.VTuple; +import org.apache.tajo.storage.*; import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.worker.TajoWorker; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -576,4 +575,44 @@ private void createMultiFile(String tableName, int numRowsEachFile, TupleCreator appender.flush(); appender.close(); } + + @Test + public final void testLeftOuterJoinLeftSideSmallTable() throws Exception { + KeyValueSet tableOptions = new KeyValueSet(); + tableOptions.put(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + tableOptions.put(StorageConstants.CSVFILE_NULL, "\\\\N"); + + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("name", Type.TEXT); + String[] data = new String[]{ "1000000|a", "1000001|b", "2|c", "3|d", "4|e" }; + TajoTestingCluster.createTable("table1", schema, tableOptions, data, 1); + + data = new String[10000]; + for (int i = 0; i < data.length; i++) { + data[i] = i + "|" + "this is testLeftOuterJoinLeftSideSmallTabletestLeftOuterJoinLeftSideSmallTable" + i; + } + TajoTestingCluster.createTable("table_large", schema, tableOptions, data, 2); + + try { + ResultSet res = executeString( + "select a.id, b.name from table1 a left outer join table_large b on a.id = b.id order by a.id" + ); + + String expected = "id,name\n" + + "-------------------------------\n" + + "2,this is testLeftOuterJoinLeftSideSmallTabletestLeftOuterJoinLeftSideSmallTable2\n" + + "3,this is testLeftOuterJoinLeftSideSmallTabletestLeftOuterJoinLeftSideSmallTable3\n" + + "4,this is testLeftOuterJoinLeftSideSmallTabletestLeftOuterJoinLeftSideSmallTable4\n" + + "1000000,null\n" + + "1000001,null\n"; + + assertEquals(expected, resultSetToString(res)); + + cleanupQuery(res); + } finally { + executeString("DROP TABLE table1 PURGE").close(); + executeString("DROP TABLE table_large PURGE").close(); + } + } }