diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 294c423d665c..b33bf3a54a7e 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3021,6 +3021,13 @@ public static enum ConfVars { HIVE_ZOOKEEPER_KILLQUERY_NAMESPACE("hive.zookeeper.killquery.namespace", "killQueries", "When kill query coordination is enabled, uses this namespace for registering queries to kill with zookeeper"), + HIVE_GETPARTITIONS_MAX_RETRIES("hive.getpartitions.max.retries", 5, + "Maximum number of retries for the Hive.GetAllPartitionsOf command when getting partitions in batches.\n " + + "If the value is greater than zero it will retry getting partitions until the maximum\n" + + "number of attempts is reached or batch size is reduced to 0, whichever is earlier.\n" + + "In each retry attempt it will reduce the batch size by a factor of 2 until it reaches zero.\n" + + "If the value is set to zero it will retry until the batch size becomes zero as described above."), + // Transactions HIVE_TXN_MANAGER("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager", diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index e013db7db266..f9a331efda8b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -117,6 +117,8 @@ import org.apache.hadoop.hive.metastore.api.SourceTable; import org.apache.hadoop.hive.metastore.api.UpdateTransactionalStatsRequest; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; +import org.apache.hadoop.hive.metastore.utils.RetryUtilities; import org.apache.hadoop.hive.ql.ddl.table.AlterTableType; import org.apache.hadoop.hive.ql.io.HdfsUtils; import org.apache.hadoop.hive.metastore.HiveMetaException; @@ -4143,11 +4145,11 @@ public List getPartitions(Table tbl) throws HiveException { * @param tbl table for which partitions are needed * @return list of partition objects */ - public Set getAllPartitionsOf(Table tbl) throws HiveException { + public Set getAllPartitions(Table tbl) throws HiveException { if (!tbl.isPartitioned()) { return Sets.newHashSet(new Partition(tbl)); } - + List tParts; try { tParts = getMSC().listPartitions(tbl.getDbName(), tbl.getTableName(), (short)-1); @@ -4162,6 +4164,50 @@ public Set getAllPartitionsOf(Table tbl) throws HiveException { return parts; } + /** + * Get all the partitions. Do it in batches if batchSize is more than 0 else get it in one call. + * @param tbl table for which partitions are needed + * @return list of partition objects + */ + public Set getAllPartitionsOf(Table tbl) throws HiveException { + int defaultDecayingFactor = 2; + int batchSize= MetastoreConf.getIntVar( + Hive.get().getConf(), MetastoreConf.ConfVars.BATCH_RETRIEVE_MAX); + if (batchSize > 0) { + return getAllPartitionsInBatches(tbl, batchSize, defaultDecayingFactor, HiveConf + .getIntVar(conf, HiveConf.ConfVars.HIVE_GETPARTITIONS_MAX_RETRIES)); + } else { + return getAllPartitions(tbl); + } + } + + public Set getAllPartitionsInBatches(Table tbl, int batchSize, int decayingFactor, + int maxRetries) throws HiveException { + if (!tbl.isPartitioned()) { + return Sets.newHashSet(new Partition(tbl)); + } + Set result = new LinkedHashSet<>(); + RetryUtilities.ExponentiallyDecayingBatchWork batchTask = new RetryUtilities + .ExponentiallyDecayingBatchWork(batchSize, decayingFactor, maxRetries) { + @Override + public Void execute(int size) throws HiveException { + try { + result.clear(); + new PartitionIterable(Hive.get(), tbl, null, size).forEach(result::add); + return null; + } catch (HiveException e) { + throw e; + } + } + }; + try { + batchTask.run(); + } catch (Exception e) { + throw new HiveException(e); + } + return result; + } + /** * get all the partitions of the table that matches the given partial * specification. partition columns whose value is can be anything should be diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestGetPartitionInBatches.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestGetPartitionInBatches.java new file mode 100644 index 000000000000..ed2408f38bb3 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestGetPartitionInBatches.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesRequest; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.client.builder.PartitionBuilder; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.junit.Assert; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class TestGetPartitionInBatches { + + private final String catName = "hive"; + private final String dbName = "default"; + private final String tableName = "test_partition_batch"; + private static HiveConf hiveConf; + private static HiveMetaStoreClient db; + private static Hive hive; + private Table table; + + @BeforeClass + public static void setupClass() throws HiveException { + hiveConf = new HiveConf(TestGetPartitionInBatches.class); + hive = Hive.get(); + SessionState.start(hiveConf); + try { + db = new HiveMetaStoreClient(hiveConf); + } catch (MetaException e) { + throw new HiveException(e); + } + } + + @Before + public void before() throws Exception { + TestMsckCreatePartitionsInBatches.createPartitionedTable(db, catName, dbName, tableName); + table = db.getTable(catName, dbName, tableName); + addPartitions(dbName, tableName); + } + + @After + public void after() throws Exception { + TestMsckCreatePartitionsInBatches.cleanUpTableQuietly(db, catName, dbName, tableName); + } + + private void addPartitions(String dbName, String tableName) throws Exception { + List partitions = new ArrayList<>(); + for (int i = 0; i < 30; i++) { + partitions.add(buildPartition(dbName, tableName, String.valueOf(i), table.getSd().getLocation() + "/city=" + i)); + } + db.add_partitions(partitions, true, true); + } + + protected Partition buildPartition(String dbName, String tableName, String value, + String location) throws MetaException { + return new PartitionBuilder() + .setDbName(dbName) + .setTableName(tableName) + .addValue(value) + .addCol("test_id", "int", "test col id") + .addCol("test_value", "string", "test col value") + .setLocation(location) + .build(hiveConf); + } + + /** + * Tests the number of partitions recieved from the HMS + * + * @throws Exception + */ + @Test + public void testgetAllPartitionsOf() throws Exception { + Set part = hive.getAllPartitionsOf(hive.getTable(dbName, tableName)); + Assert.assertEquals(part.size(), 30); + } + + /** + * Tests the number of times Hive.getAllPartitionsOf calls are executed with total number of + * partitions to be added are equally divisible by batch size + * + * @throws Exception + */ + @Test + public void testNumberOfGetPartitionCalls() throws Exception { + HiveMetaStoreClient spyMSC = spy(db); + hive.setMSC(spyMSC); + // test with a batch size of 10 and decaying factor of 2 + hive.getAllPartitionsInBatches(hive.getTable(dbName, tableName),10, 2, 0); + ArgumentCaptor req = ArgumentCaptor.forClass(GetPartitionsByNamesRequest.class); + // there should be 3 calls to get partitions + verify(spyMSC, times(3)).getPartitionsByNames(req.capture()); + Assert.assertEquals(10, req.getValue().getNames().size()); + } + + /** + * Tests the number of times Hive.getAllPartitionsOf calls are executed with total number of + * partitions to be added are not exactly divisible by batch size + * + * @throws Exception + */ + @Test + public void testUnevenNumberOfGetPartitionCalls() throws Exception { + HiveMetaStoreClient spyMSC = spy(db); + hive.setMSC(spyMSC); + // there should be 2 calls to get partitions with batch sizes of 19, 11 + hive.getAllPartitionsInBatches(hive.getTable(dbName, tableName),19, 2, 0); + ArgumentCaptor req = ArgumentCaptor.forClass(GetPartitionsByNamesRequest.class); + // there should be 2 calls to get partitions + verify(spyMSC, times(2)).getPartitionsByNames(req.capture()); + // confirm the batch sizes were 19, 11 in the two calls to get partitions + List apds = req.getAllValues(); + Assert.assertEquals(19, apds.get(0).getNames().size()); + Assert.assertEquals(11, apds.get(1).getNames().size()); + } + + /** + * Tests the number of times Hive.getAllPartitionsOf calls are executed with total number of + * partitions to is less than batch size + * + * @throws Exception + */ + @Test + public void testSmallNumberOfPartitions() throws Exception { + HiveMetaStoreClient spyMSC = spy(db); + hive.setMSC(spyMSC); + hive.getAllPartitionsInBatches(hive.getTable(dbName, tableName),100, 2, 0); + ArgumentCaptor req = ArgumentCaptor.forClass(GetPartitionsByNamesRequest.class); + // there should be 1 call to get partitions + verify(spyMSC, times(1)).getPartitionsByNames(req.capture()); + Assert.assertEquals(30, req.getValue().getNames().size()); + } + + /** + * Tests the retries exhausted case when getAllPartitionsOf method call always keep throwing + * HiveException. The batch sizes should exponentially decreased based on the decaying factor and + * ultimately give up when it reaches 0 + * + * @throws Exception + */ + @Test + public void testRetriesExhaustedBatchSize() throws Exception { + HiveMetaStoreClient spyMSC = spy(db); + hive.setMSC(spyMSC); + doThrow(MetaException.class).when(spyMSC).getPartitionsByNames(any()); + try { + hive.getAllPartitionsInBatches(hive.getTable(dbName, tableName), 30, 2, 0); + } catch (Exception ignored) {} + ArgumentCaptor req = ArgumentCaptor.forClass(GetPartitionsByNamesRequest.class); + // there should be 5 call to get partitions with batch sizes as 30, 15, 7, 3, 1 + verify(spyMSC, times(5)).getPartitionsByNames(req.capture()); + List apds = req.getAllValues(); + Assert.assertEquals(5, apds.size()); + + Assert.assertEquals(30, apds.get(0).getNames().size()); + Assert.assertEquals(15, apds.get(1).getNames().size()); + Assert.assertEquals(7, apds.get(2).getNames().size()); + Assert.assertEquals(3, apds.get(3).getNames().size()); + Assert.assertEquals(1, apds.get(4).getNames().size()); + } + + /** + * Tests the maximum retry attempts provided by configuration + * @throws Exception + */ + @Test + public void testMaxRetriesReached() throws Exception { + HiveMetaStoreClient spyMSC = spy(db); + hive.setMSC(spyMSC); + doThrow(MetaException.class).when(spyMSC).getPartitionsByNames(any()); + try { + hive.getAllPartitionsInBatches(hive.getTable(dbName, tableName), 30, 2, 2); + } catch (Exception ignored) {} + ArgumentCaptor req = ArgumentCaptor.forClass(GetPartitionsByNamesRequest.class); + // there should be 2 call to get partitions with batch sizes as 30, 15 + verify(spyMSC, times(2)).getPartitionsByNames(req.capture()); + List apds = req.getAllValues(); + Assert.assertEquals(2, apds.size()); + + Assert.assertEquals(30, apds.get(0).getNames().size()); + Assert.assertEquals(15, apds.get(1).getNames().size()); + } + + /** + * Tests the number of calls to getPartitions and the respective batch sizes when first call to + * getPartitions throws HiveException. The batch size should be reduced by the decayingFactor + * and the second call should fetch all the results + * + * @throws Exception + */ + @Test + public void testBatchingWhenException() throws Exception { + HiveMetaStoreClient spyMSC = spy(db); + hive.setMSC(spyMSC); + // This will throw exception only the first time. + Mockito.doThrow(new MetaException()).doCallRealMethod() + .when(spyMSC).getPartitionsByNames(any()); + hive.getAllPartitionsInBatches(hive.getTable(dbName, tableName), 30, 2, 3); + ArgumentCaptor req = ArgumentCaptor.forClass(GetPartitionsByNamesRequest.class); + // The first call with batch size of 30 will fail, the rest two call will be of size 15 each. Total 3 calls + verify(spyMSC, times(3)).getPartitionsByNames(req.capture()); + List apds = req.getAllValues(); + Assert.assertEquals(3, apds.size()); + + Assert.assertEquals(30, apds.get(0).getNames().size()); + Assert.assertEquals(15, apds.get(1).getNames().size()); + Assert.assertEquals(15, apds.get(2).getNames().size()); + + Set partNames = new HashSet<>(apds.get(1).getNames()); + partNames.addAll(apds.get(2).getNames()); + assert(partNames.size() == 30); + + List partitionNames = hive.getPartitionNames(table.getDbName(),table.getTableName(), (short) -1); + assert(partitionNames.size() == 30); + partitionNames.forEach(partNames::remove); + assert(partitionNames.size() == 30); + // In case any duplicate/incomplete list is given by hive.getAllPartitionsInBatches, the below assertion will fail + assert(partNames.size() == 0); + } +} diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestMsckCreatePartitionsInBatches.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestMsckCreatePartitionsInBatches.java index 30c893223825..a762aac430ef 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestMsckCreatePartitionsInBatches.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestMsckCreatePartitionsInBatches.java @@ -88,17 +88,18 @@ public static void setupClass() throws HiveException, MetaException { @Before public void before() throws Exception { - createPartitionedTable(catName, dbName, tableName); + createPartitionedTable(db, catName, dbName, tableName); table = db.getTable(catName, dbName, tableName); repairOutput = new ArrayList(); } @After public void after() throws Exception { - cleanUpTableQuietly(catName, dbName, tableName); + cleanUpTableQuietly(db, catName, dbName, tableName); } - private Table createPartitionedTable(String catName, String dbName, String tableName) throws Exception { + public static Table createPartitionedTable(IMetaStoreClient db, String catName, String dbName, String tableName) + throws Exception { try { db.dropTable(catName, dbName, tableName); Table table = new Table(); @@ -124,7 +125,7 @@ private Table createPartitionedTable(String catName, String dbName, String table } } - private void cleanUpTableQuietly(String catName, String dbName, String tableName) { + public static void cleanUpTableQuietly(IMetaStoreClient db, String catName, String dbName, String tableName) { try { db.dropTable(catName, dbName, tableName); } catch (Exception exception) { diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestMsckDropPartitionsInBatches.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestMsckDropPartitionsInBatches.java index a61e066266a0..74368a9f5030 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestMsckDropPartitionsInBatches.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestMsckDropPartitionsInBatches.java @@ -25,17 +25,10 @@ import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.Msck; import org.apache.hadoop.hive.metastore.PartitionDropOptions; -import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; -import org.apache.hadoop.hive.metastore.api.SerDeInfo; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.utils.RetryUtilities; -import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.ql.stats.StatsUtils; -import org.apache.hadoop.mapred.TextInputFormat; -import org.apache.hadoop.util.StringUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -44,13 +37,11 @@ import org.mockito.ArgumentCaptor; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Set; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.eq; @@ -86,49 +77,14 @@ public static void setupClass() throws Exception { @Before public void before() throws Exception { - createPartitionedTable(catName, dbName, tableName); + TestMsckCreatePartitionsInBatches.createPartitionedTable(db, catName, dbName, tableName); table = db.getTable(catName, dbName, tableName); repairOutput = new ArrayList(); } @After public void after() throws Exception { - cleanUpTableQuietly(catName, dbName, tableName); - } - - private Table createPartitionedTable(String catName, String dbName, String tableName) throws Exception { - try { - db.dropTable(catName, dbName, tableName); - Table table = new Table(); - table.setCatName(catName); - table.setDbName(dbName); - table.setTableName(tableName); - FieldSchema col1 = new FieldSchema("key", "string", ""); - FieldSchema col2 = new FieldSchema("value", "int", ""); - FieldSchema col3 = new FieldSchema("city", "string", ""); - StorageDescriptor sd = new StorageDescriptor(); - sd.setSerdeInfo(new SerDeInfo()); - sd.setInputFormat(TextInputFormat.class.getCanonicalName()); - sd.setOutputFormat(HiveIgnoreKeyTextOutputFormat.class.getCanonicalName()); - sd.setCols(Arrays.asList(col1, col2)); - table.setPartitionKeys(Arrays.asList(col3)); - table.setSd(sd); - db.createTable(table); - return db.getTable(catName, dbName, tableName); - } catch (Exception exception) { - fail("Unable to drop and create table " + StatsUtils - .getFullyQualifiedTableName(dbName, tableName) + " because " + StringUtils - .stringifyException(exception)); - throw exception; - } - } - - private void cleanUpTableQuietly(String catName, String dbName, String tableName) { - try { - db.dropTable(catName, dbName, tableName, true, true, true); - } catch (Exception exception) { - fail("Unexpected exception: " + StringUtils.stringifyException(exception)); - } + TestMsckCreatePartitionsInBatches.cleanUpTableQuietly(db, catName, dbName, tableName); } private Set dropPartsNotInFs(int numOfParts) { diff --git a/ql/src/test/results/clientpositive/llap/temp_table_add_part_multiple.q.out b/ql/src/test/results/clientpositive/llap/temp_table_add_part_multiple.q.out index ac040e8509b1..657ba88e6ca0 100644 --- a/ql/src/test/results/clientpositive/llap/temp_table_add_part_multiple.q.out +++ b/ql/src/test/results/clientpositive/llap/temp_table_add_part_multiple.q.out @@ -115,5 +115,5 @@ POSTHOOK: Input: default@add_part_test_n1_temp@ds=2010-04-01 #### A masked pattern was here #### 100 100 2010-01-01 200 200 2010-02-01 -500 400 2010-04-01 400 300 2010-03-01 +500 400 2010-04-01