Skip to content

Commit

Permalink
- FORK now handles setAddress() correctly (https://issues.redhat.com/…
Browse files Browse the repository at this point in the history
…browse/JGRP-2612)

- Test case for the above issue
  • Loading branch information
belaban committed Apr 7, 2022
1 parent 9c6d773 commit 41eaa9c
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 20 deletions.
31 changes: 31 additions & 0 deletions conf/fork.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<config xmlns="urn:org:jgroups"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/jgroups.xsd"
>
<UDP
thread_pool.min_threads="0"
thread_pool.max_threads="20"
thread_pool.keep_alive_time="30000"/>
<PING />
<MERGE3 />
<FD_SOCK/>
<FD_ALL/>
<VERIFY_SUSPECT timeout="1500" />
<pbcast.NAKACK2 xmit_interval="500"/>
<UNICAST3 xmit_interval="500"/>
<pbcast.STABLE />
<pbcast.GMS print_local_addr="true" join_timeout="1000"/>
<UFC />
<MFC />
<FRAG2 frag_size="60K" />
<FORK>
<fork-stacks xmlns="fork" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="fork fork-stacks-4.2.xsd">
<fork-stack id="lock">
<config>
<CENTRAL_LOCK2/>
</config>
</fork-stack>
</fork-stacks>
</FORK>
</config>
8 changes: 8 additions & 0 deletions src/org/jgroups/protocols/FORK.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,14 @@ public Object down(Event evt) {
switch(evt.getType()) {
case Event.SET_LOCAL_ADDRESS:
local_addr=evt.getArg();
for(Protocol prot: fork_stacks.values()) {
if(prot instanceof ForkProtocol) {
ForkProtocol fp=(ForkProtocol)prot;
ProtocolStack st=fp.getProtocolStack();
for(Protocol p: st.getProtocols())
p.down(evt);
}
}
break;
}
return down_prot.down(evt);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package org.jgroups.blocks;

import org.jgroups.Global;
import org.jgroups.JChannel;
import org.jgroups.blocks.locking.LockService;
import org.jgroups.fork.ForkChannel;
import org.jgroups.protocols.CENTRAL_LOCK2;
import org.jgroups.protocols.FORK;
import org.jgroups.stack.Protocol;
import org.jgroups.util.Util;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;

@Test(groups= Global.FUNCTIONAL,singleThreaded=true)
public class LockServiceWithCentralLock2Test {
protected JChannel c;
protected ForkChannel fc;
protected static final String PROPS="fork.xml";
protected static final String CLUSTER=LockServiceWithCentralLock2Test.class.getSimpleName();

@AfterMethod protected void destroy() {
Util.close(fc,c);
}

public void shouldLockServiceGetLockUsingForkChannel() throws Exception {
c = create(PROPS, "A", true).connect(CLUSTER);
_shouldLockServiceGetLockUsingForkChannel();
}

public void shouldLockServiceGetLockUsingForkChannelProgrammaticCreation() throws Exception {
c = create(null, "A", true).connect(CLUSTER);
_shouldLockServiceGetLockUsingForkChannel();
}

public void shouldRequestHandlerRunningUsingForkChannel() throws Exception {
c=create(PROPS, "A", true);
fc = new ForkChannel(c, "lock", "lock-channel");
_shouldRequestHandlerRunningUsingForkChannel();
}

public void shouldRequestHandlerRunningUsingForkChannelProgrammaticCreation() throws Exception {
c=create(null, "A", true);
fc = new ForkChannel(c, "lock", "lock-channel", new CENTRAL_LOCK2());
_shouldRequestHandlerRunningUsingForkChannel();
}

public void shouldLockServiceGetLockNotUsingForkChannel() throws Exception {
c = create(null, "A", false).connect(CLUSTER);
LockService lockService = new LockService(c);
Lock lock=lockService.getLock("myLock");
try {
boolean success=lock.tryLock(5, TimeUnit.SECONDS);
assert success;
}
finally {
lock.unlock();
}
}

public void shouldRequestHandlerRunningNotUsingForkChannel() throws Exception{
c = create(null, "A", false).connect(CLUSTER);
CENTRAL_LOCK2 centralLock2 = c.getProtocolStack().findProtocol(CENTRAL_LOCK2.class);
assert centralLock2.isCoord() && centralLock2.isRequestHandlerRunning();
}


protected void _shouldLockServiceGetLockUsingForkChannel() throws Exception {
fc = new ForkChannel(c, "lock", "lock-channel", new CENTRAL_LOCK2());
fc.connect("bla");
LockService lockService = new LockService(fc);
boolean isLocked = lockService.getLock("myLock").tryLock(5, TimeUnit.SECONDS);
assert isLocked == true;
}

protected void _shouldRequestHandlerRunningUsingForkChannel() throws Exception{
c.connect(CLUSTER);
fc.connect("bla");
CENTRAL_LOCK2 centralLock2 = fc.getProtocolStack().findProtocol(CENTRAL_LOCK2.class);
assert centralLock2.isCoord() && centralLock2.isRequestHandlerRunning();
}

protected static JChannel create(String props, String name, boolean use_fork) throws Exception {
if(props != null)
return new JChannel(props).name(name);
Protocol p=use_fork? new FORK() : new CENTRAL_LOCK2();
return new JChannel(Util.getTestStack(p)).name(name);
}
}
26 changes: 6 additions & 20 deletions tests/junit-functional/org/jgroups/tests/ForkChannelTest.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package org.jgroups.tests;

import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertNotNull;
import static org.testng.AssertJUnit.assertTrue;

import org.jgroups.*;
import org.jgroups.blocks.ReplicatedHashMap;
import org.jgroups.blocks.atomic.Counter;
Expand Down Expand Up @@ -31,6 +27,8 @@
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;

import static org.testng.AssertJUnit.*;

/**
* Tests {@link org.jgroups.fork.ForkChannel}
* @author Bela Ban
Expand Down Expand Up @@ -84,11 +82,7 @@ public void testSimpleSend() throws Exception {

fc1.send(null, "hello");
List l3=r3.list(), l4=r4.list();
for(int i=0; i < 10; i++) {
if(!l3.isEmpty() || !l4.isEmpty())
break;
Util.sleep(1000);
}
Util.waitUntilTrue(10000, 200, () -> !l3.isEmpty() || !l4.isEmpty());
assert !l3.isEmpty();
assert l4.isEmpty();

Expand All @@ -97,11 +91,7 @@ public void testSimpleSend() throws Exception {
Address dest=fc3.getAddress();

fc1.send(dest, "hello2");
for(int i=0; i < 10; i++) {
if(!l3.isEmpty() || !l4.isEmpty())
break;
Util.sleep(1000);
}
Util.waitUntilTrue(10000, 200, () -> !l3.isEmpty() || !l4.isEmpty());
assert !l3.isEmpty();
assert l4.isEmpty();
l3.clear();
Expand All @@ -111,15 +101,11 @@ public void testSimpleSend() throws Exception {
ucast.setValue("conn_close_timeout", 10000);

Util.close(fc3,fc4,b);
Util.sleep(1000);
Util.sleep(300);

System.out.printf("---- sending message to non-existing member %s\n", dest);
fc1.send(dest, "hello3");
for(int i=0; i < 10; i++) {
if(!l3.isEmpty() || !l4.isEmpty())
break;
Util.sleep(500);
}
Util.waitUntilTrue(2000, 100, () -> !l3.isEmpty() || !l4.isEmpty());
assert l3.isEmpty();
assert l4.isEmpty();
}
Expand Down

0 comments on commit 41eaa9c

Please sign in to comment.