Skip to content

Commit

Permalink
JBTM-3762 redis store: PoC is good
Browse files Browse the repository at this point in the history
  • Loading branch information
mmusgrov committed Mar 24, 2023
1 parent 6368235 commit a633e0a
Show file tree
Hide file tree
Showing 5 changed files with 249 additions and 0 deletions.
@@ -0,0 +1,63 @@
/*
* Copyright The Narayana Authors
*
* SPDX-License-Identifier: LGPL-2.1-only
*/

package com.arjuna.ats.internal.arjuna.objectstore.slot;

import com.arjuna.ats.arjuna.common.CoreEnvironmentBean;
import com.arjuna.common.internal.util.propertyservice.BeanPopulator;
import redis.clients.jedis.Jedis;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

/**
* Redis backed implementation of the SlotStore backend.
*/
public class RedisSlots implements BackingSlots {
private String namespace;
private byte[][] slots = null;
private Jedis jedis; // Java API for Redis

@Override
public void init(SlotStoreEnvironmentBean slotStoreConfig) throws IOException {
if (slots != null) {
throw new IllegalStateException("already initialized");
}

RedisStoreEnvironmentBean env = BeanPopulator.getDefaultInstance(RedisStoreEnvironmentBean.class);


jedis = new Jedis(env.getRedisHost(), env.getRedisPort()); // Connect to a Redis server
// TODO where is the API for fini so that we can call jedis.disconnect();
slots = new byte[slotStoreConfig.getNumberOfSlots()][];

namespace = BeanPopulator.getDefaultInstance(CoreEnvironmentBean.class).getNodeIdentifier() + ':';

for (int i = 0; i < slots.length; i++) {
slots[i] = (namespace + i).getBytes(StandardCharsets.UTF_8);
}

}

@Override
public void write(int slot, byte[] data, boolean sync) {
String ok = jedis.set(slots[slot], data);

if (ok == null) { // ok == "OK" means success
throw new RuntimeException("failed to write " + slot);
}
}

@Override
public byte[] read(int slot) {
return jedis.get(slots[slot]);
}

@Override
public void clear(int slot, boolean sync) {
write(slot, new byte[0], sync);
}
}
@@ -0,0 +1,28 @@
/*
* Copyright The Narayana Authors
*
* SPDX-License-Identifier: LGPL-2.1-only
*/

package com.arjuna.ats.internal.arjuna.objectstore.slot;

