diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index f6649f55a3a6..f195a046b599 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -617,6 +617,22 @@ public static enum ConfVars { "Name of the source cluster for the replication."), REPL_TARGET_CLUSTER_NAME("hive.repl.target.cluster.name", null, "Name of the target cluster for the replication."), + REPL_RETRY_INTIAL_DELAY("hive.repl.retry.initial.delay", "60s", + new TimeValidator(TimeUnit.SECONDS), + "Initial Delay before retry starts."), + REPL_RETRY_BACKOFF_COEFFICIENT("hive.repl.retry.backoff.coefficient", 1.2f, + "The backoff coefficient for exponential retry delay between retries. " + + "Previous Delay * Backoff Coefficient will determine the next retry interval"), + REPL_RETRY_JITTER("hive.repl.retry.jitter", "30s", new TimeValidator(TimeUnit.SECONDS), + "A random jitter to be applied to avoid all retries happening at the same time."), + REPL_RETRY_MAX_DELAY_BETWEEN_RETRIES("hive.repl.retry.max.delay.between.retries", "60m", + new TimeValidator(TimeUnit.MINUTES), + "Maximum allowed retry delay in minutes after including exponential backoff. " + + "If this limit is reached, retry will continue with this retry duration."), + REPL_RETRY_TOTAL_DURATION("hive.repl.retry.total.duration", "24h", + new TimeValidator(TimeUnit.HOURS), + "Total allowed retry duration in hours inclusive of all retries. Once this is exhausted, " + + "the policy instance will be marked as failed and will need manual intervention to restart."), LOCALSCRATCHDIR("hive.exec.local.scratchdir", "${system:java.io.tmpdir}" + File.separator + "${system:user.name}", "Local scratch space for Hive jobs"), diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java index 94ea3c2d9d13..e6384af5dd06 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AtlasDumpTask.java @@ -173,7 +173,7 @@ long dumpAtlasMetaData(AtlasRequestBuilder atlasRequestBuilder, AtlasReplInfo at inputStream = atlasRestClient.exportData(exportRequest); FileSystem fs = atlasReplInfo.getStagingDir().getFileSystem(atlasReplInfo.getConf()); Path exportFilePath = new Path(atlasReplInfo.getStagingDir(), ReplUtils.REPL_ATLAS_EXPORT_FILE_NAME); - numBytesWritten = Utils.writeFile(fs, exportFilePath, inputStream); + numBytesWritten = Utils.writeFile(fs, exportFilePath, inputStream, conf); } catch (SemanticException ex) { throw ex; } catch (Exception ex) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java index 7a0259fc0f3c..4c1a1708a793 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerDumpTask.java @@ -98,11 +98,11 @@ public int execute() { conf.addResource(url); String rangerHiveServiceName = conf.get(ReplUtils.RANGER_HIVE_SERVICE_NAME); String rangerEndpoint = conf.get(ReplUtils.RANGER_REST_URL); - if (StringUtils.isEmpty(rangerEndpoint) || !rangerRestClient.checkConnection(rangerEndpoint)) { + if (StringUtils.isEmpty(rangerEndpoint) || !rangerRestClient.checkConnection(rangerEndpoint, conf)) { throw new SemanticException("Ranger endpoint is not valid " + rangerEndpoint); } RangerExportPolicyList rangerExportPolicyList = rangerRestClient.exportRangerPolicies(rangerEndpoint, - work.getDbName(), rangerHiveServiceName); + work.getDbName(), rangerHiveServiceName, conf); List rangerPolicies = rangerExportPolicyList.getPolicies(); if (rangerPolicies.isEmpty()) { LOG.info("Ranger policy export request returned empty list or failed, Please refer Ranger admin logs."); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java index 20f84012c649..3c7e9e2463b8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/RangerLoadTask.java @@ -95,7 +95,7 @@ public int execute() { conf.addResource(url); String rangerHiveServiceName = conf.get(ReplUtils.RANGER_HIVE_SERVICE_NAME); String rangerEndpoint = conf.get(ReplUtils.RANGER_REST_URL); - if (StringUtils.isEmpty(rangerEndpoint) || !rangerRestClient.checkConnection(rangerEndpoint)) { + if (StringUtils.isEmpty(rangerEndpoint) || !rangerRestClient.checkConnection(rangerEndpoint, conf)) { throw new SemanticException("Ranger endpoint is not valid " + rangerEndpoint); } if (work.getCurrentDumpPath() != null) { @@ -134,7 +134,7 @@ public int execute() { } rangerExportPolicyList.setPolicies(updatedRangerPolicies); rangerRestClient.importRangerPolicies(rangerExportPolicyList, work.getTargetDbName(), rangerEndpoint, - rangerHiveServiceName); + rangerHiveServiceName, conf); LOG.info("Number of ranger policies imported {}", rangerExportPolicyList.getListSize()); importCount = rangerExportPolicyList.getListSize(); work.getMetricCollector().reportStageProgress(getName(), ReplUtils.MetricName.POLICIES.name(), importCount); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java index 402c87e74193..dc8d7903fc74 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java @@ -54,6 +54,7 @@ import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker; import org.apache.hadoop.hive.ql.exec.util.DAGTraversal; +import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.lockmgr.DbLockManager; import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager; @@ -112,6 +113,7 @@ import java.util.ArrayList; import java.util.Map; import java.util.HashMap; +import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_ABORT_WRITE_TXN_AFTER_TIMEOUT; @@ -645,11 +647,13 @@ private int getMaxEventAllowed(int currentEventMaxLimit) { } return currentEventMaxLimit; } - private void cleanFailedEventDirIfExists(Path dumpDir, long resumeFrom) throws IOException { + private void cleanFailedEventDirIfExists(Path dumpDir, long resumeFrom) throws SemanticException { Path nextEventRoot = new Path(dumpDir, String.valueOf(resumeFrom + 1)); - Retry retriable = new Retry(IOException.class) { - @Override - public Void execute() throws IOException { + Retryable retryable = Retryable.builder() + .withHiveConf(conf) + .withRetryOnException(IOException.class).build(); + try { + retryable.executeCallable((Callable) () -> { FileSystem fs = FileSystem.get(nextEventRoot.toUri(), conf); try { fs.delete(nextEventRoot, true); @@ -657,12 +661,9 @@ public Void execute() throws IOException { // no worries } return null; - } - }; - try { - retriable.run(); + }); } catch (Exception e) { - throw new IOException(e); + throw new SemanticException(e); } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java index 37b623f05067..6f24f5720913 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientBuilder.java @@ -44,6 +44,7 @@ public class AtlasRestClientBuilder { private UserGroupInformation userGroupInformation; protected String incomingUrl; protected String[] baseUrls; + private HiveConf hiveConf; public AtlasRestClientBuilder(String urls) { this.incomingUrl = urls; @@ -69,7 +70,7 @@ private AtlasRestClient create() throws SemanticException { initializeAtlasApplicationProperties(); AtlasClientV2 clientV2 = new AtlasClientV2(this.userGroupInformation, this.userGroupInformation.getShortUserName(), baseUrls); - return new AtlasRestClientImpl(clientV2); + return new AtlasRestClientImpl(clientV2, hiveConf); } private AtlasRestClientBuilder setUGInfo() throws SemanticException { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java index 7a3bf612686c..71e51fb5cc6c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/AtlasRestClientImpl.java @@ -27,6 +27,7 @@ import org.apache.atlas.model.instance.AtlasEntity; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.utils.SecurityUtils; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; import org.apache.hadoop.hive.ql.parse.SemanticException; @@ -49,12 +50,18 @@ /** * Implementation of RESTClient, encapsulates Atlas' REST APIs. */ -public class AtlasRestClientImpl extends RetryingClient implements AtlasRestClient { +public class AtlasRestClientImpl extends RetryingClientTimeBased implements AtlasRestClient { private static final Logger LOG = LoggerFactory.getLogger(AtlasRestClientImpl.class); private final AtlasClientV2 clientV2; - public AtlasRestClientImpl(AtlasClientV2 clientV2) { + public AtlasRestClientImpl(AtlasClientV2 clientV2, HiveConf conf) { this.clientV2 = clientV2; + this.totalDurationInSeconds = conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION, TimeUnit.SECONDS); + this.initialDelayInSeconds = conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY, TimeUnit.SECONDS); + this.maxRetryDelayInSeconds = conf.getTimeVar(HiveConf.ConfVars + .REPL_RETRY_MAX_DELAY_BETWEEN_RETRIES, TimeUnit.SECONDS); + this.backOff = conf.getFloatVar(HiveConf.ConfVars.REPL_RETRY_BACKOFF_COEFFICIENT); + this.maxJitterInSeconds = (int) conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_JITTER, TimeUnit.SECONDS); } private T runWithTimeout(Callable callable, long timeout, TimeUnit timeUnit) throws Exception { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClientTimeBased.java similarity index 68% rename from ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClient.java rename to ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClientTimeBased.java index dbc065ab3c25..25471a445f82 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/atlas/RetryingClientTimeBased.java @@ -23,28 +23,35 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Random; import java.util.concurrent.Callable; /** * Implement retry logic for service calls. */ -public class RetryingClient { - private static final Logger LOG = LoggerFactory.getLogger(RetryingClient.class); - private static final int PAUSE_DURATION_INCREMENT_IN_MINUTES_DEFAULT = (30 * 1000); - private static final int RETRY_COUNT_DEFAULT = 5; +public class RetryingClientTimeBased { + private static final long MINIMUM_DELAY_IN_SEC = 60; + private static final Logger LOG = LoggerFactory.getLogger(RetryingClientTimeBased.class); private static final String ERROR_MESSAGE_NO_ENTITIES = "no entities to create/update"; private static final String ERROR_MESSAGE_IN_PROGRESS = "import or export is in progress"; private static final String ATLAS_ERROR_CODE_IMPORT_EMPTY_ZIP = "empty ZIP file"; - private static final int MAX_RETY_COUNT = RETRY_COUNT_DEFAULT; - private static final int PAUSE_DURATION_INCREMENT_IN_MS = PAUSE_DURATION_INCREMENT_IN_MINUTES_DEFAULT; + protected long totalDurationInSeconds; + protected long initialDelayInSeconds; + protected long maxRetryDelayInSeconds; + protected double backOff; + protected int maxJitterInSeconds; protected T invokeWithRetry(Callable func, T defaultReturnValue) throws Exception { - for (int currentRetryCount = 1; currentRetryCount <= MAX_RETY_COUNT; currentRetryCount++) { + long startTime = System.currentTimeMillis(); + long delay = this.initialDelayInSeconds; + while (elapsedTimeInSeconds(startTime) + delay > this.totalDurationInSeconds) { try { LOG.debug("Retrying method: {}", func.getClass().getName(), null); return func.call(); } catch (Exception e) { - if (processImportExportLockException(e, currentRetryCount)) { + if (processImportExportLockException(e, delay)) { + //retry case. compute next sleep time + delay = getNextDelay(delay); continue; } if (processInvalidParameterException(e)) { @@ -57,6 +64,28 @@ protected T invokeWithRetry(Callable func, T defaultReturnValue) throws E return defaultReturnValue; } + private long getNextDelay(long currentDelay) { + if (currentDelay <= 0) { // in case initial delay was set to 0. + currentDelay = MINIMUM_DELAY_IN_SEC; + } + + currentDelay *= this.backOff; + if (this.maxJitterInSeconds > 0) { + currentDelay += new Random().nextInt(this.maxJitterInSeconds); + } + + if (currentDelay > this.maxRetryDelayInSeconds) { + currentDelay = this.maxRetryDelayInSeconds; + } + + return currentDelay; + } + + + private long elapsedTimeInSeconds(long fromTimeMillis) { + return (System.currentTimeMillis() - fromTimeMillis)/ 1000; + } + private boolean processInvalidParameterException(Exception e) { if (e instanceof UniformInterfaceException) { return true; @@ -71,16 +100,15 @@ private boolean processInvalidParameterException(Exception e) { || e.getMessage().contains(ATLAS_ERROR_CODE_IMPORT_EMPTY_ZIP)); } - private boolean processImportExportLockException(Exception e, int currentRetryCount) throws Exception { + private boolean processImportExportLockException(Exception e, long delay) throws Exception { if (!(e instanceof AtlasServiceException)) { return false; } String excMessage = e.getMessage() == null ? "" : e.getMessage(); if (excMessage.contains(ERROR_MESSAGE_IN_PROGRESS)) { try { - int pauseDuration = PAUSE_DURATION_INCREMENT_IN_MS * currentRetryCount; - LOG.info("Atlas in-progress operation detected. Will pause for: {} ms", pauseDuration); - Thread.sleep(pauseDuration); + LOG.info("Atlas in-progress operation detected. Will pause for: {} ms", delay); + Thread.sleep(delay); } catch (InterruptedException intEx) { LOG.error("Pause wait interrupted!", intEx); throw new Exception(intEx); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/NoOpRangerRestClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/NoOpRangerRestClient.java index 5a114305445f..838b61efc6ed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/NoOpRangerRestClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/NoOpRangerRestClient.java @@ -31,14 +31,16 @@ public class NoOpRangerRestClient implements RangerRestClient { @Override public RangerExportPolicyList exportRangerPolicies(String sourceRangerEndpoint, - String dbName, String rangerHiveServiceName) { + String dbName, String rangerHiveServiceName, + HiveConf hiveConf) { return new RangerExportPolicyList(); } @Override public RangerExportPolicyList importRangerPolicies(RangerExportPolicyList rangerExportPolicyList, String dbName, String baseUrl, - String rangerHiveServiceName) throws Exception { + String rangerHiveServiceName, + HiveConf hiveConf) throws Exception { return null; } @@ -65,7 +67,7 @@ public RangerExportPolicyList readRangerPoliciesFromJsonFile(Path filePath, Hive } @Override - public boolean checkConnection(String url) throws Exception { + public boolean checkConnection(String url, HiveConf hiveConf) throws Exception { return true; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClient.java index f85c3afbd310..1d2865c81c21 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClient.java @@ -29,11 +29,13 @@ */ public interface RangerRestClient { RangerExportPolicyList exportRangerPolicies(String sourceRangerEndpoint, - String dbName, String rangerHiveServiceName) throws Exception; + String dbName, String rangerHiveServiceName, + HiveConf hiveConf) throws Exception; RangerExportPolicyList importRangerPolicies(RangerExportPolicyList rangerExportPolicyList, String dbName, String baseUrl, - String rangerHiveServiceName) throws Exception; + String rangerHiveServiceName, + HiveConf hiveConf) throws Exception; List removeMultiResourcePolicies(List rangerPolicies); @@ -46,7 +48,7 @@ Path saveRangerPoliciesToFile(RangerExportPolicyList rangerExportPolicyList, Pat RangerExportPolicyList readRangerPoliciesFromJsonFile(Path filePath, HiveConf conf) throws SemanticException; - boolean checkConnection(String url) throws Exception; + boolean checkConnection(String url, HiveConf hiveConf) throws Exception; List addDenyPolicies(List rangerPolicies, String rangerServiceName, String sourceDb, String targetDb) throws SemanticException; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java index 1b167231fc12..adaaa02af21d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ranger/RangerRestClientImpl.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.utils.Retry; import org.apache.hadoop.hive.metastore.utils.SecurityUtils; +import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.http.client.utils.URIBuilder; @@ -65,6 +66,7 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Arrays; +import java.util.concurrent.Callable; /** * RangerRestClientImpl to connect to Ranger and export policies. @@ -77,25 +79,18 @@ public class RangerRestClientImpl implements RangerRestClient { public RangerExportPolicyList exportRangerPolicies(String sourceRangerEndpoint, String dbName, - String rangerHiveServiceName)throws SemanticException { + String rangerHiveServiceName, + HiveConf hiveConf)throws SemanticException { LOG.info("Ranger endpoint for cluster " + sourceRangerEndpoint); if (StringUtils.isEmpty(rangerHiveServiceName)) { throw new SemanticException("Ranger Service Name cannot be empty"); } - Retry retriable = new Retry(Exception.class) { - @Override - public RangerExportPolicyList execute() throws Exception { - if (UserGroupInformation.isSecurityEnabled()) { - SecurityUtils.reloginExpiringKeytabUser(); - return UserGroupInformation.getLoginUser().doAs((PrivilegedExceptionAction) () -> - exportRangerPoliciesPlain(sourceRangerEndpoint, rangerHiveServiceName, dbName)); - } else { - return exportRangerPoliciesPlain(sourceRangerEndpoint, rangerHiveServiceName, dbName); - } - } - }; + Retryable retryable = Retryable.builder() + .withHiveConf(hiveConf) + .withRetryOnException(Exception.class).build(); try { - return retriable.runWithDelay(); + return retryable.executeCallable(() -> exportRangerPoliciesPlain(sourceRangerEndpoint, rangerHiveServiceName, + dbName)); } catch (Exception e) { throw new SemanticException(e); } @@ -175,7 +170,8 @@ public List removeMultiResourcePolicies(List rangerP @Override public RangerExportPolicyList importRangerPolicies(RangerExportPolicyList rangerExportPolicyList, String dbName, String baseUrl, - String rangerHiveServiceName) + String rangerHiveServiceName, + HiveConf hiveConf) throws Exception { String sourceClusterServiceName = null; String serviceMapJsonFileName = "hive_servicemap.json"; @@ -200,25 +196,12 @@ public RangerExportPolicyList importRangerPolicies(RangerExportPolicyList ranger String jsonRangerExportPolicyList = gson.toJson(rangerExportPolicyList); String finalUrl = getRangerImportUrl(baseUrl, dbName); LOG.debug("URL to import policies on target Ranger: {}", finalUrl); - Retry retriable = new Retry(Exception.class) { - @Override - public RangerExportPolicyList execute() throws Exception { - if (UserGroupInformation.isSecurityEnabled()) { - SecurityUtils.reloginExpiringKeytabUser(); - return UserGroupInformation.getLoginUser().doAs((PrivilegedExceptionAction) () -> - importRangerPoliciesPlain(jsonRangerExportPolicyList, rangerPoliciesJsonFileName, - serviceMapJsonFileName, jsonServiceMap, finalUrl, rangerExportPolicyList)); - } else { - return importRangerPoliciesPlain(jsonRangerExportPolicyList, rangerPoliciesJsonFileName, - serviceMapJsonFileName, jsonServiceMap, finalUrl, rangerExportPolicyList); - } - } - }; - try { - return retriable.runWithDelay(); - } catch (Exception e) { - throw new SemanticException(e); - } + Retryable retryable = Retryable.builder() + .withHiveConf(hiveConf) + .withRetryOnException(Exception.class).build(); + return retryable.executeCallable(() -> importRangerPoliciesPlain(jsonRangerExportPolicyList, + rangerPoliciesJsonFileName, + serviceMapJsonFileName, jsonServiceMap, finalUrl, rangerExportPolicyList)); } private RangerExportPolicyList importRangerPoliciesPlain(String jsonRangerExportPolicyList, @@ -368,18 +351,15 @@ private Path writeExportedRangerPoliciesToJsonFile(String jsonString, String fil @Override public Path saveRangerPoliciesToFile(RangerExportPolicyList rangerExportPolicyList, Path stagingDirPath, - String fileName, HiveConf conf) throws Exception { + String fileName, HiveConf conf) throws SemanticException { Gson gson = new GsonBuilder().create(); String jsonRangerExportPolicyList = gson.toJson(rangerExportPolicyList); - Retry retriable = new Retry(IOException.class) { - @Override - public Path execute() throws IOException { - return writeExportedRangerPoliciesToJsonFile(jsonRangerExportPolicyList, fileName, - stagingDirPath, conf); - } - }; + Retryable retryable = Retryable.builder() + .withHiveConf(conf) + .withRetryOnException(IOException.class).build(); try { - return retriable.run(); + return retryable.executeCallable(() -> writeExportedRangerPoliciesToJsonFile(jsonRangerExportPolicyList, fileName, + stagingDirPath, conf)); } catch (Exception e) { throw new SemanticException(e); } @@ -405,20 +385,12 @@ public RangerExportPolicyList readRangerPoliciesFromJsonFile(Path filePath, } @Override - public boolean checkConnection(String url) throws SemanticException { - Retry retriable = new Retry(Exception.class) { - @Override - public Boolean execute() throws Exception { - if (UserGroupInformation.isSecurityEnabled()) { - SecurityUtils.reloginExpiringKeytabUser(); - return UserGroupInformation.getLoginUser().doAs((PrivilegedAction) () -> checkConnectionPlain(url)); - } else { - return checkConnectionPlain(url); - } - } - }; + public boolean checkConnection(String url, HiveConf hiveConf) throws SemanticException { + Retryable retryable = Retryable.builder() + .withHiveConf(hiveConf) + .withRetryOnException(Exception.class).build(); try { - return retriable.runWithDelay(); + return retryable.executeCallable(() -> checkConnectionPlain(url)); } catch (Exception e) { throw new SemanticException(e); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/util/Retryable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/util/Retryable.java new file mode 100644 index 000000000000..a31b96baec17 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/util/Retryable.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.util; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.utils.SecurityUtils; +import org.apache.hadoop.security.UserGroupInformation; + +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +/** + * Class to implement any retry logic in case of exceptions. + */ +public class Retryable { + private static final long MINIMUM_DELAY_IN_SEC = 60; + + private long totalDurationInSeconds; + private List> retryOn; + private List> failOn; + private long initialDelayInSeconds; + private long maxRetryDelayInSeconds; + private double backOff; + private int maxJitterInSeconds; + + private Retryable() { + this.retryOn = new ArrayList<>(); + this.failOn = new ArrayList<>(); + this.initialDelayInSeconds = HiveConf.toTime(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY.defaultStrVal, + HiveConf.getDefaultTimeUnit(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY), TimeUnit.SECONDS); + this.maxRetryDelayInSeconds = HiveConf.toTime(HiveConf.ConfVars.REPL_RETRY_MAX_DELAY_BETWEEN_RETRIES.defaultStrVal, + HiveConf.getDefaultTimeUnit(HiveConf.ConfVars.REPL_RETRY_MAX_DELAY_BETWEEN_RETRIES), TimeUnit.SECONDS); + this.backOff = HiveConf.ConfVars.REPL_RETRY_BACKOFF_COEFFICIENT.defaultFloatVal; + this.maxJitterInSeconds = (int) HiveConf.toTime(HiveConf.ConfVars.REPL_RETRY_JITTER.defaultStrVal, + HiveConf.getDefaultTimeUnit(HiveConf.ConfVars.REPL_RETRY_JITTER), TimeUnit.SECONDS); + this.totalDurationInSeconds = HiveConf.toTime(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION.defaultStrVal, + HiveConf.getDefaultTimeUnit(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION), TimeUnit.SECONDS);; + } + + public static Builder builder() { + return new Builder(); + } + + public T executeCallable(Callable callable) throws Exception { + long startTime = System.currentTimeMillis(); + long delay = this.initialDelayInSeconds; + Exception previousException = null; + while(true) { + try { + if (UserGroupInformation.isSecurityEnabled()) { + SecurityUtils.reloginExpiringKeytabUser(); + return UserGroupInformation.getLoginUser().doAs((PrivilegedExceptionAction) () -> callable.call()); + } else { + return callable.call(); + } + } catch (Exception e) { + if (this.failOn.stream().noneMatch(k -> e.getClass().equals(k)) + && this.retryOn.stream().anyMatch(k -> e.getClass().isAssignableFrom(k))) { + if (elapsedTimeInSeconds(startTime) + delay > this.totalDurationInSeconds) { + // case where waiting would go beyond max duration. So throw exception and return + throw e; + } + sleep(delay); + //retry case. compute next sleep time + delay = getNextDelay(delay, previousException, e); + // reset current captured exception. + previousException = e; + } else { + // Exception cannot be retried on. Throw exception and return + throw e; + } + } + } + } + + private void sleep(long seconds) { + try { + Thread.sleep(seconds * 1000); + } catch (InterruptedException e) { + // no-op.. just proceed + } + } + + private long getNextDelay(long currentDelay, final Exception previousException, final Exception currentException) { + if (previousException != null && !previousException.getClass().equals(currentException.getClass())) { + // New exception encountered. Returning initial delay for next retry. + return this.initialDelayInSeconds; + } + if (currentDelay <= 0) { // in case initial delay was set to 0. + currentDelay = MINIMUM_DELAY_IN_SEC; + } + currentDelay *= this.backOff; + if (this.maxJitterInSeconds > 0) { + currentDelay += new Random().nextInt(this.maxJitterInSeconds); + } + if (currentDelay > this.maxRetryDelayInSeconds) { + currentDelay = this.maxRetryDelayInSeconds; + } + return currentDelay; + } + + private long elapsedTimeInSeconds(long fromTimeMillis) { + return (System.currentTimeMillis() - fromTimeMillis)/ 1000; + } + + public static class Builder { + private final Retryable runnable = new Retryable(); + public Builder() { + } + + public Builder withHiveConf(HiveConf conf) { + runnable.totalDurationInSeconds = conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION, TimeUnit.SECONDS); + runnable.initialDelayInSeconds = conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY, TimeUnit.SECONDS); + runnable.maxRetryDelayInSeconds = conf.getTimeVar(HiveConf.ConfVars + .REPL_RETRY_MAX_DELAY_BETWEEN_RETRIES, TimeUnit.SECONDS); + runnable.backOff = conf.getFloatVar(HiveConf.ConfVars.REPL_RETRY_BACKOFF_COEFFICIENT); + runnable.maxJitterInSeconds = (int) conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_JITTER, TimeUnit.SECONDS); + return this; + } + + public Retryable build() { + return runnable; + } + + public Builder withTotalDuration(long maxDuration) { + runnable.totalDurationInSeconds = maxDuration; + return this; + } + + // making this thread safe as it appends to list + public synchronized Builder withRetryOnException(final Class exceptionClass) { + if (exceptionClass != null && + runnable.retryOn.stream().noneMatch(k -> exceptionClass.equals(k))) { + runnable.retryOn.add(exceptionClass); + } + return this; + } + + public synchronized Builder withRetryOnExceptionList(final List> exceptionClassList) { + for (final Class exceptionClass : exceptionClassList) { + if (exceptionClass != null && + runnable.retryOn.stream().noneMatch(k -> exceptionClass.equals(k))) { + runnable.retryOn.add(exceptionClass); + } + } + return this; + } + + public synchronized Builder withFailOnException(final Class exceptionClass) { + if (exceptionClass != null && + runnable.failOn.stream().noneMatch(k -> exceptionClass.equals(k))) { + runnable.failOn.add(exceptionClass); + } + return this; + } + + public synchronized Builder withFailOnExceptionList(final List> exceptionClassList) { + for (final Class exceptionClass : exceptionClassList) { + if (exceptionClass != null && + runnable.failOn.stream().noneMatch(k -> exceptionClass.equals(k))) { + runnable.failOn.add(exceptionClass); + } + } + return this; + } + + public Builder withInitialDelay(long initialDelayInSeconds) { + runnable.initialDelayInSeconds = initialDelayInSeconds; + return this; + } + + public Builder withMaxRetryDelay(long maxRetryDelayInSeconds) { + runnable.maxRetryDelayInSeconds = maxRetryDelayInSeconds; + return this; + } + + public Builder withBackoff(double backoff) { + runnable.backOff = backoff; + return this; + } + + public Builder withMaxJitterValue(int maxJitterInSeconds) { + runnable.maxJitterInSeconds = maxJitterInSeconds; + return this; + } + } +} diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java index 154f02809e64..3ad5b2d59534 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; +import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -55,6 +56,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.Callable; public class Utils { private static Logger LOG = LoggerFactory.getLogger(Utils.class); @@ -72,9 +74,11 @@ public static void writeOutput(List> listValues, Path outputFile, H public static void writeOutput(List> listValues, Path outputFile, HiveConf hiveConf, boolean update) throws SemanticException { - Retry retriable = new Retry(IOException.class) { - @Override - public Void execute() throws IOException { + Retryable retryable = Retryable.builder() + .withHiveConf(hiveConf) + .withRetryOnException(IOException.class).build(); + try { + retryable.executeCallable((Callable) () -> { DataOutputStream outStream = null; try { FileSystem fs = outputFile.getFileSystem(hiveConf); @@ -91,19 +95,19 @@ public Void execute() throws IOException { IOUtils.closeStream(outStream); } return null; - } - }; - try { - retriable.run(); + }); } catch (Exception e) { throw new SemanticException(e); } } - public static long writeFile(FileSystem fs, Path exportFilePath, InputStream is) throws SemanticException { - Retry retriable = new Retry(IOException.class) { - @Override - public Long execute() throws IOException { + public static long writeFile(FileSystem fs, Path exportFilePath, InputStream is, + HiveConf conf) throws SemanticException { + Retryable retryable = Retryable.builder() + .withHiveConf(conf) + .withRetryOnException(IOException.class).build(); + try { + return retryable.executeCallable(() -> { FSDataOutputStream fos = null; try { long bytesWritten; @@ -120,9 +124,7 @@ public Long execute() throws IOException { fos.close(); } } - }}; - try { - return retriable.run(); + }); } catch (Exception e) { throw new SemanticException(e); } @@ -130,9 +132,11 @@ public Long execute() throws IOException { public static void writeOutput(String content, Path outputFile, HiveConf hiveConf) throws SemanticException { - Retry retriable = new Retry(IOException.class) { - @Override - public Void execute() throws IOException { + Retryable retryable = Retryable.builder() + .withHiveConf(hiveConf) + .withRetryOnException(IOException.class).build(); + try { + retryable.executeCallable((Callable) () -> { DataOutputStream outStream = null; try { FileSystem fs = outputFile.getFileSystem(hiveConf); @@ -143,10 +147,7 @@ public Void execute() throws IOException { IOUtils.closeStream(outStream); } return null; - } - }; - try { - retriable.run(); + }); } catch (Exception e) { throw new SemanticException(e); } @@ -154,16 +155,15 @@ public Void execute() throws IOException { public static void create(Path outputFile, HiveConf hiveConf) throws SemanticException { - Retry retriable = new Retry(IOException.class) { - @Override - public Void execute() throws IOException { + Retryable retryable = Retryable.builder() + .withHiveConf(hiveConf) + .withRetryOnException(IOException.class).build(); + try { + retryable.executeCallable((Callable) () -> { FileSystem fs = outputFile.getFileSystem(hiveConf); fs.create(outputFile).close(); return null; - } - }; - try { - retriable.run(); + }); } catch (Exception e) { throw new SemanticException(e); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java index 80bfdfb3db30..caa089f11549 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java @@ -25,6 +25,7 @@ import java.net.URI; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Callable; import javax.security.auth.login.LoginException; @@ -38,6 +39,7 @@ import org.apache.hadoop.hive.metastore.utils.Retry; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.parse.EximUtil; @@ -188,36 +190,35 @@ void exportFilesAsList() throws SemanticException, IOException, LoginException { if (dataPathList.isEmpty()) { return; } - Retry retryable = new Retry(IOException.class) { - @Override - public Void execute() throws Exception { - try (BufferedWriter writer = writer()) { - for (Path dataPath : dataPathList) { - writeFilesList(listFilesInDir(dataPath), writer, AcidUtils.getAcidSubDir(dataPath)); - } - } catch (IOException e) { - if (e instanceof FileNotFoundException) { - logger.error("exporting data files in dir : " + dataPathList + " to " + exportRootDataDir + " failed"); - throw new FileNotFoundException(FILE_NOT_FOUND.format(e.getMessage())); - } - // in case of io error, reset the file system object - FileSystem.closeAllForUGI(Utils.getUGI()); - dataFileSystem = dataPathList.get(0).getFileSystem(hiveConf); - exportFileSystem = exportRootDataDir.getFileSystem(hiveConf); - Path exportPath = new Path(exportRootDataDir, EximUtil.FILES_NAME); - if (exportFileSystem.exists(exportPath)) { - exportFileSystem.delete(exportPath, true); - } - throw e; - } - return null; - } - }; - try { - retryable.run(); - } catch (Exception e) { - throw new SemanticException(e); - } + Retryable retryable = Retryable.builder() + .withHiveConf(hiveConf) + .withRetryOnException(IOException.class).build(); + try { + retryable.executeCallable((Callable) () -> { + try (BufferedWriter writer = writer()) { + for (Path dataPath : dataPathList) { + writeFilesList(listFilesInDir(dataPath), writer, AcidUtils.getAcidSubDir(dataPath)); + } + } catch (IOException e) { + if (e instanceof FileNotFoundException) { + logger.error("exporting data files in dir : " + dataPathList + " to " + exportRootDataDir + " failed"); + throw new FileNotFoundException(FILE_NOT_FOUND.format(e.getMessage())); + } + // in case of io error, reset the file system object + FileSystem.closeAllForUGI(Utils.getUGI()); + dataFileSystem = dataPathList.get(0).getFileSystem(hiveConf); + exportFileSystem = exportRootDataDir.getFileSystem(hiveConf); + Path exportPath = new Path(exportRootDataDir, EximUtil.FILES_NAME); + if (exportFileSystem.exists(exportPath)) { + exportFileSystem.delete(exportPath, true); + } + throw e; + } + return null; + }); + } catch (Exception e) { + throw new SemanticException(e); + } } private void writeFilesList(FileStatus[] fileStatuses, BufferedWriter writer, String encodedSubDirs) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/MetricSink.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/MetricSink.java index 1eedf1946926..d0f8b5f9af9a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/MetricSink.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/MetricSink.java @@ -23,14 +23,17 @@ import org.apache.hadoop.hive.metastore.api.ReplicationMetrics; import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.utils.Retry; +import org.apache.hadoop.hive.ql.exec.util.Retryable; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.parse.repl.metric.event.ReplicationMetric; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.Callable; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -126,18 +129,16 @@ public void run() { } metricList.setReplicationMetricList(replicationMetricsList); // write metrics and retry if fails - Retry retriable = new Retry(Exception.class) { - @Override - public Void execute() throws Exception { - //write - if (metricList.getReplicationMetricListSize() > 0) { - LOG.debug("Persisting metrics to DB {} ", metricList.getReplicationMetricListSize()); - Hive.get(conf).getMSC().addReplicationMetrics(metricList); - } - return null; + Retryable retryable = Retryable.builder() + .withHiveConf(conf) + .withRetryOnException(Exception.class).build(); + retryable.executeCallable((Callable) () -> { + if (metricList.getReplicationMetricListSize() > 0) { + LOG.debug("Persisting metrics to DB {} ", metricList.getReplicationMetricListSize()); + Hive.get(conf).getMSC().addReplicationMetrics(metricList); } - }; - retriable.run(); + return null; + }); } else { LOG.debug("No Metrics to Update "); } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerDumpTask.java index 12e074107cfb..d2222e7cae24 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerDumpTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerDumpTask.java @@ -71,7 +71,7 @@ public class TestRangerDumpTask { public void setup() throws Exception { task = new RangerDumpTask(mockClient, conf, work); Mockito.when(mockClient.removeMultiResourcePolicies(Mockito.anyList())).thenCallRealMethod(); - Mockito.when(mockClient.checkConnection(Mockito.anyString())).thenReturn(true); + Mockito.when(mockClient.checkConnection(Mockito.anyString(), Mockito.any())).thenReturn(true); Mockito.when(work.getMetricCollector()).thenReturn(metricCollector); } @@ -92,7 +92,8 @@ public void testFailureInvalidRangerConfig() throws Exception { public void testSuccessValidAuthProviderEndpoint() throws Exception { RangerExportPolicyList rangerPolicyList = new RangerExportPolicyList(); rangerPolicyList.setPolicies(new ArrayList()); - Mockito.when(mockClient.exportRangerPolicies(Mockito.anyString(), Mockito.anyString(), Mockito.anyString())) + Mockito.when(mockClient.exportRangerPolicies(Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), + Mockito.any())) .thenReturn(rangerPolicyList); Mockito.when(conf.get(RANGER_REST_URL)).thenReturn("rangerEndpoint"); Mockito.when(conf.get(RANGER_HIVE_SERVICE_NAME)).thenReturn("hive"); @@ -117,7 +118,8 @@ public void testSuccessNonEmptyRangerPolicies() throws Exception { + "\"dataMaskPolicyItems\":[],\"rowFilterPolicyItems\":[],\"id\":40,\"guid\":" + "\"4e2b3406-7b9a-4004-8cdf-7a239c8e2cae\",\"isEnabled\":true,\"version\":1}]}"; RangerExportPolicyList rangerPolicyList = new Gson().fromJson(rangerResponse, RangerExportPolicyList.class); - Mockito.when(mockClient.exportRangerPolicies(Mockito.anyString(), Mockito.anyString(), Mockito.anyString())) + Mockito.when(mockClient.exportRangerPolicies(Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), + Mockito.any())) .thenReturn(rangerPolicyList); Mockito.when(conf.get(RANGER_REST_URL)).thenReturn("rangerEndpoint"); Mockito.when(conf.get(RANGER_HIVE_SERVICE_NAME)).thenReturn("hive"); @@ -138,7 +140,8 @@ public void testSuccessRangerDumpMetrics() throws Exception { Whitebox.setInternalState(ReplState.class, logger); RangerExportPolicyList rangerPolicyList = new RangerExportPolicyList(); rangerPolicyList.setPolicies(new ArrayList()); - Mockito.when(mockClient.exportRangerPolicies(Mockito.anyString(), Mockito.anyString(), Mockito.anyString())) + Mockito.when(mockClient.exportRangerPolicies(Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), + Mockito.any())) .thenReturn(rangerPolicyList); Mockito.when(conf.get(RANGER_REST_URL)).thenReturn("rangerEndpoint"); Mockito.when(conf.get(RANGER_HIVE_SERVICE_NAME)).thenReturn("hive"); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerLoadTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerLoadTask.java index 73d506927225..dbbbf2cc3690 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerLoadTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestRangerLoadTask.java @@ -72,7 +72,7 @@ public void setup() throws Exception { .thenCallRealMethod(); Mockito.when(mockClient.addDenyPolicies(Mockito.anyList(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString())).thenCallRealMethod(); - Mockito.when(mockClient.checkConnection(Mockito.anyString())).thenReturn(true); + Mockito.when(mockClient.checkConnection(Mockito.anyString(), Mockito.any())).thenReturn(true); Mockito.when(work.getMetricCollector()).thenReturn(metricCollector); } @@ -188,9 +188,10 @@ public void testSuccessAddDenyRangerPolicies() throws Exception { ArgumentCaptor rangerEndpoint = ArgumentCaptor.forClass(String.class); ArgumentCaptor serviceName = ArgumentCaptor.forClass(String.class); ArgumentCaptor targetDb = ArgumentCaptor.forClass(String.class); + ArgumentCaptor confCaptor = ArgumentCaptor.forClass(HiveConf.class); Mockito.verify(mockClient, Mockito.times(1)).importRangerPolicies(rangerPolicyCapture.capture(), - targetDb.capture(), rangerEndpoint.capture(), serviceName.capture()); + targetDb.capture(), rangerEndpoint.capture(), serviceName.capture(), confCaptor.capture()); Assert.assertEquals("tgtdb", targetDb.getAllValues().get(0)); Assert.assertEquals("rangerEndpoint", rangerEndpoint.getAllValues().get(0)); Assert.assertEquals("hive", serviceName.getAllValues().get(0)); @@ -252,9 +253,10 @@ public void testSuccessDisableDenyRangerPolicies() throws Exception { ArgumentCaptor rangerEndpoint = ArgumentCaptor.forClass(String.class); ArgumentCaptor serviceName = ArgumentCaptor.forClass(String.class); ArgumentCaptor targetDb = ArgumentCaptor.forClass(String.class); + ArgumentCaptor confCaptor = ArgumentCaptor.forClass(HiveConf.class); Mockito.verify(mockClient, Mockito.times(1)).importRangerPolicies(rangerPolicyCapture.capture(), - targetDb.capture(), rangerEndpoint.capture(), serviceName.capture()); + targetDb.capture(), rangerEndpoint.capture(), serviceName.capture(), confCaptor.capture()); Assert.assertEquals("tgtdb", targetDb.getAllValues().get(0)); Assert.assertEquals("rangerEndpoint", rangerEndpoint.getAllValues().get(0)); Assert.assertEquals("hive", serviceName.getAllValues().get(0)); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/ranger/TestRangerRestClient.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/ranger/TestRangerRestClient.java index 12afd8e7b5d3..5f41488ef8fa 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/ranger/TestRangerRestClient.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/ranger/TestRangerRestClient.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.exec.repl.ranger; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.security.UserGroupInformation; import org.junit.Assert; import org.junit.Before; @@ -32,6 +33,8 @@ import java.security.PrivilegedAction; import java.security.PrivilegedExceptionAction; +import java.util.concurrent.TimeUnit; + /** * Unit test class for testing Ranger Dump. @@ -46,23 +49,34 @@ public class TestRangerRestClient { @Mock private UserGroupInformation userGroupInformation; + @Mock + private HiveConf conf; + @Before public void setup() throws Exception { PowerMockito.mockStatic(UserGroupInformation.class); Mockito.when(UserGroupInformation.getLoginUser()).thenReturn(userGroupInformation); Mockito.when(userGroupInformation.doAs((PrivilegedAction) Mockito.any())).thenCallRealMethod(); + Mockito.when(userGroupInformation.doAs((PrivilegedExceptionAction) Mockito.any())).thenCallRealMethod(); Mockito.when(mockClient.getRangerExportUrl(Mockito.anyString(), Mockito.anyString(), Mockito.anyString())) .thenCallRealMethod(); Mockito.when(mockClient.getRangerImportUrl(Mockito.anyString(), Mockito.anyString())) .thenCallRealMethod(); + Mockito.when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY, TimeUnit.SECONDS)).thenReturn(1L); + Mockito.when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION, TimeUnit.SECONDS)).thenReturn(20L); + Mockito.when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_JITTER, TimeUnit.SECONDS)).thenReturn(1L); + Mockito.when(conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_MAX_DELAY_BETWEEN_RETRIES, TimeUnit.SECONDS)) + .thenReturn(10L); + Mockito.when(conf.getFloat(HiveConf.ConfVars.REPL_RETRY_BACKOFF_COEFFICIENT.varname, 1.0f)) + .thenReturn(1.0f); } @Test public void testSuccessSimpleAuthCheckConnection() throws Exception { Mockito.when(UserGroupInformation.isSecurityEnabled()).thenReturn(false); Mockito.when(mockClient.checkConnectionPlain(Mockito.anyString())).thenReturn(true); - Mockito.when(mockClient.checkConnection(Mockito.anyString())).thenCallRealMethod(); - mockClient.checkConnection("http://localhost:6080/ranger"); + Mockito.when(mockClient.checkConnection(Mockito.anyString(), Mockito.any())).thenCallRealMethod(); + mockClient.checkConnection("http://localhost:6080/ranger", conf); ArgumentCaptor urlCaptor = ArgumentCaptor.forClass(String.class); Mockito.verify(mockClient, Mockito.times(1)).checkConnectionPlain(urlCaptor.capture()); @@ -72,36 +86,23 @@ public void testSuccessSimpleAuthCheckConnection() throws Exception { Mockito.times(0)).doAs(privilegedActionArgumentCaptor.capture()); } - @Test - public void testSuccessKerberosAuthCheckConnection() throws Exception { - Mockito.when(UserGroupInformation.isSecurityEnabled()).thenReturn(true); - Mockito.when(mockClient.checkConnectionPlain(Mockito.anyString())).thenReturn(true); - Mockito.when(mockClient.checkConnection(Mockito.anyString())).thenCallRealMethod(); - mockClient.checkConnection("http://localhost:6080/ranger"); - ArgumentCaptor urlCaptor = ArgumentCaptor.forClass(String.class); - Mockito.verify(mockClient, - Mockito.times(1)).checkConnectionPlain(urlCaptor.capture()); - Assert.assertEquals("http://localhost:6080/ranger", urlCaptor.getValue()); - ArgumentCaptor privilegedActionArgumentCaptor = ArgumentCaptor.forClass(PrivilegedAction.class); - Mockito.verify(userGroupInformation, - Mockito.times(3)).doAs(privilegedActionArgumentCaptor.capture()); - } - @Test public void testSuccessSimpleAuthRangerExport() throws Exception { Mockito.when(UserGroupInformation.isSecurityEnabled()).thenReturn(false); Mockito.when(mockClient.exportRangerPoliciesPlain(Mockito.anyString(), Mockito.anyString(), Mockito.anyString())) .thenReturn(new RangerExportPolicyList()); - Mockito.when(mockClient.exportRangerPolicies(Mockito.anyString(), Mockito.anyString(), Mockito.anyString())) + Mockito.when(mockClient.exportRangerPolicies(Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), + Mockito.any())) .thenCallRealMethod(); mockClient.exportRangerPolicies("http://localhost:6080/ranger", "db", - "hive"); + "hive", conf); ArgumentCaptor urlCaptor = ArgumentCaptor.forClass(String.class); ArgumentCaptor dbCaptor = ArgumentCaptor.forClass(String.class); ArgumentCaptor serviceCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor confCaptor = ArgumentCaptor.forClass(HiveConf.class); Mockito.verify(mockClient, Mockito.times(1)).exportRangerPolicies(urlCaptor.capture(), dbCaptor.capture(), - serviceCaptor.capture()); + serviceCaptor.capture(), confCaptor.capture()); Assert.assertEquals("http://localhost:6080/ranger", urlCaptor.getValue()); Assert.assertEquals("db", dbCaptor.getValue()); Assert.assertEquals("hive", serviceCaptor.getValue()); @@ -109,28 +110,4 @@ public void testSuccessSimpleAuthRangerExport() throws Exception { Mockito.verify(userGroupInformation, Mockito.times(0)).doAs(privilegedActionArgumentCaptor.capture()); } - - @Test - public void testSuccessKerberosAuthRangerExport() throws Exception { - Mockito.when(UserGroupInformation.isSecurityEnabled()).thenReturn(true); - Mockito.when(mockClient.exportRangerPoliciesPlain(Mockito.anyString(), Mockito.anyString(), Mockito.anyString())) - .thenReturn(new RangerExportPolicyList()); - Mockito.when(mockClient.exportRangerPolicies(Mockito.anyString(), Mockito.anyString(), Mockito.anyString())) - .thenCallRealMethod(); - mockClient.exportRangerPolicies("http://localhost:6080/ranger", "db", - "hive"); - ArgumentCaptor urlCaptor = ArgumentCaptor.forClass(String.class); - ArgumentCaptor dbCaptor = ArgumentCaptor.forClass(String.class); - ArgumentCaptor serviceCaptor = ArgumentCaptor.forClass(String.class); - Mockito.verify(mockClient, - Mockito.times(1)).exportRangerPolicies(urlCaptor.capture(), dbCaptor.capture(), - serviceCaptor.capture()); - Assert.assertEquals("http://localhost:6080/ranger", urlCaptor.getValue()); - Assert.assertEquals("db", dbCaptor.getValue()); - Assert.assertEquals("hive", serviceCaptor.getValue()); - ArgumentCaptor privilegedActionArgumentCaptor = ArgumentCaptor - .forClass(PrivilegedExceptionAction.class); - Mockito.verify(userGroupInformation, - Mockito.times(1)).doAs(privilegedActionArgumentCaptor.capture()); - } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/util/TestRetryable.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/util/TestRetryable.java new file mode 100644 index 000000000000..986e5085357c --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/util/TestRetryable.java @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.exec.util; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.security.PrivilegedAction; +import java.security.PrivilegedExceptionAction; +import java.util.Arrays; +import java.util.concurrent.Callable; + +/** + * Tests for retriable interface. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({UserGroupInformation.class}) +public class TestRetryable { + + @Mock + UserGroupInformation userGroupInformation; + + @Before + public void setup() throws IOException { + PowerMockito.mockStatic(UserGroupInformation.class); + Mockito.when(UserGroupInformation.isSecurityEnabled()).thenReturn(false); + Mockito.when(UserGroupInformation.getLoginUser()).thenReturn(userGroupInformation); + Mockito.when(UserGroupInformation.getCurrentUser()).thenReturn(userGroupInformation); + } + + @Test + public void testRetrySuccessValidException() throws Throwable { + Retryable retryable = Retryable.builder() + .withTotalDuration(10) + .withInitialDelay(1) + .withBackoff(1.0) + .withRetryOnException(NullPointerException.class).build(); + try { + retryable.executeCallable(new Callable() { + int count = 0; + @Override + public Void call() throws Exception { + if (count < 1) { + count++; + throw new NullPointerException(); + } else { + return null; + } + } + }); + } catch (Exception e) { + Assert.fail(); + } + } + + @Test + public void testRetrySuccessValidExceptionList() throws Throwable { + Retryable retryable = Retryable.builder() + .withTotalDuration(30) + .withInitialDelay(1) + .withBackoff(1.0) + .withRetryOnExceptionList(Arrays.asList(NullPointerException.class, IOException.class)).build(); + try { + retryable.executeCallable(new Callable() { + int count = 0; + @Override + public Void call() throws Exception { + if (count == 0) { + count++; + throw new NullPointerException(); + } else if (count == 1) { + count++; + throw new IOException(); + } else { + return null; + } + } + }); + } catch (Exception e) { + Assert.fail(); + } + } + + @Test + public void testRetryFailureWithMaxDuration() throws Throwable { + Retryable retryable = Retryable.builder() + .withTotalDuration(10) + .withBackoff(10.0) + .withInitialDelay(1) + .withRetryOnException(NullPointerException.class).build(); + try { + retryable.executeCallable(new Callable() { + int count = 0; + @Override + public Void call() throws Exception { + if (count < 2) { + count++; + throw new NullPointerException(); + } else { + return null; + } + } + }); + } catch (Exception e) { + Assert.assertEquals(NullPointerException.class, e.getClass()); + } + } + + @Test + public void testRetryFailureWithInitialDelay() throws Throwable { + Retryable retryable = Retryable.builder() + .withTotalDuration(20) + .withBackoff(10.0) + .withInitialDelay(10) + .withMaxJitterValue(1) + .withRetryOnExceptionList(Arrays.asList(NullPointerException.class, IOException.class)).build(); + try { + retryable.executeCallable(new Callable() { + int count = 0; + @Override + public Void call() throws Exception { + if (count == 0) { + count++; + throw new NullPointerException(); + } else if (count == 1) { + count++; + throw new IOException(); + } else { + return null; + } + } + }); + } catch (Exception e) { + Assert.assertEquals(IOException.class, e.getClass()); + } + } + + @Test + public void testRetryFailureWithMaxRetryDelay() throws Throwable { + Retryable retryable = Retryable.builder() + .withTotalDuration(20) + .withBackoff(10.0) + .withInitialDelay(1) + .withMaxJitterValue(1) + .withMaxRetryDelay(1) + .withRetryOnExceptionList(Arrays.asList(NullPointerException.class, IOException.class)).build(); + try { + retryable.executeCallable(new Callable() { + int count = 0; + @Override + public Void call() throws Exception { + if (count == 0) { + count++; + throw new NullPointerException(); + } else if (count == 1) { + count++; + throw new IOException(); + } else { + return null; + } + } + }); + } catch (Exception e) { + Assert.fail(); + } + } + + @Test + public void testRetryFailureWithBackoff() throws Throwable { + Retryable retryable = Retryable.builder() + .withTotalDuration(20) + .withBackoff(100.0) + .withInitialDelay(1) + .withMaxJitterValue(1) + .withRetryOnException(NullPointerException.class).build(); + try { + retryable.executeCallable(new Callable() { + int count = 0; + @Override + public Void call() throws Exception { + if (count < 2) { + count++; + throw new NullPointerException(); + } else { + return null; + } + } + }); + } catch (Exception e) { + Assert.assertEquals(NullPointerException.class, e.getClass()); + } + } + + @Test + public void testRetrySuccessWithMaxDurationDifferentException() throws Throwable { + Retryable retryable = Retryable.builder() + .withTotalDuration(30) + .withBackoff(10.0) + .withInitialDelay(1) + .withMaxJitterValue(1) + .withRetryOnExceptionList(Arrays.asList(NullPointerException.class, IOException.class)).build(); + try { + retryable.executeCallable(new Callable() { + int count = 0; + @Override + public Void call() throws Exception { + if (count == 0) { + count++; + throw new NullPointerException(); + } else if (count == 1) { + count++; + throw new IOException(); + } else { + return null; + } + } + }); + } catch (Exception e) { + Assert.fail(); + } + } + + @Test + public void testRetrySuccessWithNonRetriableException() throws Throwable { + Retryable retryable = Retryable.builder() + .withTotalDuration(30) + .withBackoff(10.0) + .withInitialDelay(1) + .withMaxJitterValue(1) + .withRetryOnException(IOException.class) + .withFailOnException(FileNotFoundException.class).build(); + try { + retryable.executeCallable(new Callable() { + int count = 0; + @Override + public Void call() throws Exception { + if (count == 0) { + count++; + throw new FileNotFoundException(); + } else if (count == 1) { + count++; + throw new IOException(); + } else { + return null; + } + } + }); + } catch (Exception e) { + Assert.assertEquals(FileNotFoundException.class, e.getClass()); + } + } + + @Test + public void testRetryFailureInValidException() throws Throwable { + Retryable retryable = Retryable.builder() + .withTotalDuration(10) + .withInitialDelay(1) + .withBackoff(1.0) + .withRetryOnException(NullPointerException.class).build(); + try { + retryable.executeCallable(new Callable() { + @Override + public Void call() throws Exception { + throw new IOException(); + } + }); + } catch (Exception e) { + Assert.assertEquals(IOException.class, e.getClass()); + } + } + + @Test + public void testRetryFailureWithHiveConf() throws Throwable { + HiveConf conf = new HiveConf(TestRetryable.class); + conf.set(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY.varname, "1s"); + conf.setFloat(HiveConf.ConfVars.REPL_RETRY_BACKOFF_COEFFICIENT.varname, 1.0f); + conf.set(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION.varname, "60s"); + conf.set(HiveConf.ConfVars.REPL_RETRY_JITTER.varname, "1s"); + conf.set(HiveConf.ConfVars.REPL_RETRY_MAX_DELAY_BETWEEN_RETRIES.varname, "30s"); + long startTime = System.currentTimeMillis(); + long totalTime = 60; + Retryable retryable = Retryable.builder().withHiveConf(conf) + .withRetryOnException(NullPointerException.class).build(); + try { + retryable.executeCallable(new Callable() { + @Override + public Void call() throws Exception { + executeWithDelay(startTime, totalTime); + return null; + } + }); + } catch (Exception e) { + Assert.assertEquals(NullPointerException.class, e.getClass()); + } + } + + @Test + public void testRetrySuccessSecureCallable() throws Throwable { + Mockito.when(userGroupInformation.doAs((PrivilegedAction) Mockito.any())).thenCallRealMethod(); + Mockito.when(userGroupInformation.doAs((PrivilegedExceptionAction) Mockito.any())).thenCallRealMethod(); + Mockito.when(UserGroupInformation.isSecurityEnabled()).thenReturn(true); + Retryable retryable = Retryable.builder() + .withTotalDuration(10) + .withInitialDelay(1) + .withBackoff(1.0) + .withRetryOnExceptionList(Arrays.asList(InterruptedException.class, IOException.class)).build(); + try { + retryable.executeCallable(() -> true); + } catch (Exception e) { + Assert.fail(); + } + ArgumentCaptor privilegedActionArgumentCaptor + = ArgumentCaptor.forClass(PrivilegedExceptionAction.class); + Mockito.verify(userGroupInformation, + Mockito.times(3)).doAs(privilegedActionArgumentCaptor.capture()); + } + + private void executeWithDelay(long startTime, long totalTime) { + long currentTime = System.currentTimeMillis(); + if (currentTime - startTime < totalTime * 1000) { + throw new NullPointerException(); + } + } +}