Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.hadoop.fs.Path;
import org.apache.tajo.DataTypeUtil;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.catalog.partition.PartitionDesc;
import org.apache.tajo.catalog.partition.PartitionKey;
import org.apache.tajo.catalog.partition.PartitionMethodDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.catalog.proto.CatalogProtos.SchemaProto;
Expand Down Expand Up @@ -792,6 +794,15 @@ public static AlterTableDesc setProperty(String tableName, KeyValueSet params, A
return alterTableDesc;
}

public static AlterTableDesc addPartitionAndDropPartition(String tableName, PartitionDesc partitionDesc,
AlterTableType alterTableType) {
final AlterTableDesc alterTableDesc = new AlterTableDesc();
alterTableDesc.setTableName(tableName);
alterTableDesc.setPartitionDesc(partitionDesc);
alterTableDesc.setAlterTableType(alterTableType);
return alterTableDesc;
}

/* It is the relationship graph of type conversions. */
public static final Map<Type, Map<Type, Type>> OPERATION_CASTING_MAP = Maps.newHashMap();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.catalog.partition.PartitionDesc;

import java.util.List;

Expand Down Expand Up @@ -49,7 +50,6 @@ public static StatSet aggregateStatSet(List<StatSet> statSets) {
* @param stats The TableStats to be aggregated
*/
public static void aggregateTableStat(TableStats result, TableStats stats) {

if (stats.getColumnStats().size() > 0) {
if (result.getColumnStats().size() == 0) {
for (int i = 0; i < stats.getColumnStats().size(); i++) {
Expand Down Expand Up @@ -83,6 +83,14 @@ public static void aggregateTableStat(TableStats result, TableStats stats) {
}
}

// If there is partitions
if (stats.getPartitions().size() > 0) {
// Aggregate partitions for each table
for (PartitionDesc partitionDesc : stats.getPartitions()) {
result.addPartition(partitionDesc);
}
}

result.setNumRows(result.getNumRows() + stats.getNumRows());
result.setNumBytes(result.getNumBytes() + stats.getNumBytes());
result.setReadBytes(result.getReadBytes() + stats.getReadBytes());
Expand Down Expand Up @@ -112,6 +120,14 @@ public static TableStats aggregateTableStat(List<TableStats> tableStatses) {
}

for (TableStats ts : tableStatses) {
// If there is partitions
if (ts.getPartitions().size() > 0) {
// Aggregate partitions for each table
for (PartitionDesc partitionDesc : ts.getPartitions()) {
aggregated.addPartition(partitionDesc);
}
}

// if there is empty stats
if (ts.getColumnStats().size() > 0) {
// aggregate column stats for each table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import com.google.common.base.Objects;
import com.google.gson.Gson;
import com.google.gson.annotations.Expose;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.catalog.partition.PartitionDesc;
import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.json.GsonObject;
Expand All @@ -36,13 +39,16 @@
import java.util.List;

public class TableStats implements ProtoObject<TableStatsProto>, Cloneable, GsonObject {
private static Log LOG = LogFactory.getLog(TableStats.class);

@Expose private Long numRows = null; // required
@Expose private Long numBytes = null; // required
@Expose private Integer numBlocks = null; // optional
@Expose private Integer numShuffleOutputs = null; // optional
@Expose private Long avgRows = null; // optional
@Expose private Long readBytes = null; //optional
@Expose private List<ColumnStats> columnStatses = null; // repeated
@Expose private List<PartitionDesc> partitions = null; // repeated

public TableStats() {
reset();
Expand All @@ -56,6 +62,7 @@ public void reset() {
avgRows = 0l;
readBytes = 0l;
columnStatses = TUtil.newList();
partitions = TUtil.newList();
}

public TableStats(CatalogProtos.TableStatsProto proto) {
Expand Down Expand Up @@ -90,6 +97,11 @@ public TableStats(CatalogProtos.TableStatsProto proto) {
}
columnStatses.add(new ColumnStats(colProto));
}

this.partitions = TUtil.newList();
for (CatalogProtos.PartitionDescProto partitionProto : proto.getPartitionsList()) {
partitions.add(new PartitionDesc(partitionProto));
}
}

public Long getNumRows() {
Expand Down Expand Up @@ -148,10 +160,22 @@ public void setColumnStats(List<ColumnStats> columnStatses) {
this.columnStatses = new ArrayList<ColumnStats>(columnStatses);
}

public List<PartitionDesc> getPartitions() {
return partitions;
}

public void setPartitions(List<PartitionDesc> partitions) {
this.partitions = partitions;
}

public void addColumnStat(ColumnStats columnStats) {
this.columnStatses.add(columnStats);
}

public void addPartition(PartitionDesc partitionDesc) {
this.partitions.add(partitionDesc);
}

public boolean equals(Object obj) {
if (obj instanceof TableStats) {
TableStats other = (TableStats) obj;
Expand All @@ -163,6 +187,7 @@ public boolean equals(Object obj) {
eq = eq && TUtil.checkEquals(this.avgRows, other.avgRows);
eq = eq && TUtil.checkEquals(this.readBytes, other.readBytes);
eq = eq && TUtil.checkEquals(this.columnStatses, other.columnStatses);
eq = eq && TUtil.checkEquals(this.partitions, other.partitions);
return eq;
} else {
return false;
Expand All @@ -171,7 +196,7 @@ public boolean equals(Object obj) {

public int hashCode() {
return Objects.hashCode(numRows, numBytes,
numBlocks, numShuffleOutputs, columnStatses);
numBlocks, numShuffleOutputs, columnStatses, partitions);
}

public Object clone() throws CloneNotSupportedException {
Expand All @@ -185,6 +210,8 @@ public Object clone() throws CloneNotSupportedException {

stat.columnStatses = new ArrayList<ColumnStats>(this.columnStatses);

stat.partitions = new ArrayList<PartitionDesc>(this.partitions);

return stat;
}

Expand Down Expand Up @@ -260,6 +287,11 @@ public TableStatsProto getProto() {
builder.addColStat(colStat.getProto());
}
}
if (this.partitions != null) {
for (PartitionDesc partitionDesc: partitions) {
builder.addPartitions(partitionDesc.getProto());
}
}
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ message TableStatsProto {
optional int64 readBytes = 7;
repeated ColumnStatsProto colStat = 8;
optional int32 tid = 9;
repeated PartitionDescProto partitions = 10;
}

message ColumnStatsProto {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ public void alterTable(final CatalogProtos.AlterTableDescProto alterTableDescPro
partitionName = alterTableDescProto.getPartitionDesc().getPartitionName();
partitionDesc = getPartition(databaseName, tableName, partitionName);
if(partitionDesc != null) {
throw new AlreadyExistsPartitionException(databaseName, tableName, partitionName);
dropPartition(databaseName, tableName, partitionDesc);
}
addPartition(databaseName, tableName, alterTableDescProto.getPartitionDesc());
break;
Expand All @@ -630,8 +630,9 @@ public void alterTable(final CatalogProtos.AlterTableDescProto alterTableDescPro
partitionDesc = getPartition(databaseName, tableName, partitionName);
if(partitionDesc == null) {
throw new NoSuchPartitionException(databaseName, tableName, partitionName);
} else {
dropPartition(databaseName, tableName, partitionDesc);
}
dropPartition(databaseName, tableName, partitionDesc);
break;
case SET_PROPERTY:
// TODO - not implemented yet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -999,7 +999,7 @@ public void alterTable(CatalogProtos.AlterTableDescProto alterTableDescProto) th
partitionName = alterTableDescProto.getPartitionDesc().getPartitionName();
partitionDesc = getPartition(databaseName, tableName, partitionName);
if(partitionDesc != null) {
throw new AlreadyExistsPartitionException(databaseName, tableName, partitionName);
dropPartition(tableId, alterTableDescProto.getPartitionDesc().getPartitionName());
}
addPartition(tableId, alterTableDescProto.getPartitionDesc());
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,41 +315,13 @@ public void alterTable(CatalogProtos.AlterTableDescProto alterTableDescProto) th
partitionName = partitionDesc.getPartitionName();

if (partitions.containsKey(tableName) && partitions.get(tableName).containsKey(partitionName)) {
throw new AlreadyExistsPartitionException(databaseName, tableName, partitionName);
} else {
CatalogProtos.PartitionDescProto.Builder builder = CatalogProtos.PartitionDescProto.newBuilder();
builder.setPartitionName(partitionName);
builder.setPath(partitionDesc.getPath());

if (partitionDesc.getPartitionKeysCount() > 0) {
int i = 0;
for (CatalogProtos.PartitionKeyProto eachKey : partitionDesc.getPartitionKeysList()) {
CatalogProtos.PartitionKeyProto.Builder keyBuilder = CatalogProtos.PartitionKeyProto.newBuilder();
keyBuilder.setColumnName(eachKey.getColumnName());
keyBuilder.setPartitionValue(eachKey.getPartitionValue());
builder.setPartitionKeys(i, keyBuilder.build());
i++;
}
}

Map<String, CatalogProtos.PartitionDescProto> protoMap = null;
if (!partitions.containsKey(tableName)) {
protoMap = Maps.newHashMap();
} else {
protoMap = partitions.get(tableName);
}
protoMap.put(partitionName, builder.build());
partitions.put(tableName, protoMap);
dropPartition(databaseName, tableName, partitionName);
}
addPartition(partitionDesc, tableName, partitionName);
break;
case DROP_PARTITION:
partitionDesc = alterTableDescProto.getPartitionDesc();
partitionName = partitionDesc.getPartitionName();
if(!partitions.containsKey(tableName)) {
throw new NoSuchPartitionException(databaseName, tableName, partitionName);
} else {
partitions.remove(partitionName);
}
dropPartition(databaseName, tableName, partitionName);
break;
case SET_PROPERTY:
KeyValueSet properties = new KeyValueSet(tableDescProto.getMeta().getParams());
Expand All @@ -369,6 +341,42 @@ public void alterTable(CatalogProtos.AlterTableDescProto alterTableDescProto) th
}
}

private void addPartition(CatalogProtos.PartitionDescProto partitionDesc, String tableName, String partitionName) {
Map<String, CatalogProtos.PartitionDescProto> protoMap = null;

CatalogProtos.PartitionDescProto.Builder builder = CatalogProtos.PartitionDescProto.newBuilder();
builder.setPartitionName(partitionName);
builder.setPath(partitionDesc.getPath());

if (partitionDesc.getPartitionKeysCount() > 0) {
for (CatalogProtos.PartitionKeyProto eachKey : partitionDesc.getPartitionKeysList()) {
CatalogProtos.PartitionKeyProto.Builder keyBuilder = CatalogProtos.PartitionKeyProto.newBuilder();
keyBuilder.setColumnName(eachKey.getColumnName());
keyBuilder.setPartitionValue(eachKey.getPartitionValue());
builder.addPartitionKeys(keyBuilder.build());
}
}

if (!partitions.containsKey(tableName)) {
protoMap = Maps.newHashMap();
} else {
protoMap = partitions.get(tableName);
}
protoMap.put(partitionName, builder.build());
partitions.put(tableName, protoMap);
}

private void dropPartition(String databaseName, String tableName, String partitionName) {
Map<String, CatalogProtos.PartitionDescProto> protoMap = null;

if(partitions.containsKey(tableName)) {
protoMap = partitions.get(tableName);
protoMap.remove(partitionName);
partitions.put(tableName, protoMap);
} else {
throw new NoSuchPartitionException(databaseName, tableName, partitionName);
}
}

private int getIndexOfColumnToBeRenamed(List<CatalogProtos.ColumnProto> fieldList, String columnName) {
int fieldCount = fieldList.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,21 @@
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
import org.apache.tajo.catalog.partition.PartitionDesc;
import org.apache.tajo.catalog.partition.PartitionKey;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.plan.logical.CreateTableNode;
import org.apache.tajo.plan.logical.InsertNode;
import org.apache.tajo.plan.logical.NodeType;
import org.apache.tajo.plan.logical.StoreTableNode;
import org.apache.tajo.storage.*;
import org.apache.tajo.unit.StorageUnit;
import org.apache.tajo.util.TUtil;
import org.apache.tajo.worker.TaskAttemptContext;

import java.io.IOException;
import java.net.URI;
import java.util.List;

public abstract class ColPartitionStoreExec extends UnaryPhysicalExec {
private static Log LOG = LogFactory.getLog(ColPartitionStoreExec.class);
Expand Down Expand Up @@ -156,9 +161,45 @@ protected Appender getNextPartitionAppender(String partition) throws IOException

openAppender(0);

addPartition(partition);

return appender;
}

/**
* Add partition information to TableStats for storing to CatalogStore.
*
* @param partition partition name
* @throws IOException
*/
private void addPartition(String partition) throws IOException {
PartitionDesc partitionDesc = new PartitionDesc();
partitionDesc.setPartitionName(partition);

String[] partitionKeyPairs = partition.split("/");
List<PartitionKey> partitionKeyList = TUtil.newList();
for(String partitionKeyPair: partitionKeyPairs) {
String[] keyValue = partitionKeyPair.split("=");
PartitionKey partitionKey = new PartitionKey(keyValue[0], keyValue[1]);
partitionKeyList.add(partitionKey);
}
partitionDesc.setPartitionKeys(partitionKeyList);

if (this.plan.getUri() == null) {
// In CTAS, the uri would be null. So,
String[] split = CatalogUtil.splitTableName(plan.getTableName());
int endIndex = storeTablePath.toString().indexOf(split[1]) + split[1].length();
String outputPath = storeTablePath.toString().substring(0, endIndex);
partitionDesc.setPath(outputPath + "/" + partition);
} else {
partitionDesc.setPath(this.plan.getUri().toString() + "/" + partition);
}

if(!appender.getStats().getPartitions().contains(partitionDesc)) {
appender.getStats().addPartition(partitionDesc);
}
}

public void openAppender(int suffixId) throws IOException {
Path actualFilePath = lastFileName;
if (suffixId > 0) {
Expand Down
Loading