From 3b2c8c10176a84e2c1b07325920f0e289d52d641 Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Sun, 22 Mar 2015 23:49:45 +0900 Subject: [PATCH 1/6] TAJO-1284: Add alter partition method to CatalogStore. --- .../tajo/catalog/AbstractCatalogClient.java | 45 ++- .../src/main/proto/CatalogProtocol.proto | 7 +- .../apache/tajo/catalog/AlterTableDesc.java | 21 +- .../apache/tajo/catalog/AlterTableType.java | 2 +- .../apache/tajo/catalog/CatalogConstants.java | 5 + .../apache/tajo/catalog/CatalogService.java | 9 +- .../AlreadyExistsPartitionException.java | 33 ++ .../exception/NoSuchPartitionException.java | 39 ++ .../tajo/catalog/partition/PartitionDesc.java | 92 +++-- .../tajo/catalog/partition/PartitionKey.java | 148 ++++++++ .../src/main/proto/CatalogProtos.proto | 28 +- .../tajo/catalog/store/HCatalogStore.java | 125 +++++-- .../tajo/catalog/store/TestHCatalogStore.java | 77 +++- .../apache/tajo/catalog/CatalogServer.java | 100 ++++-- .../tajo/catalog/store/AbstractDBStore.java | 337 ++++++++++++------ .../store/AbstractMySQLMariaDBStore.java | 14 + .../tajo/catalog/store/CatalogStore.java | 14 +- .../apache/tajo/catalog/store/MemStore.java | 27 +- .../main/resources/schemas/derby/derby.xml | 25 +- .../schemas/mariadb/partition_keys.sql | 6 + .../resources/schemas/mariadb/partitions.sql | 13 +- .../schemas/mysql/partition_keys.sql | 6 + .../resources/schemas/mysql/partitions.sql | 13 +- .../main/resources/schemas/oracle/oracle.xml | 43 ++- .../schemas/postgresql/postgresql.xml | 1 - .../org/apache/tajo/catalog/TestCatalog.java | 73 +++- .../NonForwardQueryResultSystemScanner.java | 2 - 27 files changed, 1020 insertions(+), 285 deletions(-) create mode 100644 tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/AlreadyExistsPartitionException.java create mode 100644 tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchPartitionException.java create mode 100644 tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionKey.java create mode 100644 tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/partition_keys.sql create mode 100644 tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/partition_keys.sql diff --git a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java index d8350a38ff..458d6e0489 100644 --- a/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java +++ b/tajo-catalog/tajo-catalog-client/src/main/java/org/apache/tajo/catalog/AbstractCatalogClient.java @@ -410,7 +410,50 @@ public Boolean call(NettyClientBase client) throws ServiceException { return false; } } - + + @Override + public final PartitionDescProto getPartition(final String databaseName, final String tableName, + final String partitionName) { + try { + return new ServerCallable(this.pool, getCatalogServerAddr(), CatalogProtocol.class, false) { + public PartitionDescProto call(NettyClientBase client) throws ServiceException { + + PartitionIdentifierProto.Builder builder = PartitionIdentifierProto.newBuilder(); + builder.setDatabaseName(databaseName); + builder.setTableName(tableName); + builder.setPartitionName(partitionName); + + CatalogProtocolService.BlockingInterface stub = getStub(client); + return stub.getPartitionByPartitionName(null, builder.build()); + } + }.withRetries(); + } catch (ServiceException e) { + LOG.error(e.getMessage(), e); + return null; + } + } + + @Override + public final List getPartitions(final String databaseName, final String tableName) { + try { + return new ServerCallable>(this.pool, getCatalogServerAddr(), CatalogProtocol.class, + false) { + public List call(NettyClientBase client) throws ServiceException { + + PartitionIdentifierProto.Builder builder = PartitionIdentifierProto.newBuilder(); + builder.setDatabaseName(databaseName); + builder.setTableName(tableName); + + CatalogProtocolService.BlockingInterface stub = getStub(client); + PartitionsProto response = stub.getPartitionsByTableName(null, builder.build()); + return response.getPartitionList(); + } + }.withRetries(); + } catch (ServiceException e) { + LOG.error(e.getMessage(), e); + return null; + } + } @Override public List getAllPartitions() { try { diff --git a/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto b/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto index cae5d8892e..5ace32e7a4 100644 --- a/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto +++ b/tajo-catalog/tajo-catalog-client/src/main/proto/CatalogProtocol.proto @@ -56,11 +56,8 @@ service CatalogProtocolService { rpc existPartitionMethod(TableIdentifierProto) returns (BoolProto); rpc dropPartitionMethod(TableIdentifierProto) returns (BoolProto); - rpc addPartitions(PartitionsProto) returns (BoolProto); - rpc addPartition(PartitionDescProto) returns (BoolProto); - rpc getPartitionByPartitionName(StringProto) returns (PartitionDescProto); - rpc getPartitionsByTableName(StringProto) returns (PartitionsProto); - rpc delAllPartitions(StringProto) returns (PartitionsProto); + rpc getPartitionByPartitionName(PartitionIdentifierProto) returns (PartitionDescProto); + rpc getPartitionsByTableName(PartitionIdentifierProto) returns (PartitionsProto); rpc getAllPartitions(NullProto) returns (GetTablePartitionsProto); rpc createIndex(IndexDescProto) returns (BoolProto); diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/AlterTableDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/AlterTableDesc.java index 69d5be44a8..f1265fbdd9 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/AlterTableDesc.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/AlterTableDesc.java @@ -21,6 +21,7 @@ import com.google.gson.GsonBuilder; import com.google.gson.annotations.Expose; import org.apache.tajo.catalog.json.CatalogGsonHelper; +import org.apache.tajo.catalog.partition.PartitionDesc; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.common.ProtoObject; import org.apache.tajo.json.GsonObject; @@ -40,7 +41,9 @@ public class AlterTableDesc implements ProtoObject, GsonObj @Expose protected String newColumnName; //optional @Expose - protected Column addColumn = null; //optiona + protected Column addColumn = null; //optional + @Expose + protected PartitionDesc partitionDesc; //optional public AlterTableDesc() { } @@ -94,6 +97,10 @@ public void setAlterTableType(AlterTableType alterTableType) { this.alterTableType = alterTableType; } + public PartitionDesc getPartitionDesc() { return partitionDesc; } + + public void setPartitionDesc(PartitionDesc partitionDesc) { this.partitionDesc = partitionDesc; } + @Override public String toString() { Gson gson = new GsonBuilder().setPrettyPrinting(). @@ -109,6 +116,7 @@ public AlterTableDesc clone() throws CloneNotSupportedException { newAlter.newTableName = newTableName; newAlter.columnName = newColumnName; newAlter.addColumn = addColumn; + newAlter.partitionDesc = partitionDesc; return newAlter; } @@ -147,8 +155,19 @@ public AlterTableDescProto getProto() { case ADD_COLUMN: builder.setAlterTableType(CatalogProtos.AlterTableType.ADD_COLUMN); break; + case ADD_PARTITION: + builder.setAlterTableType(CatalogProtos.AlterTableType.ADD_PARTITION); + break; + case DROP_PARTITION: + builder.setAlterTableType(CatalogProtos.AlterTableType.DROP_PARTITION); + break; default: } + + if (null != this.partitionDesc) { + builder.setPartitionDesc(partitionDesc.getProto()); + } + return builder.build(); } diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/AlterTableType.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/AlterTableType.java index 0b7639cf93..7e3be91c52 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/AlterTableType.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/AlterTableType.java @@ -18,5 +18,5 @@ package org.apache.tajo.catalog; public enum AlterTableType { - RENAME_TABLE, RENAME_COLUMN, ADD_COLUMN + RENAME_TABLE, RENAME_COLUMN, ADD_COLUMN, ADD_PARTITION, DROP_PARTITION } diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java index a8c5c9b3c9..946c5041c0 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java @@ -48,11 +48,16 @@ public class CatalogConstants { public static final String TB_STATISTICS = "STATS"; public static final String TB_PARTITION_METHODS = "PARTITION_METHODS"; public static final String TB_PARTTIONS = "PARTITIONS"; + public static final String TB_PARTTION_KEYS = "PARTITION_KEYS"; public static final String COL_TABLESPACE_PK = "SPACE_ID"; public static final String COL_DATABASES_PK = "DB_ID"; public static final String COL_TABLES_PK = "TID"; public static final String COL_TABLES_NAME = "TABLE_NAME"; + + public static final String COL_PARTITIONS_PK = "PID"; + public static final String COL_COLUMN_NAME = "COLUMN_NAME"; + public static final String COL_PARTITION_VALUE = "PARTITION_VALUE"; public static final String INFORMATION_SCHEMA_DB_NAME = "information_schema"; } diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java index 2a5d890ad2..86b773b704 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogService.java @@ -19,6 +19,7 @@ package org.apache.tajo.catalog; import org.apache.tajo.catalog.partition.PartitionMethodDesc; +import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto; import org.apache.tajo.catalog.proto.CatalogProtos.DatabaseProto; import org.apache.tajo.catalog.proto.CatalogProtos.IndexProto; @@ -183,7 +184,11 @@ public interface CatalogService { PartitionMethodDesc getPartitionMethod(String databaseName, String tableName); boolean existPartitionMethod(String databaseName, String tableName); - + + CatalogProtos.PartitionDescProto getPartition(String databaseName, String tableName, String partitionName); + + List getPartitions(String databaseName, String tableName); + List getAllPartitions(); boolean createIndex(IndexDesc index); @@ -221,4 +226,6 @@ public interface CatalogService { boolean updateTableStats(UpdateTableStatsProto stats); + + } diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/AlreadyExistsPartitionException.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/AlreadyExistsPartitionException.java new file mode 100644 index 0000000000..ab6144f26a --- /dev/null +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/AlreadyExistsPartitionException.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.catalog.exception; + +public class AlreadyExistsPartitionException extends RuntimeException { + + private static final long serialVersionUID = 277182608283894930L; + + public AlreadyExistsPartitionException(String message) { + super(message); + } + + public AlreadyExistsPartitionException(String databaseName, String tableName, String partitionName) { + super(String.format("ERROR: \"%s already exist in \"%s.%s\"", partitionName, databaseName, tableName)); + } + +} diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchPartitionException.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchPartitionException.java new file mode 100644 index 0000000000..45c92990bf --- /dev/null +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/exception/NoSuchPartitionException.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.catalog.exception; + +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.function.FunctionUtil; +import org.codehaus.jackson.schema.JsonSerializableSchema; + +import java.util.Collection; + +public class NoSuchPartitionException extends RuntimeException { + + private static final long serialVersionUID = 277182608283894938L; + + public NoSuchPartitionException(String message) { + super(message); + } + + public NoSuchPartitionException(String databaseName, String tableName, String partitionName) { + super(String.format("ERROR: \"%s\" does not exist in \"%s.%s\".", partitionName, databaseName, tableName)); + } + +} diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java index d775ba8520..7abfa2cdb3 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java @@ -19,20 +19,44 @@ package org.apache.tajo.catalog.partition; import com.google.common.base.Objects; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; import com.google.gson.annotations.Expose; +import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.json.CatalogGsonHelper; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.common.ProtoObject; import org.apache.tajo.json.GsonObject; +import java.util.ArrayList; +import java.util.List; + /** - * PartitionDesc presents a table partition. + * This presents each partitions of column partitioned table. + * Each partitions can have a own name, partition path, colum name and partition value pairs. + * + * For example, consider you have a partitioned table as follows: + * + * create external table table1 (id text, name text) PARTITION BY COLUMN (dt text, phone text, + * gender text) USING RCFILE LOCATION '/tajo/data/table1'; + * + * Then, its data will be stored on HDFS as follows: + * - /tajo/data/table1/dt=20150301/phone=1300/gender=m + * - /tajo/data/table1/dt=20150301/phone=1300/gender=f + * - /tajo/data/table1/dt=20150302/phone=1500/gender=m + * - /tajo/data/table1/dt=20150302/phone=1500/gender=f + * + * In such as above, first directory can be presented with this class as follows: + * - partitionName : dt=20150301/phone=1300/gender=m + * - path: /tajo/data/table1/dt=20150301/phone=1300/gender=m + * - partitionKeys: + * dt=20150301, phone=1300, gender=m + * */ public class PartitionDesc implements ProtoObject, Cloneable, GsonObject { - @Expose protected String partitionName; // optional - @Expose protected int ordinalPosition; // required - @Expose protected String partitionValue; // optional - @Expose protected String path; // optional + @Expose protected String partitionName; + @Expose protected List partitionKeys; + @Expose protected String path; //optional private CatalogProtos.PartitionDescProto.Builder builder = CatalogProtos.PartitionDescProto.newBuilder(); @@ -41,8 +65,7 @@ public PartitionDesc() { public PartitionDesc(PartitionDesc partition) { this.partitionName = partition.partitionName; - this.ordinalPosition = partition.ordinalPosition; - this.partitionValue = partition.partitionValue; + this.partitionKeys = partition.partitionKeys; this.path = partition.path; } @@ -50,15 +73,21 @@ public PartitionDesc(CatalogProtos.PartitionDescProto proto) { if(proto.hasPartitionName()) { this.partitionName = proto.getPartitionName(); } - this.ordinalPosition = proto.getOrdinalPosition(); - if(proto.hasPartitionValue()) { - this.partitionValue = proto.getPartitionValue(); + + this.partitionKeys = new ArrayList(); + for(CatalogProtos.PartitionKeyProto keyProto : proto.getPartitionKeysList()) { + PartitionKey partitionKey = new PartitionKey(keyProto); + this.partitionKeys.add(partitionKey); } + if(proto.hasPath()) { this.path = proto.getPath(); } } + public String getPartitionName() { return partitionName; } + public void setPartitionName(String partitionName) { this.partitionName = partitionName; } + public void setName(String partitionName) { this.partitionName = partitionName; } @@ -66,20 +95,9 @@ public String getName() { return partitionName; } + public List getPartitionKeys() { return partitionKeys; } - public void setOrdinalPosition(int ordinalPosition) { - this.ordinalPosition = ordinalPosition; - } - public int getOrdinalPosition() { - return ordinalPosition; - } - - public void setPartitionValue(String partitionValue) { - this.partitionValue = partitionValue; - } - public String getPartitionValue() { - return partitionValue; - } + public void setPartitionKeys(List partitionKeys) { this.partitionKeys = partitionKeys; } public void setPath(String path) { this.path = path; @@ -89,7 +107,7 @@ public String getPath() { } public int hashCode() { - return Objects.hashCode(partitionName, ordinalPosition, partitionValue, path); + return Objects.hashCode(partitionName, partitionKeys, path); } public boolean equals(Object o) { @@ -98,10 +116,9 @@ public boolean equals(Object o) { boolean eq = ((partitionName != null && another.partitionName != null && partitionName.equals(another.partitionName)) || (partitionName == null && another.partitionName == null)); - eq = eq && (ordinalPosition == another.ordinalPosition); - eq = eq && ((partitionValue != null && another.partitionValue != null - && partitionValue.equals(another.partitionValue)) - || (partitionValue == null && another.partitionValue == null)); + eq = eq && ((partitionKeys != null && another.partitionKeys != null + && partitionKeys.equals(another.partitionKeys)) + || (partitionKeys == null && another.partitionKeys == null)); eq = eq && ((path != null && another.path != null && path.equals(another.path)) || (path == null && another.path == null)); return eq; @@ -117,13 +134,14 @@ public CatalogProtos.PartitionDescProto getProto() { } if(this.partitionName != null) { - builder.setPartitionName(partitionName); + builder.setPartitionName(this.partitionName); } - builder.setOrdinalPosition(this.ordinalPosition); - - if (this.partitionValue != null) { - builder.setPartitionValue(this.partitionValue); + builder.clearPartitionKeys(); + if (this.partitionKeys != null) { + for(PartitionKey partitionKey : this.partitionKeys) { + builder.addPartitionKeys(partitionKey.getProto()); + } } if(this.path != null) { @@ -134,8 +152,9 @@ public CatalogProtos.PartitionDescProto getProto() { } public String toString() { - StringBuilder sb = new StringBuilder("name: " + partitionName); - return sb.toString(); + Gson gson = new GsonBuilder().setPrettyPrinting(). + excludeFieldsWithoutExposeAnnotation().create(); + return gson.toJson(this); } @Override @@ -151,8 +170,7 @@ public Object clone() throws CloneNotSupportedException { PartitionDesc desc = (PartitionDesc) super.clone(); desc.builder = CatalogProtos.PartitionDescProto.newBuilder(); desc.partitionName = partitionName; - desc.ordinalPosition = ordinalPosition; - desc.partitionValue = partitionValue; + desc.partitionKeys = partitionKeys; desc.path = path; return desc; diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionKey.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionKey.java new file mode 100644 index 0000000000..4a7103e7eb --- /dev/null +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionKey.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.catalog.partition; + +import com.google.common.base.Objects; +import com.google.gson.annotations.Expose; +import org.apache.tajo.catalog.json.CatalogGsonHelper; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.common.ProtoObject; +import org.apache.tajo.json.GsonObject; + + +/** + * This presents column name and partition value pairs of column partitioned table. + * + * For example, consider you have a partitioned table as follows: + * + * create external table table1 (id text, name text) PARTITION BY COLUMN (dt text, phone text, + * gender text) USING RCFILE LOCATION '/tajo/data/table1'; + * + * Then, its data will be stored on HDFS as follows: + * - /tajo/data/table1/dt=20150301/phone=1300/gender=m + * - /tajo/data/table1/dt=20150301/phone=1300/gender=f + * - /tajo/data/table1/dt=20150302/phone=1500/gender=m + * - /tajo/data/table1/dt=20150302/phone=1500/gender=f + * + * In such as above, first directory can be presented with this class as follows: + * The first pair: column name = dt, partition value = 20150301 + * The second pair: column name = phone, partition value = 1300 + * The thris pair: column name = gender, partition value = m + * + */ +public class PartitionKey implements ProtoObject, Cloneable, GsonObject { + @Expose protected String columnName; // required + @Expose protected String partitionValue; // required + + private CatalogProtos.PartitionKeyProto.Builder builder = CatalogProtos.PartitionKeyProto.newBuilder(); + + public PartitionKey() { + } + + public PartitionKey(String columnName, String partitionValue) { + this.columnName = columnName; + this.partitionValue = partitionValue; + } + + public PartitionKey(PartitionKey partition) { + this.columnName = partition.columnName; + this.partitionValue = partition.partitionValue; + } + + public PartitionKey(CatalogProtos.PartitionKeyProto proto) { + if (proto.hasColumnName()) { + this.columnName = proto.getColumnName(); + } + if (proto.hasPartitionValue()) { + this.partitionValue = proto.getPartitionValue(); + } + } + + public String getPartitionValue() { + return partitionValue; + } + + public void setPartitionValue(String partitionValue) { + this.partitionValue = partitionValue; + } + + public String getColumnName() { return columnName; } + + public void setColumnName(String columnName) { this.columnName = columnName; } + + public int hashCode() { + return Objects.hashCode(partitionValue, columnName); + } + + public boolean equals(Object o) { + if (o instanceof PartitionKey) { + PartitionKey another = (PartitionKey) o; + boolean eq = ((columnName != null && another.columnName != null + && columnName.equals(another.columnName)) || + (columnName == null && another.columnName == null)); + eq = eq && ((partitionValue != null && another.partitionValue != null + && partitionValue.equals(another.partitionValue)) || + (partitionValue == null && another.partitionValue == null)); + return eq; + } + return false; + } + + + @Override + public CatalogProtos.PartitionKeyProto getProto() { + if (builder == null) { + builder = CatalogProtos.PartitionKeyProto.newBuilder(); + } + + if (this.columnName != null) { + builder.setColumnName(this.columnName); + } + + if (this.partitionValue != null) { + builder.setPartitionValue(this.partitionValue); + } + + return builder.build(); + } + + public String toString() { + StringBuilder sb = new StringBuilder("name: " + partitionValue); + return sb.toString(); + } + + @Override + public String toJson() { + return CatalogGsonHelper.toJson(this, PartitionKey.class); + } + + public static PartitionKey fromJson(String strVal) { + return strVal != null ? CatalogGsonHelper.fromJson(strVal, PartitionKey.class) : null; + } + + public Object clone() throws CloneNotSupportedException { + PartitionKey desc = (PartitionKey) super.clone(); + desc.builder = CatalogProtos.PartitionKeyProto.newBuilder(); + desc.partitionValue = partitionValue; + desc.columnName = columnName; + + return desc; + } + +} \ No newline at end of file diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto index a204685f63..83d8cf2036 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto +++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto @@ -58,6 +58,8 @@ enum AlterTableType { RENAME_TABLE = 0; RENAME_COLUMN = 1; ADD_COLUMN = 2; + ADD_PARTITION = 3; + DROP_PARTITION = 4; } message ColumnProto { @@ -187,8 +189,7 @@ message TablePartitionProto { required int32 pid = 1; required int32 tid = 2; optional string partitionName = 3; - required int32 ordinalPosition = 4; - optional string path = 5; + optional string path = 4; } message GetIndexByColumnRequest { @@ -281,8 +282,7 @@ message SortSpecProto { message PartitionsProto { - required TableIdentifierProto tableIdentifier = 1; - repeated PartitionDescProto partition = 2; + repeated PartitionDescProto partition = 21; } message PartitionMethodProto { @@ -293,10 +293,21 @@ message PartitionMethodProto { } message PartitionDescProto { - optional string partitionName = 2; - required int32 ordinalPosition = 3; - optional string partitionValue = 4; - optional string path = 5; + required string partitionName = 1; + repeated PartitionKeyProto partitionKeys = 32; + optional string path = 3; +} + +message PartitionKeyProto { + required string columnName = 1; + required string partitionValue = 2; +} + + +message PartitionIdentifierProto { + required string databaseName = 1; + required string tableName = 2; + optional string partitionName = 3; } message TablespaceProto { @@ -345,6 +356,7 @@ message AlterTableDescProto { optional ColumnProto addColumn = 3; optional AlterColumnProto alterColumnName = 4; required AlterTableType alterTableType = 5; + optional PartitionDescProto partitionDesc = 6; } message AlterColumnProto { diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java index 2c3fc6ac1a..276151796a 100644 --- a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java @@ -608,7 +608,8 @@ public void alterTable(final CatalogProtos.AlterTableDescProto alterTableDescPro final String databaseName = split[0]; final String tableName = split[1]; - + String partitionName = null; + CatalogProtos.PartitionDescProto partitionDesc = null; switch (alterTableDescProto.getAlterTableType()) { case RENAME_TABLE: @@ -629,6 +630,22 @@ public void alterTable(final CatalogProtos.AlterTableDescProto alterTableDescPro } addNewColumn(databaseName, tableName, alterTableDescProto.getAddColumn()); break; + case ADD_PARTITION: + partitionName = alterTableDescProto.getPartitionDesc().getPartitionName(); + partitionDesc = getPartition(databaseName, tableName, partitionName); + if(partitionDesc != null) { + throw new AlreadyExistsPartitionException(databaseName, tableName, partitionName); + } + addPartition(databaseName, tableName, alterTableDescProto.getPartitionDesc()); + break; + case DROP_PARTITION: + partitionName = alterTableDescProto.getPartitionDesc().getPartitionName(); + partitionDesc = getPartition(databaseName, tableName, partitionName); + if(partitionDesc == null) { + throw new NoSuchPartitionException(databaseName, tableName, partitionName); + } + dropPartition(databaseName, tableName, partitionDesc); + break; default: //TODO } @@ -701,6 +718,59 @@ private void addNewColumn(String databaseName, String tableName, CatalogProtos.C } } + private void addPartition(String databaseName, String tableName, CatalogProtos.PartitionDescProto + partitionDescProto) { + HCatalogStoreClientPool.HCatalogStoreClient client = null; + try { + + client = clientPool.getClient(); + + Partition partition = new Partition(); + partition.setDbName(databaseName); + partition.setTableName(tableName); + + List values = Lists.newArrayList(); + for(CatalogProtos.PartitionKeyProto keyProto : partitionDescProto.getPartitionKeysList()) { + values.add(keyProto.getPartitionValue()); + } + partition.setValues(values); + + Table table = client.getHiveClient().getTable(databaseName, tableName); + StorageDescriptor sd = table.getSd(); + sd.setLocation(partitionDescProto.getPath()); + partition.setSd(sd); + + client.getHiveClient().add_partition(partition); + } catch (Exception e) { + throw new CatalogException(e); + } finally { + if (client != null) { + client.release(); + } + } + } + + private void dropPartition(String databaseName, String tableName, CatalogProtos.PartitionDescProto + partitionDescProto) { + HCatalogStoreClientPool.HCatalogStoreClient client = null; + try { + + client = clientPool.getClient(); + + List values = Lists.newArrayList(); + for(CatalogProtos.PartitionKeyProto keyProto : partitionDescProto.getPartitionKeysList()) { + values.add(keyProto.getPartitionValue()); + } + client.getHiveClient().dropPartition(databaseName, tableName, values, true); + } catch (Exception e) { + throw new CatalogException(e); + } finally { + if (client != null) { + client.release(); + } + } + } + @Override public void addPartitionMethod(CatalogProtos.PartitionMethodProto partitionMethodProto) throws CatalogException { // TODO - not implemented yet @@ -723,35 +793,48 @@ public void dropPartitionMethod(String databaseName, String tableName) throws Ca } @Override - public void addPartitions(CatalogProtos.PartitionsProto partitionsProto) throws CatalogException { - // TODO - not implemented yet + public List getPartitions(String databaseName, + String tableName) throws CatalogException { + throw new UnsupportedOperationException(); } - @Override - public void addPartition(String databaseName, String tableName, CatalogProtos.PartitionDescProto partitionDescProto) throws CatalogException { - - } @Override - public CatalogProtos.PartitionsProto getPartitions(String tableName) throws CatalogException { - return null; // TODO - not implemented yet - } + public CatalogProtos.PartitionDescProto getPartition(String databaseName, String tableName, + String partitionName) throws CatalogException { + HCatalogStoreClientPool.HCatalogStoreClient client = null; + CatalogProtos.PartitionDescProto.Builder builder = null; - @Override - public CatalogProtos.PartitionDescProto getPartition(String partitionName) throws CatalogException { - return null; // TODO - not implemented yet - } + try { + client = clientPool.getClient(); - @Override - public void delPartition(String partitionName) throws CatalogException { - // TODO - not implemented yet - } + Partition partition = client.getHiveClient().getPartition(databaseName, tableName, partitionName); + builder = CatalogProtos.PartitionDescProto.newBuilder(); + builder.setPartitionName(partitionName); + builder.setPath(partition.getSd().getLocation()); - @Override - public void dropPartitions(String tableName) throws CatalogException { + String[] partitionNames = partitionName.split("/"); - } + for (int i = 0; i < partition.getValues().size(); i++) { + String value = partition.getValues().get(i); + CatalogProtos.PartitionKeyProto.Builder keyBuilder = CatalogProtos.PartitionKeyProto.newBuilder(); + String columnName = partitionNames[i].split("=")[0]; + keyBuilder.setColumnName(columnName); + keyBuilder.setPartitionValue(value); + builder.addPartitionKeys(keyBuilder); + } + } catch (NoSuchObjectException e) { + return null; + } catch (Exception e) { + throw new CatalogException(e); + } finally { + if (client != null) { + client.release(); + } + } + return builder.build(); + } @Override public final void addFunction(final FunctionDesc func) throws CatalogException { diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/test/java/org/apache/tajo/catalog/store/TestHCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/test/java/org/apache/tajo/catalog/store/TestHCatalogStore.java index 725f665394..a8d38ca4a1 100644 --- a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/test/java/org/apache/tajo/catalog/store/TestHCatalogStore.java +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/test/java/org/apache/tajo/catalog/store/TestHCatalogStore.java @@ -24,10 +24,9 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.tajo.catalog.CatalogUtil; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableDesc; -import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.partition.PartitionDesc; +import org.apache.tajo.catalog.partition.PartitionKey; import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.common.TajoDataTypes; @@ -40,8 +39,11 @@ import org.junit.Test; import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; import java.util.List; +import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; import static org.junit.Assert.*; /** @@ -232,11 +234,12 @@ public void testAddTableByPartition() throws Exception { org.apache.tajo.catalog.Schema expressionSchema = new org.apache.tajo.catalog.Schema(); expressionSchema.addColumn("n_nationkey", TajoDataTypes.Type.INT4); + expressionSchema.addColumn("n_date", TajoDataTypes.Type.TEXT); PartitionMethodDesc partitions = new PartitionMethodDesc( DB_NAME, NATION, - CatalogProtos.PartitionType.COLUMN, expressionSchema.getColumn(0).getQualifiedName(), expressionSchema); + CatalogProtos.PartitionType.COLUMN, "n_nationkey,n_date", expressionSchema); table.setPartitionMethod(partitions); store.createTable(table.getProto()); @@ -250,18 +253,80 @@ public void testAddTableByPartition() throws Exception { assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName()); } - Schema partitionSchema = table.getPartitionMethod().getExpressionSchema(); Schema partitionSchema1 = table1.getPartitionMethod().getExpressionSchema(); assertEquals(partitionSchema.size(), partitionSchema1.size()); + for (int i = 0; i < partitionSchema.size(); i++) { assertEquals(partitionSchema.getColumn(i).getSimpleName(), partitionSchema1.getColumn(i).getSimpleName()); } + testAddPartition(table1.getPath(), NATION, "n_nationkey=10/n_date=20150101"); + testAddPartition(table1.getPath(), NATION, "n_nationkey=20/n_date=20150102"); + + testDropPartition(NATION, "n_nationkey=10/n_date=20150101"); + testDropPartition(NATION, "n_nationkey=20/n_date=20150102"); + + CatalogProtos.PartitionDescProto partition = store.getPartition(DB_NAME, NATION, "n_nationkey=10/n_date=20150101"); + assertNull(partition); + + partition = store.getPartition(DB_NAME, NATION, "n_nationkey=20/n_date=20150102"); + assertNull(partition); + store.dropTable(DB_NAME, NATION); } + private void testAddPartition(URI uri, String tableName, String partitionName) throws Exception { + AlterTableDesc alterTableDesc = new AlterTableDesc(); + alterTableDesc.setTableName(DB_NAME + "." + tableName); + alterTableDesc.setAlterTableType(AlterTableType.ADD_PARTITION); + + Path path = new Path(uri.getPath(), partitionName); + + PartitionDesc partitionDesc = new PartitionDesc(); + partitionDesc.setName(partitionName); + + List partitionKeyList = new ArrayList(); + String[] partitionNames = partitionName.split("/"); + for(int i = 0; i < partitionNames.length; i++) { + String[] eachPartitionName = partitionNames[i].split("="); + partitionKeyList.add(new PartitionKey(eachPartitionName[0], eachPartitionName[1])); + } + partitionDesc.setPartitionKeys(partitionKeyList); + partitionDesc.setPath(path.toString()); + + alterTableDesc.setPartitionDesc(partitionDesc); + + store.alterTable(alterTableDesc.getProto()); + + CatalogProtos.PartitionDescProto resultDesc = store.getPartition(DB_NAME, NATION, partitionName); + assertNotNull(resultDesc); + assertEquals(resultDesc.getPartitionName(), partitionName); + assertEquals(resultDesc.getPath(), uri.toString() + "/" + partitionName); + assertEquals(resultDesc.getPartitionKeysCount(), 2); + + for (int i = 0; i < resultDesc.getPartitionKeysCount(); i++) { + CatalogProtos.PartitionKeyProto keyProto = resultDesc.getPartitionKeys(i); + String[] eachName = partitionNames[i].split("="); + assertEquals(keyProto.getPartitionValue(), eachName[1]); + } + } + + + private void testDropPartition(String tableName, String partitionName) throws Exception { + AlterTableDesc alterTableDesc = new AlterTableDesc(); + alterTableDesc.setTableName(DB_NAME + "." + tableName); + alterTableDesc.setAlterTableType(AlterTableType.DROP_PARTITION); + + PartitionDesc partitionDesc = new PartitionDesc(); + partitionDesc.setName(partitionName); + + alterTableDesc.setPartitionDesc(partitionDesc); + + store.alterTable(alterTableDesc.getProto()); + } + @Test public void testGetAllTableNames() throws Exception{ TableMeta meta = new TableMeta(CatalogProtos.StoreType.CSV, new KeyValueSet()); diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java index c34b4d2c1f..e9fb177cd4 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java @@ -33,6 +33,7 @@ import org.apache.tajo.catalog.CatalogProtocol.CatalogProtocolService; import org.apache.tajo.catalog.dictionary.InfoSchemaMetadataDictionary; import org.apache.tajo.catalog.exception.*; +import org.apache.tajo.catalog.partition.PartitionDesc; import org.apache.tajo.catalog.proto.CatalogProtos.*; import org.apache.tajo.catalog.store.CatalogStore; import org.apache.tajo.catalog.store.DerbyStore; @@ -69,7 +70,6 @@ */ @ThreadSafe public class CatalogServer extends AbstractService { - private final static String DEFAULT_NAMESPACE = "public"; private final static Log LOG = LogFactory.getLog(CatalogServer.class); @@ -821,34 +821,90 @@ public BoolProto dropPartitionMethod(RpcController controller, TableIdentifierPr } @Override - public BoolProto addPartitions(RpcController controller, PartitionsProto request) throws ServiceException { - return ProtoUtil.TRUE; - } + public PartitionDescProto getPartitionByPartitionName(RpcController controller, PartitionIdentifierProto request) + throws ServiceException { + String databaseName = request.getDatabaseName(); + String tableName = request.getTableName(); + String partitionName = request.getPartitionName(); - @Override - public BoolProto addPartition(RpcController controller, PartitionDescProto request) throws ServiceException { - return ProtoUtil.TRUE; - } + if (metaDictionary.isSystemDatabase(databaseName)) { + throw new ServiceException(databaseName + " is a system databsae. It does not contain any partitioned tables."); + } - @Override - public PartitionDescProto getPartitionByPartitionName(RpcController controller, StringProto request) - throws ServiceException { - return null; - } + rlock.lock(); + try { + boolean contain; - @Override - public PartitionsProto getPartitionsByTableName(RpcController controller, - StringProto request) - throws ServiceException { - return null; + contain = store.existDatabase(databaseName); + if (contain) { + contain = store.existTable(databaseName, tableName); + if (contain) { + if (store.existPartitionMethod(databaseName, tableName)) { + PartitionDescProto partitionDesc = store.getPartition(databaseName, tableName, partitionName); + if (partitionDesc != null) { + return partitionDesc; + } else { + throw new NoSuchPartitionException(databaseName, tableName, partitionName); + } + } else { + throw new NoPartitionedTableException(databaseName, tableName); + } + } else { + throw new NoSuchTableException(tableName); + } + } else { + throw new NoSuchDatabaseException(databaseName); + } + } catch (Exception e) { + LOG.error(e); + throw new ServiceException(e); + } finally { + rlock.unlock(); + } } @Override - public PartitionsProto delAllPartitions(RpcController controller, StringProto request) - throws ServiceException { - return null; + public PartitionsProto getPartitionsByTableName(RpcController controller, PartitionIdentifierProto request) + throws ServiceException { + String databaseName = request.getDatabaseName(); + String tableName = request.getTableName(); + + if (metaDictionary.isSystemDatabase(databaseName)) { + throw new ServiceException(databaseName + " is a system databsae. It does not contain any partitioned tables."); + } + + rlock.lock(); + try { + boolean contain; + + contain = store.existDatabase(databaseName); + if (contain) { + contain = store.existTable(databaseName, tableName); + if (contain) { + if (store.existPartitionMethod(databaseName, tableName)) { + List partitions = store.getPartitions(databaseName, tableName); + PartitionsProto.Builder builder = PartitionsProto.newBuilder(); + for(PartitionDescProto partition : partitions) { + builder.addPartition(partition); + } + return builder.build(); + } else { + throw new NoPartitionedTableException(databaseName, tableName); + } + } else { + throw new NoSuchTableException(tableName); + } + } else { + throw new NoSuchDatabaseException(databaseName); + } + } catch (Exception e) { + LOG.error(e); + throw new ServiceException(e); + } finally { + rlock.unlock(); + } } - + @Override public GetTablePartitionsProto getAllPartitions(RpcController controller, NullProto request) throws ServiceException { rlock.lock(); diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java index be6bf1c594..9127b9ed2f 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java @@ -973,7 +973,8 @@ public void alterTable(CatalogProtos.AlterTableDescProto alterTableDescProto) th } String databaseName = splitted[0]; String tableName = splitted[1]; - + String partitionName = null; + CatalogProtos.PartitionDescProto partitionDesc = null; try { int databaseId = getDatabaseId(databaseName); @@ -998,6 +999,22 @@ public void alterTable(CatalogProtos.AlterTableDescProto alterTableDescProto) th } addNewColumn(tableId, alterTableDescProto.getAddColumn()); break; + case ADD_PARTITION: + partitionName = alterTableDescProto.getPartitionDesc().getPartitionName(); + partitionDesc = getPartition(databaseName, tableName, partitionName); + if(partitionDesc != null) { + throw new AlreadyExistsPartitionException(databaseName, tableName, partitionName); + } + addPartition(tableId, alterTableDescProto.getPartitionDesc()); + break; + case DROP_PARTITION: + partitionName = alterTableDescProto.getPartitionDesc().getPartitionName(); + partitionDesc = getPartition(databaseName, tableName, partitionName); + if(partitionDesc == null) { + throw new NoSuchPartitionException(databaseName, tableName, partitionName); + } + dropPartition(tableId, alterTableDescProto.getPartitionDesc().getPartitionName()); + break; default: } } catch (SQLException sqlException) { @@ -1158,6 +1175,120 @@ private void addNewColumn(int tableId, CatalogProtos.ColumnProto columnProto) th } } + public void addPartition(int tableId, CatalogProtos.PartitionDescProto partition) throws CatalogException { + Connection conn = null; + PreparedStatement pstmt = null; + final String ADD_PARTITION_SQL = + "INSERT INTO " + TB_PARTTIONS + + " (" + COL_TABLES_PK + ", PARTITION_NAME, PATH) VALUES (?,?,?)"; + + final String ADD_PARTITION_KEYS_SQL = + "INSERT INTO " + TB_PARTTION_KEYS + " (" + COL_PARTITIONS_PK + ", " + COL_COLUMN_NAME + ", " + + COL_PARTITION_VALUE + ") VALUES (?,?,?)"; + + try { + + if (LOG.isDebugEnabled()) { + LOG.debug(ADD_PARTITION_SQL); + } + + conn = getConnection(); + pstmt = conn.prepareStatement(ADD_PARTITION_SQL); + + pstmt.setInt(1, tableId); + pstmt.setString(2, partition.getPartitionName()); + pstmt.setString(3, partition.getPath()); + pstmt.executeUpdate(); + + if (partition.getPartitionKeysCount() > 0) { + pstmt = conn.prepareStatement(ADD_PARTITION_KEYS_SQL); + int partitionId = getPartitionId(tableId, partition.getPartitionName()); + addPartitionKeys(pstmt, partitionId, partition); + pstmt.executeBatch(); + } + } catch (SQLException se) { + throw new CatalogException(se); + } finally { + CatalogUtil.closeQuietly(pstmt); + } + } + + public int getPartitionId(int tableId, String partitionName) throws CatalogException { + Connection conn = null; + ResultSet res = null; + PreparedStatement pstmt = null; + int retValue = -1; + + try { + String sql = "SELECT " + COL_PARTITIONS_PK + " FROM " + TB_PARTTIONS + + " WHERE " + COL_TABLES_PK + " = ? AND PARTITION_NAME = ? "; + + if (LOG.isDebugEnabled()) { + LOG.debug(sql); + } + + conn = getConnection(); + pstmt = conn.prepareStatement(sql); + pstmt.setInt(1, tableId); + pstmt.setString(2, partitionName); + res = pstmt.executeQuery(); + + if (res.next()) { + retValue = res.getInt(1); + } + } catch (SQLException se) { + throw new CatalogException(se); + } finally { + CatalogUtil.closeQuietly(pstmt, res); + } + return retValue; + } + + private void addPartitionKeys(PreparedStatement pstmt, int partitionId, PartitionDescProto partition) throws + SQLException { + for (int i = 0; i < partition.getPartitionKeysCount(); i++) { + PartitionKeyProto partitionKey = partition.getPartitionKeys(i); + + pstmt.setInt(1, partitionId); + pstmt.setString(2, partitionKey.getColumnName()); + pstmt.setString(3, partitionKey.getPartitionValue()); + + pstmt.addBatch(); + pstmt.clearParameters(); + } + } + + + private void dropPartition(int tableId, String partitionName) throws CatalogException { + Connection conn = null; + PreparedStatement pstmt = null; + + try { + int partitionId = getPartitionId(tableId, partitionName); + + String sqlDeletePartitionKeys = "DELETE FROM " + TB_PARTTION_KEYS + " WHERE " + COL_PARTITIONS_PK + " = ? "; + String sqlDeletePartition = "DELETE FROM " + TB_PARTTIONS + " WHERE " + COL_PARTITIONS_PK + " = ? "; + + if (LOG.isDebugEnabled()) { + LOG.debug(sqlDeletePartitionKeys); + } + + conn = getConnection(); + pstmt = conn.prepareStatement(sqlDeletePartitionKeys); + pstmt.setInt(1, partitionId); + pstmt.executeUpdate(); + + pstmt = conn.prepareStatement(sqlDeletePartition); + pstmt.setInt(1, partitionId); + pstmt.executeUpdate(); + + } catch (SQLException se) { + throw new CatalogException(se); + } finally { + CatalogUtil.closeQuietly(pstmt); + } + } + private int getDatabaseId(String databaseName) throws SQLException { String sql = String.format("SELECT DB_ID from %s WHERE DB_NAME = ?", TB_DATABASES); @@ -1260,6 +1391,19 @@ public void dropTableInternal(Connection conn, String databaseName, final String pstmt.executeUpdate(); pstmt.close(); + sql = "DELETE FROM " + TB_PARTTION_KEYS + + " WHERE " + COL_PARTITIONS_PK + + " IN (SELECT " + COL_PARTITIONS_PK + " FROM " + TB_PARTTIONS + " WHERE " + COL_TABLES_PK + "= ? )"; + + if (LOG.isDebugEnabled()) { + LOG.debug(sql); + } + + pstmt = conn.prepareStatement(sql); + pstmt.setInt(1, tableId); + pstmt.executeUpdate(); + pstmt.close(); + sql = "DELETE FROM " + TB_PARTTIONS + " WHERE " + COL_TABLES_PK + " = ? "; if (LOG.isDebugEnabled()) { @@ -1698,66 +1842,14 @@ public List getAllColumns() throws CatalogException { return columns; } - private static final String ADD_PARTITION_SQL = - "INSERT INTO " + TB_PARTTIONS + " (TID, PARTITION_NAME, ORDINAL_POSITION, PATH) VALUES (?,?,?,?)"; - - - @Override - public void addPartitions(CatalogProtos.PartitionsProto partitionsProto) throws CatalogException { - Connection conn = null; - PreparedStatement pstmt = null; - - try { - if (LOG.isDebugEnabled()) { - LOG.debug(ADD_PARTITION_SQL); - } - - String databaseName = partitionsProto.getTableIdentifier().getDatabaseName(); - String tableName = partitionsProto.getTableIdentifier().getTableName(); - - int databaseId = getDatabaseId(databaseName); - int tableId = getTableId(databaseId, databaseName, tableName); - - conn = getConnection(); - pstmt = conn.prepareStatement(ADD_PARTITION_SQL); - - for (CatalogProtos.PartitionDescProto partition : partitionsProto.getPartitionList()) { - addPartitionInternal(pstmt, tableId, partition); - } - pstmt.executeBatch(); - conn.commit(); - } catch (SQLException se) { - if (conn != null) { - try { - conn.rollback(); - } catch (SQLException e) { - LOG.error(e, e); - } - } - throw new CatalogException(se); - } finally { - CatalogUtil.closeQuietly(pstmt); - } - } - - private static void addPartitionInternal(PreparedStatement pstmt, int tableId, PartitionDescProto partition) throws - SQLException { - pstmt.setInt(1, tableId); - pstmt.setString(2, partition.getPartitionName()); - pstmt.setInt(3, partition.getOrdinalPosition()); - pstmt.setString(4, partition.getPath()); - pstmt.addBatch(); - pstmt.clearParameters(); - } - @Override public void addPartitionMethod(CatalogProtos.PartitionMethodProto proto) throws CatalogException { Connection conn = null; PreparedStatement pstmt = null; try { - String sql = "INSERT INTO " + TB_PARTITION_METHODS + " (TID, PARTITION_TYPE, EXPRESSION, EXPRESSION_SCHEMA) " + - "VALUES (?,?,?,?)"; + String sql = "INSERT INTO " + TB_PARTITION_METHODS + + " (" + COL_TABLES_PK + ", PARTITION_TYPE, EXPRESSION, EXPRESSION_SCHEMA) VALUES (?,?,?,?)"; if (LOG.isDebugEnabled()) { LOG.debug(sql); @@ -1789,15 +1881,18 @@ public void dropPartitionMethod(String databaseName, String tableName) throws Ca PreparedStatement pstmt = null; try { - String sql = "DELETE FROM " + TB_PARTITION_METHODS + " WHERE " + COL_TABLES_NAME + " = ? "; + String sql = "DELETE FROM " + TB_PARTITION_METHODS + " WHERE " + COL_TABLES_PK + " = ? "; if (LOG.isDebugEnabled()) { LOG.debug(sql); } + int databaseId = getDatabaseId(databaseName); + int tableId = getTableId(databaseId, databaseName, tableName); + conn = getConnection(); pstmt = conn.prepareStatement(sql); - pstmt.setString(1, tableName); + pstmt.setInt(1, tableId); pstmt.executeUpdate(); } catch (SQLException se) { throw new CatalogException(se); @@ -1815,15 +1910,18 @@ public CatalogProtos.PartitionMethodProto getPartitionMethod(String databaseName try { String sql = "SELECT partition_type, expression, expression_schema FROM " + TB_PARTITION_METHODS + - " WHERE " + COL_TABLES_NAME + " = ? "; + " WHERE " + COL_TABLES_PK + " = ? "; if (LOG.isDebugEnabled()) { LOG.debug(sql); } + int databaseId = getDatabaseId(databaseName); + int tableId = getTableId(databaseId, databaseName, tableName); + conn = getConnection(); pstmt = conn.prepareStatement(sql); - pstmt.setString(1, tableName); + pstmt.setInt(1, tableId); res = pstmt.executeQuery(); if (res.next()) { @@ -1848,15 +1946,18 @@ public boolean existPartitionMethod(String databaseName, String tableName) throw try { String sql = "SELECT partition_type, expression, expression_schema FROM " + TB_PARTITION_METHODS + - " WHERE " + COL_TABLES_NAME + "= ?"; + " WHERE " + COL_TABLES_PK + "= ?"; if (LOG.isDebugEnabled()) { LOG.debug(sql); } + int databaseId = getDatabaseId(databaseName); + int tableId = getTableId(databaseId, databaseName, tableName); + conn = getConnection(); pstmt = conn.prepareStatement(sql); - pstmt.setString(1, tableName); + pstmt.setInt(1, tableId); res = pstmt.executeQuery(); exist = res.next(); @@ -1869,90 +1970,113 @@ public boolean existPartitionMethod(String databaseName, String tableName) throw } @Override - public void addPartition(String databaseName, String tableName, - CatalogProtos.PartitionDescProto partition) throws CatalogException { + public CatalogProtos.PartitionDescProto getPartition(String databaseName, String tableName, + String partitionName) throws CatalogException { Connection conn = null; + ResultSet res = null; PreparedStatement pstmt = null; + PartitionDescProto.Builder builder = null; try { + String sql = "SELECT PATH, " + COL_PARTITIONS_PK + " FROM " + TB_PARTTIONS + + " WHERE " + COL_TABLES_PK + " = ? AND PARTITION_NAME = ? "; + if (LOG.isDebugEnabled()) { - LOG.debug(ADD_PARTITION_SQL); + LOG.debug(sql); } int databaseId = getDatabaseId(databaseName); int tableId = getTableId(databaseId, databaseName, tableName); conn = getConnection(); - pstmt = conn.prepareStatement(ADD_PARTITION_SQL); - addPartitionInternal(pstmt, tableId, partition); - pstmt.executeUpdate(); + pstmt = conn.prepareStatement(sql); + pstmt.setInt(1, tableId); + pstmt.setString(2, partitionName); + res = pstmt.executeQuery(); + + if (res.next()) { + builder = PartitionDescProto.newBuilder(); + builder.setPath(res.getString("PATH")); + builder.setPartitionName(partitionName); + setPartitionKeys(res.getInt(COL_PARTITIONS_PK), builder); + } else { + return null; + } } catch (SQLException se) { throw new CatalogException(se); } finally { - CatalogUtil.closeQuietly(pstmt); + CatalogUtil.closeQuietly(pstmt, res); } + return builder.build(); } - @Override - public CatalogProtos.PartitionDescProto getPartition(String partitionName) throws CatalogException { - // TODO - throw new UnimplementedException("getPartition is not implemented"); - } - - - @Override - public CatalogProtos.PartitionsProto getPartitions(String tableName) throws CatalogException { - // TODO - throw new UnimplementedException("getPartitions is not implemented"); - } - - - @Override - public void delPartition(String partitionName) throws CatalogException { + private void setPartitionKeys(int pid, PartitionDescProto.Builder partitionDesc) throws + CatalogException { Connection conn = null; + ResultSet res = null; PreparedStatement pstmt = null; try { - String sql = "DELETE FROM " + TB_PARTTIONS + " WHERE PARTITION_NAME = ? "; - - if (LOG.isDebugEnabled()) { - LOG.debug(sql); - } + String sql = "SELECT "+ COL_COLUMN_NAME + " , "+ COL_PARTITION_VALUE + + " FROM " + TB_PARTTION_KEYS + " WHERE " + COL_PARTITIONS_PK + " = ? "; conn = getConnection(); pstmt = conn.prepareStatement(sql); - pstmt.setString(1, partitionName); - pstmt.executeUpdate(); + pstmt.setInt(1, pid); + res = pstmt.executeQuery(); + + while (res.next()) { + PartitionKeyProto.Builder builder = PartitionKeyProto.newBuilder(); + builder.setColumnName(res.getString(COL_COLUMN_NAME)); + builder.setPartitionValue(res.getString(COL_PARTITION_VALUE)); + partitionDesc.addPartitionKeys(builder); + } } catch (SQLException se) { throw new CatalogException(se); } finally { - CatalogUtil.closeQuietly(pstmt); + CatalogUtil.closeQuietly(pstmt, res); } } @Override - public void dropPartitions(String tableName) throws CatalogException { + public List getPartitions(String databaseName, String tableName) throws CatalogException { Connection conn = null; + ResultSet res = null; PreparedStatement pstmt = null; + PartitionDescProto.Builder builder = null; + List partitions = new ArrayList(); try { - String sql = "DELETE FROM " + TB_PARTTIONS + " WHERE " + COL_TABLES_NAME + "= ? "; + String sql = "SELECT PATH, PARTITION_NAME, " + COL_PARTITIONS_PK + " FROM " + + TB_PARTTIONS +" WHERE " + COL_TABLES_PK + " = ? "; if (LOG.isDebugEnabled()) { LOG.debug(sql); } + int databaseId = getDatabaseId(databaseName); + int tableId = getTableId(databaseId, databaseName, tableName); + conn = getConnection(); pstmt = conn.prepareStatement(sql); - pstmt.setString(1, tableName); - pstmt.executeUpdate(); + pstmt.setInt(1, tableId); + res = pstmt.executeQuery(); + + while (res.next()) { + builder = PartitionDescProto.newBuilder(); + builder.setPath(res.getString("PATH")); + builder.setPartitionName(res.getString("PARTITION_NAME")); + setPartitionKeys(res.getInt(COL_PARTITIONS_PK), builder); + partitions.add(builder.build()); + } } catch (SQLException se) { throw new CatalogException(se); } finally { - CatalogUtil.closeQuietly(pstmt); + CatalogUtil.closeQuietly(pstmt, res); } + return partitions; } - + @Override public List getAllPartitions() throws CatalogException { Connection conn = null; @@ -1962,20 +2086,20 @@ public List getAllPartitions() throws CatalogException { List partitions = new ArrayList(); try { - String sql = "SELECT PID, TID, PARTITION_NAME, ORDINAL_POSITION, PATH FROM " + TB_PARTTIONS; + String sql = "SELECT " + COL_PARTITIONS_PK + ", " + COL_TABLES_PK + ", PARTITION_NAME, " + + " PATH FROM " + TB_PARTTIONS; conn = getConnection(); stmt = conn.createStatement(); resultSet = stmt.executeQuery(sql); while (resultSet.next()) { TablePartitionProto.Builder builder = TablePartitionProto.newBuilder(); - - builder.setPid(resultSet.getInt("PID")); - builder.setTid(resultSet.getInt("TID")); + + builder.setPid(resultSet.getInt(COL_PARTITIONS_PK)); + builder.setTid(resultSet.getInt(COL_TABLES_PK)); builder.setPartitionName(resultSet.getString("PARTITION_NAME")); - builder.setOrdinalPosition(resultSet.getInt("ORDINAL_POSITION")); builder.setPath(resultSet.getString("PATH")); - + partitions.add(builder.build()); } } catch (SQLException se) { @@ -1983,7 +2107,7 @@ public List getAllPartitions() throws CatalogException { } finally { CatalogUtil.closeQuietly(stmt, resultSet); } - + return partitions; } @@ -2003,7 +2127,8 @@ public void createIndex(final IndexDescProto proto) throws CatalogException { String sql = "INSERT INTO " + TB_INDEXES + " (" + COL_DATABASES_PK + ", " + COL_TABLES_PK + ", INDEX_NAME, " + - "COLUMN_NAME, DATA_TYPE, INDEX_TYPE, IS_UNIQUE, IS_CLUSTERED, IS_ASCENDING) VALUES (?,?,?,?,?,?,?,?,?)"; + "" + COL_COLUMN_NAME + ", DATA_TYPE, INDEX_TYPE, IS_UNIQUE, IS_CLUSTERED, IS_ASCENDING) " + + "VALUES (?,?,?,?,?,?,?,?,?)"; if (LOG.isDebugEnabled()) { LOG.debug(sql); diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractMySQLMariaDBStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractMySQLMariaDBStore.java index 6d0876f2d6..be9727e751 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractMySQLMariaDBStore.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractMySQLMariaDBStore.java @@ -204,6 +204,19 @@ protected void createBaseTable() throws CatalogException { baseTableMaps.put(TB_PARTTIONS, true); } + // PARTITION_KEYS + if (!baseTableMaps.get(TB_PARTTION_KEYS)) { + String sql = readSchemaFile("partition_params.sql"); + + if (LOG.isDebugEnabled()) { + LOG.debug(sql.toString()); + } + + stmt.executeUpdate(sql.toString()); + LOG.info("Table '" + TB_PARTTION_KEYS + "' is created."); + baseTableMaps.put(TB_PARTTION_KEYS, true); + } + insertSchemaVersion(); } catch (SQLException se) { @@ -270,6 +283,7 @@ protected boolean isInitialized() throws CatalogException { baseTableMaps.put(TB_INDEXES, false); baseTableMaps.put(TB_PARTITION_METHODS, false); baseTableMaps.put(TB_PARTTIONS, false); + baseTableMaps.put(TB_PARTTION_KEYS, false); if (res.wasNull()) return false; diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java index ed6fedc790..57ee74f88a 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/CatalogStore.java @@ -102,25 +102,17 @@ PartitionMethodProto getPartitionMethod(String databaseName, String tableName) /************************** PARTITIONS *****************************/ - void addPartitions(CatalogProtos.PartitionsProto partitionsProto) throws CatalogException; - - void addPartition(String databaseName, String tableName, - CatalogProtos.PartitionDescProto partitionDescProto) throws CatalogException; - /** * Get all partitions of a table * @param tableName the table name * @return * @throws CatalogException */ - CatalogProtos.PartitionsProto getPartitions(String tableName) throws CatalogException; + List getPartitions(String databaseName, String tableName) throws CatalogException; - CatalogProtos.PartitionDescProto getPartition(String partitionName) throws CatalogException; + CatalogProtos.PartitionDescProto getPartition(String databaseName, String tableName, + String partitionName) throws CatalogException; - void delPartition(String partitionName) throws CatalogException; - - void dropPartitions(String tableName) throws CatalogException; - List getAllPartitions() throws CatalogException; /**************************** INDEX *******************************/ diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java index e37efe6225..8be3825a4b 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java @@ -498,37 +498,16 @@ public void dropPartitionMethod(String databaseName, String tableName) throws Ca } @Override - public void addPartitions(CatalogProtos.PartitionsProto partitionDescList) throws CatalogException { + public List getPartitions(String databaseName, String tableName) throws CatalogException { throw new RuntimeException("not supported!"); } @Override - public void addPartition(String databaseName, String tableName, CatalogProtos.PartitionDescProto - partitionDescProto) throws CatalogException { + public CatalogProtos.PartitionDescProto getPartition(String databaseName, String tableName, + String partitionName) throws CatalogException { throw new RuntimeException("not supported!"); } - @Override - public CatalogProtos.PartitionsProto getPartitions(String tableName) throws CatalogException { - throw new RuntimeException("not supported!"); - } - - @Override - public CatalogProtos.PartitionDescProto getPartition(String partitionName) throws CatalogException { - throw new RuntimeException("not supported!"); - } - - @Override - public void delPartition(String partitionName) throws CatalogException { - throw new RuntimeException("not supported!"); - } - - @Override - public void dropPartitions(String tableName) throws CatalogException { - throw new RuntimeException("not supported!"); - } - - @Override public List getAllPartitions() throws CatalogException { throw new UnsupportedOperationException(); } diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml index a0bd9cda61..a797876dfe 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml +++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml @@ -153,19 +153,28 @@ CREATE TABLE PARTITIONS ( PID INT NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1), TID INT NOT NULL REFERENCES TABLES (TID) ON DELETE CASCADE, - PARTITION_NAME VARCHAR(255), - ORDINAL_POSITION INT NOT NULL, - PARTITION_VALUE VARCHAR(1024), + PARTITION_NAME VARCHAR(767), PATH VARCHAR(1024), - CONSTRAINT C_PARTITION_PK PRIMARY KEY (PID), - CONSTRAINT C_PARTITION_UNIQUE UNIQUE (TID, PARTITION_NAME) + CONSTRAINT C_PARTITION_PK PRIMARY KEY (PID) )]]> - - + + - + + + + + + + + diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/partition_keys.sql b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/partition_keys.sql new file mode 100644 index 0000000000..56ccfe6041 --- /dev/null +++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/partition_keys.sql @@ -0,0 +1,6 @@ +CREATE TABLE PARTITION_KEYS ( + PID INT NOT NULL, + COLUMN_NAME VARCHAR(255) BINARY NOT NULL, + PARTITION_VALUE VARCHAR(255) NOT NULL, + UNIQUE INDEX PARTITION_KEYS_IDX (PID, COLUMN_NAME, PARTITION_VALUE), + FOREIGN KEY (PID) REFERENCES PARTITIONS (PID) ON DELETE CASCADE) \ No newline at end of file diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/partitions.sql b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/partitions.sql index c2672a5c75..10e2b9a530 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/partitions.sql +++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/partitions.sql @@ -1,12 +1,7 @@ CREATE TABLE PARTITIONS ( - PID INT NOT NULL PRIMARY KEY, + PID INT NOT NULL AUTO_INCREMENT PRIMARY KEY, TID INT NOT NULL, - PARTITION_NAME VARCHAR(128) BINARY, - ORDINAL_POSITION INT NOT NULL, - PARTITION_VALUE VARCHAR(1024), + PARTITION_NAME VARCHAR(767) BINARY, PATH VARCHAR(4096), - FOREIGN KEY (TID) REFERENCES TABLES (TID) ON DELETE CASCADE, - CONSTRAINT C_PARTITION_UNIQUE UNIQUE (TID, PARTITION_NAME), - INDEX IDX_TID (TID), - UNIQUE INDEX IDX_TID_NAME (TID, PARTITION_NAME) -) + UNIQUE INDEX PARTITIONS_IDX (PID, TID, PARTITION_NAME), + FOREIGN KEY (TID) REFERENCES TABLES (TID) ON DELETE CASCADE) \ No newline at end of file diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/partition_keys.sql b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/partition_keys.sql new file mode 100644 index 0000000000..56ccfe6041 --- /dev/null +++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/partition_keys.sql @@ -0,0 +1,6 @@ +CREATE TABLE PARTITION_KEYS ( + PID INT NOT NULL, + COLUMN_NAME VARCHAR(255) BINARY NOT NULL, + PARTITION_VALUE VARCHAR(255) NOT NULL, + UNIQUE INDEX PARTITION_KEYS_IDX (PID, COLUMN_NAME, PARTITION_VALUE), + FOREIGN KEY (PID) REFERENCES PARTITIONS (PID) ON DELETE CASCADE) \ No newline at end of file diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/partitions.sql b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/partitions.sql index c2672a5c75..10e2b9a530 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/partitions.sql +++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/partitions.sql @@ -1,12 +1,7 @@ CREATE TABLE PARTITIONS ( - PID INT NOT NULL PRIMARY KEY, + PID INT NOT NULL AUTO_INCREMENT PRIMARY KEY, TID INT NOT NULL, - PARTITION_NAME VARCHAR(128) BINARY, - ORDINAL_POSITION INT NOT NULL, - PARTITION_VALUE VARCHAR(1024), + PARTITION_NAME VARCHAR(767) BINARY, PATH VARCHAR(4096), - FOREIGN KEY (TID) REFERENCES TABLES (TID) ON DELETE CASCADE, - CONSTRAINT C_PARTITION_UNIQUE UNIQUE (TID, PARTITION_NAME), - INDEX IDX_TID (TID), - UNIQUE INDEX IDX_TID_NAME (TID, PARTITION_NAME) -) + UNIQUE INDEX PARTITIONS_IDX (PID, TID, PARTITION_NAME), + FOREIGN KEY (TID) REFERENCES TABLES (TID) ON DELETE CASCADE) \ No newline at end of file diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml index 880a14ece8..0773cf3211 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml +++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml @@ -188,18 +188,45 @@ CREATE TABLE PARTITIONS ( PID INT NOT NULL PRIMARY KEY, TID INT NOT NULL, - PARTITION_NAME VARCHAR2(128), - ORDINAL_POSITION INT NOT NULL, - PARTITION_VALUE VARCHAR2(1024), + PARTITION_NAME VARCHAR2(767), PATH VARCHAR2(4000), - FOREIGN KEY (TID) REFERENCES TABLES (TID) ON DELETE CASCADE, - CONSTRAINT C_PARTITION_UNIQUE UNIQUE (TID, PARTITION_NAME) + FOREIGN KEY (TID) REFERENCES TABLES (TID) ON DELETE CASCADE )]]> - - - + + + + + + + + + + + + + + + + + + diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml index 821527bd53..aef1bee3bc 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml +++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml @@ -151,7 +151,6 @@ xsi:schemaLocation="http://tajo.apache.org/catalogstore ../DBMSSchemaDefinition. PID INT NOT NULL PRIMARY KEY, TID INT NOT NULL, PARTITION_NAME VARCHAR(128), - ORDINAL_POSITION INT NOT NULL, PARTITION_VALUE VARCHAR(1024), PATH VARCHAR(4096), FOREIGN KEY (TID) REFERENCES TABLES (TID) ON DELETE CASCADE, diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java index c3bfc99bb0..8e561e72b1 100644 --- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java +++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java @@ -25,6 +25,8 @@ import org.apache.tajo.catalog.dictionary.InfoSchemaMetadataDictionary; import org.apache.tajo.catalog.exception.CatalogException; import org.apache.tajo.catalog.exception.NoSuchFunctionException; +import org.apache.tajo.catalog.partition.PartitionDesc; +import org.apache.tajo.catalog.partition.PartitionKey; import org.apache.tajo.catalog.store.PostgreSQLStore; import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos; @@ -57,7 +59,7 @@ import static org.junit.Assert.*; public class TestCatalog { - static final String FieldName1="f1"; + static final String FieldName1="f1"; static final String FieldName2="f2"; static final String FieldName3="f3"; @@ -886,15 +888,16 @@ public final void testAddAndDeleteTablePartitionByColumn() throws Exception { Schema partSchema = new Schema(); partSchema.addColumn("id", Type.INT4); + partSchema.addColumn("name", Type.TEXT); - PartitionMethodDesc partitionDesc = + PartitionMethodDesc partitionMethodDesc = new PartitionMethodDesc(DEFAULT_DATABASE_NAME, tableName, - CatalogProtos.PartitionType.COLUMN, "id", partSchema); + CatalogProtos.PartitionType.COLUMN, "id,name", partSchema); TableDesc desc = new TableDesc(tableName, schema, meta, new Path(CommonTestingUtil.getTestDir(), "addedtable").toUri()); - desc.setPartitionMethod(partitionDesc); + desc.setPartitionMethod(partitionMethodDesc); assertFalse(catalog.existsTable(tableName)); catalog.createTable(desc); assertTrue(catalog.existsTable(tableName)); @@ -905,10 +908,72 @@ public final void testAddAndDeleteTablePartitionByColumn() throws Exception { assertEquals(retrieved.getPartitionMethod().getPartitionType(), CatalogProtos.PartitionType.COLUMN); assertEquals(retrieved.getPartitionMethod().getExpressionSchema().getColumn(0).getSimpleName(), "id"); + testAddPartition(tableName, "id=10/name=aaa"); + testAddPartition(tableName, "id=20/name=bbb"); + + List partitions = catalog.getPartitions(DEFAULT_DATABASE_NAME, "addedtable"); + assertNotNull(partitions); + assertEquals(partitions.size(), 2); + + testDropPartition(tableName, "id=10/name=aaa"); + testDropPartition(tableName, "id=20/name=bbb"); + + partitions = catalog.getPartitions(DEFAULT_DATABASE_NAME, "addedtable"); + assertNotNull(partitions); + assertEquals(partitions.size(), 0); + catalog.dropTable(tableName); assertFalse(catalog.existsTable(tableName)); } + private void testAddPartition(String tableName, String partitionName) throws Exception { + AlterTableDesc alterTableDesc = new AlterTableDesc(); + alterTableDesc.setTableName(tableName); + alterTableDesc.setAlterTableType(AlterTableType.ADD_PARTITION); + + PartitionDesc partitionDesc = new PartitionDesc(); + partitionDesc.setName(partitionName); + + String[] partitionNames = partitionName.split("/"); + + List partitionKeyList = new ArrayList(); + for(int i = 0; i < partitionNames.length; i++) { + String columnName = partitionNames[i].split("=")[0]; + partitionKeyList.add(new PartitionKey(partitionNames[i], columnName)); + } + + partitionDesc.setPartitionKeys(partitionKeyList); + + partitionDesc.setPath("hdfs://xxx.com/warehouse/" + partitionName); + + alterTableDesc.setPartitionDesc(partitionDesc); + + catalog.alterTable(alterTableDesc); + + CatalogProtos.PartitionDescProto resultDesc = catalog.getPartition(DEFAULT_DATABASE_NAME, + "addedtable", partitionName); + + assertNotNull(resultDesc); + assertEquals(resultDesc.getPartitionName(), partitionName); + assertEquals(resultDesc.getPath(), "hdfs://xxx.com/warehouse/" + partitionName); + + assertEquals(resultDesc.getPartitionKeysCount(), 2); + } + + + private void testDropPartition(String tableName, String partitionName) throws Exception { + AlterTableDesc alterTableDesc = new AlterTableDesc(); + alterTableDesc.setTableName(tableName); + alterTableDesc.setAlterTableType(AlterTableType.DROP_PARTITION); + + PartitionDesc partitionDesc = new PartitionDesc(); + partitionDesc.setName(partitionName); + + alterTableDesc.setPartitionDesc(partitionDesc); + + catalog.alterTable(alterTableDesc); + } + @Test public void testAlterTableName () throws Exception { diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java index e44d8bef13..9e091f1c84 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java @@ -423,8 +423,6 @@ private List getAllPartitions(Schema outSchema) { } else { aTuple.put(fieldId, DatumFactory.createNullDatum()); } - } else if ("ordinal_position".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createInt4(partition.getOrdinalPosition())); } else if ("path".equalsIgnoreCase(column.getSimpleName())) { aTuple.put(fieldId, DatumFactory.createText(partition.getPath())); } From 9fe430794d9b51ffcef09f0c14d94be473214dba Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Tue, 24 Mar 2015 00:30:58 +0900 Subject: [PATCH 2/6] Fixed some typos and applied TUtil::checkEquals --- .../apache/tajo/catalog/partition/PartitionKey.java | 11 +++-------- .../src/main/proto/CatalogProtos.proto | 4 ++-- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionKey.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionKey.java index 4a7103e7eb..ffd4855b7b 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionKey.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionKey.java @@ -24,6 +24,7 @@ import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.common.ProtoObject; import org.apache.tajo.json.GsonObject; +import org.apache.tajo.util.TUtil; /** @@ -93,18 +94,12 @@ public int hashCode() { public boolean equals(Object o) { if (o instanceof PartitionKey) { PartitionKey another = (PartitionKey) o; - boolean eq = ((columnName != null && another.columnName != null - && columnName.equals(another.columnName)) || - (columnName == null && another.columnName == null)); - eq = eq && ((partitionValue != null && another.partitionValue != null - && partitionValue.equals(another.partitionValue)) || - (partitionValue == null && another.partitionValue == null)); - return eq; + return TUtil.checkEquals(columnName, another.columnName) && + TUtil.checkEquals(partitionValue, another.partitionValue); } return false; } - @Override public CatalogProtos.PartitionKeyProto getProto() { if (builder == null) { diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto index 83d8cf2036..fe59f123a7 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto +++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto @@ -282,7 +282,7 @@ message SortSpecProto { message PartitionsProto { - repeated PartitionDescProto partition = 21; + repeated PartitionDescProto partition = 1; } message PartitionMethodProto { @@ -294,7 +294,7 @@ message PartitionMethodProto { message PartitionDescProto { required string partitionName = 1; - repeated PartitionKeyProto partitionKeys = 32; + repeated PartitionKeyProto partitionKeys = 2; optional string path = 3; } From 78b8c721cf74d2d021b5171e69bdf26ddde2b3a2 Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Wed, 25 Mar 2015 08:10:45 +0900 Subject: [PATCH 3/6] Move brackets according to Tajo coding convention. --- .../tajo/catalog/partition/PartitionDesc.java | 19 +++++++++++-------- .../tajo/catalog/partition/PartitionKey.java | 8 ++++++-- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java index 7abfa2cdb3..b6d883d150 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionDesc.java @@ -85,23 +85,26 @@ public PartitionDesc(CatalogProtos.PartitionDescProto proto) { } } - public String getPartitionName() { return partitionName; } - public void setPartitionName(String partitionName) { this.partitionName = partitionName; } + public String getPartitionName() { + return partitionName; + } - public void setName(String partitionName) { + public void setPartitionName(String partitionName) { this.partitionName = partitionName; } - public String getName() { - return partitionName; - } - public List getPartitionKeys() { return partitionKeys; } + public List getPartitionKeys() { + return partitionKeys; + } - public void setPartitionKeys(List partitionKeys) { this.partitionKeys = partitionKeys; } + public void setPartitionKeys(List partitionKeys) { + this.partitionKeys = partitionKeys; + } public void setPath(String path) { this.path = path; } + public String getPath() { return path; } diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionKey.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionKey.java index ffd4855b7b..085598b7ec 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionKey.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/partition/PartitionKey.java @@ -83,9 +83,13 @@ public void setPartitionValue(String partitionValue) { this.partitionValue = partitionValue; } - public String getColumnName() { return columnName; } + public String getColumnName() { + return columnName; + } - public void setColumnName(String columnName) { this.columnName = columnName; } + public void setColumnName(String columnName) { + this.columnName = columnName; + } public int hashCode() { return Objects.hashCode(partitionValue, columnName); From 43a1a3782007c04a3fcec674c0aea6e2bcb1daa1 Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Wed, 25 Mar 2015 15:16:16 +0900 Subject: [PATCH 4/6] Rename partition id column name. --- .../org/apache/tajo/catalog/CatalogConstants.java | 2 +- .../src/main/proto/CatalogProtos.proto | 2 +- .../tajo/catalog/store/TestHCatalogStore.java | 4 ++-- .../tajo/catalog/store/AbstractDBStore.java | 2 +- .../src/main/resources/schemas/derby/derby.xml | 10 +++++----- .../resources/schemas/mariadb/partition_keys.sql | 4 ++-- .../main/resources/schemas/mariadb/partitions.sql | 4 ++-- .../resources/schemas/mysql/partition_keys.sql | 6 +++--- .../main/resources/schemas/mysql/partitions.sql | 4 ++-- .../src/main/resources/schemas/oracle/oracle.xml | 12 ++++++------ .../resources/schemas/postgresql/postgresql.xml | 15 ++++++++++++++- .../java/org/apache/tajo/catalog/TestCatalog.java | 4 ++-- .../exec/NonForwardQueryResultSystemScanner.java | 4 ++-- 13 files changed, 43 insertions(+), 30 deletions(-) diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java index 946c5041c0..8265e38f14 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java @@ -55,7 +55,7 @@ public class CatalogConstants { public static final String COL_TABLES_PK = "TID"; public static final String COL_TABLES_NAME = "TABLE_NAME"; - public static final String COL_PARTITIONS_PK = "PID"; + public static final String COL_PARTITIONS_PK = "PARTITION_ID"; public static final String COL_COLUMN_NAME = "COLUMN_NAME"; public static final String COL_PARTITION_VALUE = "PARTITION_VALUE"; diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto index fe59f123a7..3abd840fa4 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto +++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto @@ -186,7 +186,7 @@ message TableOptionProto { } message TablePartitionProto { - required int32 pid = 1; + required int32 partition_id = 1; required int32 tid = 2; optional string partitionName = 3; optional string path = 4; diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/test/java/org/apache/tajo/catalog/store/TestHCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/test/java/org/apache/tajo/catalog/store/TestHCatalogStore.java index a8d38ca4a1..32ab674481 100644 --- a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/test/java/org/apache/tajo/catalog/store/TestHCatalogStore.java +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/test/java/org/apache/tajo/catalog/store/TestHCatalogStore.java @@ -285,7 +285,7 @@ private void testAddPartition(URI uri, String tableName, String partitionName) t Path path = new Path(uri.getPath(), partitionName); PartitionDesc partitionDesc = new PartitionDesc(); - partitionDesc.setName(partitionName); + partitionDesc.setPartitionName(partitionName); List partitionKeyList = new ArrayList(); String[] partitionNames = partitionName.split("/"); @@ -320,7 +320,7 @@ private void testDropPartition(String tableName, String partitionName) throws E alterTableDesc.setAlterTableType(AlterTableType.DROP_PARTITION); PartitionDesc partitionDesc = new PartitionDesc(); - partitionDesc.setName(partitionName); + partitionDesc.setPartitionName(partitionName); alterTableDesc.setPartitionDesc(partitionDesc); diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java index 9127b9ed2f..518b499e62 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java @@ -2095,7 +2095,7 @@ public List getAllPartitions() throws CatalogException { while (resultSet.next()) { TablePartitionProto.Builder builder = TablePartitionProto.newBuilder(); - builder.setPid(resultSet.getInt(COL_PARTITIONS_PK)); + builder.setPartitionId(resultSet.getInt(COL_PARTITIONS_PK)); builder.setTid(resultSet.getInt(COL_TABLES_PK)); builder.setPartitionName(resultSet.getString("PARTITION_NAME")); builder.setPath(resultSet.getString("PATH")); diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml index a797876dfe..1e60d151eb 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml +++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/derby/derby.xml @@ -151,28 +151,28 @@ - + - + diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/partition_keys.sql b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/partition_keys.sql index 56ccfe6041..dd7f2b5fb9 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/partition_keys.sql +++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/partition_keys.sql @@ -1,6 +1,6 @@ CREATE TABLE PARTITION_KEYS ( - PID INT NOT NULL, + PARTITION_ID INT NOT NULL, COLUMN_NAME VARCHAR(255) BINARY NOT NULL, PARTITION_VALUE VARCHAR(255) NOT NULL, UNIQUE INDEX PARTITION_KEYS_IDX (PID, COLUMN_NAME, PARTITION_VALUE), - FOREIGN KEY (PID) REFERENCES PARTITIONS (PID) ON DELETE CASCADE) \ No newline at end of file + FOREIGN KEY (PID) REFERENCES PARTITIONS (PARTITION_ID) ON DELETE CASCADE) \ No newline at end of file diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/partitions.sql b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/partitions.sql index 10e2b9a530..7b279afb0e 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/partitions.sql +++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mariadb/partitions.sql @@ -1,7 +1,7 @@ CREATE TABLE PARTITIONS ( - PID INT NOT NULL AUTO_INCREMENT PRIMARY KEY, + PARTITION_ID INT NOT NULL AUTO_INCREMENT PRIMARY KEY, TID INT NOT NULL, PARTITION_NAME VARCHAR(767) BINARY, PATH VARCHAR(4096), - UNIQUE INDEX PARTITIONS_IDX (PID, TID, PARTITION_NAME), + UNIQUE INDEX PARTITIONS_IDX (PARTITION_ID, TID, PARTITION_NAME), FOREIGN KEY (TID) REFERENCES TABLES (TID) ON DELETE CASCADE) \ No newline at end of file diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/partition_keys.sql b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/partition_keys.sql index 56ccfe6041..a85b12f0dc 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/partition_keys.sql +++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/partition_keys.sql @@ -1,6 +1,6 @@ CREATE TABLE PARTITION_KEYS ( - PID INT NOT NULL, + PARTITION_ID INT NOT NULL, COLUMN_NAME VARCHAR(255) BINARY NOT NULL, PARTITION_VALUE VARCHAR(255) NOT NULL, - UNIQUE INDEX PARTITION_KEYS_IDX (PID, COLUMN_NAME, PARTITION_VALUE), - FOREIGN KEY (PID) REFERENCES PARTITIONS (PID) ON DELETE CASCADE) \ No newline at end of file + UNIQUE INDEX PARTITION_KEYS_IDX (PARTITION_ID, COLUMN_NAME, PARTITION_VALUE), + FOREIGN KEY (PID) REFERENCES PARTITIONS (PARTITION_ID) ON DELETE CASCADE) \ No newline at end of file diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/partitions.sql b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/partitions.sql index 10e2b9a530..7b279afb0e 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/partitions.sql +++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/mysql/partitions.sql @@ -1,7 +1,7 @@ CREATE TABLE PARTITIONS ( - PID INT NOT NULL AUTO_INCREMENT PRIMARY KEY, + PARTITION_ID INT NOT NULL AUTO_INCREMENT PRIMARY KEY, TID INT NOT NULL, PARTITION_NAME VARCHAR(767) BINARY, PATH VARCHAR(4096), - UNIQUE INDEX PARTITIONS_IDX (PID, TID, PARTITION_NAME), + UNIQUE INDEX PARTITIONS_IDX (PARTITION_ID, TID, PARTITION_NAME), FOREIGN KEY (TID) REFERENCES TABLES (TID) ON DELETE CASCADE) \ No newline at end of file diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml index 0773cf3211..84a92fbc14 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml +++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/oracle/oracle.xml @@ -186,7 +186,7 @@ - + - + diff --git a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml index aef1bee3bc..0f49f8314f 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml +++ b/tajo-catalog/tajo-catalog-server/src/main/resources/schemas/postgresql/postgresql.xml @@ -148,7 +148,7 @@ xsi:schemaLocation="http://tajo.apache.org/catalogstore ../DBMSSchemaDefinition. + + + + + + + diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java index 8e561e72b1..48a11ee0d6 100644 --- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java +++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java @@ -932,7 +932,7 @@ private void testAddPartition(String tableName, String partitionName) throws Exc alterTableDesc.setAlterTableType(AlterTableType.ADD_PARTITION); PartitionDesc partitionDesc = new PartitionDesc(); - partitionDesc.setName(partitionName); + partitionDesc.setPartitionName(partitionName); String[] partitionNames = partitionName.split("/"); @@ -967,7 +967,7 @@ private void testDropPartition(String tableName, String partitionName) throws Ex alterTableDesc.setAlterTableType(AlterTableType.DROP_PARTITION); PartitionDesc partitionDesc = new PartitionDesc(); - partitionDesc.setName(partitionName); + partitionDesc.setPartitionName(partitionName); alterTableDesc.setPartitionDesc(partitionDesc); diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java index 9e091f1c84..c2ccf341dc 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java @@ -413,8 +413,8 @@ private List getAllPartitions(Schema outSchema) { for (int fieldId = 0; fieldId < columns.size(); fieldId++) { Column column = columns.get(fieldId); - if ("pid".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createInt4(partition.getPid())); + if ("partition_id".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createInt4(partition.getPartitionId())); } else if ("tid".equalsIgnoreCase(column.getSimpleName())) { aTuple.put(fieldId, DatumFactory.createInt4(partition.getTid())); } else if ("partition_name".equalsIgnoreCase(column.getSimpleName())) { From 4435c89ec798c22f8a9f54e3fb192f0f5bba604f Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Wed, 25 Mar 2015 15:53:50 +0900 Subject: [PATCH 5/6] Added the table for partition keys to information_schema. --- .../InfoSchemaMetadataDictionary.java | 2 + .../PartitionKeysTableDescriptor.java | 46 +++++++++++++++++++ .../dictionary/PartitionsTableDescriptor.java | 3 +- .../jdbc/util/TestQueryStringDecoder.java | 14 +++--- 4 files changed, 56 insertions(+), 9 deletions(-) create mode 100644 tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/PartitionKeysTableDescriptor.java diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/InfoSchemaMetadataDictionary.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/InfoSchemaMetadataDictionary.java index 0ac0a544b7..1bb8bc503f 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/InfoSchemaMetadataDictionary.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/InfoSchemaMetadataDictionary.java @@ -40,6 +40,7 @@ private static enum DEFINED_TABLES { TABLEOPTIONS, TABLESTATS, PARTITIONS, + PARTITION_KEYS, CLUSTER, MAX_TABLE; } @@ -60,6 +61,7 @@ private void createSystemTableDescriptors() { schemaInfoTableDescriptors.set(DEFINED_TABLES.TABLEOPTIONS.ordinal(), new TableOptionsTableDescriptor(this)); schemaInfoTableDescriptors.set(DEFINED_TABLES.TABLESTATS.ordinal(), new TableStatsTableDescriptor(this)); schemaInfoTableDescriptors.set(DEFINED_TABLES.PARTITIONS.ordinal(), new PartitionsTableDescriptor(this)); + schemaInfoTableDescriptors.set(DEFINED_TABLES.PARTITION_KEYS.ordinal(), new PartitionKeysTableDescriptor(this)); schemaInfoTableDescriptors.set(DEFINED_TABLES.CLUSTER.ordinal(), new ClusterTableDescriptor(this)); } diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/PartitionKeysTableDescriptor.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/PartitionKeysTableDescriptor.java new file mode 100644 index 0000000000..ea35cefbb7 --- /dev/null +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/PartitionKeysTableDescriptor.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.catalog.dictionary; + +import org.apache.tajo.common.TajoDataTypes.Type; + +class PartitionKeysTableDescriptor extends AbstractTableDescriptor { + + private static final String TABLENAME = "partition_keys"; + private final ColumnDescriptor[] columns = new ColumnDescriptor[] { + new ColumnDescriptor("partition_id", Type.INT4, 0), + new ColumnDescriptor("column_name", Type.TEXT, 0), + new ColumnDescriptor("partition_value", Type.TEXT, 0), + }; + + public PartitionKeysTableDescriptor(InfoSchemaMetadataDictionary metadataDictionary) { + super(metadataDictionary); + } + + @Override + public String getTableNameString() { + return TABLENAME; + } + + @Override + protected ColumnDescriptor[] getColumnDescriptors() { + return columns; + } + +} diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/PartitionsTableDescriptor.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/PartitionsTableDescriptor.java index d69c93ea42..a6725c0e89 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/PartitionsTableDescriptor.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/PartitionsTableDescriptor.java @@ -24,10 +24,9 @@ class PartitionsTableDescriptor extends AbstractTableDescriptor { private static final String TABLENAME = "partitions"; private final ColumnDescriptor[] columns = new ColumnDescriptor[] { - new ColumnDescriptor("pid", Type.INT4, 0), + new ColumnDescriptor("partition_id", Type.INT4, 0), new ColumnDescriptor("tid", Type.INT4, 0), new ColumnDescriptor("partition_name", Type.TEXT, 0), - new ColumnDescriptor("ordinal_position", Type.INT4, 0), new ColumnDescriptor("path", Type.TEXT, 0) }; diff --git a/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/util/TestQueryStringDecoder.java b/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/util/TestQueryStringDecoder.java index 31a09d5e49..ca60fdb3c3 100644 --- a/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/util/TestQueryStringDecoder.java +++ b/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/util/TestQueryStringDecoder.java @@ -50,13 +50,13 @@ public void testSingleQueries() throws Exception { QueryStringDecoder decoder = null; String rawUriStr = ""; - rawUriStr = "http://127.0.0.1:26200/?qid=1234&tid=2345&pid=4567"; + rawUriStr = "http://127.0.0.1:26200/?qid=1234&tid=2345&partition_id=4567"; decoder = new QueryStringDecoder(rawUriStr); - assertThat(decoder.getQueries(), is("qid=1234&tid=2345&pid=4567")); + assertThat(decoder.getQueries(), is("qid=1234&tid=2345&partition_id=4567")); assertThat(decoder.getParameters(), is(notNullValue())); assertThat(decoder.getParameters().size(), is(3)); assertThat(decoder.getParameters().get("qid").get(0), is("1234")); - assertThat(decoder.getParameters().get("pid").get(0), is("4567")); + assertThat(decoder.getParameters().get("partition_id").get(0), is("4567")); rawUriStr = "http://127.0.0.1:26200/?tid=2345"; decoder = new QueryStringDecoder(rawUriStr); @@ -71,9 +71,9 @@ public void testMultipleQueries() throws Exception { QueryStringDecoder decoder = null; String rawUriStr = ""; - rawUriStr = "http://127.0.0.1:26200/?qid=1234&tid=2345&pid=4567&tid=4890"; + rawUriStr = "http://127.0.0.1:26200/?qid=1234&tid=2345&partition_id=4567&tid=4890"; decoder = new QueryStringDecoder(rawUriStr); - assertThat(decoder.getQueries(), is("qid=1234&tid=2345&pid=4567&tid=4890")); + assertThat(decoder.getQueries(), is("qid=1234&tid=2345&partition_id=4567&tid=4890")); assertThat(decoder.getParameters(), is(notNullValue())); assertThat(decoder.getParameters().size(), is(3)); assertThat(decoder.getParameters().get("tid").size(), is(2)); @@ -86,9 +86,9 @@ public void testMalformedURI() throws Exception { QueryStringDecoder decoder = null; String rawUriStr = ""; - rawUriStr = "http://127.0.0.1:26200/?=1234&tid=&pid=4567"; + rawUriStr = "http://127.0.0.1:26200/?=1234&tid=&partition_id=4567"; decoder = new QueryStringDecoder(rawUriStr); - assertThat(decoder.getQueries(), is("=1234&tid=&pid=4567")); + assertThat(decoder.getQueries(), is("=1234&tid=&partition_id=4567")); decoder.getParameters(); } } From 0d05d94e4119161ad084d3ac85bd3aa06a5249fe Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Wed, 25 Mar 2015 17:19:18 +0900 Subject: [PATCH 6/6] Implemented related functions at MemStore. --- .../apache/tajo/catalog/store/MemStore.java | 86 ++++++++++++++++++- 1 file changed, 82 insertions(+), 4 deletions(-) diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java index 8be3825a4b..470f09d928 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java @@ -29,6 +29,7 @@ import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.FunctionDesc; import org.apache.tajo.catalog.exception.*; +import org.apache.tajo.catalog.partition.PartitionDesc; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto; import org.apache.tajo.catalog.proto.CatalogProtos.DatabaseProto; @@ -58,6 +59,7 @@ public class MemStore implements CatalogStore { private final Map functions = Maps.newHashMap(); private final Map> indexes = Maps.newHashMap(); private final Map> indexesByColumn = Maps.newHashMap(); + private final Map> partitions = Maps.newHashMap(); public MemStore(Configuration conf) { } @@ -67,6 +69,7 @@ public void close() throws IOException { databases.clear(); functions.clear(); indexes.clear(); + partitions.clear(); } @Override @@ -270,6 +273,8 @@ public void alterTable(CatalogProtos.AlterTableDescProto alterTableDescProto) th final CatalogProtos.TableDescProto tableDescProto = database.get(tableName); CatalogProtos.TableDescProto newTableDescProto; CatalogProtos.SchemaProto schemaProto; + String partitionName = null; + CatalogProtos.PartitionDescProto partitionDesc = null; switch (alterTableDescProto.getAlterTableType()) { case RENAME_TABLE: @@ -304,11 +309,52 @@ public void alterTable(CatalogProtos.AlterTableDescProto alterTableDescProto) th newTableDescProto = tableDescProto.toBuilder().setSchema(newSchemaProto).build(); database.put(tableName, newTableDescProto); break; + case ADD_PARTITION: + partitionDesc = alterTableDescProto.getPartitionDesc(); + partitionName = partitionDesc.getPartitionName(); + + if (partitions.containsKey(tableName) && partitions.get(tableName).containsKey(partitionName)) { + throw new AlreadyExistsPartitionException(databaseName, tableName, partitionName); + } else { + CatalogProtos.PartitionDescProto.Builder builder = CatalogProtos.PartitionDescProto.newBuilder(); + builder.setPartitionName(partitionName); + builder.setPath(partitionDesc.getPath()); + + if (partitionDesc.getPartitionKeysCount() > 0) { + int i = 0; + for (CatalogProtos.PartitionKeyProto eachKey : partitionDesc.getPartitionKeysList()) { + CatalogProtos.PartitionKeyProto.Builder keyBuilder = CatalogProtos.PartitionKeyProto.newBuilder(); + keyBuilder.setColumnName(eachKey.getColumnName()); + keyBuilder.setPartitionValue(eachKey.getPartitionValue()); + builder.setPartitionKeys(i, keyBuilder.build()); + i++; + } + } + + Map protoMap = null; + if (!partitions.containsKey(tableName)) { + protoMap = Maps.newHashMap(); + } else { + protoMap = partitions.get(tableName); + } + protoMap.put(partitionName, builder.build()); + partitions.put(tableName, protoMap); + } + break; + case DROP_PARTITION: + partitionDesc = alterTableDescProto.getPartitionDesc(); + partitionName = partitionDesc.getPartitionName(); + if(!partitions.containsKey(tableName)) { + throw new NoSuchPartitionException(databaseName, tableName, partitionName); + } else { + partitions.remove(partitionName); + } + break; default: - //TODO } } + private int getIndexOfColumnToBeRenamed(List fieldList, String columnName) { int fieldCount = fieldList.size(); for (int index = 0; index < fieldCount; index++) { @@ -499,17 +545,49 @@ public void dropPartitionMethod(String databaseName, String tableName) throws Ca @Override public List getPartitions(String databaseName, String tableName) throws CatalogException { - throw new RuntimeException("not supported!"); + List protos = new ArrayList(); + + if (partitions.containsKey(tableName)) { + for (CatalogProtos.PartitionDescProto proto : partitions.get(tableName).values()) { + protos.add(proto); + } + } + return protos; } @Override public CatalogProtos.PartitionDescProto getPartition(String databaseName, String tableName, String partitionName) throws CatalogException { - throw new RuntimeException("not supported!"); + if (partitions.containsKey(tableName) && partitions.get(tableName).containsKey(partitionName)) { + return partitions.get(tableName).get(partitionName); + } else { + throw new NoSuchPartitionException(partitionName); + } } public List getAllPartitions() throws CatalogException { - throw new UnsupportedOperationException(); + List protos = new ArrayList(); + Set tables = partitions.keySet(); + for (String table : tables) { + Map entryMap = partitions.get(table); + for (Map.Entry proto : entryMap.entrySet()) { + CatalogProtos.PartitionDescProto partitionDescProto = proto.getValue(); + + TablePartitionProto.Builder builder = TablePartitionProto.newBuilder(); + + builder.setPartitionName(partitionDescProto.getPartitionName()); + builder.setPath(partitionDescProto.getPath()); + + // PARTITION_ID and TID is always necessary variables. In other CatalogStore excepting MemStore, + // all partitions would have PARTITION_ID and TID. But MemStore doesn't contain these variable values because + // it is implemented for test purpose. Thus, we need to set each variables to 0. + builder.setPartitionId(0); + builder.setTid(0); + + protos.add(builder.build()); + } + } + return protos; } /* (non-Javadoc)