Skip to content

Commit

Permalink
HIVE-23897 : Create a common Retry Interface for replication
Browse files Browse the repository at this point in the history
  • Loading branch information
aasha committed Jul 29, 2020
1 parent f0c6e92 commit c181c2c
Show file tree
Hide file tree
Showing 19 changed files with 868 additions and 199 deletions.
16 changes: 16 additions & 0 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,22 @@ public static enum ConfVars {
"Name of the source cluster for the replication."),
REPL_TARGET_CLUSTER_NAME("hive.repl.target.cluster.name", null,
"Name of the target cluster for the replication."),
REPL_RETRY_INTIAL_DELAY("hive.repl.retry.initial.delay", "60s",
new TimeValidator(TimeUnit.SECONDS),
"Initial Delay before retry starts."),
REPL_RETRY_BACKOFF_COEFFICIENT("hive.repl.retry.backoff.coefficient", 1.2f,
"The backoff coefficient for exponential retry delay between retries. " +
"Previous Delay * Backoff Coefficient will determine the next retry interval"),
REPL_RETRY_JITTER("hive.repl.retry.jitter", "30s", new TimeValidator(TimeUnit.SECONDS),
"A random jitter to be applied to avoid all retries happening at the same time."),
REPL_RETRY_MAX_DELAY_BETWEEN_RETRIES("hive.repl.retry.max.delay.between.retries", "60m",
new TimeValidator(TimeUnit.MINUTES),
"Maximum allowed retry delay in seconds after including exponential backoff. " +
"If this limit is reached, retry will continue with this retry duration."),
REPL_RETRY_TOTAL_DURATION("hive.repl.retry.total.duration", "24h",
new TimeValidator(TimeUnit.HOURS),
"Total allowed retry duration in seconds inclusive of all retries. Once this is exhausted, " +
"the policy instance will be marked as failed and will need manual intervention to restart."),
LOCALSCRATCHDIR("hive.exec.local.scratchdir",
"${system:java.io.tmpdir}" + File.separator + "${system:user.name}",
"Local scratch space for Hive jobs"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ long dumpAtlasMetaData(AtlasRequestBuilder atlasRequestBuilder, AtlasReplInfo at
inputStream = atlasRestClient.exportData(exportRequest);
FileSystem fs = atlasReplInfo.getStagingDir().getFileSystem(atlasReplInfo.getConf());
Path exportFilePath = new Path(atlasReplInfo.getStagingDir(), ReplUtils.REPL_ATLAS_EXPORT_FILE_NAME);
numBytesWritten = Utils.writeFile(fs, exportFilePath, inputStream);
numBytesWritten = Utils.writeFile(fs, exportFilePath, inputStream, conf);
} catch (SemanticException ex) {
throw ex;
} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,11 @@ public int execute() {
conf.addResource(url);
String rangerHiveServiceName = conf.get(ReplUtils.RANGER_HIVE_SERVICE_NAME);
String rangerEndpoint = conf.get(ReplUtils.RANGER_REST_URL);
if (StringUtils.isEmpty(rangerEndpoint) || !rangerRestClient.checkConnection(rangerEndpoint)) {
if (StringUtils.isEmpty(rangerEndpoint) || !rangerRestClient.checkConnection(rangerEndpoint, conf)) {
throw new SemanticException("Ranger endpoint is not valid " + rangerEndpoint);
}
RangerExportPolicyList rangerExportPolicyList = rangerRestClient.exportRangerPolicies(rangerEndpoint,
work.getDbName(), rangerHiveServiceName);
work.getDbName(), rangerHiveServiceName, conf);
List<RangerPolicy> rangerPolicies = rangerExportPolicyList.getPolicies();
if (rangerPolicies.isEmpty()) {
LOG.info("Ranger policy export request returned empty list or failed, Please refer Ranger admin logs.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public int execute() {
conf.addResource(url);
String rangerHiveServiceName = conf.get(ReplUtils.RANGER_HIVE_SERVICE_NAME);
String rangerEndpoint = conf.get(ReplUtils.RANGER_REST_URL);
if (StringUtils.isEmpty(rangerEndpoint) || !rangerRestClient.checkConnection(rangerEndpoint)) {
if (StringUtils.isEmpty(rangerEndpoint) || !rangerRestClient.checkConnection(rangerEndpoint, conf)) {
throw new SemanticException("Ranger endpoint is not valid " + rangerEndpoint);
}
if (work.getCurrentDumpPath() != null) {
Expand Down Expand Up @@ -134,7 +134,7 @@ public int execute() {
}
rangerExportPolicyList.setPolicies(updatedRangerPolicies);
rangerRestClient.importRangerPolicies(rangerExportPolicyList, work.getTargetDbName(), rangerEndpoint,
rangerHiveServiceName);
rangerHiveServiceName, conf);
LOG.info("Number of ranger policies imported {}", rangerExportPolicyList.getListSize());
importCount = rangerExportPolicyList.getListSize();
work.getMetricCollector().reportStageProgress(getName(), ReplUtils.MetricName.POLICIES.name(), importCount);
Expand Down
19 changes: 10 additions & 9 deletions ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
import org.apache.hadoop.hive.ql.exec.util.Retryable;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.lockmgr.DbLockManager;
import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager;
Expand Down Expand Up @@ -112,6 +113,7 @@
import java.util.ArrayList;
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_ABORT_WRITE_TXN_AFTER_TIMEOUT;
Expand Down Expand Up @@ -645,24 +647,23 @@ private int getMaxEventAllowed(int currentEventMaxLimit) {
}
return currentEventMaxLimit;
}
private void cleanFailedEventDirIfExists(Path dumpDir, long resumeFrom) throws IOException {
private void cleanFailedEventDirIfExists(Path dumpDir, long resumeFrom) throws SemanticException {
Path nextEventRoot = new Path(dumpDir, String.valueOf(resumeFrom + 1));
Retry<Void> retriable = new Retry<Void>(IOException.class) {
@Override
public Void execute() throws IOException {
Retryable retryable = Retryable.builder()
.withHiveConf(conf)
.withRetryOnException(IOException.class).build();
try {
retryable.executeCallable((Callable<Void>) () -> {
FileSystem fs = FileSystem.get(nextEventRoot.toUri(), conf);
try {
fs.delete(nextEventRoot, true);
} catch (FileNotFoundException e) {
// no worries
}
return null;
}
};
try {
retriable.run();
});
} catch (Exception e) {
throw new IOException(e);
throw new SemanticException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class AtlasRestClientBuilder {
private UserGroupInformation userGroupInformation;
protected String incomingUrl;
protected String[] baseUrls;
private HiveConf hiveConf;

public AtlasRestClientBuilder(String urls) {
this.incomingUrl = urls;
Expand All @@ -69,7 +70,7 @@ private AtlasRestClient create() throws SemanticException {
initializeAtlasApplicationProperties();
AtlasClientV2 clientV2 = new AtlasClientV2(this.userGroupInformation,
this.userGroupInformation.getShortUserName(), baseUrls);
return new AtlasRestClientImpl(clientV2);
return new AtlasRestClientImpl(clientV2, hiveConf);
}

private AtlasRestClientBuilder setUGInfo() throws SemanticException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
import org.apache.hadoop.hive.ql.parse.SemanticException;
Expand All @@ -49,12 +50,18 @@
/**
* Implementation of RESTClient, encapsulates Atlas' REST APIs.
*/
public class AtlasRestClientImpl extends RetryingClient implements AtlasRestClient {
public class AtlasRestClientImpl extends RetryingClientTimeBased implements AtlasRestClient {
private static final Logger LOG = LoggerFactory.getLogger(AtlasRestClientImpl.class);
private final AtlasClientV2 clientV2;

public AtlasRestClientImpl(AtlasClientV2 clientV2) {
public AtlasRestClientImpl(AtlasClientV2 clientV2, HiveConf conf) {
this.clientV2 = clientV2;
this.totalDurationInSeconds = conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_TOTAL_DURATION, TimeUnit.SECONDS);
this.initialDelayInSeconds = conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_INTIAL_DELAY, TimeUnit.SECONDS);
this.maxRetryDelayInSeconds = conf.getTimeVar(HiveConf.ConfVars
.REPL_RETRY_MAX_DELAY_BETWEEN_RETRIES, TimeUnit.SECONDS);
this.backOff = conf.getFloatVar(HiveConf.ConfVars.REPL_RETRY_BACKOFF_COEFFICIENT);
this.maxJitterInSeconds = (int) conf.getTimeVar(HiveConf.ConfVars.REPL_RETRY_JITTER, TimeUnit.SECONDS);
}

private <T> T runWithTimeout(Callable<T> callable, long timeout, TimeUnit timeUnit) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hive.ql.exec.repl.atlas;

import com.sun.jersey.api.client.UniformInterfaceException;
import org.apache.atlas.AtlasServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Random;
import java.util.concurrent.Callable;

/**
* Implement retry logic for service calls.
*/
public class RetryingClientTimeBased {
private static long MINIMUM_DELAY_IN_SEC = 1;
private static final Logger LOG = LoggerFactory.getLogger(RetryingClientTimeBased.class);
private static final String ERROR_MESSAGE_NO_ENTITIES = "no entities to create/update";
private static final String ERROR_MESSAGE_IN_PROGRESS = "import or export is in progress";
private static final String ATLAS_ERROR_CODE_IMPORT_EMPTY_ZIP = "empty ZIP file";
protected long totalDurationInSeconds;
protected long initialDelayInSeconds;
protected long maxRetryDelayInSeconds;
protected double backOff;
protected int maxJitterInSeconds;

protected <T> T invokeWithRetry(Callable<T> func, T defaultReturnValue) throws Exception {
long startTime = System.currentTimeMillis();
long delay = this.initialDelayInSeconds;
while (elapsedTimeInSeconds(startTime) + delay > this.totalDurationInSeconds) {
try {
LOG.debug("Retrying method: {}", func.getClass().getName(), null);
return func.call();
} catch (Exception e) {
if (processImportExportLockException(e, delay)) {
//retry case. compute next sleep time
delay = getNextDelay(delay);
continue;
}
if (processInvalidParameterException(e)) {
return null;
}
LOG.error(func.getClass().getName(), e);
throw new Exception(e);
}
}
return defaultReturnValue;
}

private long getNextDelay(long currentDelay) {
if (currentDelay <= 0) { // in case initial delay was set to 0.
currentDelay = MINIMUM_DELAY_IN_SEC;
}

currentDelay *= this.backOff;
if (this.maxJitterInSeconds > 0) {
currentDelay += new Random().nextInt(this.maxJitterInSeconds);
}

if (currentDelay > this.maxRetryDelayInSeconds) {
currentDelay = this.maxRetryDelayInSeconds;
}

return currentDelay;
}


private long elapsedTimeInSeconds(long fromTimeMillis) {
return (System.currentTimeMillis() - fromTimeMillis)/ 1000;
}

private boolean processInvalidParameterException(Exception e) {
if (e instanceof UniformInterfaceException) {
return true;
}
if (!(e instanceof AtlasServiceException)) {
return false;
}
if (e.getMessage() == null) {
return false;
}
return (e.getMessage().contains(ERROR_MESSAGE_NO_ENTITIES)
|| e.getMessage().contains(ATLAS_ERROR_CODE_IMPORT_EMPTY_ZIP));
}

private boolean processImportExportLockException(Exception e, long delay) throws Exception {
if (!(e instanceof AtlasServiceException)) {
return false;
}
String excMessage = e.getMessage() == null ? "" : e.getMessage();
if (excMessage.contains(ERROR_MESSAGE_IN_PROGRESS)) {
try {
LOG.info("Atlas in-progress operation detected. Will pause for: {} ms", delay);
Thread.sleep(delay);
} catch (InterruptedException intEx) {
LOG.error("Pause wait interrupted!", intEx);
throw new Exception(intEx);
}
return true;
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@ public class NoOpRangerRestClient implements RangerRestClient {

@Override
public RangerExportPolicyList exportRangerPolicies(String sourceRangerEndpoint,
String dbName, String rangerHiveServiceName) {
String dbName, String rangerHiveServiceName,
HiveConf hiveConf) {
return new RangerExportPolicyList();
}

@Override
public RangerExportPolicyList importRangerPolicies(RangerExportPolicyList rangerExportPolicyList, String dbName,
String baseUrl,
String rangerHiveServiceName) throws Exception {
String rangerHiveServiceName,
HiveConf hiveConf) throws Exception {
return null;
}

Expand All @@ -65,7 +67,7 @@ public RangerExportPolicyList readRangerPoliciesFromJsonFile(Path filePath, Hive
}

@Override
public boolean checkConnection(String url) throws Exception {
public boolean checkConnection(String url, HiveConf hiveConf) throws Exception {
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@
*/
public interface RangerRestClient {
RangerExportPolicyList exportRangerPolicies(String sourceRangerEndpoint,
String dbName, String rangerHiveServiceName) throws Exception;
String dbName, String rangerHiveServiceName,
HiveConf hiveConf) throws Exception;

RangerExportPolicyList importRangerPolicies(RangerExportPolicyList rangerExportPolicyList, String dbName,
String baseUrl,
String rangerHiveServiceName) throws Exception;
String rangerHiveServiceName,
HiveConf hiveConf) throws Exception;

List<RangerPolicy> removeMultiResourcePolicies(List<RangerPolicy> rangerPolicies);

Expand All @@ -46,7 +48,7 @@ Path saveRangerPoliciesToFile(RangerExportPolicyList rangerExportPolicyList, Pat
RangerExportPolicyList readRangerPoliciesFromJsonFile(Path filePath,
HiveConf conf) throws SemanticException;

boolean checkConnection(String url) throws Exception;
boolean checkConnection(String url, HiveConf hiveConf) throws Exception;

List<RangerPolicy> addDenyPolicies(List<RangerPolicy> rangerPolicies, String rangerServiceName,
String sourceDb, String targetDb) throws SemanticException;
Expand Down
Loading

0 comments on commit c181c2c

Please sign in to comment.