Skip to content

Commit

Permalink
GEODE-8099: add dlock around cms create/delete operations. (#5188)
Browse files Browse the repository at this point in the history
  • Loading branch information
jinmeiliao committed Jun 15, 2020
1 parent e5426f7 commit 0f763ea
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 157 deletions.
Expand Up @@ -66,7 +66,6 @@
import org.apache.geode.distributed.ConfigurationPersistenceService;
import org.apache.geode.distributed.DistributedLockService;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.distributed.internal.locks.DLockService;
import org.apache.geode.internal.JarDeployer;
import org.apache.geode.internal.cache.ClusterConfigurationLoader;
Expand Down Expand Up @@ -129,7 +128,8 @@ public class InternalConfigurationPersistenceService implements ConfigurationPer
public InternalConfigurationPersistenceService(InternalCache cache, Path workingDirectory,
JAXBService jaxbService) {
this(cache,
sharedConfigLockService(cache.getDistributedSystem()),
DLockService.getOrCreateService(SHARED_CONFIG_LOCK_SERVICE_NAME,
cache.getInternalDistributedSystem()),
jaxbService,
workingDirectory.resolve(CLUSTER_CONFIG_ARTIFACTS_DIR_NAME),
workingDirectory
Expand All @@ -153,24 +153,6 @@ public InternalConfigurationPersistenceService(JAXBService jaxbService) {
this.jaxbService = jaxbService;
}


/**
* Gets or creates (if not created) shared configuration lock service
*/
private static DistributedLockService sharedConfigLockService(DistributedSystem ds) {
DistributedLockService sharedConfigDls =
DLockService.getServiceNamed(SHARED_CONFIG_LOCK_SERVICE_NAME);
try {
if (sharedConfigDls == null) {
sharedConfigDls = DLockService.create(SHARED_CONFIG_LOCK_SERVICE_NAME,
(InternalDistributedSystem) ds, true, true);
}
} catch (IllegalArgumentException ignore) {
return DLockService.getServiceNamed(SHARED_CONFIG_LOCK_SERVICE_NAME);
}
return sharedConfigDls;
}

public JAXBService getJaxbService() {
return jaxbService;
}
Expand Down
Expand Up @@ -2663,6 +2663,18 @@ public int getSerialNumber() {
// -------------------------------------------------------------------------
// External API methods
// -------------------------------------------------------------------------
public static DistributedLockService getOrCreateService(String serviceName,
InternalDistributedSystem ds) {
DistributedLockService cmsLockService = DLockService.getServiceNamed(serviceName);
try {
if (cmsLockService == null) {
cmsLockService = DLockService.create(serviceName, ds, true, true);
}
} catch (IllegalArgumentException ignore) {
return DLockService.getServiceNamed(serviceName);
}
return cmsLockService;
}

/**
* @see org.apache.geode.distributed.DistributedLockService#getServiceNamed(String)
Expand Down
Expand Up @@ -94,6 +94,13 @@ public static void setManagementService(InternalCacheForClientAccess cache,
}
}

@VisibleForTesting
public static void clearManagementService(InternalCacheForClientAccess cache) {
synchronized (instances) {
instances.remove(cache);
}
}

public static ManagementService getExistingManagementService(InternalCache cache) {
synchronized (instances) {
BaseManagementService service = instances.get(cache.getCacheForProcessingClientRequests());
Expand Down
Expand Up @@ -47,8 +47,10 @@
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.distributed.DistributedLockService;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.InternalConfigurationPersistenceService;
import org.apache.geode.distributed.internal.locks.DLockService;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.execute.AbstractExecution;
import org.apache.geode.logging.internal.log4j.api.LogService;
Expand Down Expand Up @@ -105,7 +107,13 @@
import org.apache.geode.management.runtime.OperationResult;
import org.apache.geode.management.runtime.RuntimeInfo;

/**
* each locator will have one instance of this running if enabled
*/
public class LocatorClusterManagementService implements ClusterManagementService {
@VisibleForTesting
// the dlock service name used by the CMS
static final String CMS_DLOCK_SERVICE_NAME = "CMS_DLOCK_SERVICE";
private static final Logger logger = LogService.getLogger();
private final InternalConfigurationPersistenceService persistenceService;
private final Map<Class, ConfigurationManager> managers;
Expand All @@ -114,6 +122,7 @@ public class LocatorClusterManagementService implements ClusterManagementService
private final MemberValidator memberValidator;
private final CommonConfigurationValidator commonValidator;
private final InternalCache cache;
private DistributedLockService cmsDlockService;

public LocatorClusterManagementService(InternalCache cache,
InternalConfigurationPersistenceService persistenceService) {
Expand Down Expand Up @@ -154,6 +163,25 @@ public LocatorClusterManagementService(
this.operationManager = operationManager;
}

@VisibleForTesting
// synchronized because cmsDlockService is lazily initialized
synchronized DistributedLockService getCmsDlockService() {
if (cmsDlockService == null) {
cmsDlockService =
DLockService.getOrCreateService(CMS_DLOCK_SERVICE_NAME,
cache.getInternalDistributedSystem());
}
return cmsDlockService;
}

private boolean lockCMS() {
return getCmsDlockService().lock(CMS_DLOCK_SERVICE_NAME, -1, -1);
}

private void unlockCMS() {
getCmsDlockService().unlock(CMS_DLOCK_SERVICE_NAME);
}

@Override
public <T extends AbstractConfiguration<?>> ClusterManagementRealizationResult create(T config) {
// validate that user used the correct config object type
Expand All @@ -164,77 +192,82 @@ public <T extends AbstractConfiguration<?>> ClusterManagementRealizationResult c
"Cluster configuration service needs to be enabled."));
}

lockCMS();
try {
// first validate common attributes of all configuration object
commonValidator.validate(CacheElementOperation.CREATE, config);
try {
// first validate common attributes of all configuration object
commonValidator.validate(CacheElementOperation.CREATE, config);

ConfigurationValidator validator = validators.get(config.getClass());
if (validator != null) {
validator.validate(CacheElementOperation.CREATE, config);
}
ConfigurationValidator validator = validators.get(config.getClass());
if (validator != null) {
validator.validate(CacheElementOperation.CREATE, config);
}

// check if this config already exists
if (configurationManager instanceof CacheConfigurationManager) {
memberValidator.validateCreate(config, (CacheConfigurationManager) configurationManager);
// check if this config already exists
if (configurationManager instanceof CacheConfigurationManager) {
memberValidator.validateCreate(config, (CacheConfigurationManager) configurationManager);
}
} catch (EntityExistsException e) {
raise(StatusCode.ENTITY_EXISTS, e);
} catch (IllegalArgumentException e) {
raise(StatusCode.ILLEGAL_ARGUMENT, e);
}
} catch (EntityExistsException e) {
raise(StatusCode.ENTITY_EXISTS, e);
} catch (IllegalArgumentException e) {
raise(StatusCode.ILLEGAL_ARGUMENT, e);
}

// find the targeted members
Set<String> groups = new HashSet<>();
Set<DistributedMember> targetedMembers;
if (config instanceof RegionScoped) {
String regionName = ((RegionScoped) config).getRegionName();
groups = memberValidator.findGroups(regionName);
if (groups.isEmpty()) {
raise(StatusCode.ENTITY_NOT_FOUND, "Region provided does not exist: " + regionName);
// find the targeted members
Set<String> groups = new HashSet<>();
Set<DistributedMember> targetedMembers;
if (config instanceof RegionScoped) {
String regionName = ((RegionScoped) config).getRegionName();
groups = memberValidator.findGroups(regionName);
if (groups.isEmpty()) {
raise(StatusCode.ENTITY_NOT_FOUND, "Region provided does not exist: " + regionName);
}
targetedMembers = memberValidator.findServers(groups.toArray(new String[0]));
} else {
final String groupName = AbstractConfiguration.getGroupName(config.getGroup());
groups.add(groupName);
targetedMembers = memberValidator.findServers(groupName);
}
targetedMembers = memberValidator.findServers(groups.toArray(new String[0]));
} else {
final String groupName = AbstractConfiguration.getGroupName(config.getGroup());
groups.add(groupName);
targetedMembers = memberValidator.findServers(groupName);
}

ClusterManagementRealizationResult result = new ClusterManagementRealizationResult();
ClusterManagementRealizationResult result = new ClusterManagementRealizationResult();

// execute function on all targeted members
List<RealizationResult> functionResults = executeAndGetFunctionResult(
new CacheRealizationFunction(),
config, CacheElementOperation.CREATE,
targetedMembers);
// execute function on all targeted members
List<RealizationResult> functionResults = executeAndGetFunctionResult(
new CacheRealizationFunction(),
config, CacheElementOperation.CREATE,
targetedMembers);

functionResults.forEach(result::addMemberStatus);
functionResults.forEach(result::addMemberStatus);

// if any false result is added to the member list
if (result.getStatusCode() != StatusCode.OK) {
result.setStatus(StatusCode.ERROR, "Failed to create on all members.");
return assertSuccessful(result);
}
// if any false result is added to the member list
if (result.getStatusCode() != StatusCode.OK) {
result.setStatus(StatusCode.ERROR, "Failed to create on all members.");
return assertSuccessful(result);
}

// persist configuration in cache config
List<String> updatedGroups = new ArrayList<>();
List<String> failedGroups = new ArrayList<>();
for (String groupName : groups) {
try {
configurationManager.add(config, groupName);
updatedGroups.add(groupName);
} catch (Exception e) {
logger.error(e.getMessage(), e);
failedGroups.add(groupName);
// persist configuration in cache config
List<String> updatedGroups = new ArrayList<>();
List<String> failedGroups = new ArrayList<>();
for (String groupName : groups) {
try {
configurationManager.add(config, groupName);
updatedGroups.add(groupName);
} catch (Exception e) {
logger.error(e.getMessage(), e);
failedGroups.add(groupName);
}
}
}

setResultStatus(result, updatedGroups, failedGroups);
setResultStatus(result, updatedGroups, failedGroups);

// add the config object which includes the HATEOAS information of the element created
if (result.isSuccessful()) {
result.setLinks(config.getLinks());
// add the config object which includes the HATEOAS information of the element created
if (result.isSuccessful()) {
result.setLinks(config.getLinks());
}
return assertSuccessful(result);
} finally {
unlockCMS();
}
return assertSuccessful(result);
}

@VisibleForTesting
Expand Down Expand Up @@ -278,56 +311,61 @@ public <T extends AbstractConfiguration<?>> ClusterManagementRealizationResult d
"Cluster configuration service needs to be enabled."));
}

lockCMS();
try {
// first validate common attributes of all configuration object
commonValidator.validate(CacheElementOperation.DELETE, config);
try {
// first validate common attributes of all configuration object
commonValidator.validate(CacheElementOperation.DELETE, config);

ConfigurationValidator validator = validators.get(config.getClass());
if (validator != null) {
validator.validate(CacheElementOperation.DELETE, config);
ConfigurationValidator validator = validators.get(config.getClass());
if (validator != null) {
validator.validate(CacheElementOperation.DELETE, config);
}
} catch (IllegalArgumentException e) {
raise(StatusCode.ILLEGAL_ARGUMENT, e);
}
} catch (IllegalArgumentException e) {
raise(StatusCode.ILLEGAL_ARGUMENT, e);
}

String[] groupsWithThisElement =
memberValidator.findGroupsWithThisElement(config, configurationManager);
if (groupsWithThisElement.length == 0) {
raise(StatusCode.ENTITY_NOT_FOUND,
config.getClass().getSimpleName() + " '" + config.getId() + "' does not exist.");
}
String[] groupsWithThisElement =
memberValidator.findGroupsWithThisElement(config, configurationManager);
if (groupsWithThisElement.length == 0) {
raise(StatusCode.ENTITY_NOT_FOUND,
config.getClass().getSimpleName() + " '" + config.getId() + "' does not exist.");
}

// execute function on all members
ClusterManagementRealizationResult result = new ClusterManagementRealizationResult();
// execute function on all members
ClusterManagementRealizationResult result = new ClusterManagementRealizationResult();

List<RealizationResult> functionResults = executeAndGetFunctionResult(
new CacheRealizationFunction(),
config, CacheElementOperation.DELETE,
memberValidator.findServers(groupsWithThisElement));
functionResults.forEach(result::addMemberStatus);
List<RealizationResult> functionResults = executeAndGetFunctionResult(
new CacheRealizationFunction(),
config, CacheElementOperation.DELETE,
memberValidator.findServers(groupsWithThisElement));
functionResults.forEach(result::addMemberStatus);

// if any false result is added to the member list
if (result.getStatusCode() != StatusCode.OK) {
result.setStatus(StatusCode.ERROR, "Failed to delete on all members.");
return result;
}
// if any false result is added to the member list
if (result.getStatusCode() != StatusCode.OK) {
result.setStatus(StatusCode.ERROR, "Failed to delete on all members.");
return result;
}

// persist configuration in cache config
List<String> updatedGroups = new ArrayList<>();
List<String> failedGroups = new ArrayList<>();
for (String finalGroup : groupsWithThisElement) {
try {
configurationManager.delete(config, finalGroup);
updatedGroups.add(finalGroup);
} catch (Exception e) {
logger.error(e.getMessage(), e);
failedGroups.add(finalGroup);
// persist configuration in cache config
List<String> updatedGroups = new ArrayList<>();
List<String> failedGroups = new ArrayList<>();
for (String finalGroup : groupsWithThisElement) {
try {
configurationManager.delete(config, finalGroup);
updatedGroups.add(finalGroup);
} catch (Exception e) {
logger.error(e.getMessage(), e);
failedGroups.add(finalGroup);
}
}
}

setResultStatus(result, updatedGroups, failedGroups);
setResultStatus(result, updatedGroups, failedGroups);

return assertSuccessful(result);
return assertSuccessful(result);
} finally {
unlockCMS();
}
}

@Override
Expand Down

0 comments on commit 0f763ea

Please sign in to comment.