From 853bf6252a0d5b0d10f35248b4b4785c08380914 Mon Sep 17 00:00:00 2001 From: miklosgergely Date: Mon, 22 Jul 2019 02:24:14 +0200 Subject: [PATCH] HIVE-22028 Clean up Add Partition --- .../hive/hcatalog/streaming/HiveEndPoint.java | 60 +--- .../partition/AlterTableAddPartitionDesc.java | 306 ++++++------------ .../AlterTableAddPartitionOperation.java | 196 ++++++++++- .../events/filesystem/FSTableEvent.java | 54 ++-- .../bootstrap/load/table/LoadPartitions.java | 8 +- .../apache/hadoop/hive/ql/metadata/Hive.java | 160 ++------- .../hive/ql/parse/DDLSemanticAnalyzer.java | 72 ++--- .../hive/ql/parse/ImportSemanticAnalyzer.java | 92 +++--- .../clientpositive/add_part_multiple.q.out | 18 +- .../drop_partitions_filter.q.out | 8 +- .../llap/add_part_with_loc.q.out | 8 +- .../spark/add_part_multiple.q.out | 18 +- .../streaming/HiveStreamingConnection.java | 12 +- 13 files changed, 479 insertions(+), 533 deletions(-) diff --git a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java index fee7ffc4c73b..e249b7775e4e 100644 --- a/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java +++ b/hcatalog/streaming/src/java/org/apache/hive/hcatalog/streaming/HiveEndPoint.java @@ -25,12 +25,10 @@ import org.apache.hadoop.hive.metastore.api.DataOperationType; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.ddl.table.partition.AlterTableAddPartitionDesc; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hive.cli.CliSessionState; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.LockComponentBuilder; @@ -42,14 +40,10 @@ import org.apache.hadoop.hive.metastore.api.LockResponse; import org.apache.hadoop.hive.metastore.api.LockState; import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.TxnAbortedException; import org.apache.hadoop.hive.metastore.api.TxnToWriteId; -import org.apache.hadoop.hive.ql.DriverFactory; -import org.apache.hadoop.hive.ql.IDriver; -import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hive.hcatalog.common.HCatUtil; import org.apache.hadoop.security.UserGroupInformation; @@ -229,18 +223,6 @@ private StreamingConnection newConnectionImpl(UserGroupInformation ugi, return new ConnectionImpl(this, ugi, conf, createPartIfNotExists, agentInfo); } - private static UserGroupInformation getUserGroupInfo(String user) - throws ImpersonationFailed { - try { - return UserGroupInformation.createProxyUser( - user, UserGroupInformation.getLoginUser()); - } catch (IOException e) { - LOG.error("Unable to get UserGroupInfo for user : " + user, e); - throw new ImpersonationFailed(user,e); - } - } - - @Override public boolean equals(Object o) { if (this == o) { @@ -467,12 +449,10 @@ private static void createPartitionIfNotExists(HiveEndPoint ep, Map partSpec = Warehouse.makeSpecFromValues(tableObject.getPartitionKeys(), ep.partitionVals); - AlterTableAddPartitionDesc addPartitionDesc = new AlterTableAddPartitionDesc(ep.database, ep.table, true); - String partLocation = new Path(tableObject.getDataLocation(), - Warehouse.makePartPath(partSpec)).toString(); - addPartitionDesc.addPartition(partSpec, partLocation); - Partition partition = Hive.convertAddSpecToMetaPartition(tableObject, - addPartitionDesc.getPartition(0), conf); + Path location = new Path(tableObject.getDataLocation(), Warehouse.makePartPath(partSpec)); + location = new Path(Utilities.getQualifiedPath(conf, location)); + Partition partition = + org.apache.hadoop.hive.ql.metadata.Partition.createMetaPartitionObject(tableObject, partSpec, location); msClient.add_partition(partition); } catch (AlreadyExistsException e) { @@ -486,36 +466,6 @@ private static void createPartitionIfNotExists(HiveEndPoint ep, } } - private static boolean runDDL(IDriver driver, String sql) throws QueryFailedException { - if (LOG.isDebugEnabled()) { - LOG.debug("Running Hive Query: " + sql); - } - driver.run(sql); - return true; - } - - private static String partSpecStr(List partKeys, ArrayList partVals) { - if (partKeys.size()!=partVals.size()) { - throw new IllegalArgumentException("Partition values:" + partVals + - ", does not match the partition Keys in table :" + partKeys ); - } - StringBuilder buff = new StringBuilder(partKeys.size()*20); - buff.append(" ( "); - int i=0; - for (FieldSchema schema : partKeys) { - buff.append(schema.getName()); - buff.append("='"); - buff.append(partVals.get(i)); - buff.append("'"); - if (i!=partKeys.size()-1) { - buff.append(","); - } - ++i; - } - buff.append(" )"); - return buff.toString(); - } - private static IMetaStoreClient getMetaStoreClient(HiveEndPoint endPoint, HiveConf conf, boolean secureMode) throws ConnectionError { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/AlterTableAddPartitionDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/AlterTableAddPartitionDesc.java index 72828efaae64..bc068ed2ec65 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/AlterTableAddPartitionDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/AlterTableAddPartitionDesc.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hive.ql.ddl.table.partition; import java.io.Serializable; -import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -32,45 +31,67 @@ import org.apache.hadoop.hive.ql.plan.Explain.Level; /** - * DDL task description for ALTER TABLE ... DROP PARTITION ... commands. + * DDL task description for ALTER TABLE ... ADD PARTITION ... commands. */ @Explain(displayName = "Add Partition", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public class AlterTableAddPartitionDesc implements DDLDesc, Serializable { private static final long serialVersionUID = 1L; + /** + * Description of a partition to add. + */ + @Explain(displayName = "Partition", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public static class PartitionDesc { - PartitionDesc( - Map partSpec, String location, Map params) { - this(partSpec, location); - this.partParams = params; - } - - PartitionDesc(Map partSpec, String location) { - this.partSpec = partSpec; + private final Map partitionSpec; + private String location; // TODO: make location final too + private final Map params; + private final String inputFormat; + private final String outputFormat; + private final int numBuckets; + private final List columns; + private final String serializationLib; + private final Map serdeParams; + private final List bucketColumns; + private final List sortColumns; + private final ColumnStatistics columnStats; + private final long writeId; + + public PartitionDesc(Map partitionSpec, String location, Map params) { + this(partitionSpec, location, params, null, null, -1, null, null, null, null, null, null, -1); + } + + public PartitionDesc(Map partitionSpec, String location, Map params, + String inputFormat, String outputFormat, int numBuckets, List columns, String serializationLib, + Map serdeParams, List bucketColumns, List sortColumns, + ColumnStatistics columnStats, long writeId) { + this.partitionSpec = partitionSpec; this.location = location; + this.params = params; + this.inputFormat = inputFormat; + this.outputFormat = outputFormat; + this.numBuckets = numBuckets; + this.columns = columns; + this.serializationLib = serializationLib; + this.serdeParams = serdeParams; + this.bucketColumns = bucketColumns; + this.sortColumns = sortColumns; + this.columnStats = columnStats; + this.writeId = writeId; } - Map partSpec; - Map partParams; - String location; - String inputFormat = null; - String outputFormat = null; - int numBuckets = -1; - List cols = null; - String serializationLib = null; - Map serdeParams = null; - List bucketCols = null; - List sortCols = null; - ColumnStatistics colStats = null; - long writeId = -1; - public Map getPartSpec() { - return partSpec; + return partitionSpec; + } + + @Explain(displayName = "partition spec", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) + public String getPartSpecForExplain() { + return partitionSpec.toString(); } /** * @return location of partition in relation to table */ + @Explain(displayName = "location", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public String getLocation() { return location; } @@ -80,241 +101,102 @@ public void setLocation(String location) { } public Map getPartParams() { - return partParams; + return params; + } + + @Explain(displayName = "params", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) + public String getPartParamsForExplain() { + return params.toString(); } - public void setPartParams(Map partParams) { - this.partParams = partParams; + @Explain(displayName = "input format", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) + public String getInputFormat() { + return inputFormat; + } + + @Explain(displayName = "output format", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) + public String getOutputFormat() { + return outputFormat; } public int getNumBuckets() { return numBuckets; } - public void setNumBuckets(int numBuckets) { - this.numBuckets = numBuckets; + @Explain(displayName = "num buckets", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) + public Integer getNumBucketsExplain() { + return numBuckets == -1 ? null : numBuckets; } + @Explain(displayName = "columns", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public List getCols() { - return cols; - } - - public void setCols(List cols) { - this.cols = cols; + return columns; } + @Explain(displayName = "serialization lib", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public String getSerializationLib() { return serializationLib; } - public void setSerializationLib(String serializationLib) { - this.serializationLib = serializationLib; - } - + @Explain(displayName = "serde params", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public Map getSerdeParams() { return serdeParams; } - public void setSerdeParams(Map serdeParams) { - this.serdeParams = serdeParams; - } - + @Explain(displayName = "bucket columns", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public List getBucketCols() { - return bucketCols; - } - - public void setBucketCols(List bucketCols) { - this.bucketCols = bucketCols; + return bucketColumns; } + @Explain(displayName = "sort columns", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public List getSortCols() { - return sortCols; + return sortColumns; } - public void setSortCols(List sortCols) { - this.sortCols = sortCols; + @Explain(displayName = "column stats", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) + public ColumnStatistics getColStats() { + return columnStats; } - public String getInputFormat() { - return inputFormat; + public long getWriteId() { + return writeId; } - - public void setInputFormat(String inputFormat) { - this.inputFormat = inputFormat; - } - - public String getOutputFormat() { - return outputFormat; - } - - public void setOutputFormat(String outputFormat) { - this.outputFormat = outputFormat; - } - - public ColumnStatistics getColStats() { return colStats; } - - public void setColStats(ColumnStatistics colStats) { this.colStats = colStats; } - - public long getWriteId() { return writeId; } - - public void setWriteId(long writeId) { this.writeId = writeId; } } - String tableName; - String dbName; - boolean ifNotExists; - List partitions = null; - boolean replaceMode = false; - private ReplicationSpec replicationSpec = null; - + private final String dbName; + private final String tableName; + private final boolean ifNotExists; + private final List partitions; - /** - * For serialization only. - */ - public AlterTableAddPartitionDesc() { - } + private ReplicationSpec replicationSpec = null; // TODO: make replicationSpec final too - public AlterTableAddPartitionDesc( - String dbName, String tableName, boolean ifNotExists) { - super(); + public AlterTableAddPartitionDesc(String dbName, String tableName, boolean ifNotExists, + List partitions) { this.dbName = dbName; this.tableName = tableName; this.ifNotExists = ifNotExists; + this.partitions = partitions; } - /** - * Legacy single-partition ctor for ImportSemanticAnalyzer - * @param dbName - * database to add to. - * @param tableName - * table to add to. - * @param partSpec - * partition specification. - * @param location - * partition location, relative to table location. - * @param params - * partition parameters. - */ - @Deprecated - public AlterTableAddPartitionDesc(String dbName, String tableName, - Map partSpec, String location, Map params) { - super(); - this.dbName = dbName; - this.tableName = tableName; - this.ifNotExists = true; - addPartition(partSpec, location, params); - } - - public void addPartition(Map partSpec, String location) { - addPartition(partSpec, location, null); - } - - private void addPartition( - Map partSpec, String location, Map params) { - if (this.partitions == null) { - this.partitions = new ArrayList(); - } - this.partitions.add(new PartitionDesc(partSpec, location, params)); - } - - /** - * @return database name - */ + @Explain(displayName = "db name", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public String getDbName() { return dbName; } - /** - * @param dbName - * database name - */ - public void setDbName(String dbName) { - this.dbName = dbName; - } - - /** - * @return the table we're going to add the partitions to. - */ + @Explain(displayName = "table name", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public String getTableName() { return tableName; } - /** - * @param tableName - * the table we're going to add the partitions to. - */ - public void setTableName(String tableName) { - this.tableName = tableName; - } - - /** - * @return location of partition in relation to table - */ - @Explain(displayName = "Location") - public String getLocationForExplain() { - if (this.partitions == null || this.partitions.isEmpty()) return ""; - boolean isFirst = true; - StringBuilder sb = new StringBuilder(); - for (PartitionDesc desc : this.partitions) { - if (!isFirst) { - sb.append(", "); - } - isFirst = false; - sb.append(desc.location); - } - return sb.toString(); - } - - @Explain(displayName = "Spec") - public String getPartSpecStringForExplain() { - if (this.partitions == null || this.partitions.isEmpty()) return ""; - boolean isFirst = true; - StringBuilder sb = new StringBuilder(); - for (PartitionDesc desc : this.partitions) { - if (!isFirst) { - sb.append(", "); - } - isFirst = false; - sb.append(desc.partSpec.toString()); - } - return sb.toString(); - } - - /** - * @return if the partition should only be added if it doesn't exist already - */ + @Explain(displayName = "if not exists", displayOnlyOnTrue = true, + explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public boolean isIfNotExists() { - return this.ifNotExists; + return ifNotExists; } - /** - * @param ifNotExists - * if the part should be added only if it doesn't exist - */ - public void setIfNotExists(boolean ifNotExists) { - this.ifNotExists = ifNotExists; - } - - public int getPartitionCount() { - return this.partitions.size(); - } - - public PartitionDesc getPartition(int i) { - return this.partitions.get(i); - } - - /** - * @param replaceMode Determine if this AddPartition should behave like a replace-into alter instead - */ - public void setReplaceMode(boolean replaceMode){ - this.replaceMode = replaceMode; - } - - /** - * @return true if this AddPartition should behave like a replace-into alter instead - */ - public boolean getReplaceMode() { - return this.replaceMode; + @Explain(displayName = "partitions", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) + public List getPartitions() { + return partitions; } /** @@ -331,8 +213,8 @@ public void setReplicationSpec(ReplicationSpec replicationSpec) { */ public ReplicationSpec getReplicationSpec(){ if (replicationSpec == null){ - this.replicationSpec = new ReplicationSpec(); + replicationSpec = new ReplicationSpec(); } - return this.replicationSpec; + return replicationSpec; } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/AlterTableAddPartitionOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/AlterTableAddPartitionOperation.java index 488fa59cbff4..d8597a888c19 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/AlterTableAddPartitionOperation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/AlterTableAddPartitionOperation.java @@ -18,14 +18,29 @@ package org.apache.hadoop.hive.ql.ddl.table.partition; +import java.util.ArrayList; +import java.util.BitSet; import java.util.List; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.common.TableName; +import org.apache.hadoop.hive.common.ValidReaderWriteIdList; +import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.api.EnvironmentContext; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.ql.ddl.DDLOperation; import org.apache.hadoop.hive.ql.ddl.DDLOperationContext; import org.apache.hadoop.hive.ql.ddl.DDLUtils; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.parse.ReplicationSpec; /** * Operation process of adding a partition to a table. @@ -37,10 +52,181 @@ public AlterTableAddPartitionOperation(DDLOperationContext context, AlterTableAd @Override public int execute() throws HiveException { - List parts = context.getDb().createPartitions(desc); - for (Partition part : parts) { - DDLUtils.addIfAbsentByName(new WriteEntity(part, WriteEntity.WriteType.INSERT), context); - } + // TODO: catalog name everywhere in this method + Table table = context.getDb().getTable(desc.getDbName(), desc.getTableName()); + long writeId = getWriteId(table); + + List partitions = getPartitions(table, writeId); + addPartitions(table, partitions, writeId); return 0; } + + private long getWriteId(Table table) throws LockException { + // In case of replication, get the writeId from the source and use valid write Id list for replication. + if (desc.getReplicationSpec().isInReplicationScope() && desc.getPartitions().get(0).getWriteId() > 0) { + return desc.getPartitions().get(0).getWriteId(); + } else { + AcidUtils.TableSnapshot tableSnapshot = AcidUtils.getTableSnapshot(context.getConf(), table, true); + if (tableSnapshot != null && tableSnapshot.getWriteId() > 0) { + return tableSnapshot.getWriteId(); + } else { + return -1; + } + } + } + + private List getPartitions(Table table, long writeId) throws HiveException { + List partitions = new ArrayList<>(desc.getPartitions().size()); + for (AlterTableAddPartitionDesc.PartitionDesc partitionDesc : desc.getPartitions()) { + Partition partition = convertPartitionSpecToMetaPartition(table, partitionDesc); + if (partition != null && writeId > 0) { + partition.setWriteId(writeId); + } + partitions.add(partition); + } + + return partitions; + } + + private Partition convertPartitionSpecToMetaPartition(Table table, + AlterTableAddPartitionDesc.PartitionDesc partitionSpec) throws HiveException { + Path location = partitionSpec.getLocation() != null ? new Path(table.getPath(), partitionSpec.getLocation()) : null; + if (location != null) { + // Ensure that it is a full qualified path (in most cases it will be since tbl.getPath() is full qualified) + location = new Path(Utilities.getQualifiedPath(context.getConf(), location)); + } + + Partition partition = org.apache.hadoop.hive.ql.metadata.Partition.createMetaPartitionObject( + table, partitionSpec.getPartSpec(), location); + + if (partitionSpec.getPartParams() != null) { + partition.setParameters(partitionSpec.getPartParams()); + } + if (partitionSpec.getInputFormat() != null) { + partition.getSd().setInputFormat(partitionSpec.getInputFormat()); + } + if (partitionSpec.getOutputFormat() != null) { + partition.getSd().setOutputFormat(partitionSpec.getOutputFormat()); + } + if (partitionSpec.getNumBuckets() != -1) { + partition.getSd().setNumBuckets(partitionSpec.getNumBuckets()); + } + if (partitionSpec.getCols() != null) { + partition.getSd().setCols(partitionSpec.getCols()); + } + if (partitionSpec.getSerializationLib() != null) { + partition.getSd().getSerdeInfo().setSerializationLib(partitionSpec.getSerializationLib()); + } + if (partitionSpec.getSerdeParams() != null) { + partition.getSd().getSerdeInfo().setParameters(partitionSpec.getSerdeParams()); + } + if (partitionSpec.getBucketCols() != null) { + partition.getSd().setBucketCols(partitionSpec.getBucketCols()); + } + if (partitionSpec.getSortCols() != null) { + partition.getSd().setSortCols(partitionSpec.getSortCols()); + } + if (partitionSpec.getColStats() != null) { + partition.setColStats(partitionSpec.getColStats()); + // Statistics will have an associated write Id for a transactional table. We need it to update column statistics. + partition.setWriteId(partitionSpec.getWriteId()); + } + return partition; + } + + private void addPartitions(Table table, List partitions, long writeId) throws HiveException { + List outPartitions = null; + if (!desc.getReplicationSpec().isInReplicationScope()) { + outPartitions = addPartitionsNoReplication(table, partitions); + } else { + outPartitions = addPartitionsWithReplication(table, partitions, writeId); + } + + for (org.apache.hadoop.hive.ql.metadata.Partition outPartition : outPartitions) { + DDLUtils.addIfAbsentByName(new WriteEntity(outPartition, WriteEntity.WriteType.INSERT), context); + } + } + + private List addPartitionsNoReplication(Table table, + List partitions) throws HiveException { + // TODO: normally, the result is not necessary; might make sense to pass false + List outPartitions = new ArrayList<>(); + for (Partition outPart : context.getDb().addPartition(partitions, desc.isIfNotExists(), true)) { + outPartitions.add(new org.apache.hadoop.hive.ql.metadata.Partition(table, outPart)); + } + return outPartitions; + } + + private List addPartitionsWithReplication(Table table, + List partitions, long writeId) throws HiveException { + // For replication add-ptns, we need to follow a insert-if-not-exist, alter-if-exists scenario. + // TODO : ideally, we should push this mechanism to the metastore, because, otherwise, we have + // no choice but to iterate over the partitions here. + + List partitionsToAdd = new ArrayList<>(); + List partitionssToAlter = new ArrayList<>(); + List partitionNames = new ArrayList<>(); + for (Partition partition : partitions){ + partitionNames.add(getPartitionName(table, partition)); + try { + Partition p = context.getDb().getPartition(desc.getDbName(), desc.getTableName(), partition.getValues()); + if (desc.getReplicationSpec().allowReplacementInto(p.getParameters())){ + ReplicationSpec.copyLastReplId(p.getParameters(), partition.getParameters()); + partitionssToAlter.add(partition); + } // else ptn already exists, but we do nothing with it. + } catch (HiveException e){ + if (e.getCause() instanceof NoSuchObjectException) { + // if the object does not exist, we want to add it. + partitionsToAdd.add(partition); + } else { + throw e; + } + } + } + + List outPartitions = new ArrayList<>(); + for (Partition outPartition : context.getDb().addPartition(partitionsToAdd, desc.isIfNotExists(), true)) { + outPartitions.add(new org.apache.hadoop.hive.ql.metadata.Partition(table, outPartition)); + } + + // In case of replication, statistics is obtained from the source, so do not update those on replica. + EnvironmentContext ec = new EnvironmentContext(); + ec.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE); + String validWriteIdList = getValidWriteIdList(table, writeId); + context.getDb().alterPartitions(desc.getDbName(), desc.getTableName(), partitionssToAlter, ec, validWriteIdList, + writeId); + + for (Partition outPartition : context.getDb().getPartitionsByNames(desc.getDbName(), desc.getTableName(), + partitionNames)){ + outPartitions.add(new org.apache.hadoop.hive.ql.metadata.Partition(table, outPartition)); + } + + return outPartitions; + } + + private String getPartitionName(Table table, Partition partition) throws HiveException { + try { + return Warehouse.makePartName(table.getPartitionKeys(), partition.getValues()); + } catch (MetaException e) { + throw new HiveException(e); + } + } + + private String getValidWriteIdList(Table table, long writeId) throws LockException { + if (desc.getReplicationSpec().isInReplicationScope() && desc.getPartitions().get(0).getWriteId() > 0) { + // We need a valid writeId list for a transactional change. During replication we do not + // have a valid writeId list which was used for this on the source. But we know for sure + // that the writeId associated with it was valid then (otherwise the change would have + // failed on the source). So use a valid transaction list with only that writeId. + return new ValidReaderWriteIdList(TableName.getDbTable(table.getDbName(), table.getTableName()), + new long[0], new BitSet(), writeId).writeToString(); + } else { + AcidUtils.TableSnapshot tableSnapshot = AcidUtils.getTableSnapshot(context.getConf(), table, true); + if (tableSnapshot != null && tableSnapshot.getWriteId() > 0) { + return tableSnapshot.getValidWriteIdList(); + } else { + return null; + } + } + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java index 64f9af3abaf8..79a93c34f2ec 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.ql.ddl.table.partition.AlterTableAddPartitionDesc; import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; @@ -41,9 +42,12 @@ import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.util.HiveStrictManagedMigration; +import com.google.common.collect.ImmutableList; + import java.net.URI; import java.util.ArrayList; import java.util.List; +import java.util.Map; import static org.apache.hadoop.hive.ql.util.HiveStrictManagedMigration.getHiveUpdater; @@ -146,7 +150,7 @@ public List partitionDescriptions(ImportTableDesc tb //TODO: if partitions are loaded lazily via the iterator then we will have to avoid conversion of everything here as it defeats the purpose. for (Partition partition : metadata.getPartitions()) { // TODO: this should ideally not create AddPartitionDesc per partition - AlterTableAddPartitionDesc partsDesc = partitionDesc(fromPath, tblDesc, partition); + AlterTableAddPartitionDesc partsDesc = addPartitionDesc(fromPath, tblDesc, partition); descs.add(partsDesc); } return descs; @@ -167,46 +171,42 @@ public List partitions(ImportTableDesc tblDesc) return partitions; } - private AlterTableAddPartitionDesc partitionDesc(Path fromPath, - ImportTableDesc tblDesc, Partition partition) throws SemanticException { + private AlterTableAddPartitionDesc addPartitionDesc(Path fromPath, ImportTableDesc tblDesc, Partition partition) + throws SemanticException { try { - AlterTableAddPartitionDesc partsDesc = - new AlterTableAddPartitionDesc(tblDesc.getDatabaseName(), tblDesc.getTableName(), - EximUtil.makePartSpec(tblDesc.getPartCols(), partition.getValues()), - partition.getSd().getLocation(), partition.getParameters()); - AlterTableAddPartitionDesc.PartitionDesc partDesc = partsDesc.getPartition(0); - partDesc.setInputFormat(partition.getSd().getInputFormat()); - partDesc.setOutputFormat(partition.getSd().getOutputFormat()); - partDesc.setNumBuckets(partition.getSd().getNumBuckets()); - partDesc.setCols(partition.getSd().getCols()); - partDesc.setSerializationLib(partition.getSd().getSerdeInfo().getSerializationLib()); - partDesc.setSerdeParams(partition.getSd().getSerdeInfo().getParameters()); - partDesc.setBucketCols(partition.getSd().getBucketCols()); - partDesc.setSortCols(partition.getSd().getSortCols()); - if (tblDesc.isExternal() && !replicationSpec().isMigratingToExternalTable()) { - // we have to provide the source location so target location can be derived. - partDesc.setLocation(partition.getSd().getLocation()); - } else { + Map partitionSpec = EximUtil.makePartSpec(tblDesc.getPartCols(), partition.getValues()); + + StorageDescriptor sd = partition.getSd(); + String location = sd.getLocation(); + if (!tblDesc.isExternal() || replicationSpec().isMigratingToExternalTable()) { /** * this is required for file listing of all files in a partition for managed table as described in * {@link org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator} */ - partDesc.setLocation(new Path(fromPath, - Warehouse.makePartName(tblDesc.getPartCols(), partition.getValues())).toString()); + location = new Path(fromPath, Warehouse.makePartName(tblDesc.getPartCols(), partition.getValues())).toString(); } - partsDesc.setReplicationSpec(replicationSpec()); + ColumnStatistics columnStatistics = null; + long writeId = -1; if (partition.isSetColStats()) { ColumnStatistics colStats = partition.getColStats(); ColumnStatisticsDesc colStatsDesc = new ColumnStatisticsDesc(colStats.getStatsDesc()); colStatsDesc.setTableName(tblDesc.getTableName()); colStatsDesc.setDbName(tblDesc.getDatabaseName()); - partDesc.setColStats(new ColumnStatistics(colStatsDesc, colStats.getStatsObj())); - long writeId = replicationSpec().isMigratingToTxnTable() ? + columnStatistics = new ColumnStatistics(colStatsDesc, colStats.getStatsObj()); + writeId = replicationSpec().isMigratingToTxnTable() ? ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID : partition.getWriteId(); - partDesc.setWriteId(writeId); } - return partsDesc; + + AlterTableAddPartitionDesc.PartitionDesc partitionDesc = new AlterTableAddPartitionDesc.PartitionDesc( + partitionSpec, location, partition.getParameters(), sd.getInputFormat(), sd.getOutputFormat(), + sd.getNumBuckets(), sd.getCols(), sd.getSerdeInfo().getSerializationLib(), sd.getSerdeInfo().getParameters(), + sd.getBucketCols(), sd.getSortCols(), columnStatistics, writeId); + + AlterTableAddPartitionDesc addPartitionDesc = new AlterTableAddPartitionDesc(tblDesc.getDatabaseName(), + tblDesc.getTableName(), true, ImmutableList.of(partitionDesc)); + addPartitionDesc.setReplicationSpec(replicationSpec()); + return addPartitionDesc; } catch (Exception e) { throw new SemanticException(e); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java index c728e2d49a86..40020ed2573e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java @@ -182,7 +182,7 @@ private void addPartition(boolean hasMorePartitions, AlterTableAddPartitionDesc */ private Task tasksForAddPartition(Table table, AlterTableAddPartitionDesc addPartitionDesc, Task ptnRootTask) throws MetaException, HiveException { - AlterTableAddPartitionDesc.PartitionDesc partSpec = addPartitionDesc.getPartition(0); + AlterTableAddPartitionDesc.PartitionDesc partSpec = addPartitionDesc.getPartitions().get(0); Path sourceWarehousePartitionLocation = new Path(partSpec.getLocation()); Path replicaWarehousePartitionLocation = locationOnReplicaWarehouse(table, partSpec); partSpec.setLocation(replicaWarehousePartitionLocation.toString()); @@ -362,7 +362,7 @@ private TaskTracker forExistingTable(AlterTableAddPartitionDesc lastPartitionRep boolean encounteredTheLastReplicatedPartition = (lastPartitionReplicated == null); Map lastReplicatedPartSpec = null; if (!encounteredTheLastReplicatedPartition) { - lastReplicatedPartSpec = lastPartitionReplicated.getPartition(0).getPartSpec(); + lastReplicatedPartSpec = lastPartitionReplicated.getPartitions().get(0).getPartSpec(); LOG.info("Start processing from partition info spec : {}", StringUtils.mapToString(lastReplicatedPartSpec)); } @@ -370,13 +370,13 @@ private TaskTracker forExistingTable(AlterTableAddPartitionDesc lastPartitionRep Iterator partitionIterator = event.partitionDescriptions(tableDesc).iterator(); while (!encounteredTheLastReplicatedPartition && partitionIterator.hasNext()) { AlterTableAddPartitionDesc addPartitionDesc = partitionIterator.next(); - Map currentSpec = addPartitionDesc.getPartition(0).getPartSpec(); + Map currentSpec = addPartitionDesc.getPartitions().get(0).getPartSpec(); encounteredTheLastReplicatedPartition = lastReplicatedPartSpec.equals(currentSpec); } while (partitionIterator.hasNext() && tracker.canAddMoreTasks()) { AlterTableAddPartitionDesc addPartitionDesc = partitionIterator.next(); - Map partSpec = addPartitionDesc.getPartition(0).getPartSpec(); + Map partSpec = addPartitionDesc.getPartitions().get(0).getPartSpec(); Task ptnRootTask = null; ReplLoadOpType loadPtnType = getLoadPartitionType(partSpec); switch (loadPtnType) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 691f3ee2e9b4..a236490e0446 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -83,7 +83,6 @@ import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.permission.FsAction; @@ -112,7 +111,6 @@ import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.ErrorMsg; -import org.apache.hadoop.hive.ql.ddl.table.partition.AlterTableAddPartitionDesc; import org.apache.hadoop.hive.ql.ddl.table.partition.AlterTableDropPartitionDesc; import org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; @@ -132,7 +130,6 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType; -import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.session.CreateTableAutomaticGrant; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde2.Deserializer; @@ -2999,143 +2996,46 @@ public Partition createPartition(Table tbl, Map partSpec) throws } } - public List createPartitions(AlterTableAddPartitionDesc addPartitionDesc) throws HiveException { - // TODO: catalog name everywhere in this method - Table tbl = getTable(addPartitionDesc.getDbName(), addPartitionDesc.getTableName()); - int size = addPartitionDesc.getPartitionCount(); - List in = - new ArrayList(size); - long writeId; - String validWriteIdList; - - // In case of replication, get the writeId from the source and use valid write Id list - // for replication. - if (addPartitionDesc.getReplicationSpec().isInReplicationScope() && - addPartitionDesc.getPartition(0).getWriteId() > 0) { - writeId = addPartitionDesc.getPartition(0).getWriteId(); - // We need a valid writeId list for a transactional change. During replication we do not - // have a valid writeId list which was used for this on the source. But we know for sure - // that the writeId associated with it was valid then (otherwise the change would have - // failed on the source). So use a valid transaction list with only that writeId. - validWriteIdList = new ValidReaderWriteIdList(TableName.getDbTable(tbl.getDbName(), - tbl.getTableName()), - new long[0], new BitSet(), writeId).writeToString(); - } else { - AcidUtils.TableSnapshot tableSnapshot = AcidUtils.getTableSnapshot(conf, tbl, true); - if (tableSnapshot != null && tableSnapshot.getWriteId() > 0) { - writeId = tableSnapshot.getWriteId(); - validWriteIdList = tableSnapshot.getValidWriteIdList(); - } else { - writeId = -1; - validWriteIdList = null; - } - } - for (int i = 0; i < size; ++i) { - org.apache.hadoop.hive.metastore.api.Partition tmpPart = - convertAddSpecToMetaPartition(tbl, addPartitionDesc.getPartition(i), conf); - if (tmpPart != null && writeId > 0) { - tmpPart.setWriteId(writeId); - } - in.add(tmpPart); - } - List out = new ArrayList(); + public List addPartition( + List partitions, boolean ifNotExists, boolean needResults) + throws HiveException { try { - if (!addPartitionDesc.getReplicationSpec().isInReplicationScope()){ - // TODO: normally, the result is not necessary; might make sense to pass false - for (org.apache.hadoop.hive.metastore.api.Partition outPart - : getMSC().add_partitions(in, addPartitionDesc.isIfNotExists(), true)) { - out.add(new Partition(tbl, outPart)); - } - } else { - - // For replication add-ptns, we need to follow a insert-if-not-exist, alter-if-exists scenario. - // TODO : ideally, we should push this mechanism to the metastore, because, otherwise, we have - // no choice but to iterate over the partitions here. - - List partsToAdd = new ArrayList<>(); - List partsToAlter = new ArrayList<>(); - List part_names = new ArrayList<>(); - for (org.apache.hadoop.hive.metastore.api.Partition p: in){ - part_names.add(Warehouse.makePartName(tbl.getPartitionKeys(), p.getValues())); - try { - org.apache.hadoop.hive.metastore.api.Partition ptn = - getMSC().getPartition(addPartitionDesc.getDbName(), addPartitionDesc.getTableName(), p.getValues()); - if (addPartitionDesc.getReplicationSpec().allowReplacementInto(ptn.getParameters())){ - ReplicationSpec.copyLastReplId(ptn.getParameters(), p.getParameters()); - partsToAlter.add(p); - } // else ptn already exists, but we do nothing with it. - } catch (NoSuchObjectException nsoe){ - // if the object does not exist, we want to add it. - partsToAdd.add(p); - } - } - for (org.apache.hadoop.hive.metastore.api.Partition outPart - : getMSC().add_partitions(partsToAdd, addPartitionDesc.isIfNotExists(), true)) { - out.add(new Partition(tbl, outPart)); - } - EnvironmentContext ec = new EnvironmentContext(); - // In case of replication, statistics is obtained from the source, so do not update those - // on replica. - ec.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE); - getMSC().alter_partitions(addPartitionDesc.getDbName(), addPartitionDesc.getTableName(), - partsToAlter, ec, validWriteIdList, writeId); - - for ( org.apache.hadoop.hive.metastore.api.Partition outPart : - getMSC().getPartitionsByNames(addPartitionDesc.getDbName(), addPartitionDesc.getTableName(),part_names)){ - out.add(new Partition(tbl,outPart)); - } - } + return getMSC().add_partitions(partitions, ifNotExists, needResults); } catch (Exception e) { LOG.error(StringUtils.stringifyException(e)); throw new HiveException(e); } - return out; } - public static org.apache.hadoop.hive.metastore.api.Partition convertAddSpecToMetaPartition( - Table tbl, AlterTableAddPartitionDesc.PartitionDesc addSpec, final HiveConf conf) throws HiveException { - Path location = addSpec.getLocation() != null - ? new Path(tbl.getPath(), addSpec.getLocation()) : null; - if (location != null) { - // Ensure that it is a full qualified path (in most cases it will be since tbl.getPath() is full qualified) - location = new Path(Utilities.getQualifiedPath(conf, location)); - } - org.apache.hadoop.hive.metastore.api.Partition part = - Partition.createMetaPartitionObject(tbl, addSpec.getPartSpec(), location); - if (addSpec.getPartParams() != null) { - part.setParameters(addSpec.getPartParams()); - } - if (addSpec.getInputFormat() != null) { - part.getSd().setInputFormat(addSpec.getInputFormat()); - } - if (addSpec.getOutputFormat() != null) { - part.getSd().setOutputFormat(addSpec.getOutputFormat()); - } - if (addSpec.getNumBuckets() != -1) { - part.getSd().setNumBuckets(addSpec.getNumBuckets()); - } - if (addSpec.getCols() != null) { - part.getSd().setCols(addSpec.getCols()); - } - if (addSpec.getSerializationLib() != null) { - part.getSd().getSerdeInfo().setSerializationLib(addSpec.getSerializationLib()); - } - if (addSpec.getSerdeParams() != null) { - part.getSd().getSerdeInfo().setParameters(addSpec.getSerdeParams()); - } - if (addSpec.getBucketCols() != null) { - part.getSd().setBucketCols(addSpec.getBucketCols()); + public org.apache.hadoop.hive.metastore.api.Partition getPartition(String dbName, String tableName, + List params) throws HiveException { + try { + return getMSC().getPartition(dbName, tableName, params); + } catch (Exception e) { + LOG.error(StringUtils.stringifyException(e)); + throw new HiveException(e); } - if (addSpec.getSortCols() != null) { - part.getSd().setSortCols(addSpec.getSortCols()); + } + + public void alterPartitions(String dbName, String tableName, + List partitions, EnvironmentContext ec, String validWriteIdList, + long writeId) throws HiveException { + try { + getMSC().alter_partitions(dbName, tableName, partitions, ec, validWriteIdList, writeId); + } catch (Exception e) { + LOG.error(StringUtils.stringifyException(e)); + throw new HiveException(e); } - if (addSpec.getColStats() != null) { - part.setColStats(addSpec.getColStats()); - // Statistics will have an associated write Id for a transactional table. We need it to - // update column statistics. - part.setWriteId(addSpec.getWriteId()); + } + + public List getPartitionsByNames(String dbName, String tableName, + List partitionNames) throws HiveException { + try { + return getMSC().getPartitionsByNames(dbName, tableName, partitionNames); + } catch (Exception e) { + LOG.error(StringUtils.stringifyException(e)); + throw new HiveException(e); } - return part; } public Partition getPartition(Table tbl, Map partSpec, diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index f8d906f135b4..ddfa0979ea16 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -146,7 +146,6 @@ import org.apache.hadoop.hive.ql.ddl.table.storage.AlterTableSetSkewedLocationDesc; import org.apache.hadoop.hive.ql.ddl.table.storage.AlterTableSkewedByDesc; import org.apache.hadoop.hive.ql.ddl.table.storage.AlterTableUnarchiveDesc; -import org.apache.hadoop.hive.ql.ddl.table.partition.AlterTableAddPartitionDesc.PartitionDesc; import org.apache.hadoop.hive.ql.ddl.view.AlterMaterializedViewRewriteDesc; import org.apache.hadoop.hive.ql.ddl.view.DropMaterializedViewDesc; import org.apache.hadoop.hive.ql.ddl.view.DropViewDesc; @@ -3505,15 +3504,15 @@ private void analyzeAlterTableAddParts(String[] qualified, CommonTree ast, boole // ^(TOK_ALTERTABLE_ADDPARTS identifier ifNotExists? alterStatementSuffixAddPartitionsElement+) boolean ifNotExists = ast.getChild(0).getType() == HiveParser.TOK_IFNOTEXISTS; - Table tab = getTable(qualified); - boolean isView = tab.isView(); - validateAlterTableType(tab, AlterTableType.ADDPARTITION, expectView); - outputs.add(new WriteEntity(tab, + Table table = getTable(qualified); + boolean isView = table.isView(); + validateAlterTableType(table, AlterTableType.ADDPARTITION, expectView); + outputs.add(new WriteEntity(table, /*use DDL_EXCLUSIVE to cause X lock to prevent races between concurrent add partition calls with IF NOT EXISTS. w/o this 2 concurrent calls to add the same partition may both add data since for transactional tables creating partition metadata and moving data there are 2 separate actions. */ - ifNotExists && AcidUtils.isTransactionalTable(tab) ? WriteType.DDL_EXCLUSIVE + ifNotExists && AcidUtils.isTransactionalTable(table) ? WriteType.DDL_EXCLUSIVE : WriteEntity.WriteType.DDL_SHARED)); int numCh = ast.getChildCount(); @@ -3522,17 +3521,17 @@ private void analyzeAlterTableAddParts(String[] qualified, CommonTree ast, boole String currentLocation = null; Map currentPart = null; // Parser has done some verification, so the order of tokens doesn't need to be verified here. - AlterTableAddPartitionDesc addPartitionDesc = - new AlterTableAddPartitionDesc(tab.getDbName(), tab.getTableName(), ifNotExists); + + List partitions = new ArrayList<>(); for (int num = start; num < numCh; num++) { ASTNode child = (ASTNode) ast.getChild(num); switch (child.getToken().getType()) { case HiveParser.TOK_PARTSPEC: if (currentPart != null) { - addPartitionDesc.addPartition(currentPart, currentLocation); + partitions.add(createPartitionDesc(table, currentLocation, currentPart)); currentLocation = null; } - currentPart = getValidatedPartSpec(tab, child, conf, true); + currentPart = getValidatedPartSpec(table, child, conf, true); validatePartitionValues(currentPart); // validate reserved values break; case HiveParser.TOK_PARTITIONLOCATION: @@ -3550,31 +3549,21 @@ private void analyzeAlterTableAddParts(String[] qualified, CommonTree ast, boole // add the last one if (currentPart != null) { - addPartitionDesc.addPartition(currentPart, currentLocation); + partitions.add(createPartitionDesc(table, currentLocation, currentPart)); } - if (this.conf.getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) { - for (int index = 0; index < addPartitionDesc.getPartitionCount(); index++) { - PartitionDesc desc = addPartitionDesc.getPartition(index); - if (desc.getLocation() == null) { - if (desc.getPartParams() == null) { - desc.setPartParams(new HashMap()); - } - StatsSetupConst.setStatsStateForCreateTable(desc.getPartParams(), - MetaStoreUtils.getColumnNames(tab.getCols()), StatsSetupConst.TRUE); - } - } - } - - if (addPartitionDesc.getPartitionCount() == 0) { + if (partitions.isEmpty()) { // nothing to do return; } + AlterTableAddPartitionDesc addPartitionDesc = new AlterTableAddPartitionDesc(table.getDbName(), + table.getTableName(), ifNotExists, partitions); + Task ddlTask = TaskFactory.get(new DDLWork(getInputs(), getOutputs(), addPartitionDesc)); rootTasks.add(ddlTask); - handleTransactionalTable(tab, addPartitionDesc, ddlTask); + handleTransactionalTable(table, addPartitionDesc, ddlTask); if (isView) { // Compile internal query to capture underlying table partition dependencies @@ -3585,8 +3574,7 @@ private void analyzeAlterTableAddParts(String[] qualified, CommonTree ast, boole cmd.append(HiveUtils.unparseIdentifier(qualified[1])); cmd.append(" WHERE "); boolean firstOr = true; - for (int i = 0; i < addPartitionDesc.getPartitionCount(); ++i) { - AlterTableAddPartitionDesc.PartitionDesc partitionDesc = addPartitionDesc.getPartition(i); + for (AlterTableAddPartitionDesc.PartitionDesc partitionDesc : partitions) { if (firstOr) { firstOr = false; } else { @@ -3619,6 +3607,17 @@ private void analyzeAlterTableAddParts(String[] qualified, CommonTree ast, boole } } + private AlterTableAddPartitionDesc.PartitionDesc createPartitionDesc(Table table, String currentLocation, + Map currentPart) { + Map params = null; + if (conf.getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER) && currentLocation == null) { + params = new HashMap(); + StatsSetupConst.setStatsStateForCreateTable(params, + MetaStoreUtils.getColumnNames(table.getCols()), StatsSetupConst.TRUE); + } + return new AlterTableAddPartitionDesc.PartitionDesc(currentPart, currentLocation, params); + } + /** * Add partition for Transactional tables needs to add (copy/rename) the data so that it lands * in a delta_x_x/ folder in the partition dir. @@ -3631,13 +3630,12 @@ private void handleTransactionalTable(Table tab, AlterTableAddPartitionDesc addP Long writeId = null; int stmtId = 0; - for (int index = 0; index < addPartitionDesc.getPartitionCount(); index++) { - PartitionDesc desc = addPartitionDesc.getPartition(index); - if (desc.getLocation() != null) { - AcidUtils.validateAcidPartitionLocation(desc.getLocation(), conf); + for (AlterTableAddPartitionDesc.PartitionDesc partitonDesc : addPartitionDesc.getPartitions()) { + if (partitonDesc.getLocation() != null) { + AcidUtils.validateAcidPartitionLocation(partitonDesc.getLocation(), conf); if(addPartitionDesc.isIfNotExists()) { //Don't add partition data if it already exists - Partition oldPart = getPartition(tab, desc.getPartSpec(), false); + Partition oldPart = getPartition(tab, partitonDesc.getPartSpec(), false); if(oldPart != null) { continue; } @@ -3653,15 +3651,15 @@ private void handleTransactionalTable(Table tab, AlterTableAddPartitionDesc addP } stmtId = getTxnMgr().getStmtIdAndIncrement(); } - LoadTableDesc loadTableWork = new LoadTableDesc(new Path(desc.getLocation()), - Utilities.getTableDesc(tab), desc.getPartSpec(), + LoadTableDesc loadTableWork = new LoadTableDesc(new Path(partitonDesc.getLocation()), + Utilities.getTableDesc(tab), partitonDesc.getPartSpec(), LoadTableDesc.LoadFileType.KEEP_EXISTING, //not relevant - creating new partition writeId); loadTableWork.setStmtId(stmtId); loadTableWork.setInheritTableSpecs(true); try { - desc.setLocation(new Path(tab.getDataLocation(), - Warehouse.makePartPath(desc.getPartSpec())).toString()); + partitonDesc.setLocation(new Path(tab.getDataLocation(), + Warehouse.makePartPath(partitonDesc.getPartSpec())).toString()); } catch (MetaException ex) { throw new SemanticException("Could not determine partition path due to: " diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 687122a45ff9..e955989d9211 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.metastore.txn.TxnUtils; @@ -70,6 +71,9 @@ import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import com.google.common.collect.ImmutableList; + import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import java.io.IOException; @@ -322,10 +326,17 @@ public static boolean prepareImport(boolean isImportCmd, x.getInputs().add(toReadEntity(new Path(parsedLocation), x.getConf())); } - if ((parsedTableName != null) && (!parsedTableName.isEmpty())) { + if (StringUtils.isNotBlank(parsedTableName)) { tblDesc.setTableName(parsedTableName); } + if (tblDesc.getTableName() == null) { + // Either we got the tablename from the IMPORT statement (first priority) or from the export dump. + throw new SemanticException(ErrorMsg.NEED_TABLE_SPECIFICATION.getMsg()); + } else { + x.getConf().set("import.destination.table", tblDesc.getTableName()); + } + List partitionDescs = new ArrayList<>(); Iterable partitions = rv.getPartitions(); for (Partition partition : partitions) { @@ -343,7 +354,7 @@ public static boolean prepareImport(boolean isImportCmd, for (Iterator partnIter = partitionDescs .listIterator(); partnIter.hasNext(); ) { AlterTableAddPartitionDesc addPartitionDesc = partnIter.next(); - if (!found && addPartitionDesc.getPartition(0).getPartSpec().equals(parsedPartSpec)) { + if (!found && addPartitionDesc.getPartitions().get(0).getPartSpec().equals(parsedPartSpec)) { found = true; } else { partnIter.remove(); @@ -356,17 +367,6 @@ public static boolean prepareImport(boolean isImportCmd, } } - if (tblDesc.getTableName() == null) { - // Either we got the tablename from the IMPORT statement (first priority) - // or from the export dump. - throw new SemanticException(ErrorMsg.NEED_TABLE_SPECIFICATION.getMsg()); - } else { - x.getConf().set("import.destination.table", tblDesc.getTableName()); - for (AlterTableAddPartitionDesc addPartitionDesc : partitionDescs) { - addPartitionDesc.setTableName(tblDesc.getTableName()); - } - } - Warehouse wh = new Warehouse(x.getConf()); Table table = tableIfExists(tblDesc, x.getHive()); boolean tableExists = false; @@ -410,36 +410,32 @@ public static boolean prepareImport(boolean isImportCmd, return tableExists; } - private static AlterTableAddPartitionDesc getBaseAddPartitionDescFromPartition( - Path fromPath, String dbName, ImportTableDesc tblDesc, Partition partition, - ReplicationSpec replicationSpec, HiveConf conf) + private static AlterTableAddPartitionDesc getBaseAddPartitionDescFromPartition(Path fromPath, String dbName, + ImportTableDesc tblDesc, Partition partition, ReplicationSpec replicationSpec, HiveConf conf) throws MetaException, SemanticException { - AlterTableAddPartitionDesc partsDesc = new AlterTableAddPartitionDesc(dbName, tblDesc.getTableName(), - EximUtil.makePartSpec(tblDesc.getPartCols(), partition.getValues()), - partition.getSd().getLocation(), partition.getParameters()); - AlterTableAddPartitionDesc.PartitionDesc partDesc = partsDesc.getPartition(0); - partDesc.setInputFormat(partition.getSd().getInputFormat()); - partDesc.setOutputFormat(partition.getSd().getOutputFormat()); - partDesc.setNumBuckets(partition.getSd().getNumBuckets()); - partDesc.setCols(partition.getSd().getCols()); - partDesc.setSerializationLib(partition.getSd().getSerdeInfo().getSerializationLib()); - partDesc.setSerdeParams(partition.getSd().getSerdeInfo().getParameters()); - partDesc.setBucketCols(partition.getSd().getBucketCols()); - partDesc.setSortCols(partition.getSd().getSortCols()); - if (replicationSpec.isInReplicationScope() && tblDesc.isExternal() - && !replicationSpec.isMigratingToExternalTable()) { - String newLocation = ReplExternalTables - .externalTableLocation(conf, partition.getSd().getLocation()); - LOG.debug("partition {} has data location: {}", partition, newLocation); - partDesc.setLocation(newLocation); + Map partitionSpec = EximUtil.makePartSpec(tblDesc.getPartCols(), partition.getValues()); + + StorageDescriptor sd = partition.getSd(); + + String location = null; + if (replicationSpec.isInReplicationScope() && tblDesc.isExternal() && + !replicationSpec.isMigratingToExternalTable()) { + location = ReplExternalTables.externalTableLocation(conf, partition.getSd().getLocation()); + LOG.debug("partition {} has data location: {}", partition, location); } else { - partDesc.setLocation(new Path(fromPath, - Warehouse.makePartName(tblDesc.getPartCols(), partition.getValues())).toString()); + location = new Path(fromPath, Warehouse.makePartName(tblDesc.getPartCols(), partition.getValues())).toString(); } + + long writeId = -1; if (tblDesc.getReplWriteId() != null) { - partDesc.setWriteId(tblDesc.getReplWriteId()); + writeId = tblDesc.getReplWriteId(); } - return partsDesc; + + AlterTableAddPartitionDesc.PartitionDesc partitionDesc = new AlterTableAddPartitionDesc.PartitionDesc( + partitionSpec, location, partition.getParameters(), sd.getInputFormat(), sd.getOutputFormat(), + sd.getNumBuckets(), sd.getCols(), sd.getSerdeInfo().getSerializationLib(), sd.getSerdeInfo().getParameters(), + sd.getBucketCols(), sd.getSortCols(), null, writeId); + return new AlterTableAddPartitionDesc(dbName, tblDesc.getTableName(), true, ImmutableList.of(partitionDesc)); } private static ImportTableDesc getBaseCreateTableDescFromTable(String dbName, @@ -566,11 +562,10 @@ private static Task alterSinglePartition( ImportTableDesc tblDesc, Table table, Warehouse wh, AlterTableAddPartitionDesc addPartitionDesc, ReplicationSpec replicationSpec, org.apache.hadoop.hive.ql.metadata.Partition ptn, EximUtil.SemanticAnalyzerWrapperContext x) throws MetaException, IOException, HiveException { - addPartitionDesc.setReplaceMode(true); if ((replicationSpec != null) && (replicationSpec.isInReplicationScope())) { addPartitionDesc.setReplicationSpec(replicationSpec); } - AlterTableAddPartitionDesc.PartitionDesc partSpec = addPartitionDesc.getPartition(0); + AlterTableAddPartitionDesc.PartitionDesc partSpec = addPartitionDesc.getPartitions().get(0); if (ptn == null) { fixLocationInPartSpec(tblDesc, table, wh, replicationSpec, partSpec, x); } else if (!externalTablePartition(tblDesc, replicationSpec)) { @@ -584,7 +579,7 @@ private static Task addSinglePartition(ImportTableDesc tblDesc, ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext x, Long writeId, int stmtId) throws MetaException, IOException, HiveException { - AlterTableAddPartitionDesc.PartitionDesc partSpec = addPartitionDesc.getPartition(0); + AlterTableAddPartitionDesc.PartitionDesc partSpec = addPartitionDesc.getPartitions().get(0); boolean isAutoPurge = false; boolean needRecycle = false; boolean copyToMigratedTxnTable = replicationSpec.isMigratingToTxnTable(); @@ -1038,7 +1033,7 @@ private static void createRegularImportTasks( x.getLOG().debug("table partitioned"); for (AlterTableAddPartitionDesc addPartitionDesc : partitionDescs) { - Map partSpec = addPartitionDesc.getPartition(0).getPartSpec(); + Map partSpec = addPartitionDesc.getPartitions().get(0).getPartSpec(); org.apache.hadoop.hive.ql.metadata.Partition ptn = null; if ((ptn = x.getHive().getPartition(table, partSpec, false)) == null) { x.getTasks().add(addSinglePartition( @@ -1235,7 +1230,7 @@ private static void createReplImportTasks( } if (updatedMetadata != null) { updatedMetadata.addPartition(table.getDbName(), table.getTableName(), - addPartitionDesc.getPartition(0).getPartSpec()); + addPartitionDesc.getPartitions().get(0).getPartSpec()); } } } else if (!replicationSpec.isMetadataOnly() @@ -1289,7 +1284,7 @@ private static void createReplImportTasks( x.getLOG().debug("table partitioned"); for (AlterTableAddPartitionDesc addPartitionDesc : partitionDescs) { addPartitionDesc.setReplicationSpec(replicationSpec); - Map partSpec = addPartitionDesc.getPartition(0).getPartSpec(); + Map partSpec = addPartitionDesc.getPartitions().get(0).getPartSpec(); org.apache.hadoop.hive.ql.metadata.Partition ptn = null; if (isOldTableValid) { // If existing table is valid but the partition spec is different, then ignore partition @@ -1308,15 +1303,13 @@ private static void createReplImportTasks( x.getTasks().add(addSinglePartition( tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId)); if (updatedMetadata != null) { - updatedMetadata.addPartition(table.getDbName(), table.getTableName(), - addPartitionDesc.getPartition(0).getPartSpec()); + updatedMetadata.addPartition(table.getDbName(), table.getTableName(), partSpec); } } else { x.getTasks().add(alterSinglePartition( tblDesc, table, wh, addPartitionDesc, replicationSpec, null, x)); if (updatedMetadata != null) { - updatedMetadata.addPartition(table.getDbName(), table.getTableName(), - addPartitionDesc.getPartition(0).getPartSpec()); + updatedMetadata.addPartition(table.getDbName(), table.getTableName(), partSpec); } } } else { @@ -1331,8 +1324,7 @@ private static void createReplImportTasks( tblDesc, table, wh, addPartitionDesc, replicationSpec, ptn, x)); } if (updatedMetadata != null) { - updatedMetadata.addPartition(table.getDbName(), table.getTableName(), - addPartitionDesc.getPartition(0).getPartSpec()); + updatedMetadata.addPartition(table.getDbName(), table.getTableName(), partSpec); } if (lockType == WriteEntity.WriteType.DDL_NO_LOCK){ lockType = WriteEntity.WriteType.DDL_SHARED; diff --git a/ql/src/test/results/clientpositive/add_part_multiple.q.out b/ql/src/test/results/clientpositive/add_part_multiple.q.out index 81454f7dc5a7..8fbbe2ae0b0e 100644 --- a/ql/src/test/results/clientpositive/add_part_multiple.q.out +++ b/ql/src/test/results/clientpositive/add_part_multiple.q.out @@ -30,8 +30,22 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Add Partition -#### A masked pattern was here #### - Spec: {ds=2010-01-01}, {ds=2010-02-01}, {ds=2010-03-01}, {ds=2010-04-01} + db name: default + partitions: + Partition + location: A + partition spec: {ds=2010-01-01} + Partition + location: B + partition spec: {ds=2010-02-01} + Partition + params: {totalSize=0, numRows=0, rawDataSize=0, COLUMN_STATS_ACCURATE={"BASIC_STATS":"true","COLUMN_STATS":{"key":"true","value":"true"}}, numFiles=0, numFilesErasureCoded=0} + partition spec: {ds=2010-03-01} + Partition + location: C + partition spec: {ds=2010-04-01} + table name: add_part_test_n1 + if not exists: true PREHOOK: query: ALTER TABLE add_part_test_n1 ADD IF NOT EXISTS PARTITION (ds='2010-01-01') location 'A' diff --git a/ql/src/test/results/clientpositive/drop_partitions_filter.q.out b/ql/src/test/results/clientpositive/drop_partitions_filter.q.out index 457fa0bc2fc2..edfbbf7127e8 100644 --- a/ql/src/test/results/clientpositive/drop_partitions_filter.q.out +++ b/ql/src/test/results/clientpositive/drop_partitions_filter.q.out @@ -33,8 +33,12 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Add Partition -#### A masked pattern was here #### - Spec: {c=US, d=1} + db name: default + partitions: + Partition + params: {totalSize=0, numRows=0, rawDataSize=0, COLUMN_STATS_ACCURATE={"BASIC_STATS":"true","COLUMN_STATS":{"a":"true","b":"true"}}, numFiles=0, numFilesErasureCoded=0} + partition spec: {c=US, d=1} + table name: ptestfilter_n1 PREHOOK: query: alter table ptestfilter_n1 add partition (c='US', d=1) PREHOOK: type: ALTERTABLE_ADDPARTS diff --git a/ql/src/test/results/clientpositive/llap/add_part_with_loc.q.out b/ql/src/test/results/clientpositive/llap/add_part_with_loc.q.out index b307c021aba5..1c58917d6732 100644 --- a/ql/src/test/results/clientpositive/llap/add_part_with_loc.q.out +++ b/ql/src/test/results/clientpositive/llap/add_part_with_loc.q.out @@ -31,8 +31,12 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Add Partition -#### A masked pattern was here #### - Spec: {day=20110102} + db name: default + partitions: + Partition + location: hdfs://### HDFS PATH ### + partition spec: {day=20110102} + table name: supply Stage: Stage-1 Move Operator diff --git a/ql/src/test/results/clientpositive/spark/add_part_multiple.q.out b/ql/src/test/results/clientpositive/spark/add_part_multiple.q.out index 81454f7dc5a7..8fbbe2ae0b0e 100644 --- a/ql/src/test/results/clientpositive/spark/add_part_multiple.q.out +++ b/ql/src/test/results/clientpositive/spark/add_part_multiple.q.out @@ -30,8 +30,22 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Add Partition -#### A masked pattern was here #### - Spec: {ds=2010-01-01}, {ds=2010-02-01}, {ds=2010-03-01}, {ds=2010-04-01} + db name: default + partitions: + Partition + location: A + partition spec: {ds=2010-01-01} + Partition + location: B + partition spec: {ds=2010-02-01} + Partition + params: {totalSize=0, numRows=0, rawDataSize=0, COLUMN_STATS_ACCURATE={"BASIC_STATS":"true","COLUMN_STATS":{"key":"true","value":"true"}}, numFiles=0, numFilesErasureCoded=0} + partition spec: {ds=2010-03-01} + Partition + location: C + partition spec: {ds=2010-04-01} + table name: add_part_test_n1 + if not exists: true PREHOOK: query: ALTER TABLE add_part_test_n1 ADD IF NOT EXISTS PARTITION (ds='2010-01-01') location 'A' diff --git a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java index f2beafea9ee3..f4e71f915b22 100644 --- a/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java +++ b/streaming/src/java/org/apache/hive/streaming/HiveStreamingConnection.java @@ -48,7 +48,7 @@ import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.Table; -import org.apache.hadoop.hive.ql.ddl.table.partition.AlterTableAddPartitionDesc; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hive.common.util.ShutdownHookManager; import org.apache.thrift.TException; @@ -437,11 +437,13 @@ public PartitionInfo createPartitionIfNotExists(final List partitionValu try { Map partSpec = Warehouse.makeSpecFromValues(tableObject.getPartitionKeys(), partitionValues); - AlterTableAddPartitionDesc addPartitionDesc = new AlterTableAddPartitionDesc(database, table, true); + + Path location = new Path(tableObject.getDataLocation(), Warehouse.makePartPath(partSpec)); + location = new Path(Utilities.getQualifiedPath(conf, location)); + partLocation = location.toString(); partName = Warehouse.makePartName(tableObject.getPartitionKeys(), partitionValues); - partLocation = new Path(tableObject.getDataLocation(), Warehouse.makePartPath(partSpec)).toString(); - addPartitionDesc.addPartition(partSpec, partLocation); - Partition partition = Hive.convertAddSpecToMetaPartition(tableObject, addPartitionDesc.getPartition(0), conf); + Partition partition = + org.apache.hadoop.hive.ql.metadata.Partition.createMetaPartitionObject(tableObject, partSpec, location); if (getMSC() == null) { // We assume it doesn't exist if we can't check it