-
Notifications
You must be signed in to change notification settings - Fork 695
GEODE-9369: Command to copy region entries from a WAN site to another #6833
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
8e59781 to
a0b33b2
Compare
geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
Show resolved
Hide resolved
| * An abstract function implementation to be extended by cli functions. Any cli function extending | ||
| * this class has to return a CliFunctionResult. | ||
| */ | ||
| public abstract class CliFunction<T> implements InternalFunction<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of copying CliFunction to geode-core, I would copy some build.gradle changes from geode-connectors to geode-wan.
The geode-connectors module is a good one to use as an example for how it adds GFSH commands modularly without changes to geode-gfsh or geode-core.
Basically, you would just add a geode-gfsh dependency to geode-wan's build.gradle just like it has in geode-connectors:
implementation(project(':geode-gfsh'))
And you probably need this also because gfsh extends spring-shell:
implementation('org.springframework.shell:spring-shell') {
exclude module: 'aopalliance'
exclude module: 'asm'
exclude module: 'cglib'
exclude module: 'guava'
exclude module: 'spring-aop'
exclude module: 'spring-context-support'
exclude module: 'spring-core'
ext.optional = true
}
If you get any weird compilation failures with those added, study the geode-gfsh and geode-connectors build.gradle files to see if there are more dependencies that are needed.
geode-gfsh/build.gradle
Outdated
|
|
||
| api(project(':geode-core')) | ||
| api(project(':geode-common')) | ||
| api(project(':geode-wan')) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the dependency is flipped so that geode-wan depends on geode-gfsh, then you can remove this line.
| final Object[] args = {regionName, senderId, isCancel, maxRate, batchSize}; | ||
| ResultCollector<?, ?> resultCollector = | ||
| executeFunction(wanCopyRegionFunction, args, getAllNormalMembers()); | ||
| @SuppressWarnings("unchecked") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like you've added types, do you still need this suppression?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not anymore.
|
|
||
| fact.setOffHeap(offHeap); | ||
| Region r = fact.create(regionName); | ||
| Region<Object, Object> r = fact.create(regionName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Region<?, ?> usually works in places like this, too. Either typing should be ok.
| .submit(wanCopyCommandFuture); | ||
|
|
||
| // Wait for the wan-copy command to start | ||
| wanCopyCommandStartLatch.await(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add the GeodeAwaitility timeout.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not applicable as there is no CountDownLatch anymore.
| asyncOps1.await(); | ||
| asyncOps2.await(); | ||
| asyncOps3.await(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: These are ok because AsyncInvocation is our code and await() actually uses the GeodeAwaitility timeout under the covers.
| .submit(wanCopyCommandFuture); | ||
|
|
||
| // Wait for the wan-copy command to start | ||
| wanCopyCommandStartLatch.await(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add the GeodeAwaitility timeout.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not applicable as there is no CountDownLatch anymore.
| } | ||
|
|
||
| public static void removeEntry(String regionName, long key) { | ||
| Region<?, ?> r = ClientCacheFactory.getAnyInstance().getRegion(SEPARATOR + regionName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Singleton call.
| } | ||
|
|
||
| public void sendRandomOpsFromClient(String regionName, Set<Long> keySet, int iterations) { | ||
| Region<Long, Integer> r = ClientCacheFactory.getAnyInstance().getRegion(SEPARATOR + regionName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Singleton call.
f4c3255 to
39c33fd
Compare
Thanks for the suggestions. They are very good. |
ghost
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Build changes seem good.
|
@albertogpz Quick note to let you I'm reviewing this. |
| command.hasTableSection(ResultModel.MEMBER_STATUS_SECTION).hasColumn("Message") | ||
| .asList().haveExactly(1, exceptionError).haveExactly(2, senderNotPrimary); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[comment] Not a request for changes, but I generally try to format these chained APIs with linefeeds like:
command
.hasTableSection(ResultModel.MEMBER_STATUS_SECTION)
.hasColumn("Message")
.asList()
.haveExactly(1, exceptionError)
.haveExactly(2, senderNotPrimary);
I'm not sure if it's easier to read for everyone, but it is for me.
| AsyncInvocation<Object> asyncOps1 = | ||
| client.invokeAsync(() -> sendRandomOpsFromClient(regionName, keySet, 10)); | ||
| AsyncInvocation<Object> asyncOps2 = | ||
| client.invokeAsync(() -> sendRandomOpsFromClient(regionName, keySet, 10)); | ||
| AsyncInvocation<Object> asyncOps3 = | ||
| client.invokeAsync(() -> sendRandomOpsFromClient(regionName, keySet, 10)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since you don't actually need a return from these (you just call await() down below), you should just use the Void type: AsyncInvocation<Void>.
| } | ||
|
|
||
| public static void removeEntry(String regionName, long key) { | ||
| Region<?, ?> r = cache.getRegion(SEPARATOR + regionName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should go ahead and name any variables with a full word like region.
| try { | ||
| return service.execute(firstExecution, regionName, senderId); | ||
| } catch (Exception e) { | ||
| return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[comment] Returning null is probably ok. Or you could use an ErrorCollector JUnit rule here as well.
| private EventCreator eventCreatorMock; | ||
|
|
||
| @Before | ||
| public void setUp() throws InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[comment] Some of these big methods are easier to read if you group all of the mocks together at the top, and then all of the stubbings (when statements) down lower.
| Object[] options = new Object[] {"myRegion", "mySender", false, 1L, 2}; | ||
| WanCopyRegionFunctionDelegate.ThreadSleeper sleeperMock = | ||
| mock(WanCopyRegionFunctionDelegate.ThreadSleeper.class); | ||
| doNothing().when(sleeperMock).sleep(anyLong()); | ||
|
|
||
| executeWanCopyRegionFunction(options, sleeperMock); | ||
| ArgumentCaptor<Long> captor = ArgumentCaptor.forClass(Long.class); | ||
| verify(sleeperMock, times(1)).sleep(captor.capture()); | ||
| assertThat(captor.getValue()).isGreaterThan(0).isLessThanOrEqualTo(2000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[comment] Another way to organize these tests for (hopefully) easier reading is to group it in arrange/act/assert sections:
// arrange
Object[] options = new Object[] {"myRegion", "mySender", false, 1L, 2};
WanCopyRegionFunctionDelegate.ThreadSleeper sleeperMock =
mock(WanCopyRegionFunctionDelegate.ThreadSleeper.class);
doNothing().when(sleeperMock).sleep(anyLong());
// act
executeWanCopyRegionFunction(options, sleeperMock);
// assert
ArgumentCaptor<Long> captor = ArgumentCaptor.forClass(Long.class);
verify(sleeperMock, times(1)).sleep(captor.capture());
assertThat(captor.getValue()).isGreaterThan(0).isLessThanOrEqualTo(2000);
| assertThatThrownBy( | ||
| () -> function.wanCopyRegion(internalCacheMock, "member1", regionMock, gatewaySenderMock, 1, | ||
| 10)) | ||
| .isInstanceOf(NoAvailableServersException.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[comment] If you decide you like arrange/act/assert then you can use this syntax if you want:
// arrange
...
// act
Throwable thrown = catchThrowable(() -> {
function.wanCopyRegion(internalCacheMock, "member1", regionMock, gatewaySenderMock, 1, 10);
});
// assert
assertThat(thrown).isInstanceOf(NoAvailableServersException.class);
| public class WanCopyRegionFunctionServiceTest { | ||
|
|
||
| private WanCopyRegionFunctionService service; | ||
| InternalCache cache = mock(InternalCache.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private InternalCache cache = mock(InternalCache.class);
…apache#6601) * GEODE-9369: Command to copy region entries from a WAN site to another The command has been implemented as proposed in https://cwiki.apache.org/confluence/display/GEODE/Geode+Command+to+replicate+region+data+from+one+site+to+another+connected+via+WAN with some modifications with respect to the initial proposal. The command will get the entries of a region in a WAN site and will put them in batches that will be sent by a gateway sender to a remote WAN site. * GEODE-9369: Remove comments added to geode-gfsh/src/test/resources/expected-pom.xml * GEODE-9369: Changes after review * GEODE-9369: Update with Kirk's review comments * GEODE-9369: Update with boglesby's review comments * GEODE-9369: Changes after boglesby's review * GEODE-9369: Updated with davebarnes97's review comments. * GEODE-9369: More changes after davebarnes97's review * GEODE-9369: Updated with DonalEvan's comments * GEODE-9369: Small refactoring: Use CompletableFuture instead of Callable + FutureTask * GEODE-9369: Fix race condition added in previous commit * GEODE-9369: Small change of re-review from DonalEvans * GEODE-9369: Fix esporadic test failures due to error log. * GEODE-9369: Changes required after rebasing to develop
- Remove sleeps in unit tests that made some of them flaky, especially in windows - Remove use of DEEP_STUBS in mocks - Move new function to geode-wan - Handle the lifecycle of the executor used by the function in a new CacheService class in wan - Remove partial mocking in unit tests
In order to be able to use CliFunction in WanCopyRegion without adding a dependency to geode-gfsh, I created a CliFunction class inside geode-core which is a copy of that same class in geode-gfsh. At first I thought about moving the class from geode-gfsh to geode-core which seemed more logical but I realized it would break the compatibility of a public API.
6bc0526 to
a4ff342
Compare
kirklund
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should change any unit test that is validating that Thread.sleep throws InterruptedException to only test your class' interaction with ThreadSleeper. I inlined an example for one of the tests.
That Thread.sleep(1) is very prone to flakiness.
| public void testCreation_WithAfterUpdateWithGenerateCallbacks(boolean isGenerateCallbacks, | ||
| boolean isCallbackArgumentNull) | ||
| throws IOException { | ||
| LocalRegion region = mock(LocalRegion.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use InternalRegion instead of LocalRegion.
| await().until( | ||
| () -> getNumberOfCurrentExecutionsInServers(servers) == executionsExpected); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Anytime you're awaiting something with more than a binary value, I recommend using an assertion so that if it fails, you'll see the exact value:
await().untilAsserted(
() -> assertThat(getNumberOfCurrentExecutionsInServers(servers)).isEqualTo(executionsExpected));
| } | ||
|
|
||
| // Wait for the functions to start execution | ||
| await().until(() -> service.getNumberOfCurrentExecutions() == executions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another one where I'd use untilAsserted with an assertion so you see the exact number if it fails.
| CountDownLatch latch = new CountDownLatch(executions); | ||
| for (int i = 0; i < executions; i++) { | ||
| Callable<CliFunctionResult> firstExecution = () -> { | ||
| latch.await(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since CountDownLatch await waits forever by default, you should change it to:
latch.await(GeodeAwaitility.getTimeout().getSeconds(), TimeUnit.SECONDS);
| AbstractGatewaySenderEventProcessor eventProcessorMock = | ||
| mock(AbstractGatewaySenderEventProcessor.class); | ||
| RegionAttributes<?, ?> attributesMock = mock(RegionAttributes.class); | ||
| Set<?> idsMock = mock(Set.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[comments] This is ok but Collections.emptySet() might be better to use since the general recommendation is to not mock classes from other libraries.
| when(((AbstractGatewaySender) gatewaySenderMock).getEventProcessor()) | ||
| .thenReturn(eventProcessorMock); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the interface InternalGatewaySender should work here instead of AbstractGatewaySender.
| doAnswer(invocation -> { | ||
| Thread.sleep(1); | ||
| return null; | ||
| }).when(threadSleeperMock).sleep(anyLong()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test is actually validating that Thread.sleep throws InterruptedException when interrupted, but you don't need (or want) to test the Java library, just your own classes. So this would be a place where it's better to just stub threadSleeperMock to throw InterruptedException:
@Test
public void doPostSendBatchActions_ThrowInterruptedIfInterruptedTimeToSleepNotZero()
throws InterruptedException {
// arrange
long maxRate = 1;
long elapsedTime = 20L;
InterruptedException thrownByThreadSleeper = new InterruptedException("thrownByThreadSleeper");
when(clockMock.millis()).thenReturn(startTime + elapsedTime);
doThrow(thrownByThreadSleeper).when(threadSleeperMock).sleep(anyLong());
// act
Throwable thrown =
catchThrowable(() -> function.doPostSendBatchActions(startTime, entries, maxRate));
// assert
assertThat(thrown).isInstanceOf(InterruptedException.class);
}
kirklund
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking really solid Alberto!
aec89ad to
67fc8de
Compare
Thanks to your thorough review and your suggestions ;-) |
67fc8de to
84e574a
Compare
84e574a to
a3bb6f1
Compare
|
Looks like these new tests are having some trouble on Windows: http://files.apachegeode-ci.info/builds/apache-develop-main/1.15.0-build.0583/test-results/distributedTest/1634209508/ Tip: running Windows PR checks on a PR is as simple as adding the |
… another (apache#6833)" This reverts commit c106e01.
This PR contains the original commit of PR #6601 plus some improvements mainly on the testing as suggested by @kirklund .
Thank you for submitting a contribution to Apache Geode.
In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:
For all changes:
Is there a JIRA ticket associated with this PR? Is it referenced in the commit message?
Has your PR been rebased against the latest commit within the target branch (typically
develop)?Is your initial contribution a single, squashed commit?
Does
gradlew buildrun cleanly?Have you written or updated unit tests to verify your changes?
If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under ASF 2.0?
Note:
Please ensure that once the PR is submitted, check Concourse for build issues and
submit an update to your PR as soon as possible. If you need help, please send an
email to dev@geode.apache.org.