Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.DeflateCodec;
import org.apache.tajo.QueryId;
import org.apache.tajo.QueryTestCaseBase;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.TajoTestingCluster;
import org.apache.tajo.*;
import org.apache.tajo.catalog.CatalogService;
import org.apache.tajo.catalog.CatalogUtil;
import org.apache.tajo.catalog.Schema;
Expand Down Expand Up @@ -1298,4 +1295,45 @@ private void verifyPartitionDirectoryFromCatalog(String databaseName, String tab
assertEquals(numRows, new Long(rowCount));
}
}

@Test
public final void testDuplicatedPartitions() throws Exception {
String tableName = CatalogUtil.normalizeIdentifier("testDuplicatedPartitions");

try {
executeString("CREATE TABLE lineitem2 as select * from lineitem").close();

// Execute UNION ALL statement for creating multiple output files.
if (nodeType == NodeType.INSERT) {
executeString(
"create table " + tableName + " (col1 int4, col2 int4) partition by column(key text) ").close();

executeString(
"insert overwrite into " + tableName
+ " select a.l_orderkey, a.l_partkey, a.l_returnflag from lineitem a union all"
+ " select b.l_orderkey, b.l_partkey, b.l_returnflag from lineitem2 b"
).close();
} else {
executeString(
"create table " + tableName + "(col1 int4, col2 int4) partition by column(key text) as "
+ " select a.l_orderkey, a.l_partkey, a.l_returnflag from lineitem a union all"
+ " select b.l_orderkey, b.l_partkey, b.l_returnflag from lineitem2 b"
).close();
}

// If duplicated partitions had been removed, partitions just will contain 'KEY=N' partition and 'KEY=R'
// partition. In previous Query and Stage, duplicated partitions were not deleted because they had been in List.
// If you want to verify duplicated partitions, you need to use List instead of Set with DerbyStore.
List<PartitionDescProto> partitions = catalog.getPartitions(DEFAULT_DATABASE_NAME, tableName);
assertEquals(2, partitions.size());

PartitionDescProto firstPartition = catalog.getPartition(DEFAULT_DATABASE_NAME, tableName, "key=N");
assertNotNull(firstPartition);
PartitionDescProto secondPartition = catalog.getPartition(DEFAULT_DATABASE_NAME, tableName, "key=R");
assertNotNull(secondPartition);
} finally {
executeString("DROP TABLE lineitem2 PURGE");
executeString("DROP TABLE " + tableName + " PURGE");
}
}
}
56 changes: 30 additions & 26 deletions tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.tajo.querymaster;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.logging.Log;
Expand Down Expand Up @@ -333,14 +334,17 @@ private QueryHistory makeQueryHistory() {
}

public List<PartitionDescProto> getPartitions() {
List<PartitionDescProto> partitions = new ArrayList<PartitionDescProto>();
Set<PartitionDescProto> partitions = TUtil.newHashSet();
for(Stage eachStage : getStages()) {
if (!eachStage.getPartitions().isEmpty()) {
partitions.addAll(eachStage.getPartitions());
}
partitions.addAll(eachStage.getPartitions());
}
return Lists.newArrayList(partitions);
}

return partitions;
public void clearPartitions() {
for(Stage eachStage : getStages()) {
eachStage.clearPartitions();
}
}

