Skip to content

Commit

Permalink
JBTM-3762 cluster support
Browse files Browse the repository at this point in the history
  • Loading branch information
mmusgrov committed Apr 17, 2023
1 parent 0bc6a6e commit 0dad492
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 21 deletions.
Expand Up @@ -21,11 +21,11 @@ public CloudId(String nodeId, String failoverGroupId) {
}

public CloudId(String nodeId, String failoverGroupId, String description) {
this.nodeId = nodeId;
this.failoverGroupId = failoverGroupId;
this.id = String.format("%s:%s", nodeId, failoverGroupId);
this.nodeId = nodeId;
this.id = String.format("%s:%s", failoverGroupId, nodeId);
this.description = description;
this.keyPattern = String.format("%s:*", id);
this.keyPattern = String.format("{%s}:%s:*", failoverGroupId, nodeId); // matches all keys if this failover group
}

public String allKeysPattern() {
Expand Down
Expand Up @@ -14,7 +14,7 @@
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Transaction;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.params.ScanParams;
import redis.clients.jedis.resps.ScanResult;

Expand All @@ -24,6 +24,8 @@
import java.util.List;
import java.util.Set;

import static redis.clients.jedis.params.ScanParams.SCAN_POINTER_START;

/**
* Redis backed implementation of the SlotStore backend.
* Ensure that your Redis installation is configured for
Expand Down Expand Up @@ -86,7 +88,13 @@ private Set<String> loadClustered() {

for (ConnectionPool node : jedisCluster.getClusterNodes().values()) {
try (Jedis j = new Jedis(node.getResource())) {
keys.addAll(j.keys(cloudId.keyPattern));
// load keys matching this recovery manager
// String pattern = String.format("{%s}:%s:*", cloudId.failoverGroupId, cloudId.nodeId);
Set<String> candidates = j.keys(cloudId.allKeysPattern()); //cloudId.keyPattern);
// filter out candidates that don't match this managers node id
//"{0}:migration-node:6"
// Collection actuals = candidates.stream().filter(s -> s.matches(pattern)).collect(Collectors.toList());
keys.addAll(candidates);
}
}

Expand Down Expand Up @@ -114,9 +122,12 @@ private void load(Set<String> keys) {

// initialise the remaining slots
while (i < slots.length) {
// prefix the slot key with the cloudId and force keys for nodeId into the same hash slot
// prefix the slot key with the cloudId and force keys for nodeId + failoverId into the same hash slot
// (using the curly brace notation) so that they will be stored on the same redis node
slots[i] = String.format("{%s}:%d", cloudId.id, i).getBytes(StandardCharsets.UTF_8);
// In this way we can perform multikey operations on a slot
// see https://redis.io/docs/reference/cluster-spec/ section "Key distribution model" for more info
// slots[i] = String.format("{%s}:%d", cloudId.id, i).getBytes(StandardCharsets.UTF_8);
slots[i] = String.format("{%s}:%s:%d", cloudId.failoverGroupId, cloudId.nodeId, i).getBytes(StandardCharsets.UTF_8);
i += 1;
}
}
Expand Down Expand Up @@ -163,7 +174,11 @@ public void write(int slot, byte[] data, boolean sync) throws IOException {
public byte[] read(int slot) throws IOException {
if (clustered) {
try (JedisCluster jedis = new JedisCluster(hostAndPort)) {
return jedis.get(slots[slot]);
try {
return jedis.get(slots[slot]);
} catch (Exception e) {
throw new IOException(e);
}
}
} else {
try (Jedis jedis = jedisPool.getResource()) {
Expand Down Expand Up @@ -207,13 +222,18 @@ public boolean migrate(CloudId from, CloudId to) {
}

String keyPattern = from.allKeysPattern();
int prefixLength = from.nodeId.length();

try (JedisCluster jedis = new JedisCluster(hostAndPort)) {
for (String key : getKeys(keyPattern)) {
String newKey = to.nodeId + key.substring(prefixLength);

jedis.rename(key, newKey);
String newKey = key.replace(from.nodeId, to.nodeId);

try {
String res = jedis.rename(key, newKey);
System.out.printf("%s%n", res);
} catch (JedisException e) {
System.out.printf("%s%n", e.getMessage());
return false;
}
}
}

Expand All @@ -222,14 +242,14 @@ public boolean migrate(CloudId from, CloudId to) {

private void getKeys(Jedis node, String keyPattern, Set<String> keySet) {
ScanParams scanParams = new ScanParams().count(100).match(keyPattern);
String cursor = ScanParams.SCAN_POINTER_START;
String cursor = SCAN_POINTER_START;

do {
ScanResult<String> scanResult = node.scan(cursor, scanParams);
List<String> keys = scanResult.getResult();
keySet.addAll(keys);
cursor = scanResult.getCursor();
} while (!cursor.equals(ScanParams.SCAN_POINTER_START));
} while (!cursor.equals(SCAN_POINTER_START));
}

private Set<String> getKeys(String keyPattern) {
Expand Down
Expand Up @@ -52,21 +52,40 @@ public class RedisStoreTest {
private static RedisSlots redisSlots;
private static RecoveryStore recoveryStore;
private static boolean clustered;

private static Boolean redisAvailable = null;

private static boolean isRedisRunning() {
if (redisAvailable != null) {
return redisAvailable;
}

String uri = redisConfig.getRedisURI();
if (clustered) {
try (JedisCluster jedisCluster = new JedisCluster(hostAndPort)) {
for (ConnectionPool node : jedisCluster.getClusterNodes().values()) {
try (Jedis ignore = new Jedis(node.getResource())) {
redisAvailable = true;
break;
}
}
} catch (Exception e) {
redisAvailable = false;
log.warnf("Skipping RedisStoreTests because Redis is not running on the configured endpoint %s:%d",
redisHost, redisPort);
}
} else {
try (JedisPool jedisPool = new JedisPool(redisHost, redisPort)) {
try (Jedis ignore = jedisPool.getResource()) {
redisAvailable = true;
}
}
}

try (Jedis ignored = new Jedis(uri)) {
try (Jedis ignored = new Jedis(redisConfig.getRedisHost(), redisConfig.getRedisPort())) {
redisAvailable = true;
} catch (Exception e) {
redisAvailable = false;
log.warnf("Skipping RedisStoreTests because Redis is not running on the configured endpoint %s", uri);
log.warnf("Skipping RedisStoreTests because Redis is not running on the configured endpoint %s:%d",
redisHost, redisPort);
}

return redisAvailable;
Expand All @@ -82,7 +101,9 @@ public static void before() throws CoreEnvironmentBeanException {
redisConfig.setBackingSlots(redisSlots);
redisConfig.setClustered(true);

clustered = true;
clustered = redisConfig.isClustered();
redisHost = redisConfig.getRedisHost();
redisPort = redisConfig.getRedisPort();

BeanPopulator.getDefaultInstance(ObjectStoreEnvironmentBean.class).
setObjectStoreType(SlotStoreAdaptor.class.getName());
Expand Down Expand Up @@ -292,8 +313,8 @@ public void testMove() throws ObjectStoreException, IOException {
// only loads keys corresponding to this nodeId, ignoring ones corresponding to toNodeId)
RecoveryStore finalRecoveryStore = recoveryStore;
Assertions.assertThrows(ObjectStoreException.class, () -> {
InputObjectState buff = finalRecoveryStore.read_committed(uid, typeName);
assertEquals(value, buff.unpackString());
InputObjectState buff = finalRecoveryStore.read_committed(uid, typeName); // should throw an exception
assertEquals(value, buff.unpackString()); // should not be reached
});

keysAfter = delKeys(toNodeId + ":*"); // clean up
Expand Down
@@ -0,0 +1,12 @@
<!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd">
<properties>
<entry key="CoreEnvironmentBean.nodeIdentifier">from-node</entry>
<entry key="RecoveryEnvironmentBean.recoveryModuleClassNames">io.narayana.lra.coordinator.internal.LRARecoveryModule</entry>

<entry key="ObjectStoreEnvironmentBean.objectStoreType">com.arjuna.ats.internal.arjuna.objectstore.slot.SlotStoreAdaptor</entry>
<entry key="ObjectStoreEnvironmentBean.stateStore.objectStoreType">com.arjuna.ats.internal.arjuna.objectstore.slot.SlotStoreAdaptor</entry>

<entry key="SlotStoreEnvironmentBean.backingSlotsClassName">com.arjuna.ats.internal.arjuna.objectstore.slot.RedisSlots</entry>

<entry key="CoordinatorEnvironmentBean.transactionStatusManagerEnable">false</entry>
</properties>

0 comments on commit 0dad492

Please sign in to comment.