Skip to content

Commit

Permalink
GEODE-8574: ClusterManagementService should not throw ClassCastExcept… (
Browse files Browse the repository at this point in the history
  • Loading branch information
jinmeiliao committed Oct 7, 2020
1 parent 6c8bc5e commit 73f6783
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 17 deletions.
Expand Up @@ -29,7 +29,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -46,7 +45,6 @@
import org.apache.geode.cache.execute.Execution;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.distributed.DistributedLockService;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.InternalConfigurationPersistenceService;
Expand Down Expand Up @@ -236,8 +234,7 @@ public <T extends AbstractConfiguration<?>> ClusterManagementRealizationResult c
ClusterManagementRealizationResult result = new ClusterManagementRealizationResult();

// execute function on all targeted members
List<RealizationResult> functionResults = executeAndGetFunctionResult(
new CacheRealizationFunction(),
List<RealizationResult> functionResults = executeCacheRealizationFunction(
config, CacheElementOperation.CREATE,
targetedMembers);
functionResults.forEach(result::addMemberStatus);
Expand Down Expand Up @@ -338,8 +335,7 @@ public <T extends AbstractConfiguration<?>> ClusterManagementRealizationResult d
// execute function on all members
ClusterManagementRealizationResult result = new ClusterManagementRealizationResult();

List<RealizationResult> functionResults = executeAndGetFunctionResult(
new CacheRealizationFunction(),
List<RealizationResult> functionResults = executeCacheRealizationFunction(
config, CacheElementOperation.DELETE,
memberValidator.findServers(groupsWithThisElement));
functionResults.forEach(result::addMemberStatus);
Expand Down Expand Up @@ -446,7 +442,7 @@ public <T extends AbstractConfiguration<R>, R extends RuntimeInfo> ClusterManage
members = Collections.singleton(members.iterator().next());
}

List<R> runtimeInfos = executeAndGetFunctionResult(new CacheRealizationFunction(),
List<R> runtimeInfos = executeCacheRealizationFunction(
element, CacheElementOperation.GET,
members);
response.setRuntimeInfo(runtimeInfos);
Expand Down Expand Up @@ -593,14 +589,15 @@ private <T extends AbstractConfiguration> ConfigurationManager<T> getConfigurati
}

@VisibleForTesting
<R> List<R> executeAndGetFunctionResult(Function function, AbstractConfiguration configuration,
<R> List<R> executeCacheRealizationFunction(AbstractConfiguration configuration,
CacheElementOperation operation,
Set<DistributedMember> targetMembers) {
if (targetMembers.size() == 0) {
return Collections.emptyList();
}

List<R> results = new ArrayList();
Function function = new CacheRealizationFunction();


File file = null;
if (configuration instanceof HasFile) {
Expand All @@ -611,9 +608,8 @@ <R> List<R> executeAndGetFunctionResult(Function function, AbstractConfiguration
Execution execution = FunctionService.onMembers(targetMembers)
.setArguments(Arrays.asList(configuration, operation, null));
((AbstractExecution) execution).setIgnoreDepartedMembers(true);
ResultCollector rc = execution.execute(function);
return ((List<R>) rc.getResult()).stream().filter(Objects::nonNull)
.collect(Collectors.toList());
List<?> functionResults = (List<?>) execution.execute(function).getResult();
return cleanResults(functionResults);
}

// if we have file arguments, we need to export the file input stream for each member
Expand All @@ -623,6 +619,7 @@ <R> List<R> executeAndGetFunctionResult(Function function, AbstractConfiguration
.getManagementAgent();
exporter = agent.getRemoteStreamExporter();

List<R> results = new ArrayList();
for (DistributedMember member : targetMembers) {
FileInputStream fileInputStream = null;
SimpleRemoteInputStream inputStream = null;
Expand All @@ -634,7 +631,8 @@ <R> List<R> executeAndGetFunctionResult(Function function, AbstractConfiguration
Execution execution = FunctionService.onMember(member)
.setArguments(Arrays.asList(configuration, operation, remoteInputStream));
((AbstractExecution) execution).setIgnoreDepartedMembers(true);
results.add(((List<R>) execution.execute(function).getResult()).get(0));
List<R> functionResults = cleanResults((List<?>) execution.execute(function).getResult());
results.addAll(functionResults);
} catch (IOException e) {
raise(StatusCode.ILLEGAL_ARGUMENT, "Invalid file: " + file.getAbsolutePath());
} finally {
Expand All @@ -653,7 +651,23 @@ <R> List<R> executeAndGetFunctionResult(Function function, AbstractConfiguration
}
}
}
return results;
}

@VisibleForTesting
<R> List<R> cleanResults(List<?> functionResults) {
List<R> results = new ArrayList<>();
for (Object functionResult : functionResults) {
if (functionResult == null) {
continue;
}
if (functionResult instanceof Throwable) {
// log the exception and continue
logger.warn("Error executing CacheRealizationFunction.", (Throwable) functionResult);
continue;
}
results.add((R) functionResult);
}
return results;
}

Expand Down
Expand Up @@ -50,6 +50,7 @@
import org.apache.geode.cache.configuration.CacheConfig;
import org.apache.geode.cache.configuration.GatewayReceiverConfig;
import org.apache.geode.cache.configuration.RegionConfig;
import org.apache.geode.cache.execute.FunctionInvocationTargetException;
import org.apache.geode.distributed.DistributedLockService;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.DistributedSystem;
Expand Down Expand Up @@ -216,7 +217,7 @@ public void create_partialFailureOnMembers() {
functionResults.add(new RealizationResult().setMemberName("member1"));
functionResults.add(
new RealizationResult().setMemberName("member2").setSuccess(false).setMessage("failed"));
doReturn(functionResults).when(service).executeAndGetFunctionResult(any(), any(), any(), any());
doReturn(functionResults).when(service).executeCacheRealizationFunction(any(), any(), any());

doReturn(Collections.singleton(mock(DistributedMember.class))).when(memberValidator)
.findServers();
Expand All @@ -232,7 +233,7 @@ public void create_succeedsOnAllMembers() {
List<RealizationResult> functionResults = new ArrayList<>();
functionResults.add(new RealizationResult().setMemberName("member1"));
functionResults.add(new RealizationResult().setMemberName("member2"));
doReturn(functionResults).when(service).executeAndGetFunctionResult(any(), any(), any(), any());
doReturn(functionResults).when(service).executeCacheRealizationFunction(any(), any(), any());

doReturn(Collections.singleton(mock(DistributedMember.class))).when(memberValidator)
.findServers();
Expand Down Expand Up @@ -337,7 +338,7 @@ public void delete_partialFailureOnMembers() {
functionResults.add(new RealizationResult().setMemberName("member1"));
functionResults.add(
new RealizationResult().setMemberName("member2").setSuccess(false).setMessage("failed"));
doReturn(functionResults).when(service).executeAndGetFunctionResult(any(), any(), any(), any());
doReturn(functionResults).when(service).executeCacheRealizationFunction(any(), any(), any());

doReturn(new String[] {"cluster"}).when(memberValidator).findGroupsWithThisElement(any(),
any());
Expand Down Expand Up @@ -365,7 +366,7 @@ public void delete_succeedsOnAllMembers() {
List<RealizationResult> functionResults = new ArrayList<>();
functionResults.add(new RealizationResult().setMemberName("member1"));
functionResults.add(new RealizationResult().setMemberName("member2"));
doReturn(functionResults).when(service).executeAndGetFunctionResult(any(), any(), any(), any());
doReturn(functionResults).when(service).executeCacheRealizationFunction(any(), any(), any());

doReturn(new String[] {"cluster"}).when(memberValidator).findGroupsWithThisElement(any(),
any());
Expand Down Expand Up @@ -600,4 +601,19 @@ public void setResultStatus() {
assertThat(result.getStatusMessage()).isEqualTo(
"Successfully updated configuration for group1. Failed to update configuration for group2.");
}

@Test
public void cleanResultsShouldCleanOutExceptionsAndNull() throws Exception {
List functionResults = new ArrayList<>();
MemberInformation memberInfo = new MemberInformation();
memberInfo.setId("server-1");
functionResults.add(memberInfo);
functionResults.add(new FunctionInvocationTargetException("Not available"));
functionResults.add(null);

List<MemberInformation> results = service.cleanResults(functionResults);
assertThat(results).hasSize(1)
.extracting(MemberInformation::getId)
.containsExactly("server-1");
}
}

0 comments on commit 73f6783

Please sign in to comment.