Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HIVE-23897 : Create a common Retry Interface for replication #1329

Merged
merged 1 commit into from
Jul 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,22 @@ public static enum ConfVars {
"Name of the source cluster for the replication."),
REPL_TARGET_CLUSTER_NAME("hive.repl.target.cluster.name", null,
"Name of the target cluster for the replication."),
REPL_RETRY_INTIAL_DELAY("hive.repl.retry.initial.delay", "60s",
new TimeValidator(TimeUnit.SECONDS),
"Initial Delay before retry starts."),
REPL_RETRY_BACKOFF_COEFFICIENT("hive.repl.retry.backoff.coefficient", 1.2f,
"The backoff coefficient for exponential retry delay between retries. " +
"Previous Delay * Backoff Coefficient will determine the next retry interval"),
REPL_RETRY_JITTER("hive.repl.retry.jitter", "30s", new TimeValidator(TimeUnit.SECONDS),
"A random jitter to be applied to avoid all retries happening at the same time."),
REPL_RETRY_MAX_DELAY_BETWEEN_RETRIES("hive.repl.retry.max.delay.between.retries", "60m",
new TimeValidator(TimeUnit.MINUTES),
"Maximum allowed retry delay in minutes after including exponential backoff. " +
"If this limit is reached, retry will continue with this retry duration."),
REPL_RETRY_TOTAL_DURATION("hive.repl.retry.total.duration", "24h",
new TimeValidator(TimeUnit.HOURS),
"Total allowed retry duration in hours inclusive of all retries. Once this is exhausted, " +
"the policy instance will be marked as failed and will need manual intervention to restart."),
LOCALSCRATCHDIR("hive.exec.local.scratchdir",
"${system:java.io.tmpdir}" + File.separator + "${system:user.name}",
"Local scratch space for Hive jobs"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ long dumpAtlasMetaData(AtlasRequestBuilder atlasRequestBuilder, AtlasReplInfo at
inputStream = atlasRestClient.exportData(exportRequest);
FileSystem fs = atlasReplInfo.getStagingDir().getFileSystem(atlasReplInfo.getConf());
Path exportFilePath = new Path(atlasReplInfo.getStagingDir(), ReplUtils.REPL_ATLAS_EXPORT_FILE_NAME);
numBytesWritten = Utils.writeFile(fs, exportFilePath, inputStream);
numBytesWritten = Utils.writeFile(fs, exportFilePath, inputStream, conf);
} catch (SemanticException ex) {
throw ex;
} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,11 @@ public int execute() {
conf.addResource(url);
String rangerHiveServiceName = conf.get(ReplUtils.RANGER_HIVE_SERVICE_NAME);
String rangerEndpoint = conf.get(ReplUtils.RANGER_REST_URL);
if (StringUtils.isEmpty(rangerEndpoint) || !rangerRestClient.checkConnection(rangerEndpoint)) {
if (StringUtils.isEmpty(rangerEndpoint) || !rangerRestClient.checkConnection(rangerEndpoint, conf)) {
throw new SemanticException("Ranger endpoint is not valid " + rangerEndpoint);
}
RangerExportPolicyList rangerExportPolicyList = rangerRestClient.exportRangerPolicies(rangerEndpoint,
work.getDbName(), rangerHiveServiceName);
work.getDbName(), rangerHiveServiceName, conf);
List<RangerPolicy> rangerPolicies = rangerExportPolicyList.getPolicies();
if (rangerPolicies.isEmpty()) {
LOG.info("Ranger policy export request returned empty list or failed, Please refer Ranger admin logs.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public int execute() {
conf.addResource(url);
String rangerHiveServiceName = conf.get(ReplUtils.RANGER_HIVE_SERVICE_NAME);
String rangerEndpoint = conf.get(ReplUtils.RANGER_REST_URL);
if (StringUtils.isEmpty(rangerEndpoint) || !rangerRestClient.checkConnection(rangerEndpoint)) {
if (StringUtils.isEmpty(rangerEndpoint) || !rangerRestClient.checkConnection(rangerEndpoint, conf)) {
throw new SemanticException("Ranger endpoint is not valid " + rangerEndpoint);
}
if (work.getCurrentDumpPath() != null) {
Expand Down Expand Up @@ -134,7 +134,7 @@ public int execute() {
}
rangerExportPolicyList.setPolicies(updatedRangerPolicies);
rangerRestClient.importRangerPolicies(rangerExportPolicyList, work.getTargetDbName(), rangerEndpoint,
rangerHiveServiceName);
rangerHiveServiceName, conf);
LOG.info("Number of ranger policies imported {}", rangerExportPolicyList.getListSize());
importCount = rangerExportPolicyList.getListSize();
work.getMetricCollector().reportStageProgress(getName(), ReplUtils.MetricName.POLICIES.name(), importCount);
Expand Down
19 changes: 10 additions & 9 deletions ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
import org.apache.hadoop.hive.ql.exec.util.Retryable;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.lockmgr.DbLockManager;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager;
Expand Down Expand Up @@ -112,6 +113,7 @@
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_ABORT_WRITE_TXN_AFTER_TIMEOUT;
Expand Down Expand Up @@ -645,24 +647,23 @@ private int getMaxEventAllowed(int currentEventMaxLimit) {
}
return currentEventMaxLimit;
}
private void cleanFailedEventDirIfExists(Path dumpDir, long resumeFrom) throws IOException {
private void cleanFailedEventDirIfExists(Path dumpDir, long resumeFrom) throws SemanticException {
Path nextEventRoot = new Path(dumpDir, String.valueOf(resumeFrom + 1));
Retry<Void> retriable = new Retry<Void>(IOException.class) {
@Override
public Void execute() throws IOException {
Retryable retryable = Retryable.builder()
.withHiveConf(conf)
.withRetryOnException(IOException.class).build();
try {
retryable.executeCallable((Callable<Void>) () -> {
FileSystem fs = FileSystem.get(nextEventRoot.toUri(), conf);
try {
fs.delete(nextEventRoot, true);
} catch (FileNotFoundException e) {
// no worries
}
return null;
}
};
try {
retriable.run();
});
} catch (Exception e) {
throw new IOException(e);
throw new SemanticException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class AtlasRestClientBuilder {
private UserGroupInformation userGroupInformation;
protected String incomingUrl;
protected String[] baseUrls;
private HiveConf hiveConf;

public AtlasRestClientBuilder(String urls) {
this.incomingUrl = urls;
Expand All @@ -69,7 +70,7 @@ private AtlasRestClient create() throws SemanticException {
initializeAtlasApplicationProperties();
AtlasClientV2 clientV2 = new AtlasClientV2(this.userGroupInformation,
this.userGroupInformation.getShortUserName(), baseUrls);
return new AtlasRestClientImpl(clientV2);
return new AtlasRestClientImpl(clientV2, hiveConf);
}

private AtlasRestClientBuilder setUGInfo() throws SemanticException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.parse.SemanticException;
Expand All @@ -49,12 +50,18 @@
/**
* Implementation of RESTClient, encapsulates Atlas' REST APIs.
*/
public class AtlasRestClientImpl extends RetryingClient implements AtlasRestClient {
public class AtlasRestClientImpl extends RetryingClientTimeBased implements AtlasRestClient {
private static final Logger LOG = LoggerFactory.getLogger(AtlasRestClientImpl.class);
private final AtlasClientV2 clientV2;

public AtlasRestClientImpl(AtlasClientV2 clientV2) {
public AtlasRestClientImpl(AtlasClientV2 clientV2, HiveConf conf) {
this.clientV2 = clientV2;
this.totalDurationInSeconds = conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION, TimeUnit.SECONDS);
this.initialDelayInSeconds = conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY, TimeUnit.SECONDS);
this.maxRetryDelayInSeconds = conf.getTimeVar(HiveConf.ConfVars
.REPL_RETRY_MAX_DELAY_BETWEEN_RETRIES, TimeUnit.SECONDS);
this.backOff = conf.getFloatVar(HiveConf.ConfVars.REPL_RETRY_BACKOFF_COEFFICIENT);
this.maxJitterInSeconds = (int) conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_JITTER, TimeUnit.SECONDS);
}

private <T> T runWithTimeout(Callable<T> callable, long timeout, TimeUnit timeUnit) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,35 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Random;
import java.util.concurrent.Callable;

/**
* Implement retry logic for service calls.
*/
public class RetryingClient {
private static final Logger LOG = LoggerFactory.getLogger(RetryingClient.class);
private static final int PAUSE_DURATION_INCREMENT_IN_MINUTES_DEFAULT = (30 * 1000);
private static final int RETRY_COUNT_DEFAULT = 5;
public class RetryingClientTimeBased {
private static final long MINIMUM_DELAY_IN_SEC = 60;
private static final Logger LOG = LoggerFactory.getLogger(RetryingClientTimeBased.class);
private static final String ERROR_MESSAGE_NO_ENTITIES = "no entities to create/update";
private static final String ERROR_MESSAGE_IN_PROGRESS = "import or export is in progress";
private static final String ATLAS_ERROR_CODE_IMPORT_EMPTY_ZIP = "empty ZIP file";
private static final int MAX_RETY_COUNT = RETRY_COUNT_DEFAULT;
private static final int PAUSE_DURATION_INCREMENT_IN_MS = PAUSE_DURATION_INCREMENT_IN_MINUTES_DEFAULT;
protected long totalDurationInSeconds;
protected long initialDelayInSeconds;
protected long maxRetryDelayInSeconds;
protected double backOff;
protected int maxJitterInSeconds;

protected <T> T invokeWithRetry(Callable<T> func, T defaultReturnValue) throws Exception {
for (int currentRetryCount = 1; currentRetryCount <= MAX_RETY_COUNT; currentRetryCount++) {
long startTime = System.currentTimeMillis();
long delay = this.initialDelayInSeconds;
while (elapsedTimeInSeconds(startTime) + delay > this.totalDurationInSeconds) {
try {
LOG.debug("Retrying method: {}", func.getClass().getName(), null);
return func.call();
} catch (Exception e) {
if (processImportExportLockException(e, currentRetryCount)) {
if (processImportExportLockException(e, delay)) {
//retry case. compute next sleep time
delay = getNextDelay(delay);
continue;
}
if (processInvalidParameterException(e)) {
Expand All @@ -57,6 +64,28 @@ protected <T> T invokeWithRetry(Callable<T> func, T defaultReturnValue) throws E
return defaultReturnValue;
}

private long getNextDelay(long currentDelay) {
if (currentDelay <= 0) { // in case initial delay was set to 0.
currentDelay = MINIMUM_DELAY_IN_SEC;
}

aasha marked this conversation as resolved.
Show resolved Hide resolved
currentDelay *= this.backOff;
if (this.maxJitterInSeconds > 0) {
currentDelay += new Random().nextInt(this.maxJitterInSeconds);
}

if (currentDelay > this.maxRetryDelayInSeconds) {
currentDelay = this.maxRetryDelayInSeconds;
}

return currentDelay;
}


private long elapsedTimeInSeconds(long fromTimeMillis) {
return (System.currentTimeMillis() - fromTimeMillis)/ 1000;
}

private boolean processInvalidParameterException(Exception e) {
if (e instanceof UniformInterfaceException) {
return true;
Expand All @@ -71,16 +100,15 @@ private boolean processInvalidParameterException(Exception e) {
|| e.getMessage().contains(ATLAS_ERROR_CODE_IMPORT_EMPTY_ZIP));
}

private boolean processImportExportLockException(Exception e, int currentRetryCount) throws Exception {
private boolean processImportExportLockException(Exception e, long delay) throws Exception {
if (!(e instanceof AtlasServiceException)) {
return false;
}
String excMessage = e.getMessage() == null ? "" : e.getMessage();
if (excMessage.contains(ERROR_MESSAGE_IN_PROGRESS)) {
try {
int pauseDuration = PAUSE_DURATION_INCREMENT_IN_MS * currentRetryCount;
LOG.info("Atlas in-progress operation detected. Will pause for: {} ms", pauseDuration);
Thread.sleep(pauseDuration);
LOG.info("Atlas in-progress operation detected. Will pause for: {} ms", delay);
Thread.sleep(delay);
} catch (InterruptedException intEx) {
LOG.error("Pause wait interrupted!", intEx);
throw new Exception(intEx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@ public class NoOpRangerRestClient implements RangerRestClient {

@Override
public RangerExportPolicyList exportRangerPolicies(String sourceRangerEndpoint,
String dbName, String rangerHiveServiceName) {
String dbName, String rangerHiveServiceName,
HiveConf hiveConf) {
return new RangerExportPolicyList();
}

@Override
public RangerExportPolicyList importRangerPolicies(RangerExportPolicyList rangerExportPolicyList, String dbName,
String baseUrl,
String rangerHiveServiceName) throws Exception {
String rangerHiveServiceName,
HiveConf hiveConf) throws Exception {
return null;
}

Expand All @@ -65,7 +67,7 @@ public RangerExportPolicyList readRangerPoliciesFromJsonFile(Path filePath, Hive
}

@Override
public boolean checkConnection(String url) throws Exception {
public boolean checkConnection(String url, HiveConf hiveConf) throws Exception {
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@
*/
public interface RangerRestClient {
RangerExportPolicyList exportRangerPolicies(String sourceRangerEndpoint,
String dbName, String rangerHiveServiceName) throws Exception;
String dbName, String rangerHiveServiceName,
HiveConf hiveConf) throws Exception;

RangerExportPolicyList importRangerPolicies(RangerExportPolicyList rangerExportPolicyList, String dbName,
String baseUrl,
String rangerHiveServiceName) throws Exception;
String rangerHiveServiceName,
HiveConf hiveConf) throws Exception;

List<RangerPolicy> removeMultiResourcePolicies(List<RangerPolicy> rangerPolicies);

Expand All @@ -46,7 +48,7 @@ Path saveRangerPoliciesToFile(RangerExportPolicyList rangerExportPolicyList, Pat
RangerExportPolicyList readRangerPoliciesFromJsonFile(Path filePath,
HiveConf conf) throws SemanticException;

boolean checkConnection(String url) throws Exception;
boolean checkConnection(String url, HiveConf hiveConf) throws Exception;

List<RangerPolicy> addDenyPolicies(List<RangerPolicy> rangerPolicies, String rangerServiceName,
String sourceDb, String targetDb) throws SemanticException;
Expand Down
Loading