public List<String> getDiagnostics() {
Expand Down Expand Up @@ -505,30 +509,30 @@ private QueryState finalizeQuery(Query query, QueryCompletedEvent event) {
QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext());
hookExecutor.execute(query.context.getQueryContext(), query, event.getExecutionBlockId(), finalOutputDir);

TableDesc desc = query.getResultDesc();

// If there is partitions
List<PartitionDescProto> partitions = query.getPartitions();
if (partitions!= null && !partitions.isEmpty()) {

String databaseName, simpleTableName;

if (CatalogUtil.isFQTableName(desc.getName())) {
String[] split = CatalogUtil.splitFQTableName(desc.getName());
databaseName = split[0];
simpleTableName = split[1];
// Add dynamic partitions to catalog for partition table.
if (queryContext.hasOutputTableUri() && queryContext.hasPartition()) {
List<PartitionDescProto> partitions = query.getPartitions();
if (partitions != null) {
String databaseName, simpleTableName;

if (CatalogUtil.isFQTableName(tableDesc.getName())) {
String[] split = CatalogUtil.splitFQTableName(tableDesc.getName());
databaseName = split[0];
simpleTableName = split[1];
} else {
databaseName = queryContext.getCurrentDatabase();
simpleTableName = tableDesc.getName();
}

// Store partitions to CatalogStore using alter table statement.
catalog.addPartitions(databaseName, simpleTableName, partitions, true);
LOG.info("Added partitions to catalog (total=" + partitions.size() + ")");
} else {
databaseName = queryContext.getCurrentDatabase();
simpleTableName = desc.getName();
LOG.info("Can't find partitions for adding.");
}

// Store partitions to CatalogStore using alter table statement.
catalog.addPartitions(databaseName, simpleTableName, partitions, true);
} else {
LOG.info("Can't find partitions for adding.");
query.clearPartitions();
}

} catch (Exception e) {
} catch (Throwable e) {
query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(e)));
return QueryState.QUERY_ERROR;
}
Expand Down
13 changes: 10 additions & 3 deletions tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java
Original file line number Diff line number Diff line change
Expand Up @@ -487,9 +487,8 @@ private StageHistory makeStageHistory() {
return stageHistory;
}

public List<PartitionDescProto> getPartitions() {
List<PartitionDescProto> partitions = TUtil.newList();

public Set<PartitionDescProto> getPartitions() {
Set<PartitionDescProto> partitions = TUtil.newHashSet();
for(Task eachTask : getTasks()) {
if (eachTask.getLastAttempt() != null && !eachTask.getLastAttempt().getPartitions().isEmpty()) {
partitions.addAll(eachTask.getLastAttempt().getPartitions());
Expand All @@ -499,6 +498,14 @@ public List<PartitionDescProto> getPartitions() {
return partitions;
}

public void clearPartitions() {
for(Task eachTask : getTasks()) {
if (eachTask.getLastAttempt() != null && !eachTask.getLastAttempt().getPartitions().isEmpty()) {
eachTask.getLastAttempt().getPartitions().clear();
}
}
}

/**
* It finalizes this stage. It is only invoked when the stage is finalizing.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@
import org.apache.tajo.querymaster.Task.PullHost;
import org.apache.tajo.util.TUtil;

import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -69,7 +67,7 @@ public class TaskAttempt implements EventHandler<TaskAttemptEvent> {
private CatalogProtos.TableStatsProto inputStats;
private CatalogProtos.TableStatsProto resultStats;

private List<PartitionDescProto> partitions;
private Set<PartitionDescProto> partitions;

protected static final StateMachineFactory
<TaskAttempt, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
Expand Down Expand Up @@ -194,8 +192,7 @@ public TaskAttempt(final TaskAttemptScheduleContext scheduleContext,
this.writeLock = readWriteLock.writeLock();

stateMachine = stateMachineFactory.make(this);

this.partitions = TUtil.newList();
this.partitions = TUtil.newHashSet();
}

public TaskAttemptState getState() {
Expand Down Expand Up @@ -258,12 +255,12 @@ public TableStats getResultStats() {
return new TableStats(resultStats);
}

public List<PartitionDescProto> getPartitions() {
public Set<PartitionDescProto> getPartitions() {
return partitions;
}

public void setPartitions(List<PartitionDescProto> partitions) {
this.partitions = partitions;
public void addPartitions(List<PartitionDescProto> partitions) {
this.partitions.addAll(partitions);
}

private void fillTaskStatistics(TaskCompletionReport report) {
Expand Down Expand Up @@ -407,7 +404,7 @@ public void transition(TaskAttempt taskAttempt,

try {
if (report.getPartitionsCount() > 0) {
taskAttempt.setPartitions(report.getPartitionsList());
taskAttempt.addPartitions(report.getPartitionsList());
}

taskAttempt.fillTaskStatistics(report);
Expand Down