From 73f6783b07f1151c1617978fb57822ade5b71414 Mon Sep 17 00:00:00 2001 From: Jinmei Liao Date: Wed, 7 Oct 2020 11:05:28 -0700 Subject: [PATCH] =?UTF-8?q?GEODE-8574:=20ClusterManagementService=20should?= =?UTF-8?q?=20not=20throw=20ClassCastExcept=E2=80=A6=20(#5596)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../api/LocatorClusterManagementService.java | 40 +++++++++++++------ .../LocatorClusterManagementServiceTest.java | 24 +++++++++-- 2 files changed, 47 insertions(+), 17 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/api/LocatorClusterManagementService.java b/geode-core/src/main/java/org/apache/geode/management/internal/api/LocatorClusterManagementService.java index f67ae830c165..f8d9f52210f6 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/api/LocatorClusterManagementService.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/api/LocatorClusterManagementService.java @@ -29,7 +29,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -46,7 +45,6 @@ import org.apache.geode.cache.execute.Execution; import org.apache.geode.cache.execute.Function; import org.apache.geode.cache.execute.FunctionService; -import org.apache.geode.cache.execute.ResultCollector; import org.apache.geode.distributed.DistributedLockService; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.internal.InternalConfigurationPersistenceService; @@ -236,8 +234,7 @@ public > ClusterManagementRealizationResult c ClusterManagementRealizationResult result = new ClusterManagementRealizationResult(); // execute function on all targeted members - List functionResults = executeAndGetFunctionResult( - new CacheRealizationFunction(), + List functionResults = executeCacheRealizationFunction( config, CacheElementOperation.CREATE, targetedMembers); functionResults.forEach(result::addMemberStatus); @@ -338,8 +335,7 @@ public > ClusterManagementRealizationResult d // execute function on all members ClusterManagementRealizationResult result = new ClusterManagementRealizationResult(); - List functionResults = executeAndGetFunctionResult( - new CacheRealizationFunction(), + List functionResults = executeCacheRealizationFunction( config, CacheElementOperation.DELETE, memberValidator.findServers(groupsWithThisElement)); functionResults.forEach(result::addMemberStatus); @@ -446,7 +442,7 @@ public , R extends RuntimeInfo> ClusterManage members = Collections.singleton(members.iterator().next()); } - List runtimeInfos = executeAndGetFunctionResult(new CacheRealizationFunction(), + List runtimeInfos = executeCacheRealizationFunction( element, CacheElementOperation.GET, members); response.setRuntimeInfo(runtimeInfos); @@ -593,14 +589,15 @@ private ConfigurationManager getConfigurati } @VisibleForTesting - List executeAndGetFunctionResult(Function function, AbstractConfiguration configuration, + List executeCacheRealizationFunction(AbstractConfiguration configuration, CacheElementOperation operation, Set targetMembers) { if (targetMembers.size() == 0) { return Collections.emptyList(); } - List results = new ArrayList(); + Function function = new CacheRealizationFunction(); + File file = null; if (configuration instanceof HasFile) { @@ -611,9 +608,8 @@ List executeAndGetFunctionResult(Function function, AbstractConfiguration Execution execution = FunctionService.onMembers(targetMembers) .setArguments(Arrays.asList(configuration, operation, null)); ((AbstractExecution) execution).setIgnoreDepartedMembers(true); - ResultCollector rc = execution.execute(function); - return ((List) rc.getResult()).stream().filter(Objects::nonNull) - .collect(Collectors.toList()); + List functionResults = (List) execution.execute(function).getResult(); + return cleanResults(functionResults); } // if we have file arguments, we need to export the file input stream for each member @@ -623,6 +619,7 @@ List executeAndGetFunctionResult(Function function, AbstractConfiguration .getManagementAgent(); exporter = agent.getRemoteStreamExporter(); + List results = new ArrayList(); for (DistributedMember member : targetMembers) { FileInputStream fileInputStream = null; SimpleRemoteInputStream inputStream = null; @@ -634,7 +631,8 @@ List executeAndGetFunctionResult(Function function, AbstractConfiguration Execution execution = FunctionService.onMember(member) .setArguments(Arrays.asList(configuration, operation, remoteInputStream)); ((AbstractExecution) execution).setIgnoreDepartedMembers(true); - results.add(((List) execution.execute(function).getResult()).get(0)); + List functionResults = cleanResults((List) execution.execute(function).getResult()); + results.addAll(functionResults); } catch (IOException e) { raise(StatusCode.ILLEGAL_ARGUMENT, "Invalid file: " + file.getAbsolutePath()); } finally { @@ -653,7 +651,23 @@ List executeAndGetFunctionResult(Function function, AbstractConfiguration } } } + return results; + } + @VisibleForTesting + List cleanResults(List functionResults) { + List results = new ArrayList<>(); + for (Object functionResult : functionResults) { + if (functionResult == null) { + continue; + } + if (functionResult instanceof Throwable) { + // log the exception and continue + logger.warn("Error executing CacheRealizationFunction.", (Throwable) functionResult); + continue; + } + results.add((R) functionResult); + } return results; } diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/api/LocatorClusterManagementServiceTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/api/LocatorClusterManagementServiceTest.java index 9506e8d02e49..c50e66928f14 100644 --- a/geode-core/src/test/java/org/apache/geode/management/internal/api/LocatorClusterManagementServiceTest.java +++ b/geode-core/src/test/java/org/apache/geode/management/internal/api/LocatorClusterManagementServiceTest.java @@ -50,6 +50,7 @@ import org.apache.geode.cache.configuration.CacheConfig; import org.apache.geode.cache.configuration.GatewayReceiverConfig; import org.apache.geode.cache.configuration.RegionConfig; +import org.apache.geode.cache.execute.FunctionInvocationTargetException; import org.apache.geode.distributed.DistributedLockService; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.DistributedSystem; @@ -216,7 +217,7 @@ public void create_partialFailureOnMembers() { functionResults.add(new RealizationResult().setMemberName("member1")); functionResults.add( new RealizationResult().setMemberName("member2").setSuccess(false).setMessage("failed")); - doReturn(functionResults).when(service).executeAndGetFunctionResult(any(), any(), any(), any()); + doReturn(functionResults).when(service).executeCacheRealizationFunction(any(), any(), any()); doReturn(Collections.singleton(mock(DistributedMember.class))).when(memberValidator) .findServers(); @@ -232,7 +233,7 @@ public void create_succeedsOnAllMembers() { List functionResults = new ArrayList<>(); functionResults.add(new RealizationResult().setMemberName("member1")); functionResults.add(new RealizationResult().setMemberName("member2")); - doReturn(functionResults).when(service).executeAndGetFunctionResult(any(), any(), any(), any()); + doReturn(functionResults).when(service).executeCacheRealizationFunction(any(), any(), any()); doReturn(Collections.singleton(mock(DistributedMember.class))).when(memberValidator) .findServers(); @@ -337,7 +338,7 @@ public void delete_partialFailureOnMembers() { functionResults.add(new RealizationResult().setMemberName("member1")); functionResults.add( new RealizationResult().setMemberName("member2").setSuccess(false).setMessage("failed")); - doReturn(functionResults).when(service).executeAndGetFunctionResult(any(), any(), any(), any()); + doReturn(functionResults).when(service).executeCacheRealizationFunction(any(), any(), any()); doReturn(new String[] {"cluster"}).when(memberValidator).findGroupsWithThisElement(any(), any()); @@ -365,7 +366,7 @@ public void delete_succeedsOnAllMembers() { List functionResults = new ArrayList<>(); functionResults.add(new RealizationResult().setMemberName("member1")); functionResults.add(new RealizationResult().setMemberName("member2")); - doReturn(functionResults).when(service).executeAndGetFunctionResult(any(), any(), any(), any()); + doReturn(functionResults).when(service).executeCacheRealizationFunction(any(), any(), any()); doReturn(new String[] {"cluster"}).when(memberValidator).findGroupsWithThisElement(any(), any()); @@ -600,4 +601,19 @@ public void setResultStatus() { assertThat(result.getStatusMessage()).isEqualTo( "Successfully updated configuration for group1. Failed to update configuration for group2."); } + + @Test + public void cleanResultsShouldCleanOutExceptionsAndNull() throws Exception { + List functionResults = new ArrayList<>(); + MemberInformation memberInfo = new MemberInformation(); + memberInfo.setId("server-1"); + functionResults.add(memberInfo); + functionResults.add(new FunctionInvocationTargetException("Not available")); + functionResults.add(null); + + List results = service.cleanResults(functionResults); + assertThat(results).hasSize(1) + .extracting(MemberInformation::getId) + .containsExactly("server-1"); + } }