public class RedisStoreEnvironmentBean {
private String redisHost = "127.0.0.1";
private int redisPort = 6379;

public String getRedisHost() {
return redisHost;
}

public void setRedisHost(String redisHost) {
this.redisHost = redisHost;
}

public int getRedisPort() {
return redisPort;
}

public void setRedisPort(int redisPort) {
this.redisPort = redisPort;
}
}
Expand Up @@ -211,6 +211,7 @@ public void setBackingSlots(BackingSlots instance) {

if (instance == null) {
this.backingSlotsClassName = null;

} else if (instance != oldInstance) {
String name = ClassloadingUtility.getNameForClass(instance);
this.backingSlotsClassName = name;
Expand Down
5 changes: 5 additions & 0 deletions ArjunaCore/arjuna/pom.xml
Expand Up @@ -114,6 +114,11 @@
<artifactId>mashona-logwriting</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>4.4.0-m2</version>
</dependency>
<dependency>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging</artifactId>
Expand Down
@@ -0,0 +1,152 @@
/*
* Copyright The Narayana Authors
*
* SPDX-License-Identifier: LGPL-2.1-only
*/

package com.hp.mwtests.ts.arjuna.objectstore;

import com.arjuna.ats.arjuna.common.CoreEnvironmentBean;
import com.arjuna.ats.arjuna.common.CoreEnvironmentBeanException;
import com.arjuna.ats.arjuna.common.ObjectStoreEnvironmentBean;
import com.arjuna.ats.arjuna.common.Uid;
import com.arjuna.ats.arjuna.exceptions.ObjectStoreException;
import com.arjuna.ats.arjuna.objectstore.ObjectStoreIterator;
import com.arjuna.ats.arjuna.objectstore.RecoveryStore;
import com.arjuna.ats.arjuna.objectstore.StoreManager;
import com.arjuna.ats.arjuna.state.InputObjectState;
import com.arjuna.ats.arjuna.state.OutputObjectState;
import com.arjuna.ats.internal.arjuna.objectstore.slot.RedisSlots;
import com.arjuna.ats.internal.arjuna.objectstore.slot.SlotStoreAdaptor;
import com.arjuna.ats.internal.arjuna.objectstore.slot.SlotStoreEnvironmentBean;
import com.arjuna.common.internal.util.propertyservice.BeanPopulator;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;

import java.io.IOException;


public class RedisStoreTest {
private final static String typeName = "/StateManager/junit";

@BeforeAll
public static void before() throws CoreEnvironmentBeanException {
BeanPopulator.getDefaultInstance(ObjectStoreEnvironmentBean.class).
setObjectStoreType(SlotStoreAdaptor.class.getName());
BeanPopulator.getDefaultInstance(SlotStoreEnvironmentBean.class).
setBackingSlots(new RedisSlots());
BeanPopulator.getDefaultInstance(CoreEnvironmentBean.class).setNodeIdentifier("test-node");
// BeanPopulator.getDefaultInstance(SlotStoreEnvironmentBean.class).
// setBackingSlots(new RedisSlots());
}

@AfterAll
public static void after() {
}

@Test
public void inTxn() throws Exception {
Jedis jedis = new Jedis("127.0.0.1", 6379);
// Transaction t = jedis.multi();
String k = "k1";
String v = "d2";
jedis.set(k, v);

// t.sadd("data3", "value3");
// t.sadd("data4", "value4");
// t.exec();
// jedis.disconnect();
System.out.printf("done%n");
}

@Test
public void test () throws Exception {
RecoveryStore recoveryStore = StoreManager.getRecoveryStore();
Uid uid = new Uid();
OutputObjectState outputData = new OutputObjectState();
final String v = "hello2";

outputData.packString(v);

// add a record to the object store
boolean ok = recoveryStore.write_committed(uid, typeName, outputData);
Assertions.assertTrue(ok);

InputObjectState inputData = recoveryStore.read_committed(uid, typeName);
String value = inputData.unpackString();

Assertions.assertEquals(v, value);
}

@Test
public void test1 () throws Exception {
RecoveryStore recoveryStore = StoreManager.getRecoveryStore();
Uid[] uid = {new Uid(), new Uid()};
OutputObjectState[] outputData = {new OutputObjectState(), new OutputObjectState()};
final String[] DATA = {"hello","world"};

// store two values
for (int i = 0; i < DATA.length; i++) {
outputData[i].packString(DATA[i]);

// add a record to the object store
boolean ok = recoveryStore.write_committed(uid[i], typeName, outputData[i]);
Assertions.assertTrue(ok);

InputObjectState inputData = recoveryStore.read_committed(uid[i], typeName);
String value = inputData.unpackString();

Assertions.assertEquals(DATA[i], value);
}

// clean up
// recoveryStore.remove_committed(uid[0], typeName);
// recoveryStore.remove_committed(uid[1], typeName);
}

// @Test
public void test2 () throws Exception {
RecoveryStore rs = StoreManager.getRecoveryStore();
// ParticipantStore ps = StoreManager.getParticipantStore();

final int count = countUids(rs, typeName);

for (int i = 0; i < 2; i++) {
Uid u = new Uid();
OutputObjectState buff = new OutputObjectState();
InputObjectState iBuff;

buff.packInt(i);

rs.write_committed(u, typeName, buff);

iBuff = rs.read_committed(u, typeName);

Assertions.assertEquals(iBuff.unpackInt(), i);

Assertions.assertEquals(count + 1, countUids(rs, typeName));

rs.remove_committed(u, typeName);
}
}

private int countUids(RecoveryStore rs, String tn) throws ObjectStoreException, IOException {
int i = 0;
ObjectStoreIterator iter = new ObjectStoreIterator(rs, tn);

while (true) {
Uid u = iter.iterate();

if (Uid.nullUid().equals(u))
break;

i += 1;
}

return i;
}
}

0 comments on commit a633e0a

Please sign in to comment.