diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 294c423d665c..be5eb484467f 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1292,7 +1292,7 @@ public static enum ConfVars { * @deprecated Use MetastoreConf.BATCH_RETRIEVE_MAX */ @Deprecated - METASTORE_BATCH_RETRIEVE_MAX("hive.metastore.batch.retrieve.max", 300, + METASTORE_BATCH_RETRIEVE_MAX("hive.metastore.batch.retrieve.max", 1000, "Maximum number of objects (tables/partitions) can be retrieved from metastore in one batch. \n" + "The higher the number, the less the number of round trips is needed to the Hive metastore server, \n" + "but it may also cause higher memory requirement at the client side."), @@ -3021,6 +3021,13 @@ public static enum ConfVars { HIVE_ZOOKEEPER_KILLQUERY_NAMESPACE("hive.zookeeper.killquery.namespace", "killQueries", "When kill query coordination is enabled, uses this namespace for registering queries to kill with zookeeper"), + HIVE_GETPARTITIONS_MAX_RETRIES("hive.getpartitions.max.retries", 5, + "Maximum number of retries for the Hive.GetAllPartitionsOf command when getting partitions in batches.\n " + + "If the value is greater than zero it will retry getting partitions until the maximum\n" + + "number of attempts is reached or batch size is reduced to 0, whichever is earlier.\n" + + "In each retry attempt it will reduce the batch size by a factor of 2 until it reaches zero.\n" + + "If the value is set to zero it will retry until the batch size becomes zero as described above."), + // Transactions HIVE_TXN_MANAGER("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager", diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 5c91bc1abc61..5259653f4cf5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -117,6 +117,8 @@ import org.apache.hadoop.hive.metastore.api.SourceTable; import org.apache.hadoop.hive.metastore.api.UpdateTransactionalStatsRequest; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.utils.RetryUtilities; import org.apache.hadoop.hive.ql.ddl.table.AlterTableType; import org.apache.hadoop.hive.ql.io.HdfsUtils; import org.apache.hadoop.hive.metastore.HiveMetaException; @@ -4139,27 +4141,42 @@ public List getPartitions(Table tbl) throws HiveException { } /** - * Get all the partitions; unlike {@link #getPartitions(Table)}, does not include auth. + * Get all the partitions in batches; unlike {@link #getPartitions(Table)}, does not include auth. * @param tbl table for which partitions are needed * @return list of partition objects */ public Set getAllPartitionsOf(Table tbl) throws HiveException { + int batchSize= MetastoreConf.getIntVar( + Hive.get().getConf(), MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX); + int maxRetries = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_GETPARTITIONS_MAX_RETRIES); + return getAllPartitionsOf(tbl, batchSize, 2, maxRetries); + } + + public Set getAllPartitionsOf(Table tbl, int batchSize, int decayingFactor, + int maxRetries) throws HiveException { if (!tbl.isPartitioned()) { return Sets.newHashSet(new Partition(tbl)); } - - List tParts; + Set result = new LinkedHashSet<>(); + RetryUtilities.ExponentiallyDecayingBatchWork batchTask = new RetryUtilities + .ExponentiallyDecayingBatchWork(batchSize, decayingFactor, maxRetries) { + @Override + public Void execute(int size) throws HiveException { + try { + result.clear(); + new PartitionIterable(Hive.get(), tbl, null, size).forEach(result::add); + return null; + } catch (HiveException e) { + throw e; + } + } + }; try { - tParts = getMSC().listPartitions(tbl.getDbName(), tbl.getTableName(), (short)-1); + batchTask.run(); } catch (Exception e) { - LOG.error("Failed getAllPartitionsOf", e); throw new HiveException(e); } - Set parts = new LinkedHashSet(tParts.size()); - for (org.apache.hadoop.hive.metastore.api.Partition tpart : tParts) { - parts.add(new Partition(tbl, tpart)); - } - return parts; + return result; } /** diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestGetPartitionInBatches.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestGetPartitionInBatches.java new file mode 100644 index 000000000000..3bb56913f233 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestGetPartitionInBatches.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.metastore.client.builder.PartitionBuilder; +import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.stats.StatsUtils; +import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hadoop.util.StringUtils; +import org.junit.*; +import org.mockito.ArgumentCaptor; + +import java.util.*; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; +import static org.mockito.Mockito.times; + +public class TestGetPartitionInBatches { + + private final String catName = "hive"; + private final String dbName = "default"; + private final String tableName = "test_partition_batch"; + private static HiveConf hiveConf; + private static HiveMetaStoreClient db; + private static Hive hive; + private Table table; + + @BeforeClass + public static void setupClass() throws HiveException { + hiveConf = new HiveConf(TestMsckCreatePartitionsInBatches.class); + hive = Hive.get(); + SessionState.start(hiveConf); + try { + db = new HiveMetaStoreClient(hiveConf); + } catch (MetaException e) { + throw new HiveException(e); + } + } + + @Before + public void before() throws Exception { + createPartitionedTable(catName, dbName, tableName); + table = db.getTable(catName, dbName, tableName); + addPartitions(dbName, tableName); + } + + @After + public void after() throws Exception { + cleanUpTableQuietly(catName, dbName, tableName); + } + + private Table createPartitionedTable(String catName, String dbName, String tableName) throws Exception { + try { + db.dropTable(catName, dbName, tableName); + Table table = new Table(); + table.setCatName(catName); + table.setDbName(dbName); + table.setTableName(tableName); + FieldSchema col1 = new FieldSchema("key", "string", ""); + FieldSchema col2 = new FieldSchema("value", "int", ""); + FieldSchema col3 = new FieldSchema("city", "string", ""); + StorageDescriptor sd = new StorageDescriptor(); + sd.setSerdeInfo(new SerDeInfo()); + sd.setInputFormat(TextInputFormat.class.getCanonicalName()); + sd.setOutputFormat(HiveIgnoreKeyTextOutputFormat.class.getCanonicalName()); + sd.setCols(Arrays.asList(col1, col2)); + table.setPartitionKeys(Arrays.asList(col3)); + table.setSd(sd); + db.createTable(table); + return db.getTable(catName, dbName, tableName); + } catch (Exception exception) { + fail("Unable to drop and create table " + StatsUtils.getFullyQualifiedTableName(dbName, tableName) + " because " + + StringUtils.stringifyException(exception)); + throw exception; + } + } + + private void addPartitions(String dbName, String tableName) throws Exception { + List partitions = new ArrayList<>(); + for (int i = 0; i < 30; i++) { + partitions.add(buildPartition(dbName, tableName, String.valueOf(i), table.getSd().getLocation() + "/city=" + String.valueOf(i))); + } + db.add_partitions(partitions, true, true); + } + + protected Partition buildPartition(String dbName, String tableName, String value, + String location) throws MetaException { + return new PartitionBuilder() + .setDbName(dbName) + .setTableName(tableName) + .addValue(value) + .addCol("test_id", "int", "test col id") + .addCol("test_value", "string", "test col value") + .setLocation(location) + .build(hiveConf); + } + + private void cleanUpTableQuietly(String catName, String dbName, String tableName) { + try { + db.dropTable(catName, dbName, tableName); + } catch (Exception exception) { + fail("Unexpected exception: " + StringUtils.stringifyException(exception)); + } + } + + /** + * Tests the number of partitions recieved from the HMS + * + * @throws Exception + */ + @Test + public void testgetAllPartitionsOf() throws Exception { + Set part = hive.getAllPartitionsOf(hive.getTable(dbName, tableName)); + Assert.assertEquals(part.size(), 30); + } + + /** + * Tests the number of times Hive.getAllPartitionsOf calls are executed with total number of + * partitions to be added are equally divisible by batch size + * + * @throws Exception + */ + @Test + public void testNumberOfGetPartitionCalls() throws Exception { + HiveMetaStoreClient spyMSC = spy(db); + hive.setMSC(spyMSC); + // test with a batch size of 10 and decaying factor of 2 + hive.getAllPartitionsOf(hive.getTable(dbName, tableName),10, 2, 0); + ArgumentCaptor req = ArgumentCaptor.forClass(GetPartitionsByNamesRequest.class); + // there should be 3 calls to get partitions + verify(spyMSC, times(3)).getPartitionsByNames(req.capture()); + Assert.assertEquals(10, req.getValue().getNames().size()); + } + + /** + * Tests the number of times Hive.getAllPartitionsOf calls are executed with total number of + * partitions to be added are not exactly divisible by batch size + * + * @throws Exception + */ + @Test + public void testUnevenNumberOfGetPartitionCalls() throws Exception { + HiveMetaStoreClient spyMSC = spy(db); + hive.setMSC(spyMSC); + // there should be 2 calls to get partitions with batch sizes of 19, 11 + hive.getAllPartitionsOf(hive.getTable(dbName, tableName),19, 2, 0); + ArgumentCaptor req = ArgumentCaptor.forClass(GetPartitionsByNamesRequest.class); + // there should be 2 calls to get partitions + verify(spyMSC, times(2)).getPartitionsByNames(req.capture()); + // confirm the batch sizes were 19, 11 in the two calls to get partitions + List apds = req.getAllValues(); + Assert.assertEquals(19, apds.get(0).getNames().size()); + Assert.assertEquals(11, apds.get(1).getNames().size()); + } + + /** + * Tests the number of times Hive.getAllPartitionsOf calls are executed with total number of + * partitions to is less than batch size + * + * @throws Exception + */ + @Test + public void testSmallNumberOfPartitions() throws Exception { + HiveMetaStoreClient spyMSC = spy(db); + hive.setMSC(spyMSC); + hive.getAllPartitionsOf(hive.getTable(dbName, tableName),100, 2, 0); + ArgumentCaptor req = ArgumentCaptor.forClass(GetPartitionsByNamesRequest.class); + // there should be 1 call to get partitions + verify(spyMSC, times(1)).getPartitionsByNames(req.capture()); + Assert.assertEquals(30, req.getValue().getNames().size()); + } + + /** + * Tests the retries exhausted case when getAllPartitionsOf method call always keep throwing + * HiveException. The batch sizes should exponentially decreased based on the decaying factor and + * ultimately give up when it reaches 0 + * + * @throws Exception + */ + @Test + public void testRetriesExhaustedBatchSize() throws Exception { + HiveMetaStoreClient spyMSC = spy(db); + hive.setMSC(spyMSC); + doThrow(MetaException.class).when(spyMSC).getPartitionsByNames(any()); + try { + hive.getAllPartitionsOf(hive.getTable(dbName, tableName), 30, 2, 0); + } catch (Exception ignored) {} + ArgumentCaptor req = ArgumentCaptor.forClass(GetPartitionsByNamesRequest.class); + // there should be 5 call to get partitions with batch sizes as 30, 15, 7, 3, 1 + verify(spyMSC, times(5)).getPartitionsByNames(req.capture()); + List apds = req.getAllValues(); + Assert.assertEquals(5, apds.size()); + + Assert.assertEquals(30, apds.get(0).getNames().size()); + Assert.assertEquals(15, apds.get(1).getNames().size()); + Assert.assertEquals(7, apds.get(2).getNames().size()); + Assert.assertEquals(3, apds.get(3).getNames().size()); + Assert.assertEquals(1, apds.get(4).getNames().size()); + } + + /** + * Tests the maximum retry attempts provided by configuration + * @throws Exception + */ + @Test + public void testMaxRetriesReached() throws Exception { + HiveMetaStoreClient spyMSC = spy(db); + hive.setMSC(spyMSC); + doThrow(MetaException.class).when(spyMSC).getPartitionsByNames(any()); + try { + hive.getAllPartitionsOf(hive.getTable(dbName, tableName), 30, 2, 2); + } catch (Exception ignored) {} + ArgumentCaptor req = ArgumentCaptor.forClass(GetPartitionsByNamesRequest.class); + // there should be 2 call to get partitions with batch sizes as 30, 15 + verify(spyMSC, times(2)).getPartitionsByNames(req.capture()); + List apds = req.getAllValues(); + Assert.assertEquals(2, apds.size()); + + Assert.assertEquals(30, apds.get(0).getNames().size()); + Assert.assertEquals(15, apds.get(1).getNames().size()); + } +} diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index 4e0c136fb5d0..a2d310ead8a4 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -349,7 +349,7 @@ public enum ConfVars { "Auto creates necessary schema on a startup if one doesn't exist. Set this to false, after creating it once." + "To enable auto create also set hive.metastore.schema.verification=false. Auto creation is not " + "recommended for production use cases, run schematool command instead." ), - BATCH_RETRIEVE_MAX("metastore.batch.retrieve.max", "hive.metastore.batch.retrieve.max", 300, + BATCH_RETRIEVE_MAX("metastore.batch.retrieve.max", "hive.metastore.batch.retrieve.max", 1000, "Maximum number of objects (tables/partitions) can be retrieved from metastore in one batch. \n" + "The higher the number, the less the number of round trips is needed to the Hive metastore server, \n" + "but it may also cause higher memory requirement at the client side."),