Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package org.apache.geode.redis.internal.executor.pubsub;

import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.assertj.core.api.Assertions.assertThat;

import java.util.ArrayList;
Expand All @@ -33,7 +32,8 @@
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.config.Configurator;
import org.junit.AfterClass;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
Expand Down Expand Up @@ -70,27 +70,29 @@ public class PubSubDUnitTest {
public static ExecutorServiceRule executor = new ExecutorServiceRule();

private static final String LOCAL_HOST = "127.0.0.1";
private static Jedis subscriber1;
private static Jedis subscriber2;
private static Jedis publisher1;
private static Jedis publisher2;
private Jedis subscriber1;
private Jedis subscriber2;
private Jedis publisher1;
private Jedis publisher2;

private static MemberVM locator;
private static MemberVM server1;
private static MemberVM server2;
private static MemberVM server3;
private static MemberVM server4;
private static MemberVM server5;
private MemberVM server1;
private MemberVM server2;
private MemberVM server3;
private MemberVM server4;
private MemberVM server5;

private static int redisServerPort1;
private static int redisServerPort2;
private static int redisServerPort3;
private static int redisServerPort4;
private int redisServerPort1;
private int redisServerPort2;
private int redisServerPort3;

@BeforeClass
public static void beforeClass() throws Exception {
public static void beforeClass() {
locator = cluster.startLocatorVM(0);
}

@Before
public void before() throws Exception {
int locatorPort = locator.getPort();
SerializableFunction<ServerStarterRule> operator = x -> x
.withSystemProperty("io.netty.eventLoopThreads", "10")
Expand All @@ -113,7 +115,7 @@ public static void beforeClass() throws Exception {
redisServerPort1 = cluster.getRedisPort(1);
redisServerPort2 = cluster.getRedisPort(2);
redisServerPort3 = cluster.getRedisPort(3);
redisServerPort4 = cluster.getRedisPort(4);
int redisServerPort4 = cluster.getRedisPort(4);

subscriber1 = new Jedis(LOCAL_HOST, redisServerPort1, 120000);
subscriber2 = new Jedis(LOCAL_HOST, redisServerPort2, 120000);
Expand All @@ -123,8 +125,8 @@ public static void beforeClass() throws Exception {
gfsh.connectAndVerify(locator);
}

@AfterClass
public static void tearDown() {
@After
public void tearDown() {
subscriber1.disconnect();
subscriber2.disconnect();
publisher1.disconnect();
Expand Down Expand Up @@ -158,9 +160,7 @@ public void shouldNotHang_givenPublishingAndSubscribingSimultaneously() {
Jedis client = getConnection(random);
clients.add(client);

Future<Void> f = executor.submit(() -> {
client.subscribe(mockSubscriber, channelName);
});
Future<Void> f = executor.submit(() -> client.subscribe(mockSubscriber, channelName));
subscribeFutures.add(f);
}

Expand Down Expand Up @@ -227,9 +227,6 @@ public void shouldContinueToFunction_whenOneServerShutsDownGracefully_givenTwoSu

mockSubscriber2.unsubscribe(CHANNEL_NAME);
GeodeAwaitility.await().untilAsserted(subscriber2Future::get);

restartServerVM1();
reconnectSubscriber1();
}

@Test
Expand Down Expand Up @@ -275,9 +272,6 @@ public void shouldContinueToFunction_whenOneServerShutsDownAbruptly_givenTwoSubs
} catch (ExecutionException e) {
// exception expected since we killed server 2
}

restartServerVM2();
reconnectSubscriber2();
}

@Test
Expand Down Expand Up @@ -309,9 +303,6 @@ public void shouldContinueToFunction_whenOneServerShutsDownGracefully_givenTwoSu
mockSubscriber1.unsubscribe(CHANNEL_NAME);

GeodeAwaitility.await().untilAsserted(subscriber1Future::get);

restartServerVM2();
reconnectSubscriber2();
}

@Test
Expand Down Expand Up @@ -477,39 +468,11 @@ public void testPubSubWithManyClientsDisconnecting() throws Exception {
clients.set(candy, client);
}

GeodeAwaitility.await().untilAsserted(() -> future.get());
GeodeAwaitility.await().untilAsserted(future::get);

clients.forEach(Jedis::close);
}

private void restartServerVM1() {
cluster.startRedisVM(1, locator.getPort());
waitForRestart();
redisServerPort1 = cluster.getRedisPort(1);
}

private void restartServerVM2() {
cluster.startRedisVM(2, locator.getPort());
waitForRestart();
redisServerPort2 = cluster.getRedisPort(2);
}

private void waitForRestart() {
await().untilAsserted(
() -> gfsh.executeAndAssertThat("list members").statusIsSuccess().hasTableSection()
.hasColumn("Name")
.containsOnly("locator-0", "server-1", "server-2", "server-3", "server-4",
"server-5"));
}

private void reconnectSubscriber1() {
subscriber1 = new Jedis(LOCAL_HOST, redisServerPort1);
}

private void reconnectSubscriber2() {
subscriber2 = new Jedis(LOCAL_HOST, redisServerPort2);
}

private Jedis getConnection(Random random) {
Jedis client = null;

Expand All @@ -524,7 +487,7 @@ private Jedis getConnection(Random random) {
if (client != null) {
client.close();
}
} catch (Exception exception) {
} catch (Exception ignored) {
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ org/apache/geode/redis/internal/data/RedisSet$MemberSet
org/apache/geode/redis/internal/data/RedisSetCommandsFunctionExecutor$SetOp
org/apache/geode/redis/internal/data/RedisSortedSet$MemberMap
org/apache/geode/redis/internal/data/RedisStringCommandsFunctionExecutor$BitOp
org/apache/geode/redis/internal/pubsub/PubSubImpl$PublishFunction
org/apache/geode/redis/internal/services/StripedExecutorService$State
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import static org.apache.geode.redis.internal.data.RedisDataType.REDIS_STRING;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -36,9 +35,7 @@
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.partition.PartitionMemberInfo;
import org.apache.geode.cache.partition.PartitionRegionHelper;
import org.apache.geode.cache.partition.PartitionRegionInfo;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.InternalRegionFactory;
Expand Down Expand Up @@ -154,7 +151,7 @@ public <T> T execute(RedisKey key, Callable<T> callable) {
return partitionedRegion.computeWithPrimaryLocked(key,
() -> stripedCoordinator.execute(key, callable));
} catch (PrimaryBucketLockException | BucketMovedException | RegionDestroyedException ex) {
throw createRedisDataMovedException((RedisKey) key);
throw createRedisDataMovedException(key);
} catch (RedisException bex) {
throw bex;
} catch (Exception ex) {
Expand All @@ -167,7 +164,7 @@ public <T> T execute(RedisKey key, List<RedisKey> keysToLock, Callable<T> callab
return partitionedRegion.computeWithPrimaryLocked(key,
() -> stripedCoordinator.execute(keysToLock, callable));
} catch (PrimaryBucketLockException | BucketMovedException | RegionDestroyedException ex) {
throw createRedisDataMovedException((RedisKey) key);
throw createRedisDataMovedException(key);
} catch (RedisException bex) {
throw bex;
} catch (Exception ex) {
Expand Down Expand Up @@ -300,14 +297,9 @@ public RedisKeyCommands getKeyCommands() {
return keyCommands;
}

public Set<DistributedMember> getRegionMembers() {
PartitionRegionInfo info = PartitionRegionHelper.getPartitionRegionInfo(dataRegion);
Set<DistributedMember> membersWithDataRegion = new HashSet<>();
for (PartitionMemberInfo memberInfo : info.getPartitionMemberInfo()) {
membersWithDataRegion.add(memberInfo.getDistributedMember());
}

return membersWithDataRegion;
@SuppressWarnings("unchecked")
public Set<DistributedMember> getRemoteRegionMembers() {
return (Set<DistributedMember>) (Set<?>) partitionedRegion.getRegionAdvisor().adviseDataStore();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@
* expecting to receive published messages.
*/
public class PubSubImpl implements PubSub {
public static final String REDIS_PUB_SUB_FUNCTION_ID = "redisPubSubFunctionID";

private static final int MAX_PUBLISH_THREAD_COUNT =
Integer.getInteger("redis.max-publish-thread-count", 10);

Expand Down Expand Up @@ -122,14 +120,25 @@ public long publish(RegionProvider regionProvider, byte[] channel, byte[] messag

@SuppressWarnings("unchecked")
private void internalPublish(RegionProvider regionProvider, byte[] channel, byte[] message) {
Set<DistributedMember> membersWithDataRegion = regionProvider.getRegionMembers();
Set<DistributedMember> remoteMembers = regionProvider.getRemoteRegionMembers();
try {
ResultCollector<?, ?> resultCollector = FunctionService
.onMembers(membersWithDataRegion)
.setArguments(new Object[] {channel, message})
.execute(REDIS_PUB_SUB_FUNCTION_ID);
// block until execute completes
resultCollector.getResult();
ResultCollector<?, ?> resultCollector = null;
try {
if (!remoteMembers.isEmpty()) {
// send function to remotes
resultCollector = FunctionService
.onMembers(remoteMembers)
.setArguments(new byte[][] {channel, message})
.execute(PublishFunction.ID);
}
} finally {
// execute it locally
publishMessageToLocalSubscribers(channel, message);
if (resultCollector != null) {
// block until remote execute completes
resultCollector.getResult();
}
}
} catch (Exception e) {
// the onMembers contract is for execute to throw an exception
// if one of the members goes down.
Expand All @@ -152,35 +161,7 @@ public SubscribeResult psubscribe(byte[] pattern, Client client) {
}

private void registerPublishFunction() {
FunctionService.registerFunction(new InternalFunction<Object[]>() {
@Override
public String getId() {
return REDIS_PUB_SUB_FUNCTION_ID;
}

@Override
public void execute(FunctionContext<Object[]> context) {
Object[] publishMessage = context.getArguments();
publishMessageToLocalSubscribers((byte[]) publishMessage[0], (byte[]) publishMessage[1]);
context.getResultSender().lastResult(true);
}

/**
* Since the publish process uses an onMembers function call, we don't want to re-publish
* to members if one fails.
* TODO: Revisit this in the event that we instead use an onMember call against individual
* members.
*/
@Override
public boolean isHA() {
return false;
}

@Override
public boolean hasResult() {
return true; // this is needed to preserve ordering
}
});
FunctionService.registerFunction(new PublishFunction(this));
}

@Override
Expand Down Expand Up @@ -231,4 +212,46 @@ void publishMessageToLocalSubscribers(byte[] channel, byte[] message) {
client, channel, message));
}

private static class PublishFunction implements InternalFunction<byte[][]> {
public static final String ID = "redisPubSubFunctionID";
/**
* this class is never serialized (since it implemented getId)
* but make tools happy by setting its serialVersionUID.
*/
private static final long serialVersionUID = -1L;

private final PubSubImpl pubSub;

public PublishFunction(PubSubImpl pubSub) {
this.pubSub = pubSub;
}

@Override
public String getId() {
return ID;
}

@Override
public void execute(FunctionContext<byte[][]> context) {
byte[][] publishMessage = context.getArguments();
pubSub.publishMessageToLocalSubscribers(publishMessage[0], publishMessage[1]);
context.getResultSender().lastResult(true);
}

/**
* Since the publish process uses an onMembers function call, we don't want to re-publish
* to members if one fails.
* TODO: Revisit this in the event that we instead use an onMember call against individual
* members.
*/
@Override
public boolean isHA() {
return false;
}

@Override
public boolean hasResult() {
return true; // this is needed to preserve ordering
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,3 @@ org/apache/geode/redis/internal/executor/BaseSetOptions$Exists,false
org/apache/geode/redis/internal/netty/CoderException,true,4707944288714910949
org/apache/geode/redis/internal/netty/RedisCommandParserException,true,4707944288714910949
org/apache/geode/redis/internal/parameters/RedisParametersMismatchException,true,-643700717871858072
org/apache/geode/redis/internal/pubsub/PubSubImpl$1,false,this$0:org/apache/geode/redis/internal/pubsub/PubSubImpl