From bb1f654e86f7ddb23c391e43b5916c9c3664a39c Mon Sep 17 00:00:00 2001 From: miklosgergely Date: Thu, 25 Jul 2019 10:45:33 +0200 Subject: [PATCH] Revert "HIVE-22028 Clean up Add Partition" This reverts commit 853bf6252a0d5b0d10f35248b4b4785c08380914. --- .../hive/hcatalog/streaming/HiveEndPoint.java | 60 +++- .../partition/AlterTableAddPartitionDesc.java | 306 ++++++++++++------ .../AlterTableAddPartitionOperation.java | 196 +---------- .../events/filesystem/FSTableEvent.java | 54 ++-- .../bootstrap/load/table/LoadPartitions.java | 8 +- .../apache/hadoop/hive/ql/metadata/Hive.java | 160 +++++++-- .../hive/ql/parse/DDLSemanticAnalyzer.java | 72 +++-- .../hive/ql/parse/ImportSemanticAnalyzer.java | 92 +++--- .../clientpositive/add_part_multiple.q.out | 18 +- .../drop_partitions_filter.q.out | 8 +- .../llap/add_part_with_loc.q.out | 8 +- .../spark/add_part_multiple.q.out | 18 +- .../streaming/HiveStreamingConnection.java | 12 +- 13 files changed, 533 insertions(+), 479 deletions(-) diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java index e249b7775e4e..fee7ffc4c73b 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java @@ -25,10 +25,12 @@ import org.apache.hadoop.hive.metastore.api.DataOperationType; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.ddl.table.partition.AlterTableAddPartitionDesc; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.LockComponentBuilder; @@ -40,10 +42,14 @@ import org.apache.hadoop.hive.metastore.api.LockResponse; import org.apache.hadoop.hive.metastore.api.LockState; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.TxnAbortedException; import org.apache.hadoop.hive.metastore.api.TxnToWriteId; +import org.apache.hadoop.hive.ql.DriverFactory; +import org.apache.hadoop.hive.ql.IDriver; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.hcatalog.common.HCatUtil; import org.apache.hadoop.security.UserGroupInformation; @@ -223,6 +229,18 @@ private StreamingConnection newConnectionImpl(UserGroupInformation ugi, return new ConnectionImpl(this, ugi, conf, createPartIfNotExists, agentInfo); } + private static UserGroupInformation getUserGroupInfo(String user) + throws ImpersonationFailed { + try { + return UserGroupInformation.createProxyUser( + user, UserGroupInformation.getLoginUser()); + } catch (IOException e) { + LOG.error("Unable to get UserGroupInfo for user : " + user, e); + throw new ImpersonationFailed(user,e); + } + } + + @Override public boolean equals(Object o) { if (this == o) { @@ -449,10 +467,12 @@ private static void createPartitionIfNotExists(HiveEndPoint ep, Map partSpec = Warehouse.makeSpecFromValues(tableObject.getPartitionKeys(), ep.partitionVals); - Path location = new Path(tableObject.getDataLocation(), Warehouse.makePartPath(partSpec)); - location = new Path(Utilities.getQualifiedPath(conf, location)); - Partition partition = - org.apache.hadoop.hive.ql.metadata.Partition.createMetaPartitionObject(tableObject, partSpec, location); + AlterTableAddPartitionDesc addPartitionDesc = new AlterTableAddPartitionDesc(ep.database, ep.table, true); + String partLocation = new Path(tableObject.getDataLocation(), + Warehouse.makePartPath(partSpec)).toString(); + addPartitionDesc.addPartition(partSpec, partLocation); + Partition partition = Hive.convertAddSpecToMetaPartition(tableObject, + addPartitionDesc.getPartition(0), conf); msClient.add_partition(partition); } catch (AlreadyExistsException e) { @@ -466,6 +486,36 @@ private static void createPartitionIfNotExists(HiveEndPoint ep, } } + private static boolean runDDL(IDriver driver, String sql) throws QueryFailedException { + if (LOG.isDebugEnabled()) { + LOG.debug("Running Hive Query: " + sql); + } + driver.run(sql); + return true; + } + + private static String partSpecStr(List partKeys, ArrayList partVals) { + if (partKeys.size()!=partVals.size()) { + throw new IllegalArgumentException("Partition values:" + partVals + + ", does not match the partition Keys in table :" + partKeys ); + } + StringBuilder buff = new StringBuilder(partKeys.size()*20); + buff.append(" ( "); + int i=0; + for (FieldSchema schema : partKeys) { + buff.append(schema.getName()); + buff.append("='"); + buff.append(partVals.get(i)); + buff.append("'"); + if (i!=partKeys.size()-1) { + buff.append(","); + } + ++i; + } + buff.append(" )"); + return buff.toString(); + } + private static IMetaStoreClient getMetaStoreClient(HiveEndPoint endPoint, HiveConf conf, boolean secureMode) throws ConnectionError { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/AlterTableAddPartitionDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/AlterTableAddPartitionDesc.java index bc068ed2ec65..72828efaae64 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/AlterTableAddPartitionDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/AlterTableAddPartitionDesc.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.ddl.table.partition; import java.io.Serializable; +import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -31,67 +32,45 @@ import org.apache.hadoop.hive.ql.plan.Explain.Level; /** - * DDL task description for ALTER TABLE ... ADD PARTITION ... commands. + * DDL task description for ALTER TABLE ... DROP PARTITION ... commands. */ @Explain(displayName = "Add Partition", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public class AlterTableAddPartitionDesc implements DDLDesc, Serializable { private static final long serialVersionUID = 1L; - /** - * Description of a partition to add. - */ - @Explain(displayName = "Partition", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public static class PartitionDesc { - private final Map partitionSpec; - private String location; // TODO: make location final too - private final Map params; - private final String inputFormat; - private final String outputFormat; - private final int numBuckets; - private final List columns; - private final String serializationLib; - private final Map serdeParams; - private final List bucketColumns; - private final List sortColumns; - private final ColumnStatistics columnStats; - private final long writeId; - - public PartitionDesc(Map partitionSpec, String location, Map params) { - this(partitionSpec, location, params, null, null, -1, null, null, null, null, null, null, -1); - } - - public PartitionDesc(Map partitionSpec, String location, Map params, - String inputFormat, String outputFormat, int numBuckets, List columns, String serializationLib, - Map serdeParams, List bucketColumns, List sortColumns, - ColumnStatistics columnStats, long writeId) { - this.partitionSpec = partitionSpec; - this.location = location; - this.params = params; - this.inputFormat = inputFormat; - this.outputFormat = outputFormat; - this.numBuckets = numBuckets; - this.columns = columns; - this.serializationLib = serializationLib; - this.serdeParams = serdeParams; - this.bucketColumns = bucketColumns; - this.sortColumns = sortColumns; - this.columnStats = columnStats; - this.writeId = writeId; + PartitionDesc( + Map partSpec, String location, Map params) { + this(partSpec, location); + this.partParams = params; } - public Map getPartSpec() { - return partitionSpec; + PartitionDesc(Map partSpec, String location) { + this.partSpec = partSpec; + this.location = location; } - @Explain(displayName = "partition spec", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) - public String getPartSpecForExplain() { - return partitionSpec.toString(); + Map partSpec; + Map partParams; + String location; + String inputFormat = null; + String outputFormat = null; + int numBuckets = -1; + List cols = null; + String serializationLib = null; + Map serdeParams = null; + List bucketCols = null; + List sortCols = null; + ColumnStatistics colStats = null; + long writeId = -1; + + public Map getPartSpec() { + return partSpec; } /** * @return location of partition in relation to table */ - @Explain(displayName = "location", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public String getLocation() { return location; } @@ -101,102 +80,241 @@ public void setLocation(String location) { } public Map getPartParams() { - return params; - } - - @Explain(displayName = "params", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) - public String getPartParamsForExplain() { - return params.toString(); + return partParams; } - @Explain(displayName = "input format", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) - public String getInputFormat() { - return inputFormat; - } - - @Explain(displayName = "output format", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) - public String getOutputFormat() { - return outputFormat; + public void setPartParams(Map partParams) { + this.partParams = partParams; } public int getNumBuckets() { return numBuckets; } - @Explain(displayName = "num buckets", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) - public Integer getNumBucketsExplain() { - return numBuckets == -1 ? null : numBuckets; + public void setNumBuckets(int numBuckets) { + this.numBuckets = numBuckets; } - @Explain(displayName = "columns", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public List getCols() { - return columns; + return cols; + } + + public void setCols(List cols) { + this.cols = cols; } - @Explain(displayName = "serialization lib", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public String getSerializationLib() { return serializationLib; } - @Explain(displayName = "serde params", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) + public void setSerializationLib(String serializationLib) { + this.serializationLib = serializationLib; + } + public Map getSerdeParams() { return serdeParams; } - @Explain(displayName = "bucket columns", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) + public void setSerdeParams(Map serdeParams) { + this.serdeParams = serdeParams; + } + public List getBucketCols() { - return bucketColumns; + return bucketCols; + } + + public void setBucketCols(List bucketCols) { + this.bucketCols = bucketCols; } - @Explain(displayName = "sort columns", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public List getSortCols() { - return sortColumns; + return sortCols; } - @Explain(displayName = "column stats", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) - public ColumnStatistics getColStats() { - return columnStats; + public void setSortCols(List sortCols) { + this.sortCols = sortCols; } - public long getWriteId() { - return writeId; + public String getInputFormat() { + return inputFormat; } + + public void setInputFormat(String inputFormat) { + this.inputFormat = inputFormat; + } + + public String getOutputFormat() { + return outputFormat; + } + + public void setOutputFormat(String outputFormat) { + this.outputFormat = outputFormat; + } + + public ColumnStatistics getColStats() { return colStats; } + + public void setColStats(ColumnStatistics colStats) { this.colStats = colStats; } + + public long getWriteId() { return writeId; } + + public void setWriteId(long writeId) { this.writeId = writeId; } } - private final String dbName; - private final String tableName; - private final boolean ifNotExists; - private final List partitions; + String tableName; + String dbName; + boolean ifNotExists; + List partitions = null; + boolean replaceMode = false; + private ReplicationSpec replicationSpec = null; + - private ReplicationSpec replicationSpec = null; // TODO: make replicationSpec final too + /** + * For serialization only. + */ + public AlterTableAddPartitionDesc() { + } - public AlterTableAddPartitionDesc(String dbName, String tableName, boolean ifNotExists, - List partitions) { + public AlterTableAddPartitionDesc( + String dbName, String tableName, boolean ifNotExists) { + super(); this.dbName = dbName; this.tableName = tableName; this.ifNotExists = ifNotExists; - this.partitions = partitions; } - @Explain(displayName = "db name", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) + /** + * Legacy single-partition ctor for ImportSemanticAnalyzer + * @param dbName + * database to add to. + * @param tableName + * table to add to. + * @param partSpec + * partition specification. + * @param location + * partition location, relative to table location. + * @param params + * partition parameters. + */ + @Deprecated + public AlterTableAddPartitionDesc(String dbName, String tableName, + Map partSpec, String location, Map params) { + super(); + this.dbName = dbName; + this.tableName = tableName; + this.ifNotExists = true; + addPartition(partSpec, location, params); + } + + public void addPartition(Map partSpec, String location) { + addPartition(partSpec, location, null); + } + + private void addPartition( + Map partSpec, String location, Map params) { + if (this.partitions == null) { + this.partitions = new ArrayList(); + } + this.partitions.add(new PartitionDesc(partSpec, location, params)); + } + + /** + * @return database name + */ public String getDbName() { return dbName; } - @Explain(displayName = "table name", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) + /** + * @param dbName + * database name + */ + public void setDbName(String dbName) { + this.dbName = dbName; + } + + /** + * @return the table we're going to add the partitions to. + */ public String getTableName() { return tableName; } - @Explain(displayName = "if not exists", displayOnlyOnTrue = true, - explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) + /** + * @param tableName + * the table we're going to add the partitions to. + */ + public void setTableName(String tableName) { + this.tableName = tableName; + } + + /** + * @return location of partition in relation to table + */ + @Explain(displayName = "Location") + public String getLocationForExplain() { + if (this.partitions == null || this.partitions.isEmpty()) return ""; + boolean isFirst = true; + StringBuilder sb = new StringBuilder(); + for (PartitionDesc desc : this.partitions) { + if (!isFirst) { + sb.append(", "); + } + isFirst = false; + sb.append(desc.location); + } + return sb.toString(); + } + + @Explain(displayName = "Spec") + public String getPartSpecStringForExplain() { + if (this.partitions == null || this.partitions.isEmpty()) return ""; + boolean isFirst = true; + StringBuilder sb = new StringBuilder(); + for (PartitionDesc desc : this.partitions) { + if (!isFirst) { + sb.append(", "); + } + isFirst = false; + sb.append(desc.partSpec.toString()); + } + return sb.toString(); + } + + /** + * @return if the partition should only be added if it doesn't exist already + */ public boolean isIfNotExists() { - return ifNotExists; + return this.ifNotExists; } - @Explain(displayName = "partitions", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) - public List getPartitions() { - return partitions; + /** + * @param ifNotExists + * if the part should be added only if it doesn't exist + */ + public void setIfNotExists(boolean ifNotExists) { + this.ifNotExists = ifNotExists; + } + + public int getPartitionCount() { + return this.partitions.size(); + } + + public PartitionDesc getPartition(int i) { + return this.partitions.get(i); + } + + /** + * @param replaceMode Determine if this AddPartition should behave like a replace-into alter instead + */ + public void setReplaceMode(boolean replaceMode){ + this.replaceMode = replaceMode; + } + + /** + * @return true if this AddPartition should behave like a replace-into alter instead + */ + public boolean getReplaceMode() { + return this.replaceMode; } /** @@ -213,8 +331,8 @@ public void setReplicationSpec(ReplicationSpec replicationSpec) { */ public ReplicationSpec getReplicationSpec(){ if (replicationSpec == null){ - replicationSpec = new ReplicationSpec(); + this.replicationSpec = new ReplicationSpec(); } - return replicationSpec; + return this.replicationSpec; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/AlterTableAddPartitionOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/AlterTableAddPartitionOperation.java index d8597a888c19..488fa59cbff4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/AlterTableAddPartitionOperation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/AlterTableAddPartitionOperation.java @@ -18,29 +18,14 @@ package org.apache.hadoop.hive.ql.ddl.table.partition; -import java.util.ArrayList; -import java.util.BitSet; import java.util.List; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.StatsSetupConst; -import org.apache.hadoop.hive.common.TableName; -import org.apache.hadoop.hive.common.ValidReaderWriteIdList; -import org.apache.hadoop.hive.metastore.Warehouse; -import org.apache.hadoop.hive.metastore.api.EnvironmentContext; -import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; -import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.ql.ddl.DDLOperation; import org.apache.hadoop.hive.ql.ddl.DDLOperationContext; import org.apache.hadoop.hive.ql.ddl.DDLUtils; -import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.hooks.WriteEntity; -import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.parse.ReplicationSpec; +import org.apache.hadoop.hive.ql.metadata.Partition; /** * Operation process of adding a partition to a table. @@ -52,181 +37,10 @@ public AlterTableAddPartitionOperation(DDLOperationContext context, AlterTableAd @Override public int execute() throws HiveException { - // TODO: catalog name everywhere in this method - Table table = context.getDb().getTable(desc.getDbName(), desc.getTableName()); - long writeId = getWriteId(table); - - List partitions = getPartitions(table, writeId); - addPartitions(table, partitions, writeId); - return 0; - } - - private long getWriteId(Table table) throws LockException { - // In case of replication, get the writeId from the source and use valid write Id list for replication. - if (desc.getReplicationSpec().isInReplicationScope() && desc.getPartitions().get(0).getWriteId() > 0) { - return desc.getPartitions().get(0).getWriteId(); - } else { - AcidUtils.TableSnapshot tableSnapshot = AcidUtils.getTableSnapshot(context.getConf(), table, true); - if (tableSnapshot != null && tableSnapshot.getWriteId() > 0) { - return tableSnapshot.getWriteId(); - } else { - return -1; - } - } - } - - private List getPartitions(Table table, long writeId) throws HiveException { - List partitions = new ArrayList<>(desc.getPartitions().size()); - for (AlterTableAddPartitionDesc.PartitionDesc partitionDesc : desc.getPartitions()) { - Partition partition = convertPartitionSpecToMetaPartition(table, partitionDesc); - if (partition != null && writeId > 0) { - partition.setWriteId(writeId); - } - partitions.add(partition); - } - - return partitions; - } - - private Partition convertPartitionSpecToMetaPartition(Table table, - AlterTableAddPartitionDesc.PartitionDesc partitionSpec) throws HiveException { - Path location = partitionSpec.getLocation() != null ? new Path(table.getPath(), partitionSpec.getLocation()) : null; - if (location != null) { - // Ensure that it is a full qualified path (in most cases it will be since tbl.getPath() is full qualified) - location = new Path(Utilities.getQualifiedPath(context.getConf(), location)); - } - - Partition partition = org.apache.hadoop.hive.ql.metadata.Partition.createMetaPartitionObject( - table, partitionSpec.getPartSpec(), location); - - if (partitionSpec.getPartParams() != null) { - partition.setParameters(partitionSpec.getPartParams()); - } - if (partitionSpec.getInputFormat() != null) { - partition.getSd().setInputFormat(partitionSpec.getInputFormat()); - } - if (partitionSpec.getOutputFormat() != null) { - partition.getSd().setOutputFormat(partitionSpec.getOutputFormat()); - } - if (partitionSpec.getNumBuckets() != -1) { - partition.getSd().setNumBuckets(partitionSpec.getNumBuckets()); - } - if (partitionSpec.getCols() != null) { - partition.getSd().setCols(partitionSpec.getCols()); - } - if (partitionSpec.getSerializationLib() != null) { - partition.getSd().getSerdeInfo().setSerializationLib(partitionSpec.getSerializationLib()); - } - if (partitionSpec.getSerdeParams() != null) { - partition.getSd().getSerdeInfo().setParameters(partitionSpec.getSerdeParams()); - } - if (partitionSpec.getBucketCols() != null) { - partition.getSd().setBucketCols(partitionSpec.getBucketCols()); - } - if (partitionSpec.getSortCols() != null) { - partition.getSd().setSortCols(partitionSpec.getSortCols()); - } - if (partitionSpec.getColStats() != null) { - partition.setColStats(partitionSpec.getColStats()); - // Statistics will have an associated write Id for a transactional table. We need it to update column statistics. - partition.setWriteId(partitionSpec.getWriteId()); - } - return partition; - } - - private void addPartitions(Table table, List partitions, long writeId) throws HiveException { - List outPartitions = null; - if (!desc.getReplicationSpec().isInReplicationScope()) { - outPartitions = addPartitionsNoReplication(table, partitions); - } else { - outPartitions = addPartitionsWithReplication(table, partitions, writeId); - } - - for (org.apache.hadoop.hive.ql.metadata.Partition outPartition : outPartitions) { - DDLUtils.addIfAbsentByName(new WriteEntity(outPartition, WriteEntity.WriteType.INSERT), context); - } - } - - private List addPartitionsNoReplication(Table table, - List partitions) throws HiveException { - // TODO: normally, the result is not necessary; might make sense to pass false - List outPartitions = new ArrayList<>(); - for (Partition outPart : context.getDb().addPartition(partitions, desc.isIfNotExists(), true)) { - outPartitions.add(new org.apache.hadoop.hive.ql.metadata.Partition(table, outPart)); - } - return outPartitions; - } - - private List addPartitionsWithReplication(Table table, - List partitions, long writeId) throws HiveException { - // For replication add-ptns, we need to follow a insert-if-not-exist, alter-if-exists scenario. - // TODO : ideally, we should push this mechanism to the metastore, because, otherwise, we have - // no choice but to iterate over the partitions here. - - List partitionsToAdd = new ArrayList<>(); - List partitionssToAlter = new ArrayList<>(); - List partitionNames = new ArrayList<>(); - for (Partition partition : partitions){ - partitionNames.add(getPartitionName(table, partition)); - try { - Partition p = context.getDb().getPartition(desc.getDbName(), desc.getTableName(), partition.getValues()); - if (desc.getReplicationSpec().allowReplacementInto(p.getParameters())){ - ReplicationSpec.copyLastReplId(p.getParameters(), partition.getParameters()); - partitionssToAlter.add(partition); - } // else ptn already exists, but we do nothing with it. - } catch (HiveException e){ - if (e.getCause() instanceof NoSuchObjectException) { - // if the object does not exist, we want to add it. - partitionsToAdd.add(partition); - } else { - throw e; - } - } - } - - List outPartitions = new ArrayList<>(); - for (Partition outPartition : context.getDb().addPartition(partitionsToAdd, desc.isIfNotExists(), true)) { - outPartitions.add(new org.apache.hadoop.hive.ql.metadata.Partition(table, outPartition)); - } - - // In case of replication, statistics is obtained from the source, so do not update those on replica. - EnvironmentContext ec = new EnvironmentContext(); - ec.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE); - String validWriteIdList = getValidWriteIdList(table, writeId); - context.getDb().alterPartitions(desc.getDbName(), desc.getTableName(), partitionssToAlter, ec, validWriteIdList, - writeId); - - for (Partition outPartition : context.getDb().getPartitionsByNames(desc.getDbName(), desc.getTableName(), - partitionNames)){ - outPartitions.add(new org.apache.hadoop.hive.ql.metadata.Partition(table, outPartition)); - } - - return outPartitions; - } - - private String getPartitionName(Table table, Partition partition) throws HiveException { - try { - return Warehouse.makePartName(table.getPartitionKeys(), partition.getValues()); - } catch (MetaException e) { - throw new HiveException(e); - } - } - - private String getValidWriteIdList(Table table, long writeId) throws LockException { - if (desc.getReplicationSpec().isInReplicationScope() && desc.getPartitions().get(0).getWriteId() > 0) { - // We need a valid writeId list for a transactional change. During replication we do not - // have a valid writeId list which was used for this on the source. But we know for sure - // that the writeId associated with it was valid then (otherwise the change would have - // failed on the source). So use a valid transaction list with only that writeId. - return new ValidReaderWriteIdList(TableName.getDbTable(table.getDbName(), table.getTableName()), - new long[0], new BitSet(), writeId).writeToString(); - } else { - AcidUtils.TableSnapshot tableSnapshot = AcidUtils.getTableSnapshot(context.getConf(), table, true); - if (tableSnapshot != null && tableSnapshot.getWriteId() > 0) { - return tableSnapshot.getValidWriteIdList(); - } else { - return null; - } + List parts = context.getDb().createPartitions(desc); + for (Partition part : parts) { + DDLUtils.addIfAbsentByName(new WriteEntity(part, WriteEntity.WriteType.INSERT), context); } + return 0; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java index 79a93c34f2ec..64f9af3abaf8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java @@ -27,7 +27,6 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.ql.ddl.table.partition.AlterTableAddPartitionDesc; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; @@ -42,12 +41,9 @@ import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.util.HiveStrictManagedMigration; -import com.google.common.collect.ImmutableList; - import java.net.URI; import java.util.ArrayList; import java.util.List; -import java.util.Map; import static org.apache.hadoop.hive.ql.util.HiveStrictManagedMigration.getHiveUpdater; @@ -150,7 +146,7 @@ public List partitionDescriptions(ImportTableDesc tb //TODO: if partitions are loaded lazily via the iterator then we will have to avoid conversion of everything here as it defeats the purpose. for (Partition partition : metadata.getPartitions()) { // TODO: this should ideally not create AddPartitionDesc per partition - AlterTableAddPartitionDesc partsDesc = addPartitionDesc(fromPath, tblDesc, partition); + AlterTableAddPartitionDesc partsDesc = partitionDesc(fromPath, tblDesc, partition); descs.add(partsDesc); } return descs; @@ -171,42 +167,46 @@ public List partitions(ImportTableDesc tblDesc) return partitions; } - private AlterTableAddPartitionDesc addPartitionDesc(Path fromPath, ImportTableDesc tblDesc, Partition partition) - throws SemanticException { + private AlterTableAddPartitionDesc partitionDesc(Path fromPath, + ImportTableDesc tblDesc, Partition partition) throws SemanticException { try { - Map partitionSpec = EximUtil.makePartSpec(tblDesc.getPartCols(), partition.getValues()); - - StorageDescriptor sd = partition.getSd(); - String location = sd.getLocation(); - if (!tblDesc.isExternal() || replicationSpec().isMigratingToExternalTable()) { + AlterTableAddPartitionDesc partsDesc = + new AlterTableAddPartitionDesc(tblDesc.getDatabaseName(), tblDesc.getTableName(), + EximUtil.makePartSpec(tblDesc.getPartCols(), partition.getValues()), + partition.getSd().getLocation(), partition.getParameters()); + AlterTableAddPartitionDesc.PartitionDesc partDesc = partsDesc.getPartition(0); + partDesc.setInputFormat(partition.getSd().getInputFormat()); + partDesc.setOutputFormat(partition.getSd().getOutputFormat()); + partDesc.setNumBuckets(partition.getSd().getNumBuckets()); + partDesc.setCols(partition.getSd().getCols()); + partDesc.setSerializationLib(partition.getSd().getSerdeInfo().getSerializationLib()); + partDesc.setSerdeParams(partition.getSd().getSerdeInfo().getParameters()); + partDesc.setBucketCols(partition.getSd().getBucketCols()); + partDesc.setSortCols(partition.getSd().getSortCols()); + if (tblDesc.isExternal() && !replicationSpec().isMigratingToExternalTable()) { + // we have to provide the source location so target location can be derived. + partDesc.setLocation(partition.getSd().getLocation()); + } else { /** * this is required for file listing of all files in a partition for managed table as described in * {@link org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator} */ - location = new Path(fromPath, Warehouse.makePartName(tblDesc.getPartCols(), partition.getValues())).toString(); + partDesc.setLocation(new Path(fromPath, + Warehouse.makePartName(tblDesc.getPartCols(), partition.getValues())).toString()); } + partsDesc.setReplicationSpec(replicationSpec()); - ColumnStatistics columnStatistics = null; - long writeId = -1; if (partition.isSetColStats()) { ColumnStatistics colStats = partition.getColStats(); ColumnStatisticsDesc colStatsDesc = new ColumnStatisticsDesc(colStats.getStatsDesc()); colStatsDesc.setTableName(tblDesc.getTableName()); colStatsDesc.setDbName(tblDesc.getDatabaseName()); - columnStatistics = new ColumnStatistics(colStatsDesc, colStats.getStatsObj()); - writeId = replicationSpec().isMigratingToTxnTable() ? + partDesc.setColStats(new ColumnStatistics(colStatsDesc, colStats.getStatsObj())); + long writeId = replicationSpec().isMigratingToTxnTable() ? ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID : partition.getWriteId(); + partDesc.setWriteId(writeId); } - - AlterTableAddPartitionDesc.PartitionDesc partitionDesc = new AlterTableAddPartitionDesc.PartitionDesc( - partitionSpec, location, partition.getParameters(), sd.getInputFormat(), sd.getOutputFormat(), - sd.getNumBuckets(), sd.getCols(), sd.getSerdeInfo().getSerializationLib(), sd.getSerdeInfo().getParameters(), - sd.getBucketCols(), sd.getSortCols(), columnStatistics, writeId); - - AlterTableAddPartitionDesc addPartitionDesc = new AlterTableAddPartitionDesc(tblDesc.getDatabaseName(), - tblDesc.getTableName(), true, ImmutableList.of(partitionDesc)); - addPartitionDesc.setReplicationSpec(replicationSpec()); - return addPartitionDesc; + return partsDesc; } catch (Exception e) { throw new SemanticException(e); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index 40020ed2573e..c728e2d49a86 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -182,7 +182,7 @@ private void addPartition(boolean hasMorePartitions, AlterTableAddPartitionDesc */ private Task tasksForAddPartition(Table table, AlterTableAddPartitionDesc addPartitionDesc, Task ptnRootTask) throws MetaException, HiveException { - AlterTableAddPartitionDesc.PartitionDesc partSpec = addPartitionDesc.getPartitions().get(0); + AlterTableAddPartitionDesc.PartitionDesc partSpec = addPartitionDesc.getPartition(0); Path sourceWarehousePartitionLocation = new Path(partSpec.getLocation()); Path replicaWarehousePartitionLocation = locationOnReplicaWarehouse(table, partSpec); partSpec.setLocation(replicaWarehousePartitionLocation.toString()); @@ -362,7 +362,7 @@ private TaskTracker forExistingTable(AlterTableAddPartitionDesc lastPartitionRep boolean encounteredTheLastReplicatedPartition = (lastPartitionReplicated == null); Map lastReplicatedPartSpec = null; if (!encounteredTheLastReplicatedPartition) { - lastReplicatedPartSpec = lastPartitionReplicated.getPartitions().get(0).getPartSpec(); + lastReplicatedPartSpec = lastPartitionReplicated.getPartition(0).getPartSpec(); LOG.info("Start processing from partition info spec : {}", StringUtils.mapToString(lastReplicatedPartSpec)); } @@ -370,13 +370,13 @@ private TaskTracker forExistingTable(AlterTableAddPartitionDesc lastPartitionRep Iterator partitionIterator = event.partitionDescriptions(tableDesc).iterator(); while (!encounteredTheLastReplicatedPartition && partitionIterator.hasNext()) { AlterTableAddPartitionDesc addPartitionDesc = partitionIterator.next(); - Map currentSpec = addPartitionDesc.getPartitions().get(0).getPartSpec(); + Map currentSpec = addPartitionDesc.getPartition(0).getPartSpec(); encounteredTheLastReplicatedPartition = lastReplicatedPartSpec.equals(currentSpec); } while (partitionIterator.hasNext() && tracker.canAddMoreTasks()) { AlterTableAddPartitionDesc addPartitionDesc = partitionIterator.next(); - Map partSpec = addPartitionDesc.getPartitions().get(0).getPartSpec(); + Map partSpec = addPartitionDesc.getPartition(0).getPartSpec(); Task ptnRootTask = null; ReplLoadOpType loadPtnType = getLoadPartitionType(partSpec); switch (loadPtnType) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index a236490e0446..691f3ee2e9b4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -83,6 +83,7 @@ import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.permission.FsAction; @@ -111,6 +112,7 @@ import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.ErrorMsg; +import org.apache.hadoop.hive.ql.ddl.table.partition.AlterTableAddPartitionDesc; import org.apache.hadoop.hive.ql.ddl.table.partition.AlterTableDropPartitionDesc; import org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; @@ -130,6 +132,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.session.CreateTableAutomaticGrant; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde2.Deserializer; @@ -2996,46 +2999,143 @@ public Partition createPartition(Table tbl, Map partSpec) throws } } - public List addPartition( - List partitions, boolean ifNotExists, boolean needResults) - throws HiveException { - try { - return getMSC().add_partitions(partitions, ifNotExists, needResults); - } catch (Exception e) { - LOG.error(StringUtils.stringifyException(e)); - throw new HiveException(e); + public List createPartitions(AlterTableAddPartitionDesc addPartitionDesc) throws HiveException { + // TODO: catalog name everywhere in this method + Table tbl = getTable(addPartitionDesc.getDbName(), addPartitionDesc.getTableName()); + int size = addPartitionDesc.getPartitionCount(); + List in = + new ArrayList(size); + long writeId; + String validWriteIdList; + + // In case of replication, get the writeId from the source and use valid write Id list + // for replication. + if (addPartitionDesc.getReplicationSpec().isInReplicationScope() && + addPartitionDesc.getPartition(0).getWriteId() > 0) { + writeId = addPartitionDesc.getPartition(0).getWriteId(); + // We need a valid writeId list for a transactional change. During replication we do not + // have a valid writeId list which was used for this on the source. But we know for sure + // that the writeId associated with it was valid then (otherwise the change would have + // failed on the source). So use a valid transaction list with only that writeId. + validWriteIdList = new ValidReaderWriteIdList(TableName.getDbTable(tbl.getDbName(), + tbl.getTableName()), + new long[0], new BitSet(), writeId).writeToString(); + } else { + AcidUtils.TableSnapshot tableSnapshot = AcidUtils.getTableSnapshot(conf, tbl, true); + if (tableSnapshot != null && tableSnapshot.getWriteId() > 0) { + writeId = tableSnapshot.getWriteId(); + validWriteIdList = tableSnapshot.getValidWriteIdList(); + } else { + writeId = -1; + validWriteIdList = null; + } } - } - - public org.apache.hadoop.hive.metastore.api.Partition getPartition(String dbName, String tableName, - List params) throws HiveException { - try { - return getMSC().getPartition(dbName, tableName, params); - } catch (Exception e) { - LOG.error(StringUtils.stringifyException(e)); - throw new HiveException(e); + for (int i = 0; i < size; ++i) { + org.apache.hadoop.hive.metastore.api.Partition tmpPart = + convertAddSpecToMetaPartition(tbl, addPartitionDesc.getPartition(i), conf); + if (tmpPart != null && writeId > 0) { + tmpPart.setWriteId(writeId); + } + in.add(tmpPart); } - } - - public void alterPartitions(String dbName, String tableName, - List partitions, EnvironmentContext ec, String validWriteIdList, - long writeId) throws HiveException { + List out = new ArrayList(); try { - getMSC().alter_partitions(dbName, tableName, partitions, ec, validWriteIdList, writeId); + if (!addPartitionDesc.getReplicationSpec().isInReplicationScope()){ + // TODO: normally, the result is not necessary; might make sense to pass false + for (org.apache.hadoop.hive.metastore.api.Partition outPart + : getMSC().add_partitions(in, addPartitionDesc.isIfNotExists(), true)) { + out.add(new Partition(tbl, outPart)); + } + } else { + + // For replication add-ptns, we need to follow a insert-if-not-exist, alter-if-exists scenario. + // TODO : ideally, we should push this mechanism to the metastore, because, otherwise, we have + // no choice but to iterate over the partitions here. + + List partsToAdd = new ArrayList<>(); + List partsToAlter = new ArrayList<>(); + List part_names = new ArrayList<>(); + for (org.apache.hadoop.hive.metastore.api.Partition p: in){ + part_names.add(Warehouse.makePartName(tbl.getPartitionKeys(), p.getValues())); + try { + org.apache.hadoop.hive.metastore.api.Partition ptn = + getMSC().getPartition(addPartitionDesc.getDbName(), addPartitionDesc.getTableName(), p.getValues()); + if (addPartitionDesc.getReplicationSpec().allowReplacementInto(ptn.getParameters())){ + ReplicationSpec.copyLastReplId(ptn.getParameters(), p.getParameters()); + partsToAlter.add(p); + } // else ptn already exists, but we do nothing with it. + } catch (NoSuchObjectException nsoe){ + // if the object does not exist, we want to add it. + partsToAdd.add(p); + } + } + for (org.apache.hadoop.hive.metastore.api.Partition outPart + : getMSC().add_partitions(partsToAdd, addPartitionDesc.isIfNotExists(), true)) { + out.add(new Partition(tbl, outPart)); + } + EnvironmentContext ec = new EnvironmentContext(); + // In case of replication, statistics is obtained from the source, so do not update those + // on replica. + ec.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE); + getMSC().alter_partitions(addPartitionDesc.getDbName(), addPartitionDesc.getTableName(), + partsToAlter, ec, validWriteIdList, writeId); + + for ( org.apache.hadoop.hive.metastore.api.Partition outPart : + getMSC().getPartitionsByNames(addPartitionDesc.getDbName(), addPartitionDesc.getTableName(),part_names)){ + out.add(new Partition(tbl,outPart)); + } + } } catch (Exception e) { LOG.error(StringUtils.stringifyException(e)); throw new HiveException(e); } + return out; } - public List getPartitionsByNames(String dbName, String tableName, - List partitionNames) throws HiveException { - try { - return getMSC().getPartitionsByNames(dbName, tableName, partitionNames); - } catch (Exception e) { - LOG.error(StringUtils.stringifyException(e)); - throw new HiveException(e); + public static org.apache.hadoop.hive.metastore.api.Partition convertAddSpecToMetaPartition( + Table tbl, AlterTableAddPartitionDesc.PartitionDesc addSpec, final HiveConf conf) throws HiveException { + Path location = addSpec.getLocation() != null + ? new Path(tbl.getPath(), addSpec.getLocation()) : null; + if (location != null) { + // Ensure that it is a full qualified path (in most cases it will be since tbl.getPath() is full qualified) + location = new Path(Utilities.getQualifiedPath(conf, location)); + } + org.apache.hadoop.hive.metastore.api.Partition part = + Partition.createMetaPartitionObject(tbl, addSpec.getPartSpec(), location); + if (addSpec.getPartParams() != null) { + part.setParameters(addSpec.getPartParams()); + } + if (addSpec.getInputFormat() != null) { + part.getSd().setInputFormat(addSpec.getInputFormat()); + } + if (addSpec.getOutputFormat() != null) { + part.getSd().setOutputFormat(addSpec.getOutputFormat()); + } + if (addSpec.getNumBuckets() != -1) { + part.getSd().setNumBuckets(addSpec.getNumBuckets()); + } + if (addSpec.getCols() != null) { + part.getSd().setCols(addSpec.getCols()); + } + if (addSpec.getSerializationLib() != null) { + part.getSd().getSerdeInfo().setSerializationLib(addSpec.getSerializationLib()); + } + if (addSpec.getSerdeParams() != null) { + part.getSd().getSerdeInfo().setParameters(addSpec.getSerdeParams()); + } + if (addSpec.getBucketCols() != null) { + part.getSd().setBucketCols(addSpec.getBucketCols()); + } + if (addSpec.getSortCols() != null) { + part.getSd().setSortCols(addSpec.getSortCols()); + } + if (addSpec.getColStats() != null) { + part.setColStats(addSpec.getColStats()); + // Statistics will have an associated write Id for a transactional table. We need it to + // update column statistics. + part.setWriteId(addSpec.getWriteId()); } + return part; } public Partition getPartition(Table tbl, Map partSpec, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index ddfa0979ea16..f8d906f135b4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -146,6 +146,7 @@ import org.apache.hadoop.hive.ql.ddl.table.storage.AlterTableSetSkewedLocationDesc; import org.apache.hadoop.hive.ql.ddl.table.storage.AlterTableSkewedByDesc; import org.apache.hadoop.hive.ql.ddl.table.storage.AlterTableUnarchiveDesc; +import org.apache.hadoop.hive.ql.ddl.table.partition.AlterTableAddPartitionDesc.PartitionDesc; import org.apache.hadoop.hive.ql.ddl.view.AlterMaterializedViewRewriteDesc; import org.apache.hadoop.hive.ql.ddl.view.DropMaterializedViewDesc; import org.apache.hadoop.hive.ql.ddl.view.DropViewDesc; @@ -3504,15 +3505,15 @@ private void analyzeAlterTableAddParts(String[] qualified, CommonTree ast, boole // ^(TOK_ALTERTABLE_ADDPARTS identifier ifNotExists? alterStatementSuffixAddPartitionsElement+) boolean ifNotExists = ast.getChild(0).getType() == HiveParser.TOK_IFNOTEXISTS; - Table table = getTable(qualified); - boolean isView = table.isView(); - validateAlterTableType(table, AlterTableType.ADDPARTITION, expectView); - outputs.add(new WriteEntity(table, + Table tab = getTable(qualified); + boolean isView = tab.isView(); + validateAlterTableType(tab, AlterTableType.ADDPARTITION, expectView); + outputs.add(new WriteEntity(tab, /*use DDL_EXCLUSIVE to cause X lock to prevent races between concurrent add partition calls with IF NOT EXISTS. w/o this 2 concurrent calls to add the same partition may both add data since for transactional tables creating partition metadata and moving data there are 2 separate actions. */ - ifNotExists && AcidUtils.isTransactionalTable(table) ? WriteType.DDL_EXCLUSIVE + ifNotExists && AcidUtils.isTransactionalTable(tab) ? WriteType.DDL_EXCLUSIVE : WriteEntity.WriteType.DDL_SHARED)); int numCh = ast.getChildCount(); @@ -3521,17 +3522,17 @@ private void analyzeAlterTableAddParts(String[] qualified, CommonTree ast, boole String currentLocation = null; Map currentPart = null; // Parser has done some verification, so the order of tokens doesn't need to be verified here. - - List partitions = new ArrayList<>(); + AlterTableAddPartitionDesc addPartitionDesc = + new AlterTableAddPartitionDesc(tab.getDbName(), tab.getTableName(), ifNotExists); for (int num = start; num < numCh; num++) { ASTNode child = (ASTNode) ast.getChild(num); switch (child.getToken().getType()) { case HiveParser.TOK_PARTSPEC: if (currentPart != null) { - partitions.add(createPartitionDesc(table, currentLocation, currentPart)); + addPartitionDesc.addPartition(currentPart, currentLocation); currentLocation = null; } - currentPart = getValidatedPartSpec(table, child, conf, true); + currentPart = getValidatedPartSpec(tab, child, conf, true); validatePartitionValues(currentPart); // validate reserved values break; case HiveParser.TOK_PARTITIONLOCATION: @@ -3549,21 +3550,31 @@ private void analyzeAlterTableAddParts(String[] qualified, CommonTree ast, boole // add the last one if (currentPart != null) { - partitions.add(createPartitionDesc(table, currentLocation, currentPart)); + addPartitionDesc.addPartition(currentPart, currentLocation); } - if (partitions.isEmpty()) { + if (this.conf.getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) { + for (int index = 0; index < addPartitionDesc.getPartitionCount(); index++) { + PartitionDesc desc = addPartitionDesc.getPartition(index); + if (desc.getLocation() == null) { + if (desc.getPartParams() == null) { + desc.setPartParams(new HashMap()); + } + StatsSetupConst.setStatsStateForCreateTable(desc.getPartParams(), + MetaStoreUtils.getColumnNames(tab.getCols()), StatsSetupConst.TRUE); + } + } + } + + if (addPartitionDesc.getPartitionCount() == 0) { // nothing to do return; } - AlterTableAddPartitionDesc addPartitionDesc = new AlterTableAddPartitionDesc(table.getDbName(), - table.getTableName(), ifNotExists, partitions); - Task ddlTask = TaskFactory.get(new DDLWork(getInputs(), getOutputs(), addPartitionDesc)); rootTasks.add(ddlTask); - handleTransactionalTable(table, addPartitionDesc, ddlTask); + handleTransactionalTable(tab, addPartitionDesc, ddlTask); if (isView) { // Compile internal query to capture underlying table partition dependencies @@ -3574,7 +3585,8 @@ private void analyzeAlterTableAddParts(String[] qualified, CommonTree ast, boole cmd.append(HiveUtils.unparseIdentifier(qualified[1])); cmd.append(" WHERE "); boolean firstOr = true; - for (AlterTableAddPartitionDesc.PartitionDesc partitionDesc : partitions) { + for (int i = 0; i < addPartitionDesc.getPartitionCount(); ++i) { + AlterTableAddPartitionDesc.PartitionDesc partitionDesc = addPartitionDesc.getPartition(i); if (firstOr) { firstOr = false; } else { @@ -3607,17 +3619,6 @@ private void analyzeAlterTableAddParts(String[] qualified, CommonTree ast, boole } } - private AlterTableAddPartitionDesc.PartitionDesc createPartitionDesc(Table table, String currentLocation, - Map currentPart) { - Map params = null; - if (conf.getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER) && currentLocation == null) { - params = new HashMap(); - StatsSetupConst.setStatsStateForCreateTable(params, - MetaStoreUtils.getColumnNames(table.getCols()), StatsSetupConst.TRUE); - } - return new AlterTableAddPartitionDesc.PartitionDesc(currentPart, currentLocation, params); - } - /** * Add partition for Transactional tables needs to add (copy/rename) the data so that it lands * in a delta_x_x/ folder in the partition dir. @@ -3630,12 +3631,13 @@ private void handleTransactionalTable(Table tab, AlterTableAddPartitionDesc addP Long writeId = null; int stmtId = 0; - for (AlterTableAddPartitionDesc.PartitionDesc partitonDesc : addPartitionDesc.getPartitions()) { - if (partitonDesc.getLocation() != null) { - AcidUtils.validateAcidPartitionLocation(partitonDesc.getLocation(), conf); + for (int index = 0; index < addPartitionDesc.getPartitionCount(); index++) { + PartitionDesc desc = addPartitionDesc.getPartition(index); + if (desc.getLocation() != null) { + AcidUtils.validateAcidPartitionLocation(desc.getLocation(), conf); if(addPartitionDesc.isIfNotExists()) { //Don't add partition data if it already exists - Partition oldPart = getPartition(tab, partitonDesc.getPartSpec(), false); + Partition oldPart = getPartition(tab, desc.getPartSpec(), false); if(oldPart != null) { continue; } @@ -3651,15 +3653,15 @@ private void handleTransactionalTable(Table tab, AlterTableAddPartitionDesc addP } stmtId = getTxnMgr().getStmtIdAndIncrement(); } - LoadTableDesc loadTableWork = new LoadTableDesc(new Path(partitonDesc.getLocation()), - Utilities.getTableDesc(tab), partitonDesc.getPartSpec(), + LoadTableDesc loadTableWork = new LoadTableDesc(new Path(desc.getLocation()), + Utilities.getTableDesc(tab), desc.getPartSpec(), LoadTableDesc.LoadFileType.KEEP_EXISTING, //not relevant - creating new partition writeId); loadTableWork.setStmtId(stmtId); loadTableWork.setInheritTableSpecs(true); try { - partitonDesc.setLocation(new Path(tab.getDataLocation(), - Warehouse.makePartPath(partitonDesc.getPartSpec())).toString()); + desc.setLocation(new Path(tab.getDataLocation(), + Warehouse.makePartPath(desc.getPartSpec())).toString()); } catch (MetaException ex) { throw new SemanticException("Could not determine partition path due to: " diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index e955989d9211..687122a45ff9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -33,7 +33,6 @@ import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.metastore.txn.TxnUtils; @@ -71,9 +70,6 @@ import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import com.google.common.collect.ImmutableList; - import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import java.io.IOException; @@ -326,17 +322,10 @@ public static boolean prepareImport(boolean isImportCmd, x.getInputs().add(toReadEntity(new Path(parsedLocation), x.getConf())); } - if (StringUtils.isNotBlank(parsedTableName)) { + if ((parsedTableName != null) && (!parsedTableName.isEmpty())) { tblDesc.setTableName(parsedTableName); } - if (tblDesc.getTableName() == null) { - // Either we got the tablename from the IMPORT statement (first priority) or from the export dump. - throw new SemanticException(ErrorMsg.NEED_TABLE_SPECIFICATION.getMsg()); - } else { - x.getConf().set("import.destination.table", tblDesc.getTableName()); - } - List partitionDescs = new ArrayList<>(); Iterable partitions = rv.getPartitions(); for (Partition partition : partitions) { @@ -354,7 +343,7 @@ public static boolean prepareImport(boolean isImportCmd, for (Iterator partnIter = partitionDescs .listIterator(); partnIter.hasNext(); ) { AlterTableAddPartitionDesc addPartitionDesc = partnIter.next(); - if (!found && addPartitionDesc.getPartitions().get(0).getPartSpec().equals(parsedPartSpec)) { + if (!found && addPartitionDesc.getPartition(0).getPartSpec().equals(parsedPartSpec)) { found = true; } else { partnIter.remove(); @@ -367,6 +356,17 @@ public static boolean prepareImport(boolean isImportCmd, } } + if (tblDesc.getTableName() == null) { + // Either we got the tablename from the IMPORT statement (first priority) + // or from the export dump. + throw new SemanticException(ErrorMsg.NEED_TABLE_SPECIFICATION.getMsg()); + } else { + x.getConf().set("import.destination.table", tblDesc.getTableName()); + for (AlterTableAddPartitionDesc addPartitionDesc : partitionDescs) { + addPartitionDesc.setTableName(tblDesc.getTableName()); + } + } + Warehouse wh = new Warehouse(x.getConf()); Table table = tableIfExists(tblDesc, x.getHive()); boolean tableExists = false; @@ -410,32 +410,36 @@ public static boolean prepareImport(boolean isImportCmd, return tableExists; } - private static AlterTableAddPartitionDesc getBaseAddPartitionDescFromPartition(Path fromPath, String dbName, - ImportTableDesc tblDesc, Partition partition, ReplicationSpec replicationSpec, HiveConf conf) + private static AlterTableAddPartitionDesc getBaseAddPartitionDescFromPartition( + Path fromPath, String dbName, ImportTableDesc tblDesc, Partition partition, + ReplicationSpec replicationSpec, HiveConf conf) throws MetaException, SemanticException { - Map partitionSpec = EximUtil.makePartSpec(tblDesc.getPartCols(), partition.getValues()); - - StorageDescriptor sd = partition.getSd(); - - String location = null; - if (replicationSpec.isInReplicationScope() && tblDesc.isExternal() && - !replicationSpec.isMigratingToExternalTable()) { - location = ReplExternalTables.externalTableLocation(conf, partition.getSd().getLocation()); - LOG.debug("partition {} has data location: {}", partition, location); + AlterTableAddPartitionDesc partsDesc = new AlterTableAddPartitionDesc(dbName, tblDesc.getTableName(), + EximUtil.makePartSpec(tblDesc.getPartCols(), partition.getValues()), + partition.getSd().getLocation(), partition.getParameters()); + AlterTableAddPartitionDesc.PartitionDesc partDesc = partsDesc.getPartition(0); + partDesc.setInputFormat(partition.getSd().getInputFormat()); + partDesc.setOutputFormat(partition.getSd().getOutputFormat()); + partDesc.setNumBuckets(partition.getSd().getNumBuckets()); + partDesc.setCols(partition.getSd().getCols()); + partDesc.setSerializationLib(partition.getSd().getSerdeInfo().getSerializationLib()); + partDesc.setSerdeParams(partition.getSd().getSerdeInfo().getParameters()); + partDesc.setBucketCols(partition.getSd().getBucketCols()); + partDesc.setSortCols(partition.getSd().getSortCols()); + if (replicationSpec.isInReplicationScope() && tblDesc.isExternal() + && !replicationSpec.isMigratingToExternalTable()) { + String newLocation = ReplExternalTables + .externalTableLocation(conf, partition.getSd().getLocation()); + LOG.debug("partition {} has data location: {}", partition, newLocation); + partDesc.setLocation(newLocation); } else { - location = new Path(fromPath, Warehouse.makePartName(tblDesc.getPartCols(), partition.getValues())).toString(); + partDesc.setLocation(new Path(fromPath, + Warehouse.makePartName(tblDesc.getPartCols(), partition.getValues())).toString()); } - - long writeId = -1; if (tblDesc.getReplWriteId() != null) { - writeId = tblDesc.getReplWriteId(); + partDesc.setWriteId(tblDesc.getReplWriteId()); } - - AlterTableAddPartitionDesc.PartitionDesc partitionDesc = new AlterTableAddPartitionDesc.PartitionDesc( - partitionSpec, location, partition.getParameters(), sd.getInputFormat(), sd.getOutputFormat(), - sd.getNumBuckets(), sd.getCols(), sd.getSerdeInfo().getSerializationLib(), sd.getSerdeInfo().getParameters(), - sd.getBucketCols(), sd.getSortCols(), null, writeId); - return new AlterTableAddPartitionDesc(dbName, tblDesc.getTableName(), true, ImmutableList.of(partitionDesc)); + return partsDesc; } private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, @@ -562,10 +566,11 @@ private static Task alterSinglePartition( ImportTableDesc tblDesc, Table table, Warehouse wh, AlterTableAddPartitionDesc addPartitionDesc, ReplicationSpec replicationSpec, org.apache.hadoop.hive.ql.metadata.Partition ptn, EximUtil.SemanticAnalyzerWrapperContext x) throws MetaException, IOException, HiveException { + addPartitionDesc.setReplaceMode(true); if ((replicationSpec != null) && (replicationSpec.isInReplicationScope())) { addPartitionDesc.setReplicationSpec(replicationSpec); } - AlterTableAddPartitionDesc.PartitionDesc partSpec = addPartitionDesc.getPartitions().get(0); + AlterTableAddPartitionDesc.PartitionDesc partSpec = addPartitionDesc.getPartition(0); if (ptn == null) { fixLocationInPartSpec(tblDesc, table, wh, replicationSpec, partSpec, x); } else if (!externalTablePartition(tblDesc, replicationSpec)) { @@ -579,7 +584,7 @@ private static Task addSinglePartition(ImportTableDesc tblDesc, ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId) throws MetaException, IOException, HiveException { - AlterTableAddPartitionDesc.PartitionDesc partSpec = addPartitionDesc.getPartitions().get(0); + AlterTableAddPartitionDesc.PartitionDesc partSpec = addPartitionDesc.getPartition(0); boolean isAutoPurge = false; boolean needRecycle = false; boolean copyToMigratedTxnTable = replicationSpec.isMigratingToTxnTable(); @@ -1033,7 +1038,7 @@ private static void createRegularImportTasks( x.getLOG().debug("table partitioned"); for (AlterTableAddPartitionDesc addPartitionDesc : partitionDescs) { - Map partSpec = addPartitionDesc.getPartitions().get(0).getPartSpec(); + Map partSpec = addPartitionDesc.getPartition(0).getPartSpec(); org.apache.hadoop.hive.ql.metadata.Partition ptn = null; if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) { x.getTasks().add(addSinglePartition( @@ -1230,7 +1235,7 @@ private static void createReplImportTasks( } if (updatedMetadata != null) { updatedMetadata.addPartition(table.getDbName(), table.getTableName(), - addPartitionDesc.getPartitions().get(0).getPartSpec()); + addPartitionDesc.getPartition(0).getPartSpec()); } } } else if (!replicationSpec.isMetadataOnly() @@ -1284,7 +1289,7 @@ private static void createReplImportTasks( x.getLOG().debug("table partitioned"); for (AlterTableAddPartitionDesc addPartitionDesc : partitionDescs) { addPartitionDesc.setReplicationSpec(replicationSpec); - Map partSpec = addPartitionDesc.getPartitions().get(0).getPartSpec(); + Map partSpec = addPartitionDesc.getPartition(0).getPartSpec(); org.apache.hadoop.hive.ql.metadata.Partition ptn = null; if (isOldTableValid) { // If existing table is valid but the partition spec is different, then ignore partition @@ -1303,13 +1308,15 @@ private static void createReplImportTasks( x.getTasks().add(addSinglePartition( tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId)); if (updatedMetadata != null) { - updatedMetadata.addPartition(table.getDbName(), table.getTableName(), partSpec); + updatedMetadata.addPartition(table.getDbName(), table.getTableName(), + addPartitionDesc.getPartition(0).getPartSpec()); } } else { x.getTasks().add(alterSinglePartition( tblDesc, table, wh, addPartitionDesc, replicationSpec, null, x)); if (updatedMetadata != null) { - updatedMetadata.addPartition(table.getDbName(), table.getTableName(), partSpec); + updatedMetadata.addPartition(table.getDbName(), table.getTableName(), + addPartitionDesc.getPartition(0).getPartSpec()); } } } else { @@ -1324,7 +1331,8 @@ private static void createReplImportTasks( tblDesc, table, wh, addPartitionDesc, replicationSpec, ptn, x)); } if (updatedMetadata != null) { - updatedMetadata.addPartition(table.getDbName(), table.getTableName(), partSpec); + updatedMetadata.addPartition(table.getDbName(), table.getTableName(), + addPartitionDesc.getPartition(0).getPartSpec()); } if (lockType == WriteEntity.WriteType.DDL_NO_LOCK){ lockType = WriteEntity.WriteType.DDL_SHARED; diff --git a/ql/src/test/results/clientpositive/add_part_multiple.q.out b/ql/src/test/results/clientpositive/add_part_multiple.q.out index 8fbbe2ae0b0e..81454f7dc5a7 100644 --- a/ql/src/test/results/clientpositive/add_part_multiple.q.out +++ b/ql/src/test/results/clientpositive/add_part_multiple.q.out @@ -30,22 +30,8 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Add Partition - db name: default - partitions: - Partition - location: A - partition spec: {ds=2010-01-01} - Partition - location: B - partition spec: {ds=2010-02-01} - Partition - params: {totalSize=0, numRows=0, rawDataSize=0, COLUMN_STATS_ACCURATE={"BASIC_STATS":"true","COLUMN_STATS":{"key":"true","value":"true"}}, numFiles=0, numFilesErasureCoded=0} - partition spec: {ds=2010-03-01} - Partition - location: C - partition spec: {ds=2010-04-01} - table name: add_part_test_n1 - if not exists: true +#### A masked pattern was here #### + Spec: {ds=2010-01-01}, {ds=2010-02-01}, {ds=2010-03-01}, {ds=2010-04-01} PREHOOK: query: ALTER TABLE add_part_test_n1 ADD IF NOT EXISTS PARTITION (ds='2010-01-01') location 'A' diff --git a/ql/src/test/results/clientpositive/drop_partitions_filter.q.out b/ql/src/test/results/clientpositive/drop_partitions_filter.q.out index edfbbf7127e8..457fa0bc2fc2 100644 --- a/ql/src/test/results/clientpositive/drop_partitions_filter.q.out +++ b/ql/src/test/results/clientpositive/drop_partitions_filter.q.out @@ -33,12 +33,8 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Add Partition - db name: default - partitions: - Partition - params: {totalSize=0, numRows=0, rawDataSize=0, COLUMN_STATS_ACCURATE={"BASIC_STATS":"true","COLUMN_STATS":{"a":"true","b":"true"}}, numFiles=0, numFilesErasureCoded=0} - partition spec: {c=US, d=1} - table name: ptestfilter_n1 +#### A masked pattern was here #### + Spec: {c=US, d=1} PREHOOK: query: alter table ptestfilter_n1 add partition (c='US', d=1) PREHOOK: type: ALTERTABLE_ADDPARTS diff --git a/ql/src/test/results/clientpositive/llap/add_part_with_loc.q.out b/ql/src/test/results/clientpositive/llap/add_part_with_loc.q.out index 1c58917d6732..b307c021aba5 100644 --- a/ql/src/test/results/clientpositive/llap/add_part_with_loc.q.out +++ b/ql/src/test/results/clientpositive/llap/add_part_with_loc.q.out @@ -31,12 +31,8 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Add Partition - db name: default - partitions: - Partition - location: hdfs://### HDFS PATH ### - partition spec: {day=20110102} - table name: supply +#### A masked pattern was here #### + Spec: {day=20110102} Stage: Stage-1 Move Operator diff --git a/ql/src/test/results/clientpositive/spark/add_part_multiple.q.out b/ql/src/test/results/clientpositive/spark/add_part_multiple.q.out index 8fbbe2ae0b0e..81454f7dc5a7 100644 --- a/ql/src/test/results/clientpositive/spark/add_part_multiple.q.out +++ b/ql/src/test/results/clientpositive/spark/add_part_multiple.q.out @@ -30,22 +30,8 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Add Partition - db name: default - partitions: - Partition - location: A - partition spec: {ds=2010-01-01} - Partition - location: B - partition spec: {ds=2010-02-01} - Partition - params: {totalSize=0, numRows=0, rawDataSize=0, COLUMN_STATS_ACCURATE={"BASIC_STATS":"true","COLUMN_STATS":{"key":"true","value":"true"}}, numFiles=0, numFilesErasureCoded=0} - partition spec: {ds=2010-03-01} - Partition - location: C - partition spec: {ds=2010-04-01} - table name: add_part_test_n1 - if not exists: true +#### A masked pattern was here #### + Spec: {ds=2010-01-01}, {ds=2010-02-01}, {ds=2010-03-01}, {ds=2010-04-01} PREHOOK: query: ALTER TABLE add_part_test_n1 ADD IF NOT EXISTS PARTITION (ds='2010-01-01') location 'A' diff --git a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java index f4e71f915b22..f2beafea9ee3 100644 --- a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java +++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java @@ -48,7 +48,7 @@ import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.ddl.table.partition.AlterTableAddPartitionDesc; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hive.common.util.ShutdownHookManager; import org.apache.thrift.TException; @@ -437,13 +437,11 @@ public PartitionInfo createPartitionIfNotExists(final List partitionValu try { Map partSpec = Warehouse.makeSpecFromValues(tableObject.getPartitionKeys(), partitionValues); - - Path location = new Path(tableObject.getDataLocation(), Warehouse.makePartPath(partSpec)); - location = new Path(Utilities.getQualifiedPath(conf, location)); - partLocation = location.toString(); + AlterTableAddPartitionDesc addPartitionDesc = new AlterTableAddPartitionDesc(database, table, true); partName = Warehouse.makePartName(tableObject.getPartitionKeys(), partitionValues); - Partition partition = - org.apache.hadoop.hive.ql.metadata.Partition.createMetaPartitionObject(tableObject, partSpec, location); + partLocation = new Path(tableObject.getDataLocation(), Warehouse.makePartPath(partSpec)).toString(); + addPartitionDesc.addPartition(partSpec, partLocation); + Partition partition = Hive.convertAddSpecToMetaPartition(tableObject, addPartitionDesc.getPartition(0), conf); if (getMSC() == null) { // We assume it doesn't exist if we can't check it