Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.knox.gateway.topology.discovery.ServiceDiscoveryConfig;
import org.apache.knox.gateway.topology.discovery.cm.monitor.ClouderaManagerClusterConfigurationMonitor;

import java.net.ConnectException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -48,6 +49,7 @@
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;


/**
Expand Down Expand Up @@ -90,6 +92,10 @@ public class ClouderaManagerServiceDiscovery implements ServiceDiscovery, Cluste

private final ClouderaManagerServiceDiscoveryRepository repository = ClouderaManagerServiceDiscoveryRepository.getInstance();

private final AtomicInteger retryAttempts = new AtomicInteger(0);
private final int retrySleepSeconds = 3; // It's been agreed that we not expose this config
private int maxRetryAttempts = -1;

ClouderaManagerServiceDiscovery(GatewayConfig gatewayConfig) {
this(false, gatewayConfig);
}
Expand All @@ -106,6 +112,23 @@ public class ClouderaManagerServiceDiscovery implements ServiceDiscovery, Cluste
if (gatewayConfig != null) {
repository.setCacheEntryTTL(gatewayConfig.getClouderaManagerServiceDiscoveryRepositoryEntryTTL());
}

configureRetryParams(gatewayConfig);
}

private void configureRetryParams(GatewayConfig gatewayConfig) {
final int configuredMaxRetryAttempts = gatewayConfig.getClouderaManagerServiceDiscoveryMaximumRetryAttempts();
if (configuredMaxRetryAttempts > 0) {
final int configuredRetryDurationSeconds = configuredMaxRetryAttempts * retrySleepSeconds;
final int pollingInterval = gatewayConfig.getClusterMonitorPollingInterval(ClouderaManagerClusterConfigurationMonitor.getType());
final int retryDurationLimit = pollingInterval / 2;
if (retryDurationLimit > configuredRetryDurationSeconds) {
this.maxRetryAttempts = configuredMaxRetryAttempts;
} else {
this.maxRetryAttempts = retryDurationLimit / retrySleepSeconds;
log.updateMaxRetryAttempts(configuredMaxRetryAttempts, maxRetryAttempts);
}
}
}

@Override
Expand Down Expand Up @@ -183,13 +206,41 @@ protected ClouderaManagerCluster discover(GatewayConfig gatewayConfig,
// Notify the cluster config monitor about these cluster configuration details
configChangeMonitor.addServiceConfiguration(cluster, discoveryConfig);
}
resetRetryAttempts();
} catch (ApiException e) {
log.clusterDiscoveryError(clusterName, e);
if (shouldRetryServiceDiscovery(e)) {
log.retryDiscovery(retrySleepSeconds, retryAttempts.get());
try {
Thread.sleep(retrySleepSeconds * 1000L);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
cluster = discover(gatewayConfig, discoveryConfig, clusterName, includedServices, client);
} else {
resetRetryAttempts();
}
}

return cluster;
}

private void resetRetryAttempts() {
retryAttempts.set(0);
}

private boolean shouldRetryServiceDiscovery(ApiException e) {
if (maxRetryAttempts > 0 && maxRetryAttempts > retryAttempts.getAndIncrement()) {
final Throwable cause = e.getCause();
if (cause != null) {
if (ConnectException.class.isAssignableFrom(cause.getClass())) {
return true;
}
}
}
return false;
}

private ClouderaManagerCluster discoverCluster(DiscoveryApiClient client, String clusterName, Collection<String> includedServices)
throws ApiException {
ServicesResourceApi servicesResourceApi = new ServicesResourceApi(client);
Expand Down Expand Up @@ -281,7 +332,7 @@ private boolean shouldSkipServiceDiscovery(List<ServiceModelGenerator> modelGene
return true;
}

private List<ApiService> getClusterServices(ServiceDiscoveryConfig serviceDiscoveryConfig, ServicesResourceApi servicesResourceApi) {
private List<ApiService> getClusterServices(ServiceDiscoveryConfig serviceDiscoveryConfig, ServicesResourceApi servicesResourceApi) throws ApiException {
log.lookupClusterServicesFromRepository();
List<ApiService> services = repository.getServices(serviceDiscoveryConfig);
if (services == null || services.isEmpty()) {
Expand All @@ -294,12 +345,13 @@ private List<ApiService> getClusterServices(ServiceDiscoveryConfig serviceDiscov
services.forEach(service -> repository.addService(serviceDiscoveryConfig, service));
} catch (ApiException e) {
log.failedToAccessServiceConfigs(serviceDiscoveryConfig.getCluster(), e);
throw e;
}
}
return services;
}

private ApiServiceConfig getServiceConfig(ServiceDiscoveryConfig serviceDiscoveryConfig, ServicesResourceApi servicesResourceApi, ApiService service) {
private ApiServiceConfig getServiceConfig(ServiceDiscoveryConfig serviceDiscoveryConfig, ServicesResourceApi servicesResourceApi, ApiService service) throws ApiException {
log.lookupServiceConfigsFromRepository();
// first, try in the service discovery repository
ApiServiceConfig serviceConfig = repository.getServiceConfig(serviceDiscoveryConfig, service);
Expand All @@ -312,14 +364,15 @@ private ApiServiceConfig getServiceConfig(ServiceDiscoveryConfig serviceDiscover

// make sure that service config is populated in the service discovery repository to avoid subsequent CM calls
repository.addServiceConfig(serviceDiscoveryConfig, service, serviceConfig);
} catch (Exception e) {
} catch (ApiException e) {
log.failedToAccessServiceConfigs(serviceDiscoveryConfig.getCluster(), e);
throw e;
}
}
return serviceConfig;
}

private ApiRoleList getRoles(ServiceDiscoveryConfig serviceDiscoveryConfig, RolesResourceApi rolesResourceApi, String clusterName, ApiService service) {
private ApiRoleList getRoles(ServiceDiscoveryConfig serviceDiscoveryConfig, RolesResourceApi rolesResourceApi, String clusterName, ApiService service) throws ApiException {
log.lookupRolesFromRepository();
//first, try in the service discovery repository
ApiRoleList roles = repository.getRoles(serviceDiscoveryConfig, service);
Expand All @@ -341,15 +394,16 @@ private ApiRoleList getRoles(ServiceDiscoveryConfig serviceDiscoveryConfig, Role

// make sure that role is populated in the service discovery repository to avoid subsequent CM calls
repository.addRoles(serviceDiscoveryConfig, service, roles);
} catch (Exception e) {
} catch (ApiException e) {
log.failedToAccessServiceRoleConfigs(serviceName, "N/A", clusterName, e);
throw e;
}
}

return roles;
}

private ApiConfigList getRoleConfig(ServiceDiscoveryConfig serviceDiscoveryConfig, RolesResourceApi rolesResourceApi, ApiService service, ApiRole role) {
private ApiConfigList getRoleConfig(ServiceDiscoveryConfig serviceDiscoveryConfig, RolesResourceApi rolesResourceApi, ApiService service, ApiRole role) throws ApiException {
log.lookupRoleConfigsFromRepository();
// first, try in the service discovery repository
ApiConfigList configList = repository.getRoleConfigs(serviceDiscoveryConfig, service, role);
Expand All @@ -361,8 +415,9 @@ private ApiConfigList getRoleConfig(ServiceDiscoveryConfig serviceDiscoveryConfi

// make sure that role config is populated in the service discovery repository to avoid subsequent CM calls
repository.addRoleConfigs(serviceDiscoveryConfig, service, role, configList);
} catch (Exception e) {
} catch (ApiException e) {
log.failedToAccessServiceRoleConfigs(service.getName(), role.getName(), serviceDiscoveryConfig.getCluster(), e);
throw e;
}
}
return configList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ void failedToInstantiateJAASConfigurationFileImplementation(String implementatio
text = "Encountered an error during cluster ({0}) discovery: {1}")
void clusterDiscoveryError(String clusterName, @StackTrace(level = MessageLevel.DEBUG) Exception e);

@Message(level = MessageLevel.INFO, text = "Sleeping {0} second(s) before retrying Cloudera Manager service discovery for the {1}. time")
void retryDiscovery(long retrySleep, int retryAttempt);

@Message(level = MessageLevel.ERROR,
text = "Failed to access the service configurations for cluster ({0}) discovery: {1}")
void failedToAccessServiceConfigs(String clusterName, @StackTrace(level = MessageLevel.DEBUG) Exception e);
Expand Down Expand Up @@ -250,4 +253,6 @@ void roleConfigurationPropertyHasChanged(String propertyName,
@Message(level = MessageLevel.DEBUG, text = "Clearing service discovery repository...")
void clearServiceDiscoveryRepository();

@Message(level = MessageLevel.WARN, text = "The configured maximum retry attempts of {0} may overlap with the configured polling interval settings; using {1} retry attempts")
void updateMaxRetryAttempts(int configured, int actual);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package org.apache.knox.gateway.topology.discovery.cm;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

import com.cloudera.api.swagger.client.ApiException;
import com.cloudera.api.swagger.client.ApiResponse;
import com.cloudera.api.swagger.model.ApiClusterRef;
Expand Down Expand Up @@ -55,25 +58,31 @@
import org.apache.knox.gateway.topology.discovery.cm.model.spark.Spark3HistoryUIServiceModelGenerator;
import org.apache.knox.gateway.topology.discovery.cm.model.spark.SparkHistoryUIServiceModelGenerator;
import org.apache.knox.gateway.topology.discovery.cm.model.zeppelin.ZeppelinServiceModelGenerator;
import org.apache.knox.gateway.topology.discovery.cm.monitor.ClouderaManagerClusterConfigurationMonitor;
import org.easymock.EasyMock;
import org.junit.Test;


import java.lang.reflect.Type;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.util.concurrent.atomic.AtomicInteger;


public class ClouderaManagerServiceDiscoveryTest {

private static final String DISCOVERY_URL = "http://localhost:1234";
private static final String ATLAS_SERVICE_NAME = "ATLAS-1";

@Test
public void testServiceDiscoveryRetry() throws Exception {
//re-using an already existing test with 'true' retry flag
doTestAtlasDiscovery(true, true);
}

@Test
public void testJobTrackerServiceDiscovery() {
Expand Down Expand Up @@ -130,10 +139,14 @@ public void testAtlasAPIDiscoverySSL() {
}

private void doTestAtlasDiscovery(final boolean isSSL) {
doTestAtlasDiscovery(isSSL, false);
}

private void doTestAtlasDiscovery(final boolean isSSL, boolean testRetry) {
final String hostName = "atlas-host-1";
final String port = "21000";
final String sslPort = "21003";
ServiceDiscovery.Cluster cluster = doTestAtlasDiscovery(hostName, port, sslPort, isSSL);
ServiceDiscovery.Cluster cluster = doTestAtlasDiscovery(hostName, port, sslPort, isSSL, testRetry);
List<String> atlastURLs = cluster.getServiceURLs(AtlasServiceModelGenerator.SERVICE);
assertEquals(1, atlastURLs.size());
assertEquals((isSSL ? "https" : "http") + "://" + hostName + ":" + (isSSL ? sslPort : port),
Expand Down Expand Up @@ -934,23 +947,32 @@ private Map<String, String> sparkHistoryUIRoleProperties(final String port,
return roleProperties;
}

private ServiceDiscovery.Cluster doTestAtlasDiscovery(final String atlasHost,
final String port,
final String sslPort,
final boolean isSSL) {
return doTestAtlasDiscovery(atlasHost, port, sslPort, isSSL, false);
}

private ServiceDiscovery.Cluster doTestAtlasDiscovery(final String atlasHost,
final String port,
final String sslPort,
final boolean isSSL) {
final boolean isSSL,
final boolean testRetry) {
// Configure the role
Map<String, String> roleProperties = new HashMap<>();
roleProperties.put("atlas_server_http_port", port);
roleProperties.put("atlas_server_https_port", sslPort);
roleProperties.put("ssl_enabled", String.valueOf(isSSL));

return doTestDiscovery(atlasHost,
"ATLAS-1",
ATLAS_SERVICE_NAME,
AtlasServiceModelGenerator.SERVICE_TYPE,
"ATLAS-ATLAS_SERVER-1",
AtlasServiceModelGenerator.ROLE_TYPE,
Collections.emptyMap(),
roleProperties);
roleProperties,
testRetry);
}


Expand Down Expand Up @@ -1137,22 +1159,37 @@ private ServiceDiscovery.Cluster doTestPhoenixDiscovery(final String hostName,
}


private ServiceDiscovery.Cluster doTestDiscovery(final String hostName,
final String serviceName,
final String serviceType,
final String roleName,
final String roleType,
final Map<String, String> serviceProperties,
final Map<String, String> roleProperties) {
return doTestDiscovery(hostName, serviceName, serviceType, roleName, roleType, serviceProperties, roleProperties, false);
}

private ServiceDiscovery.Cluster doTestDiscovery(final String hostName,
final String serviceName,
final String serviceType,
final String roleName,
final String roleType,
final Map<String, String> serviceProperties,
final Map<String, String> roleProperties) {
final Map<String, String> roleProperties,
boolean testRetry) {
final String clusterName = "cluster-1";

GatewayConfig gwConf = EasyMock.createNiceMock(GatewayConfig.class);
if (testRetry) {
EasyMock.expect(gwConf.getClouderaManagerServiceDiscoveryMaximumRetryAttempts()).andReturn(GatewayConfig.DEFAULT_CM_SERVICE_DISCOVERY_MAX_RETRY_ATTEMPTS).anyTimes();
EasyMock.expect(gwConf.getClusterMonitorPollingInterval(ClouderaManagerClusterConfigurationMonitor.getType())).andReturn(10).anyTimes();
}
EasyMock.replay(gwConf);

ServiceDiscoveryConfig sdConfig = createMockDiscoveryConfig(clusterName);

// Create the test client for providing test response content
TestDiscoveryApiClient mockClient = new TestDiscoveryApiClient(sdConfig, null, null);
TestDiscoveryApiClient mockClient = testRetry ? new TestFaultyDiscoveryApiClient(sdConfig, null, null) : new TestDiscoveryApiClient(sdConfig, null, null);

// Prepare the service list response for the cluster
ApiServiceList serviceList = EasyMock.createNiceMock(ApiServiceList.class);
Expand Down Expand Up @@ -1185,6 +1222,9 @@ private ServiceDiscovery.Cluster doTestDiscovery(final String hostName,
ServiceDiscovery.Cluster cluster = cmsd.discover(gwConf, sdConfig, clusterName, Collections.emptySet(), mockClient);
assertNotNull(cluster);
assertEquals(clusterName, cluster.getName());
if (serviceName.equals(ATLAS_SERVICE_NAME)) {
assertEquals(testRetry ? 9 : 4, mockClient.getExecuteCount());
}
return cluster;
}

Expand Down Expand Up @@ -1276,6 +1316,8 @@ private static class TestDiscoveryApiClient extends DiscoveryApiClient {

private Map<Type, ApiResponse<?>> responseMap = new HashMap<>();

protected AtomicInteger executeCount = new AtomicInteger(0);

TestDiscoveryApiClient(ServiceDiscoveryConfig sdConfig, AliasService aliasService,
KeystoreService keystoreService) {
super(sdConfig, aliasService, keystoreService);
Expand All @@ -1292,8 +1334,29 @@ boolean isKerberos() {

@Override
public <T> ApiResponse<T> execute(Call call, Type returnType) throws ApiException {
executeCount.incrementAndGet();
return (ApiResponse<T>) responseMap.get(returnType);
}

int getExecuteCount() {
return executeCount.get();
}
}

private static class TestFaultyDiscoveryApiClient extends TestDiscoveryApiClient {

TestFaultyDiscoveryApiClient(ServiceDiscoveryConfig sdConfig, AliasService aliasService,
KeystoreService keystoreService) {
super(sdConfig, aliasService, keystoreService);
}

@Override
public <T> ApiResponse<T> execute(Call call, Type returnType) throws ApiException {
if (executeCount.getAndIncrement() < GatewayConfig.DEFAULT_CM_SERVICE_DISCOVERY_MAX_RETRY_ATTEMPTS - 2) {
throw new ApiException(new ConnectException("Failed to connect to CM HOST"));
}
return super.execute(call, returnType);
}
}

private static class TestResponseBase<T> extends ApiResponse<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ public class GatewayConfigImpl extends Configuration implements GatewayConfig {
private static final String CLOUDERA_MANAGER_DESCRIPTORS_MONITOR_INTERVAL = GATEWAY_CONFIG_FILE_PREFIX + ".cloudera.manager.descriptors.monitor.interval";
private static final String CLOUDERA_MANAGER_ADVANCED_SERVICE_DISCOVERY_CONF_MONITOR_INTERVAL = GATEWAY_CONFIG_FILE_PREFIX + ".cloudera.manager.advanced.service.discovery.config.monitor.interval";
private static final String CLOUDERA_MANAGER_SERVICE_DISCOVERY_REPOSITORY_CACHE_ENTRY_TTL = GATEWAY_CONFIG_FILE_PREFIX + ".cloudera.manager.service.discovery.repository.cache.entry.ttl";
private static final String CLOUDERA_MANAGER_SERVICE_DISCOVERY_MAX_RETRY_ATTEMPS = GATEWAY_CONFIG_FILE_PREFIX + ".cloudera.manager.service.discovery.maximum.retry.attemps";

private static final String KNOX_TOKEN_EVICTION_INTERVAL = GATEWAY_CONFIG_FILE_PREFIX + ".knox.token.eviction.interval";
private static final String KNOX_TOKEN_EVICTION_GRACE_PERIOD = GATEWAY_CONFIG_FILE_PREFIX + ".knox.token.eviction.grace.period";
Expand Down Expand Up @@ -1177,6 +1178,11 @@ public long getClouderaManagerServiceDiscoveryRepositoryEntryTTL() {
return getLong(CLOUDERA_MANAGER_SERVICE_DISCOVERY_REPOSITORY_CACHE_ENTRY_TTL, DEFAULT_CM_SERVICE_DISCOVERY_CACHE_ENTRY_TTL);
}

@Override
public int getClouderaManagerServiceDiscoveryMaximumRetryAttempts() {
return getInt(CLOUDERA_MANAGER_SERVICE_DISCOVERY_MAX_RETRY_ATTEMPS, DEFAULT_CM_SERVICE_DISCOVERY_MAX_RETRY_ATTEMPTS);
}

@Override
public boolean isServerManagedTokenStateEnabled() {
return getBoolean(TOKEN_STATE_SERVER_MANAGED, false);
Expand Down
Loading