From cb69c69a278fe23e54631986ddb7a8cf75db95ac Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Tue, 25 Aug 2015 18:07:15 +0900 Subject: [PATCH 1/5] TAJO-1798: Query execution is not finished even though it actually is done. --- .../src/main/java/org/apache/tajo/querymaster/Query.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java index b09d5fd7f8..6dd7847c92 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java @@ -18,6 +18,9 @@ package org.apache.tajo.querymaster; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; @@ -332,7 +335,7 @@ private QueryHistory makeQueryHistory() { return queryHistory; } - public List getPartitions() { + public ImmutableList getPartitions() { List partitions = new ArrayList(); for(Stage eachStage : getStages()) { if (!eachStage.getPartitions().isEmpty()) { @@ -340,7 +343,7 @@ public List getPartitions() { } } - return partitions; + return ImmutableSet.copyOf(partitions).asList(); } public List getDiagnostics() { From 17907b80492674a96c2bd1fccf16a9c67eacf8df Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Tue, 25 Aug 2015 23:46:39 +0900 Subject: [PATCH 2/5] Update List to Set. --- .../org/apache/tajo/querymaster/Query.java | 54 ++++++++++--------- .../org/apache/tajo/querymaster/Stage.java | 12 ++++- .../apache/tajo/querymaster/TaskAttempt.java | 17 +++--- 3 files changed, 47 insertions(+), 36 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java index 6dd7847c92..bcc44af744 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java @@ -335,15 +335,20 @@ private QueryHistory makeQueryHistory() { return queryHistory; } - public ImmutableList getPartitions() { - List partitions = new ArrayList(); + public Set getPartitions() { + Set partitions = TUtil.newHashSet(); for(Stage eachStage : getStages()) { if (!eachStage.getPartitions().isEmpty()) { partitions.addAll(eachStage.getPartitions()); } } + return partitions; + } - return ImmutableSet.copyOf(partitions).asList(); + public void clearPartitions() { + for(Stage eachStage : getStages()) { + eachStage.clearPartitions(); + } } public List getDiagnostics() { @@ -508,30 +513,31 @@ private QueryState finalizeQuery(Query query, QueryCompletedEvent event) { QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext()); hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(), finalOutputDir); - TableDesc desc = query.getResultDesc(); - - // If there is partitions - List partitions = query.getPartitions(); - if (partitions!= null && !partitions.isEmpty()) { - - String databaseName, simpleTableName; - - if (CatalogUtil.isFQTableName(desc.getName())) { - String[] split = CatalogUtil.splitFQTableName(desc.getName()); - databaseName = split[0]; - simpleTableName = split[1]; + // Add dynamic partitions to catalog for partition table. + if (queryContext.hasOutputTableUri() && queryContext.hasPartition()) { + Set partitions = query.getPartitions(); + if (partitions != null) { + String databaseName, simpleTableName; + + if (CatalogUtil.isFQTableName(tableDesc.getName())) { + String[] split = CatalogUtil.splitFQTableName(tableDesc.getName()); + databaseName = split[0]; + simpleTableName = split[1]; + } else { + databaseName = queryContext.getCurrentDatabase(); + simpleTableName = tableDesc.getName(); + } + + // Store partitions to CatalogStore using alter table statement. + catalog.addPartitions(databaseName, simpleTableName, TUtil.newList(partitions), true); + LOG.info("Added partitions to catalog (total=" + partitions.size() + ")"); } else { - databaseName = queryContext.getCurrentDatabase(); - simpleTableName = desc.getName(); + LOG.info("Can't find partitions for adding."); } - - // Store partitions to CatalogStore using alter table statement. - catalog.addPartitions(databaseName, simpleTableName, partitions, true); - } else { - LOG.info("Can't find partitions for adding."); + query.clearPartitions(); + partitions.clear(); } - - } catch (Exception e) { + } catch (Throwable e) { query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(e))); return QueryState.QUERY_ERROR; } diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java index cf5cdbded0..86f2c7af92 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java @@ -487,8 +487,8 @@ private StageHistory makeStageHistory() { return stageHistory; } - public List getPartitions() { - List partitions = TUtil.newList(); + public Set getPartitions() { + Set partitions = TUtil.newHashSet(); for(Task eachTask : getTasks()) { if (eachTask.getLastAttempt() != null && !eachTask.getLastAttempt().getPartitions().isEmpty()) { @@ -499,6 +499,14 @@ public List getPartitions() { return partitions; } + public void clearPartitions() { + for(Task eachTask : getTasks()) { + if (eachTask.getLastAttempt() != null && !eachTask.getLastAttempt().getPartitions().isEmpty()) { + eachTask.getLastAttempt().getPartitions().clear(); + } + } + } + /** * It finalizes this stage. It is only invoked when the stage is finalizing. */ diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java index f5fcfa7a7f..cda62a4154 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java @@ -38,9 +38,7 @@ import org.apache.tajo.querymaster.Task.PullHost; import org.apache.tajo.util.TUtil; -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.List; +import java.util.*; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -69,7 +67,7 @@ public class TaskAttempt implements EventHandler { private CatalogProtos.TableStatsProto inputStats; private CatalogProtos.TableStatsProto resultStats; - private List partitions; + private Set partitions; protected static final StateMachineFactory @@ -194,8 +192,7 @@ public TaskAttempt(final TaskAttemptScheduleContext scheduleContext, this.writeLock = readWriteLock.writeLock(); stateMachine = stateMachineFactory.make(this); - - this.partitions = TUtil.newList(); + this.partitions = TUtil.newHashSet(); } public TaskAttemptState getState() { @@ -258,12 +255,12 @@ public TableStats getResultStats() { return new TableStats(resultStats); } - public List getPartitions() { + public Set getPartitions() { return partitions; } - public void setPartitions(List partitions) { - this.partitions = partitions; + public void addPartitions(List partitions) { + this.partitions.addAll(partitions); } private void fillTaskStatistics(TaskCompletionReport report) { @@ -407,7 +404,7 @@ public void transition(TaskAttempt taskAttempt, try { if (report.getPartitionsCount() > 0) { - taskAttempt.setPartitions(report.getPartitionsList()); + taskAttempt.addPartitions(report.getPartitionsList()); } taskAttempt.fillTaskStatistics(report); From 984848fa707d6de192342e9190bf0a076cc08fa1 Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Tue, 25 Aug 2015 23:49:26 +0900 Subject: [PATCH 3/5] Remove unused codes --- tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java index bcc44af744..a0b381a9c8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java @@ -18,9 +18,6 @@ package org.apache.tajo.querymaster; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; From 5648679ca39dea8757ad91ad436a071cc18ac24c Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Wed, 26 Aug 2015 15:28:42 +0900 Subject: [PATCH 4/5] Convert List to Set and add unit test case. --- .../engine/query/TestTablePartitions.java | 47 +++++++++++++++++-- .../org/apache/tajo/querymaster/Query.java | 14 +++--- .../org/apache/tajo/querymaster/Stage.java | 1 - 3 files changed, 49 insertions(+), 13 deletions(-) diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java index 6eb28411bb..a7b843fdd3 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java @@ -23,10 +23,7 @@ import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.DeflateCodec; -import org.apache.tajo.QueryId; -import org.apache.tajo.QueryTestCaseBase; -import org.apache.tajo.TajoConstants; -import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.*; import org.apache.tajo.catalog.CatalogService; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Schema; @@ -1298,4 +1295,46 @@ private void verifyPartitionDirectoryFromCatalog(String databaseName, String tab assertEquals(numRows, new Long(rowCount)); } } + + @Test + public final void testDuplicatedPartitions() throws Exception { + String tableName = CatalogUtil.normalizeIdentifier("testDuplicatedPartitions"); + + try { + executeString("CREATE TABLE lineitem2 as select * from lineitem").close(); + + // Execute UNION ALL statement for creating multiple output files. + if (nodeType == NodeType.INSERT) { + executeString( + "create table " + tableName + " (col1 int4, col2 int4) partition by column(key text) ").close(); + + executeString( + "insert overwrite into " + tableName + + " select a.l_orderkey, a.l_partkey, a.l_returnflag from lineitem a union all" + + " select b.l_orderkey, b.l_partkey, b.l_returnflag from lineitem2 b" + ).close(); + } else { + executeString( + "create table " + tableName + "(col1 int4, col2 int4) partition by column(key text) as " + + " select a.l_orderkey, a.l_partkey, a.l_returnflag from lineitem a union all" + + " select b.l_orderkey, b.l_partkey, b.l_returnflag from lineitem2 b" + ).close(); + } + + // If duplicated partitions had been removed, partitions just will contain 'KEY=N' partition and 'KEY=R' + // partition. In previous Query and Stage, duplicated partitions were not deleted because they had been in List. + // If you want to verify duplicated partitions, you need to use List instead of Set with DerbyStore. + List partitions = catalog.getPartitions(DEFAULT_DATABASE_NAME, tableName); + assertEquals(2, partitions.size()); + + PartitionDescProto firstPartition = catalog.getPartition(DEFAULT_DATABASE_NAME, tableName, "key=N"); + assertNotNull(firstPartition); + PartitionDescProto secondPartition = catalog.getPartition(DEFAULT_DATABASE_NAME, tableName, "key=R"); + assertNotNull(secondPartition); + } finally { + executeString("DROP TABLE lineitem2 PURGE"); + executeString("DROP TABLE " + tableName + " PURGE"); + } + } + } diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java index a0b381a9c8..956035354c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java @@ -18,6 +18,7 @@ package org.apache.tajo.querymaster; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; @@ -332,14 +333,12 @@ private QueryHistory makeQueryHistory() { return queryHistory; } - public Set getPartitions() { + public List getPartitions() { Set partitions = TUtil.newHashSet(); for(Stage eachStage : getStages()) { - if (!eachStage.getPartitions().isEmpty()) { - partitions.addAll(eachStage.getPartitions()); - } + partitions.addAll(eachStage.getPartitions()); } - return partitions; + return Lists.newArrayList(partitions); } public void clearPartitions() { @@ -512,7 +511,7 @@ private QueryState finalizeQuery(Query query, QueryCompletedEvent event) { // Add dynamic partitions to catalog for partition table. if (queryContext.hasOutputTableUri() && queryContext.hasPartition()) { - Set partitions = query.getPartitions(); + List partitions = query.getPartitions(); if (partitions != null) { String databaseName, simpleTableName; @@ -526,13 +525,12 @@ private QueryState finalizeQuery(Query query, QueryCompletedEvent event) { } // Store partitions to CatalogStore using alter table statement. - catalog.addPartitions(databaseName, simpleTableName, TUtil.newList(partitions), true); + catalog.addPartitions(databaseName, simpleTableName, partitions, true); LOG.info("Added partitions to catalog (total=" + partitions.size() + ")"); } else { LOG.info("Can't find partitions for adding."); } query.clearPartitions(); - partitions.clear(); } } catch (Throwable e) { query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(e))); diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java index 86f2c7af92..f6c9cdb47e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java @@ -489,7 +489,6 @@ private StageHistory makeStageHistory() { public Set getPartitions() { Set partitions = TUtil.newHashSet(); - for(Task eachTask : getTasks()) { if (eachTask.getLastAttempt() != null && !eachTask.getLastAttempt().getPartitions().isEmpty()) { partitions.addAll(eachTask.getLastAttempt().getPartitions()); From 98beccf60d83c842c39373743b01a7599b6cf731 Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Wed, 26 Aug 2015 16:22:04 +0900 Subject: [PATCH 5/5] Trigger for Travis CI build --- .../java/org/apache/tajo/engine/query/TestTablePartitions.java | 1 - 1 file changed, 1 deletion(-) diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java index a7b843fdd3..952e26a38b 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java @@ -1336,5 +1336,4 @@ public final void testDuplicatedPartitions() throws Exception { executeString("DROP TABLE " + tableName + " PURGE"); } } - }