diff --git a/CHANGELOG.md b/CHANGELOG.md
index a9d38cb..1da9fff 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -2,5 +2,11 @@
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).
+## 1.1.0 (07/24/2020)
+- Add Jedis implementation. Spout defaults to using the Lettuce redis library, but you can configure
+ to use the Jedis library instead via the `RedisStreamSpoutConfig.withJedisClientLibrary()` method.
+- Bugfix on Spout deploy, consumer thread started during `open()` lifecycle call instead of `activate()`.
+- Bugfix on Spout restart, resume consuming first from consumers personal pending list.
+
## 1.0.0 (07/20/2020)
- Initial release!
diff --git a/pom.xml b/pom.xml
index 55ef901..8a7e20a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
org.sourcelab.storm.spout
redis-stream-spout
- 1.0.0
+ 1.1.0
Redis Streams Spout for Apache Storm.
@@ -48,6 +48,7 @@
5.3.1.RELEASE
+ 3.2.0
5.6.2
@@ -78,6 +79,11 @@
lettuce-core
${lettuceVersion}
+
+ redis.clients
+ jedis
+ ${jedisVersion}
+
@@ -120,6 +126,14 @@
${testContainersVersion}
test
+
+
+
+ org.awaitility
+ awaitility
+ 4.0.3
+ test
+
diff --git a/src/main/java/org/sourcelab/storm/spout/redis/RedisStreamSpout.java b/src/main/java/org/sourcelab/storm/spout/redis/RedisStreamSpout.java
index e76f02c..ebc3566 100644
--- a/src/main/java/org/sourcelab/storm/spout/redis/RedisStreamSpout.java
+++ b/src/main/java/org/sourcelab/storm/spout/redis/RedisStreamSpout.java
@@ -8,8 +8,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sourcelab.storm.spout.redis.client.Client;
+import org.sourcelab.storm.spout.redis.client.ClientFactory;
import org.sourcelab.storm.spout.redis.client.Consumer;
-import org.sourcelab.storm.spout.redis.client.LettuceClient;
+import org.sourcelab.storm.spout.redis.client.lettuce.LettuceClient;
import org.sourcelab.storm.spout.redis.funnel.ConsumerFunnel;
import org.sourcelab.storm.spout.redis.funnel.MemoryFunnel;
import org.sourcelab.storm.spout.redis.funnel.SpoutFunnel;
@@ -83,16 +84,8 @@ public void open(
// Create funnel instance.
this.funnel = new MemoryFunnel(config, spoutConfig, topologyContext);
- // Create consumer and client
- final int taskIndex = topologyContext.getThisTaskIndex();
- final Client client = new LettuceClient(config, taskIndex);
- final Consumer consumer = new Consumer(config, client, (ConsumerFunnel) funnel);
-
- // Create background consuming thread.
- consumerThread = new Thread(
- consumer,
- "RedisStreamSpout-ConsumerThread[" + taskIndex + "]"
- );
+ // Create and start consumer thread.
+ createAndStartConsumerThread();
}
@Override
@@ -103,12 +96,15 @@ public void close() {
@Override
public void activate() {
- if (consumerThread.isAlive()) {
+ // If the thread is already running and alive
+ if (consumerThread != null && consumerThread.isAlive()) {
// No-op. It's already running, and deactivate() is a no-op for us.
return;
}
- // Start thread, this should return immediately, but start a background processing thread.
- consumerThread.start();
+
+ // If we haven't created the consumer thread yet, or it has previously died.
+ // Create and start it
+ createAndStartConsumerThread();
}
@Override
@@ -186,4 +182,21 @@ public void declareOutputFields(final OutputFieldsDeclarer declarer) {
public Map getComponentConfiguration() {
return new HashMap<>();
}
+
+ /**
+ * Create background consumer thread.
+ */
+ private void createAndStartConsumerThread() {
+ // Create consumer and client
+ final int taskIndex = topologyContext.getThisTaskIndex();
+ final Client client = new ClientFactory().createClient(config, taskIndex);
+ final Consumer consumer = new Consumer(config, client, (ConsumerFunnel) funnel);
+
+ // Create background consuming thread.
+ consumerThread = new Thread(
+ consumer,
+ "RedisStreamSpout-ConsumerThread[" + taskIndex + "]"
+ );
+ consumerThread.start();
+ }
}
diff --git a/src/main/java/org/sourcelab/storm/spout/redis/RedisStreamSpoutConfig.java b/src/main/java/org/sourcelab/storm/spout/redis/RedisStreamSpoutConfig.java
index 4f3a2a4..cd5aaac 100644
--- a/src/main/java/org/sourcelab/storm/spout/redis/RedisStreamSpoutConfig.java
+++ b/src/main/java/org/sourcelab/storm/spout/redis/RedisStreamSpoutConfig.java
@@ -1,5 +1,6 @@
package org.sourcelab.storm.spout.redis;
+import org.sourcelab.storm.spout.redis.client.ClientType;
import org.sourcelab.storm.spout.redis.failhandler.NoRetryHandler;
import java.io.Serializable;
@@ -70,6 +71,11 @@ public class RedisStreamSpoutConfig implements Serializable {
*/
private final boolean metricsEnabled;
+ /**
+ * Defines which underlying client library/implementation to use.
+ */
+ private final ClientType clientType;
+
/**
* Constructor.
* Use Builder instance.
@@ -85,7 +91,7 @@ private RedisStreamSpoutConfig(
// Other settings
final int maxConsumePerRead, final int maxTupleQueueSize, final int maxAckQueueSize, final long consumerDelayMillis,
- final boolean metricsEnabled
+ final boolean metricsEnabled, final ClientType clientType
) {
// Connection
if (redisCluster != null && redisServer != null) {
@@ -117,6 +123,9 @@ private RedisStreamSpoutConfig(
this.maxAckQueueSize = maxAckQueueSize;
this.consumerDelayMillis = consumerDelayMillis;
this.metricsEnabled = metricsEnabled;
+
+ // Client type implementation
+ this.clientType = Objects.requireNonNull(clientType);
}
public String getStreamKey() {
@@ -150,6 +159,17 @@ public String getConnectString() {
return redisCluster.getConnectString();
}
+ /**
+ * The URI for connecting to this Redis Server instance with the password masked.
+ * @return URI for the server.
+ */
+ public String getConnectStringMasked() {
+ if (!isConnectingToCluster()) {
+ return redisServer.getConnectStringMasked();
+ }
+ return redisCluster.getConnectStringMasked();
+ }
+
public int getMaxTupleQueueSize() {
return maxTupleQueueSize;
}
@@ -174,6 +194,10 @@ public boolean isMetricsEnabled() {
return metricsEnabled;
}
+ public ClientType getClientType() {
+ return clientType;
+ }
+
/**
* Create a new Builder instance.
* @return Builder for Configuration instance.
@@ -218,6 +242,12 @@ public static final class Builder {
private long consumerDelayMillis = 1000L;
private boolean metricsEnabled = true;
+ /**
+ * Underlying library to use.
+ * Defaults to using Lettuce.
+ */
+ private ClientType clientType = ClientType.LETTUCE;
+
private Builder() {
}
@@ -385,6 +415,27 @@ public Builder withMetricsEnabled(final boolean enabled) {
return this;
}
+ /**
+ * Configure the spout to use the Lettuce client library for communicating with redis.
+ * @return Builder instance.
+ */
+ public Builder withLettuceClientLibrary() {
+ return withClientType(ClientType.LETTUCE);
+ }
+
+ /**
+ * Configure the spout to use the Jedis client library for communicating with redis.
+ * @return Builder instance.
+ */
+ public Builder withJedisClientLibrary() {
+ return withClientType(ClientType.JEDIS);
+ }
+
+ public Builder withClientType(final ClientType clientType) {
+ this.clientType = Objects.requireNonNull(clientType);
+ return this;
+ }
+
/**
* Creates new Configuration instance.
* @return Configuration instance.
@@ -405,7 +456,10 @@ public RedisStreamSpoutConfig build() {
tupleConverter, failureHandler,
// Other settings
maxConsumePerRead, maxTupleQueueSize, maxAckQueueSize, consumerDelayMillis,
- metricsEnabled
+ metricsEnabled,
+
+ // Underlying client type
+ clientType
);
}
}
@@ -445,6 +499,16 @@ public String getConnectString() {
.map(RedisServer::getConnectString)
.collect(Collectors.joining(","));
}
+
+ /**
+ * The URI for connecting to this Redis Server instance with the password masked.
+ * @return URI for the server.
+ */
+ public String getConnectStringMasked() {
+ return getServers().stream()
+ .map(RedisServer::getConnectStringMasked)
+ .collect(Collectors.joining(","));
+ }
}
/**
@@ -503,6 +567,21 @@ public String getConnectString() {
return connectStr;
}
+ /**
+ * The URI for connecting to this Redis Server instance with the password masked.
+ * @return URI for the server.
+ */
+ public String getConnectStringMasked() {
+ String connectStr = "redis://";
+
+ if (getPassword() != null && !getPassword().trim().isEmpty()) {
+ connectStr += "XXXXXX@";
+ }
+ connectStr += getHost() + ":" + getPort();
+
+ return connectStr;
+ }
+
@Override
public String toString() {
return "RedisServer{"
diff --git a/src/main/java/org/sourcelab/storm/spout/redis/client/ClientFactory.java b/src/main/java/org/sourcelab/storm/spout/redis/client/ClientFactory.java
new file mode 100644
index 0000000..a8090cf
--- /dev/null
+++ b/src/main/java/org/sourcelab/storm/spout/redis/client/ClientFactory.java
@@ -0,0 +1,37 @@
+package org.sourcelab.storm.spout.redis.client;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.sourcelab.storm.spout.redis.RedisStreamSpoutConfig;
+import org.sourcelab.storm.spout.redis.client.jedis.JedisClient;
+import org.sourcelab.storm.spout.redis.client.lettuce.LettuceClient;
+
+import java.util.Objects;
+
+/**
+ * Factory for creating the appropriate Client instance based on config.
+ */
+public class ClientFactory {
+ private static final Logger logger = LoggerFactory.getLogger(ClientFactory.class);
+
+ /**
+ * Create the appropriate client intance based on configuration.
+ * @param config Spout configuration.
+ * @param instanceId Instance id of spout.
+ * @return Client.
+ */
+ public Client createClient(final RedisStreamSpoutConfig config, final int instanceId) {
+ Objects.requireNonNull(config);
+
+ switch (config.getClientType()) {
+ case JEDIS:
+ logger.info("Using Jedis client library.");
+ return new JedisClient(config, instanceId);
+ case LETTUCE:
+ logger.info("Using Lettuce client library.");
+ return new LettuceClient(config, instanceId);
+ default:
+ throw new IllegalStateException("Unknown/Unhandled Client Type");
+ }
+ }
+}
diff --git a/src/main/java/org/sourcelab/storm/spout/redis/client/ClientType.java b/src/main/java/org/sourcelab/storm/spout/redis/client/ClientType.java
new file mode 100644
index 0000000..b57a461
--- /dev/null
+++ b/src/main/java/org/sourcelab/storm/spout/redis/client/ClientType.java
@@ -0,0 +1,9 @@
+package org.sourcelab.storm.spout.redis.client;
+
+/**
+ * Defines allowed implementations.
+ */
+public enum ClientType {
+ LETTUCE,
+ JEDIS;
+}
diff --git a/src/main/java/org/sourcelab/storm/spout/redis/client/jedis/JedisAdapter.java b/src/main/java/org/sourcelab/storm/spout/redis/client/jedis/JedisAdapter.java
new file mode 100644
index 0000000..48e6628
--- /dev/null
+++ b/src/main/java/org/sourcelab/storm/spout/redis/client/jedis/JedisAdapter.java
@@ -0,0 +1,44 @@
+package org.sourcelab.storm.spout.redis.client.jedis;
+
+import redis.clients.jedis.StreamEntry;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Adapter to allow usage of both Jedis and JedisCluster.
+ */
+public interface JedisAdapter {
+ /**
+ * Call connect.
+ */
+ void connect();
+
+ /**
+ * Consume next batch of messages.
+ * @return List of messages consumed.
+ */
+ List>> consume();
+
+ /**
+ * Mark the provided messageId as acknowledged/completed.
+ * @param msgId Id of the message.
+ */
+ void commit(final String msgId);
+
+ /**
+ * Disconnect client.
+ */
+ void close();
+
+ /**
+ * Advance the last offset consumed from PPL.
+ * @param lastMsgId Id of the last msg consumed.
+ */
+ void advancePplOffset(final String lastMsgId);
+
+ /**
+ * Switch to consuming from latest messages.
+ */
+ void switchToConsumerGroupMessages();
+}
diff --git a/src/main/java/org/sourcelab/storm/spout/redis/client/jedis/JedisClient.java b/src/main/java/org/sourcelab/storm/spout/redis/client/jedis/JedisClient.java
new file mode 100644
index 0000000..d46eb57
--- /dev/null
+++ b/src/main/java/org/sourcelab/storm/spout/redis/client/jedis/JedisClient.java
@@ -0,0 +1,119 @@
+package org.sourcelab.storm.spout.redis.client.jedis;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.sourcelab.storm.spout.redis.Message;
+import org.sourcelab.storm.spout.redis.RedisStreamSpoutConfig;
+import org.sourcelab.storm.spout.redis.client.Client;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisCluster;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Redis Stream Consumer using the Jedi java library.
+ */
+public class JedisClient implements Client {
+ private static final Logger logger = LoggerFactory.getLogger(JedisClient.class);
+
+ /**
+ * The underlying Redis Client.
+ */
+ private final JedisAdapter adapter;
+
+ /**
+ * State for consuming first from consumer's personal pending list,
+ * then switching to reading from consumer group messages.
+ */
+ private boolean hasFinishedPpl = false;
+
+ /**
+ * Constructor.
+ * @param config Configuration.
+ * @param instanceId Which instance number is this running under.
+ */
+ public JedisClient(final RedisStreamSpoutConfig config, final int instanceId) {
+ this(
+ // Determine which adapter to use based on what type of redis instance we are
+ // communicating with.
+ JedisClient.createAdapter(config, instanceId)
+ );
+ }
+
+ /**
+ * Protected constructor for injecting a RedisClient instance, typically for tests.
+ * @param adapter JedisAdapter instance.
+ */
+ JedisClient(final JedisAdapter adapter) {
+ this.adapter = Objects.requireNonNull(adapter);
+ }
+
+ @Override
+ public void connect() {
+ // Connect
+ adapter.connect();
+
+ // Start consuming from PPL at first entry.
+ adapter.advancePplOffset("0-0");
+ }
+
+ @Override
+ public List nextMessages() {
+ final List messages = adapter.consume()
+ .stream()
+ .map(Map.Entry::getValue)
+ .flatMap(Collection::stream)
+ .map((entry) -> new Message(entry.getID().toString(), entry.getFields()))
+ .collect(Collectors.toList());
+
+ // If we haven't finished consuming from PPL, but we received no messages
+ if (!hasFinishedPpl) {
+ if (messages.isEmpty()) {
+ logger.info("Personal Pending List appears empty, switching to consuming from new messages.");
+
+ // Switch to reading from consumer group
+ hasFinishedPpl = true;
+ adapter.switchToConsumerGroupMessages();
+
+ // Re-attempt consuming
+ return nextMessages();
+ } else {
+ // Advance last index consumed from PPL so we don't continue to replay old messages.
+ final String lastId = messages.get(messages.size() - 1).getId();
+ adapter.advancePplOffset(lastId);
+ }
+ }
+ return messages;
+ }
+
+ @Override
+ public void commitMessage(final String msgId) {
+ adapter.commit(msgId);
+ }
+
+ @Override
+ public void disconnect() {
+ adapter.close();
+ }
+
+ /**
+ * Factory method for creating the appropriate adapter based on configuration.
+ * @param config Spout configuration.
+ * @return Appropriate Adapter.
+ */
+ private static JedisAdapter createAdapter(final RedisStreamSpoutConfig config, final int instanceId) {
+ final String connectStr = config.getConnectString().replaceAll("redis://", "");
+ if (config.isConnectingToCluster()) {
+ logger.info("Connecting to RedisCluster at {}", config.getConnectStringMasked());
+ return new JedisClusterAdapter(new JedisCluster(HostAndPort.parseString(connectStr)), config, instanceId);
+ } else {
+ logger.info("Connecting to RedisCluster at {}", config.getConnectStringMasked());
+ return new JedisRedisAdapter(new Jedis(HostAndPort.parseString(connectStr)), config, instanceId);
+ }
+ }
+}
diff --git a/src/main/java/org/sourcelab/storm/spout/redis/client/jedis/JedisClusterAdapter.java b/src/main/java/org/sourcelab/storm/spout/redis/client/jedis/JedisClusterAdapter.java
new file mode 100644
index 0000000..478d386
--- /dev/null
+++ b/src/main/java/org/sourcelab/storm/spout/redis/client/jedis/JedisClusterAdapter.java
@@ -0,0 +1,117 @@
+package org.sourcelab.storm.spout.redis.client.jedis;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.sourcelab.storm.spout.redis.RedisStreamSpoutConfig;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.StreamEntry;
+import redis.clients.jedis.StreamEntryID;
+import redis.clients.jedis.exceptions.JedisDataException;
+
+import java.util.AbstractMap;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Adapter for talking to a RedisCluster.
+ * If you need to talk to a single Redis instance {@link JedisRedisAdapter}.
+ */
+public class JedisClusterAdapter implements JedisAdapter {
+ private static final Logger logger = LoggerFactory.getLogger(JedisClusterAdapter.class);
+
+ private final JedisCluster jedisCluster;
+
+ /**
+ * Configuration properties for the client.
+ */
+ private final RedisStreamSpoutConfig config;
+
+ /**
+ * Generated from config.getConsumerIdPrefix() along with the spout's instance
+ * id to come to a unique consumerId to support parallelism.
+ */
+ private final String consumerId;
+
+ /**
+ * Contains the position key to read from.
+ */
+ private Map.Entry streamPositionKey;
+
+ /**
+ * Constructor.
+ * @param jedisCluster Underlying Jedis Cluster client instance.
+ * @param config Spout configuration.
+ * @param instanceId Spout instance Id.
+ */
+ public JedisClusterAdapter(final JedisCluster jedisCluster, final RedisStreamSpoutConfig config, final int instanceId) {
+ this.jedisCluster = Objects.requireNonNull(jedisCluster);
+ this.config = Objects.requireNonNull(config);
+ this.consumerId = config.getConsumerIdPrefix() + instanceId;
+ }
+
+ @Override
+ public void connect() {
+ // Attempt to create consumer group
+ try {
+ jedisCluster.xgroupCreate(config.getStreamKey(), config.getGroupName(), new StreamEntryID(), true);
+ } catch (final JedisDataException exception) {
+ // Consumer group already exists, that's ok. Just swallow this.
+ logger.debug(
+ "Group {} for key {} already exists? : {}", config.getGroupName(), config.getStreamKey(),
+ exception.getMessage(), exception
+ );
+ }
+
+ // Default to requesting entries from our personal pending queue.
+ advancePplOffset("0-0");
+ }
+
+ @Override
+ public List>> consume() {
+ final List>> entries = jedisCluster.xreadGroup(
+ config.getGroupName(),
+ consumerId,
+ config.getMaxConsumePerRead(),
+ 2000L,
+ false,
+ streamPositionKey
+ );
+
+ if (entries == null) {
+ return Collections.emptyList();
+ }
+ return entries;
+ }
+
+ @Override
+ public void commit(final String msgId) {
+ jedisCluster.xack(
+ config.getStreamKey(),
+ config.getGroupName(),
+ new StreamEntryID(msgId)
+ );
+ }
+
+ @Override
+ public void close() {
+ jedisCluster.close();
+ }
+
+ @Override
+ public void advancePplOffset(final String lastMsgId) {
+ streamPositionKey = new AbstractMap.SimpleEntry<>(
+ config.getStreamKey(),
+ new StreamEntryID(lastMsgId)
+ );
+ }
+
+ @Override
+ public void switchToConsumerGroupMessages() {
+ streamPositionKey = new AbstractMap.SimpleEntry<>(
+ config.getStreamKey(),
+ StreamEntryID.UNRECEIVED_ENTRY
+ );
+ }
+}
diff --git a/src/main/java/org/sourcelab/storm/spout/redis/client/jedis/JedisRedisAdapter.java b/src/main/java/org/sourcelab/storm/spout/redis/client/jedis/JedisRedisAdapter.java
new file mode 100644
index 0000000..8d6dab0
--- /dev/null
+++ b/src/main/java/org/sourcelab/storm/spout/redis/client/jedis/JedisRedisAdapter.java
@@ -0,0 +1,113 @@
+package org.sourcelab.storm.spout.redis.client.jedis;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.sourcelab.storm.spout.redis.RedisStreamSpoutConfig;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.StreamEntry;
+import redis.clients.jedis.StreamEntryID;
+import redis.clients.jedis.exceptions.JedisDataException;
+
+import java.util.AbstractMap;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Adapter for talking to a single Redis instance.
+ * If you need to talk to a RedisCluster {@link JedisClusterAdapter}.
+ */
+public class JedisRedisAdapter implements JedisAdapter {
+ private static final Logger logger = LoggerFactory.getLogger(JedisRedisAdapter.class);
+
+ private final Jedis jedis;
+
+ /**
+ * Configuration properties for the client.
+ */
+ private final RedisStreamSpoutConfig config;
+
+ /**
+ * Generated from config.getConsumerIdPrefix() along with the spout's instance
+ * id to come to a unique consumerId to support parallelism.
+ */
+ private final String consumerId;
+
+ /**
+ * Contains the position key to read from.
+ */
+ private Map.Entry streamPositionKey;
+
+ /**
+ * Constructor.
+ * @param jedis Underlying Jedis client instance.
+ * @param config Spout configuration.
+ * @param instanceId Spout instance Id.
+ */
+ public JedisRedisAdapter(final Jedis jedis, final RedisStreamSpoutConfig config, final int instanceId) {
+ this.jedis = Objects.requireNonNull(jedis);
+ this.config = Objects.requireNonNull(config);
+ this.consumerId = config.getConsumerIdPrefix() + instanceId;
+ }
+
+ @Override
+ public void connect() {
+ jedis.connect();
+
+ // Attempt to create consumer group
+ try {
+ jedis.xgroupCreate(config.getStreamKey(), config.getGroupName(), new StreamEntryID(), true);
+ } catch (final JedisDataException exception) {
+ // Consumer group already exists, that's ok. Just swallow this.
+ logger.debug(
+ "Group {} for key {} already exists? : {}", config.getGroupName(), config.getStreamKey(),
+ exception.getMessage(), exception
+ );
+ }
+
+ // Default to requesting entries from our personal pending queue.
+ advancePplOffset("0-0");
+ }
+
+ @Override
+ public List>> consume() {
+ final List>> entries = jedis.xreadGroup(
+ config.getGroupName(),
+ consumerId,
+ config.getMaxConsumePerRead(),
+ 2000L,
+ false,
+ streamPositionKey
+ );
+ if (entries == null) {
+ return Collections.emptyList();
+ }
+ return entries;
+ }
+
+ public void commit(final String msgId) {
+ jedis.xack(config.getStreamKey(), config.getGroupName(), new StreamEntryID(msgId));
+ }
+
+ @Override
+ public void close() {
+ jedis.quit();
+ }
+
+ @Override
+ public void advancePplOffset(final String lastMsgId) {
+ streamPositionKey = new AbstractMap.SimpleEntry<>(
+ config.getStreamKey(),
+ new StreamEntryID(lastMsgId)
+ );
+ }
+
+ @Override
+ public void switchToConsumerGroupMessages() {
+ streamPositionKey = new AbstractMap.SimpleEntry<>(
+ config.getStreamKey(),
+ StreamEntryID.UNRECEIVED_ENTRY
+ );
+ }
+}
diff --git a/src/main/java/org/sourcelab/storm/spout/redis/client/LettuceAdapter.java b/src/main/java/org/sourcelab/storm/spout/redis/client/lettuce/LettuceAdapter.java
similarity index 91%
rename from src/main/java/org/sourcelab/storm/spout/redis/client/LettuceAdapter.java
rename to src/main/java/org/sourcelab/storm/spout/redis/client/lettuce/LettuceAdapter.java
index d08a0ae..cc43d8f 100644
--- a/src/main/java/org/sourcelab/storm/spout/redis/client/LettuceAdapter.java
+++ b/src/main/java/org/sourcelab/storm/spout/redis/client/lettuce/LettuceAdapter.java
@@ -1,4 +1,4 @@
-package org.sourcelab.storm.spout.redis.client;
+package org.sourcelab.storm.spout.redis.client.lettuce;
import io.lettuce.core.api.sync.RedisStreamCommands;
diff --git a/src/main/java/org/sourcelab/storm/spout/redis/client/LettuceClient.java b/src/main/java/org/sourcelab/storm/spout/redis/client/lettuce/LettuceClient.java
similarity index 67%
rename from src/main/java/org/sourcelab/storm/spout/redis/client/LettuceClient.java
rename to src/main/java/org/sourcelab/storm/spout/redis/client/lettuce/LettuceClient.java
index 44049d8..4a7c288 100644
--- a/src/main/java/org/sourcelab/storm/spout/redis/client/LettuceClient.java
+++ b/src/main/java/org/sourcelab/storm/spout/redis/client/lettuce/LettuceClient.java
@@ -1,4 +1,4 @@
-package org.sourcelab.storm.spout.redis.client;
+package org.sourcelab.storm.spout.redis.client.lettuce;
import io.lettuce.core.Consumer;
import io.lettuce.core.RedisBusyException;
@@ -12,6 +12,7 @@
import org.slf4j.LoggerFactory;
import org.sourcelab.storm.spout.redis.Message;
import org.sourcelab.storm.spout.redis.RedisStreamSpoutConfig;
+import org.sourcelab.storm.spout.redis.client.Client;
import java.util.List;
import java.util.Objects;
@@ -44,7 +45,13 @@ public class LettuceClient implements Client {
*/
private final XReadArgs xreadArgs;
private final Consumer consumerFrom;
- private final XReadArgs.StreamOffset lastConsumed;
+
+ /**
+ * State for consuming first from consumer's personal pending list,
+ * then switching to reading from consumer group messages.
+ */
+ private boolean hasFinishedPpl = false;
+ private XReadArgs.StreamOffset lastConsumed;
/**
* Constructor.
@@ -55,11 +62,8 @@ public LettuceClient(final RedisStreamSpoutConfig config, final int instanceId)
this(
config,
instanceId,
- // Determine which adapter to use based on what type of redis instance we are
- // communicating with.
- config.isConnectingToCluster()
- ? new LettuceClusterAdapter(RedisClusterClient.create(config.getConnectString()))
- : new LettuceRedisAdapter(RedisClient.create(config.getConnectString()))
+ // Determine which adapter to use based on what type of redis instance we are communicating with.
+ createAdapter(config)
);
}
@@ -85,9 +89,6 @@ public LettuceClient(final RedisStreamSpoutConfig config, final int instanceId)
// Create re-usable ConsumerFrom instance.
consumerFrom = Consumer.from(config.getGroupName(), consumerId);
-
- // Create re-usable lastConsumed instance.
- lastConsumed = XReadArgs.StreamOffset.lastConsumed(config.getStreamKey());
}
@Override
@@ -98,6 +99,10 @@ public void connect() {
adapter.connect();
+ // Create re-usable lastConsumed instance, default to consuming from PPL list
+ lastConsumed = XReadArgs.StreamOffset.from(config.getStreamKey(), "0-0");
+ hasFinishedPpl = false;
+
try {
// Attempt to create consumer group
adapter.getSyncCommands().xgroupCreate(
@@ -128,17 +133,35 @@ public void connect() {
@Override
public List nextMessages() {
// Get next batch of messages.
- final List> messages = adapter.getSyncCommands().xreadgroup(
+ final List> entries = adapter.getSyncCommands().xreadgroup(
consumerFrom,
xreadArgs,
lastConsumed
);
// Loop over each message
- return messages.stream()
+ final List messages = entries.stream()
// Map into Message Object
.map((streamMsg) -> new Message(streamMsg.getId(), streamMsg.getBody()))
.collect(Collectors.toList());
+
+ if (!hasFinishedPpl) {
+ if (messages.isEmpty()) {
+ logger.info("Personal Pending List appears empty, switching to consuming from new messages.");
+
+ hasFinishedPpl = true;
+ lastConsumed = XReadArgs.StreamOffset.lastConsumed(config.getStreamKey());
+
+ // Re-attempt consuming
+ return nextMessages();
+ } else {
+ // Advance last index consumed from PPL so we don't continue to replay old messages.
+ final String lastId = messages.get(messages.size() - 1).getId();
+ lastConsumed = XReadArgs.StreamOffset.from(config.getStreamKey(), lastId);
+ }
+ }
+
+ return messages;
}
@Override
@@ -155,4 +178,19 @@ public void commitMessage(final String msgId) {
public void disconnect() {
adapter.shutdown();
}
+
+ /**
+ * Factory method for creating the appropriate adapter based on configuration.
+ * @param config Spout configuration.
+ * @return Appropriate Adapter.
+ */
+ private static LettuceAdapter createAdapter(final RedisStreamSpoutConfig config) {
+ if (config.isConnectingToCluster()) {
+ logger.info("Connecting to RedisCluster at {}", config.getConnectStringMasked());
+ return new LettuceClusterAdapter(RedisClusterClient.create(config.getConnectString()));
+ } else {
+ logger.info("Connecting to Redis server at {}", config.getConnectStringMasked());
+ return new LettuceRedisAdapter(RedisClient.create(config.getConnectString()));
+ }
+ }
}
diff --git a/src/main/java/org/sourcelab/storm/spout/redis/client/LettuceClusterAdapter.java b/src/main/java/org/sourcelab/storm/spout/redis/client/lettuce/LettuceClusterAdapter.java
similarity index 96%
rename from src/main/java/org/sourcelab/storm/spout/redis/client/LettuceClusterAdapter.java
rename to src/main/java/org/sourcelab/storm/spout/redis/client/lettuce/LettuceClusterAdapter.java
index 21e178d..d1d4f46 100644
--- a/src/main/java/org/sourcelab/storm/spout/redis/client/LettuceClusterAdapter.java
+++ b/src/main/java/org/sourcelab/storm/spout/redis/client/lettuce/LettuceClusterAdapter.java
@@ -1,4 +1,4 @@
-package org.sourcelab.storm.spout.redis.client;
+package org.sourcelab.storm.spout.redis.client.lettuce;
import io.lettuce.core.api.sync.RedisStreamCommands;
import io.lettuce.core.cluster.RedisClusterClient;
diff --git a/src/main/java/org/sourcelab/storm/spout/redis/client/LettuceRedisAdapter.java b/src/main/java/org/sourcelab/storm/spout/redis/client/lettuce/LettuceRedisAdapter.java
similarity index 96%
rename from src/main/java/org/sourcelab/storm/spout/redis/client/LettuceRedisAdapter.java
rename to src/main/java/org/sourcelab/storm/spout/redis/client/lettuce/LettuceRedisAdapter.java
index 9fdbddf..a134db5 100644
--- a/src/main/java/org/sourcelab/storm/spout/redis/client/LettuceRedisAdapter.java
+++ b/src/main/java/org/sourcelab/storm/spout/redis/client/lettuce/LettuceRedisAdapter.java
@@ -1,4 +1,4 @@
-package org.sourcelab.storm.spout.redis.client;
+package org.sourcelab.storm.spout.redis.client.lettuce;
import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
diff --git a/src/test/java/org/sourcelab/storm/spout/redis/AbstractRedisStreamSpoutIntegrationTest.java b/src/test/java/org/sourcelab/storm/spout/redis/AbstractRedisStreamSpoutIntegrationTest.java
index 13a371b..2d768a4 100644
--- a/src/test/java/org/sourcelab/storm/spout/redis/AbstractRedisStreamSpoutIntegrationTest.java
+++ b/src/test/java/org/sourcelab/storm/spout/redis/AbstractRedisStreamSpoutIntegrationTest.java
@@ -1,16 +1,17 @@
package org.sourcelab.storm.spout.redis;
import org.apache.storm.generated.StreamInfo;
-import org.apache.storm.spout.ISpout;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsGetter;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Utils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.sourcelab.storm.spout.redis.client.ClientType;
import org.sourcelab.storm.spout.redis.example.TestTupleConverter;
import org.sourcelab.storm.spout.redis.failhandler.RetryFailedTuples;
import org.sourcelab.storm.spout.redis.util.outputcollector.EmittedTuple;
@@ -19,6 +20,7 @@
import org.sourcelab.storm.spout.redis.util.test.RedisTestHelper;
import org.sourcelab.storm.spout.redis.util.test.StreamConsumerInfo;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -26,6 +28,7 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -103,8 +106,12 @@ void cleanup() {
/**
* Most basic lifecycle smoke test.
*/
- @Test
- void smokeTest_openAndClose() {
+ @ParameterizedTest
+ @EnumSource(ClientType.class)
+ void smokeTest_openAndClose(final ClientType clientType) {
+ // Inject client type into config
+ configBuilder.withClientType(clientType);
+
// Create spout
try (final RedisStreamSpout spout = new RedisStreamSpout(configBuilder.build())) {
@@ -112,6 +119,8 @@ void smokeTest_openAndClose() {
// Open spout
spout.open(stormConfig, mockTopologyContext, new SpoutOutputCollector(collector));
+ spout.activate();
+ spout.deactivate();
// Close spout via autocloseable
}
@@ -122,9 +131,14 @@ void smokeTest_openAndClose() {
/**
* Basic lifecycle smoke test.
+ * Basically validating that nothing explodes.
*/
- @Test
- void smokeTest_openActivateDeactivateAndClose() throws InterruptedException {
+ @ParameterizedTest
+ @EnumSource(ClientType.class)
+ void smokeTest_openActivateDeactivateAndClose(final ClientType clientType) throws InterruptedException {
+ // Inject client type into config
+ configBuilder.withClientType(clientType);
+
// Create spout
try (final RedisStreamSpout spout = new RedisStreamSpout(configBuilder.build())) {
final StubSpoutCollector collector = new StubSpoutCollector();
@@ -138,7 +152,7 @@ void smokeTest_openActivateDeactivateAndClose() throws InterruptedException {
// Small sleep
Thread.sleep(3000L);
- // Deactivate and close via Autoclosable
+ // Deactivate and close via Autocloseable
spout.deactivate();
}
@@ -146,13 +160,53 @@ void smokeTest_openActivateDeactivateAndClose() throws InterruptedException {
verify(mockTopologyContext, times(1)).getThisTaskIndex();
}
+ /**
+ * Basic lifecycle smoke test.
+ * Cycle calling activate/deactivate a few times.
+ */
+ @ParameterizedTest
+ @EnumSource(ClientType.class)
+ void smokeTest_cycleActivateDeactivate(final ClientType clientType) throws InterruptedException {
+ // Inject client type into config
+ configBuilder.withClientType(clientType);
+
+ // Create spout
+ try (final RedisStreamSpout spout = new RedisStreamSpout(configBuilder.build())) {
+ final StubSpoutCollector collector = new StubSpoutCollector();
+
+ // Open spout
+ spout.open(stormConfig, mockTopologyContext, new SpoutOutputCollector(collector));
+
+ // Cycle Activate and Deactivate a few times
+ spout.activate();
+ Thread.sleep(2000L);
+ spout.deactivate();
+
+ spout.activate();
+ Thread.sleep(2000L);
+ spout.deactivate();
+
+ spout.activate();
+ Thread.sleep(2000L);
+ spout.deactivate();
+
+ // Close via Autocloseable.
+ }
+
+ // Verify mocks
+ verify(mockTopologyContext, times(1)).getThisTaskIndex();
+ }
+
/**
* Verifies the behavior when you attempt to connect to a redis instance
* that does not exist. Looks like nothing. You get errors in the logs.
*
* Disabled for now.
*/
- void smokeTest_configureInvalidRedisHost() throws InterruptedException {
+ void smokeTest_configureInvalidRedisHost(final ClientType clientType) throws InterruptedException {
+ // Inject client type into config
+ configBuilder.withClientType(clientType);
+
// Lets override the redis host with something invalid
configBuilder
.withServer(getTestContainer().getHost(), 124);
@@ -182,14 +236,18 @@ void smokeTest_configureInvalidRedisHost() throws InterruptedException {
}
// Verify mocks
- verify(mockTopologyContext, times(1)).getThisTaskIndex();
+ verify(mockTopologyContext, times(2)).getThisTaskIndex();
}
/**
* Basic usage test.
*/
- @Test
- void smokeTest_consumeAndAckMessages() throws InterruptedException {
+ @ParameterizedTest
+ @EnumSource(ClientType.class)
+ void smokeTest_consumeAndAckMessages(final ClientType clientType) throws InterruptedException {
+ // Inject client type into config
+ configBuilder.withClientType(clientType);
+
// Create spout
try (final RedisStreamSpout spout = new RedisStreamSpout(configBuilder.build())) {
final StubSpoutCollector collector = new StubSpoutCollector();
@@ -203,13 +261,16 @@ void smokeTest_consumeAndAckMessages() throws InterruptedException {
// Lets publish 10 messages to the stream
final List producedMsgIds = redisTestHelper.produceMessages(streamKey, 10);
- // Now lets try to get those from the spout
- do {
- spout.nextTuple();
- Thread.sleep(100L);
- } while (collector.getEmittedTuples().size() < 10);
+ // Now lets try to get those from the spout.
+ // Call spout.nextTuple() until the spout has emitted 10 tuples.
+ await()
+ .atMost(Duration.ofSeconds(10))
+ .until(() -> {
+ spout.nextTuple();
+ return collector.getEmittedTuples().size() == 10;
+ });
- // Call next tuple a few more times, should be a no-op
+ // Call next tuple a few more times, should be a no-op nothing further should be emitted.
for (int counter = 0; counter < 10; counter++) {
Thread.sleep(100L);
spout.nextTuple();
@@ -217,7 +278,6 @@ void smokeTest_consumeAndAckMessages() throws InterruptedException {
// Verify what got emitted.
assertEquals(10, collector.getEmittedTuples().size(), "Should have found 10 emitted tuples.");
-
final String expectedStreamId = Utils.DEFAULT_STREAM_ID;
for (int index = 0; index < producedMsgIds.size(); index++) {
final EmittedTuple emittedTuple = collector.getEmittedTuples().get(index);
@@ -243,27 +303,33 @@ void smokeTest_consumeAndAckMessages() throws InterruptedException {
assertTrue(foundValue, "Failed to find msgId tuple value");
}
- // See that we have 10 items pending
- StreamConsumerInfo consumerInfo = redisTestHelper.getConsumerInfo(streamKey, GROUP_NAME, CONSUMER_ID);
- assertNotNull(consumerInfo, "Failed to find consumer info!");
+ // See that we have 10 items pending in the stream's consumer list.
+ await()
+ .atMost(Duration.ofSeconds(10))
+ .until(() -> {
+ final StreamConsumerInfo consumerInfo = redisTestHelper.getConsumerInfo(streamKey, GROUP_NAME, CONSUMER_ID);
+ assertNotNull(consumerInfo, "Failed to find consumer info!");
- // Verify we have 10 items pending
- assertEquals(10L, consumerInfo.getPending(), "Found entries pending");
+ // Verify we have 10 items pending
+ return consumerInfo.getPending() == 10;
+ });
// Now Ack the messages
collector.getEmittedTuples().stream()
.map(EmittedTuple::getMessageId)
.forEach(spout::ack);
- // Small delay waiting for processing.
- Thread.sleep(1000L);
+ // We should see the number of pending messages for our consumer drop to 0
+ await()
+ .atMost(Duration.ofSeconds(10))
+ .until(() -> {
+ // Verify that our message were acked in redis.
+ final StreamConsumerInfo consumerInfo = redisTestHelper.getConsumerInfo(streamKey, GROUP_NAME, CONSUMER_ID);
+ assertNotNull(consumerInfo, "Failed to find consumer info!");
- // Verify that our message were acked in redis.
- consumerInfo = redisTestHelper.getConsumerInfo(streamKey, GROUP_NAME, CONSUMER_ID);
- assertNotNull(consumerInfo, "Failed to find consumer info!");
-
- // Verify we have nothing pending
- assertEquals(0L, consumerInfo.getPending(), "Found entries pending?");
+ // Verify we have nothing pending
+ return consumerInfo.getPending() == 0;
+ });
// Deactivate and close via Autocloseable
spout.deactivate();
@@ -276,9 +342,13 @@ void smokeTest_consumeAndAckMessages() throws InterruptedException {
/**
* Basic usage with retry failure handler.
*/
- @Test
- void smokeTest_consumeFailAndAckMessages() throws InterruptedException {
- // Swap out failure handler
+ @ParameterizedTest
+ @EnumSource(ClientType.class)
+ void smokeTest_consumeFailAndAckMessages(final ClientType clientType) throws InterruptedException {
+ // Inject client type into config
+ configBuilder.withClientType(clientType);
+
+ // Swap out failure handler, each tuple should be retried a maximum of twice.
configBuilder.withFailureHandler(new RetryFailedTuples(2));
// Create spout
@@ -295,15 +365,16 @@ void smokeTest_consumeFailAndAckMessages() throws InterruptedException {
List producedMsgIds = redisTestHelper.produceMessages(streamKey, 10);
// Now lets try to get 5 of those those from the spout...
- do {
- spout.nextTuple();
- Thread.sleep(100L);
- } while (collector.getEmittedTuples().size() < 5);
-
+ // Call spout.nextTuple() until we have at least 5 tuples emitted to the output collector.
+ await()
+ .atMost(Duration.ofSeconds(10))
+ .until(() -> {
+ spout.nextTuple();
+ return collector.getEmittedTuples().size() == 5;
+ });
// Verify what got emitted.
- assertEquals(5, collector.getEmittedTuples().size(), "Should have found 10 emitted tuples.");
-
+ assertEquals(5, collector.getEmittedTuples().size(), "Should have found 5 emitted tuples.");
final String expectedStreamId = Utils.DEFAULT_STREAM_ID;
for (int index = 0; index < 5; index++) {
final EmittedTuple emittedTuple = collector.getEmittedTuples().get(index);
@@ -329,13 +400,17 @@ void smokeTest_consumeFailAndAckMessages() throws InterruptedException {
assertTrue(foundValue, "Failed to find msgId tuple value");
}
- // See that we have 10 items pending
+ // Since we have NOT acked any tuples, we should have at least 5 tuples pending with a max of 10
+ // depending on how many have been consumed by the spout's consuming thread.
StreamConsumerInfo consumerInfo = redisTestHelper.getConsumerInfo(streamKey, GROUP_NAME, CONSUMER_ID);
assertNotNull(consumerInfo, "Failed to find consumer info!");
- assertEquals(10L, consumerInfo.getPending(), "Found entries pending");
+ assertTrue(consumerInfo.getPending() >= 5, "At least 5 entries pending");
+ assertTrue(consumerInfo.getPending() <= 10, "No more than 10 entries pending");
+ // We want to setup the following scenario using the 5 tuples the spout has emitted so far.
+ // ack the first 3 messages
+ // fail the last 2 messages.
final List messageIdsToFail = new ArrayList<>();
-
for (int index = 0; index < 5; index++) {
// Now ack the first 3 messages
if (index < 3) {
@@ -355,38 +430,48 @@ void smokeTest_consumeFailAndAckMessages() throws InterruptedException {
collector.reset();
// Small delay waiting for processing.
- Thread.sleep(1000L);
-
- // Verify that our message were acked in redis.
- consumerInfo = redisTestHelper.getConsumerInfo(streamKey, GROUP_NAME, CONSUMER_ID);
- assertNotNull(consumerInfo, "Failed to find consumer info!");
-
- // Verify we have 7 pending
- assertEquals(7L, consumerInfo.getPending(), "Found entries pending");
-
- // Ask for the next two tuples, we should get our failed tuples back out.
- do {
- spout.nextTuple();
- } while (collector.getEmittedTuples().size() < 2);
-
- // We should have emitted two tuples.
+ // Wait until we have at least 7 pending, which is our 10 total messages, minus the 3 acked ones.
+ await()
+ .atMost(Duration.ofSeconds(10))
+ .until(() -> {
+ final StreamConsumerInfo consumerState = redisTestHelper.getConsumerInfo(streamKey, GROUP_NAME, CONSUMER_ID);
+
+ // Verify that our message were acked in redis.
+ assertNotNull(consumerState, "Failed to find consumer info!");
+
+ return consumerState.getPending() == 7;
+ });
+
+ // Ask the spout for the next two tuples,
+ // The expectation here is that we should get our failed tuples back out.
+ await()
+ .atMost(Duration.ofSeconds(10))
+ .until(() -> {
+ spout.nextTuple();
+ return collector.getEmittedTuples().size() == 2;
+ });
+
+ // We should have emitted two tuples, and they should have been our failed tuples.
assertEquals(2, collector.getEmittedTuples().size());
assertEquals(messageIdsToFail.get(0), collector.getEmittedTuples().get(0).getMessageId());
assertEquals(messageIdsToFail.get(1), collector.getEmittedTuples().get(1).getMessageId());
- // Ack them
+ // Ack the previously failed tuples.
spout.ack(messageIdsToFail.get(0));
spout.ack(messageIdsToFail.get(1));
// Small delay waiting for processing.
- Thread.sleep(1000L);
-
- // Verify that our message were acked in redis.
- consumerInfo = redisTestHelper.getConsumerInfo(streamKey, GROUP_NAME, CONSUMER_ID);
- assertNotNull(consumerInfo, "Failed to find consumer info!");
-
- // Verify we have 5 pending
- assertEquals(5L, consumerInfo.getPending(), "Found entries pending");
+ // We're looking for the number of pending to drop to 5, since we've now acked 5 of the 10 original tuples.
+ await()
+ .atMost(Duration.ofSeconds(10))
+ .until(() -> {
+ // Verify that our message were acked in redis.
+ final StreamConsumerInfo consumerState = redisTestHelper.getConsumerInfo(streamKey, GROUP_NAME, CONSUMER_ID);
+ assertNotNull(consumerState, "Failed to find consumer info!");
+
+ // Verify we have 5 pending
+ return consumerState.getPending() == 5;
+ });
// Deactivate and close via Autocloseable
spout.deactivate();
@@ -399,8 +484,12 @@ void smokeTest_consumeFailAndAckMessages() throws InterruptedException {
/**
* Verify declareOutputFields using TestTupleConverter.
*/
- @Test
- void test_declareOutputFields() {
+ @ParameterizedTest
+ @EnumSource(ClientType.class)
+ void test_declareOutputFields(final ClientType clientType) {
+ // Inject client type into config
+ configBuilder.withClientType(clientType);
+
// Create a test implementation
final TupleConverter converter = new DummyTupleConverter() ;
@@ -446,8 +535,12 @@ void test_declareOutputFields() {
/**
* Verify spout emits tuples down the correct stream.
*/
- @Test
- void test_EmitDownSeparateStreams() {
+ @ParameterizedTest
+ @EnumSource(ClientType.class)
+ void test_EmitDownSeparateStreams(final ClientType clientType) {
+ // Inject client type into config
+ configBuilder.withClientType(clientType);
+
// Create a test implementation
final TupleConverter converter = new DummyTupleConverter() ;
@@ -460,6 +553,7 @@ void test_EmitDownSeparateStreams() {
// Open spout
spout.open(stormConfig, mockTopologyContext, new SpoutOutputCollector(collector));
+ spout.activate();
// Ask for stream names
final OutputFieldsGetter getter = new OutputFieldsGetter();
@@ -505,8 +599,12 @@ void test_EmitDownSeparateStreams() {
* Verify if tuple converter instance returns null, then the message
* is simply acked and nothing is emitted.
*/
- @Test
- void test_NullConversionJustGetsAckedNothingEmitted() {
+ @ParameterizedTest
+ @EnumSource(ClientType.class)
+ void test_NullConversionJustGetsAckedNothingEmitted(final ClientType clientType) {
+ // Inject client type into config
+ configBuilder.withClientType(clientType);
+
// Create a test implementation
final TupleConverter converter = new NullTupleConverter() ;
diff --git a/src/test/java/org/sourcelab/storm/spout/redis/RedisStreamSpout_ClusterIntegrationTest.java b/src/test/java/org/sourcelab/storm/spout/redis/RedisStreamSpout_ClusterIntegrationTest.java
index 6c36374..ce8e874 100644
--- a/src/test/java/org/sourcelab/storm/spout/redis/RedisStreamSpout_ClusterIntegrationTest.java
+++ b/src/test/java/org/sourcelab/storm/spout/redis/RedisStreamSpout_ClusterIntegrationTest.java
@@ -6,7 +6,7 @@
import org.testcontainers.junit.jupiter.Testcontainers;
/**
- *
+ * Runs Spout integration tests against a RedisCluster.
*/
@Testcontainers
@Tag("Integration")
diff --git a/src/test/java/org/sourcelab/storm/spout/redis/client/AbstractLettuceClientIntegrationTest.java b/src/test/java/org/sourcelab/storm/spout/redis/client/AbstractClientIntegrationTest.java
similarity index 81%
rename from src/test/java/org/sourcelab/storm/spout/redis/client/AbstractLettuceClientIntegrationTest.java
rename to src/test/java/org/sourcelab/storm/spout/redis/client/AbstractClientIntegrationTest.java
index e4b5160..2de058c 100644
--- a/src/test/java/org/sourcelab/storm/spout/redis/client/AbstractLettuceClientIntegrationTest.java
+++ b/src/test/java/org/sourcelab/storm/spout/redis/client/AbstractClientIntegrationTest.java
@@ -15,26 +15,28 @@
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
- * Abstract Integration test over LettuceClient.
+ * Abstract Integration test over Client implementations.
* Used as a base to test the client against both a single Redis instance, and against
* a RedisCluster instance.
*/
-abstract class AbstractLettuceClientIntegrationTest {
+public abstract class AbstractClientIntegrationTest {
private static final String CONSUMER_ID_PREFIX = "ConsumerId";
private static final int MAX_CONSUMED_PER_READ = 10;
private RedisTestHelper redisTestHelper;
private RedisStreamSpoutConfig config;
- private LettuceClient client;
+ private Client client;
private String streamKey;
- abstract RedisTestContainer getTestContainer();
+ public abstract RedisTestContainer getTestContainer();
+ public abstract Client createClient(final RedisStreamSpoutConfig config, final int instanceId);
@BeforeEach
void setUp(){
@@ -45,7 +47,7 @@ void setUp(){
config = createConfiguration(CONSUMER_ID_PREFIX + "1");
// Create client instance under test.
- client = new LettuceClient(config, 1);
+ client = createClient(config, 1);
// Create test helper instance.
redisTestHelper = getTestContainer().getRedisTestHelper();
@@ -131,7 +133,7 @@ void testSimpleConsumeMultipleMessages() {
void testConsumeMultipleConsumers() {
// Define 2nd client, but don't connect yet
final RedisStreamSpoutConfig config2 = createConfiguration(CONSUMER_ID_PREFIX + "2");
- final LettuceClient client2 = new LettuceClient(config2, 2);
+ final Client client2 = createClient(config2, 2);
try {
// Connect first client
@@ -185,7 +187,7 @@ void testConsumeMultipleConsumers() {
void testConsumeMultipleConsumers_scenario2() {
// Define 2nd client, but don't connect yet
final RedisStreamSpoutConfig config2 = createConfiguration(CONSUMER_ID_PREFIX + "2");
- final LettuceClient client2 = new LettuceClient(config2, 2);
+ final Client client2 = createClient(config2, 2);
try {
// Connect first client
@@ -283,8 +285,8 @@ void testSimpleConsumeMultipleMessages_withReconnect() {
// Write more messages to stream while client is disconnected.
expectedMessageIds = redisTestHelper.produceMessages(streamKey, MAX_CONSUMED_PER_READ);
- // Create new client using the same client
- final LettuceClient client2 = new LettuceClient(config, 1);
+ // Create new client using the same config
+ final Client client2 = createClient(config, 1);
client2.connect();
// Consume messages, should be the messages we got.
@@ -294,6 +296,69 @@ void testSimpleConsumeMultipleMessages_withReconnect() {
client2.disconnect();
}
+ /**
+ * 1. Connect, consume 10 messages, but only commit 5, then disconnect.
+ * 2. Create a new client using the same configuration and consume. We should receive the 5 uncommitted messages.
+ */
+ @Test
+ void testConsumeUncommittedMessages_withReconnect() {
+ // Connect
+ client.connect();
+
+ // Ask for messages.
+ List messages = client.nextMessages();
+ assertNotNull(messages, "Should be non-null");
+ assertTrue(messages.isEmpty(), "Should be empty");
+
+ // Now Submit more messages to the stream
+ List expectedMessageIds = redisTestHelper.produceMessages(streamKey, MAX_CONSUMED_PER_READ);
+
+ // Ask for the next messages
+ messages = client.nextMessages();
+
+ // Validate
+ verifyConsumedMessagesInOrder(expectedMessageIds, messages);
+
+ // Commit the first half
+ final List uncommittedMessageIds = new ArrayList<>();
+ for (int index = 0; index < messages.size(); index ++) {
+ final String msgId = messages.get(index).getId();
+ // Commit the first half
+ if (index < (MAX_CONSUMED_PER_READ / 2)) {
+ client.commitMessage(msgId);
+ } else {
+ // Remember the uncommitted messages.
+ uncommittedMessageIds.add(msgId);
+ }
+ }
+ // Sanity check
+ assertFalse(uncommittedMessageIds.isEmpty(), "[SANITY CHECK] Should have uncommitted messages");
+
+ // Ask for messages, should be empty
+ messages = client.nextMessages();
+ assertNotNull(messages, "Should be non-null");
+ assertEquals(0, messages.size(), "Should be empty");
+
+ // Disconnect client.
+ client.disconnect();
+
+ // Create new client using the same config
+ final Client client2 = createClient(config, 1);
+ client2.connect();
+
+ // Consume messages, should be the uncommitted messages
+ messages = client2.nextMessages();
+
+ // We should have the uncommitted messages.
+ verifyConsumedMessagesInOrder(uncommittedMessageIds, messages);
+
+ // Consume again should be empty
+ messages = client2.nextMessages();
+ assertTrue(messages.isEmpty(), "Should be empty list of messages");
+
+ client2.disconnect();
+ }
+
private void verifyConsumedMessagesInOrder(final List expectedMessageIds, final List foundMessages) {
// Validate
assertNotNull(foundMessages, "Should never be null");
diff --git a/src/test/java/org/sourcelab/storm/spout/redis/client/jedis/JedisClient_RedisClusterIntegrationTest.java b/src/test/java/org/sourcelab/storm/spout/redis/client/jedis/JedisClient_RedisClusterIntegrationTest.java
new file mode 100644
index 0000000..d83b7b3
--- /dev/null
+++ b/src/test/java/org/sourcelab/storm/spout/redis/client/jedis/JedisClient_RedisClusterIntegrationTest.java
@@ -0,0 +1,37 @@
+package org.sourcelab.storm.spout.redis.client.jedis;
+
+import org.junit.jupiter.api.Tag;
+import org.sourcelab.storm.spout.redis.RedisStreamSpoutConfig;
+import org.sourcelab.storm.spout.redis.client.AbstractClientIntegrationTest;
+import org.sourcelab.storm.spout.redis.client.Client;
+import org.sourcelab.storm.spout.redis.util.test.RedisTestContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+/**
+ * NOTE: This Integration test requires Docker to run.
+ *
+ * This integration test verifies JedisClient against a RedisCluster instance to verify
+ * things work as expected when consuming from a RedisCluster.
+ *
+ * Test cases are defined in {@link AbstractClientIntegrationTest}.
+ */
+@Testcontainers
+@Tag("Integration")
+public class JedisClient_RedisClusterIntegrationTest extends AbstractClientIntegrationTest {
+ /**
+ * This test depends on the following Redis Container.
+ */
+ @Container
+ public RedisTestContainer redisContainer = RedisTestContainer.newRedisContainer();
+
+ @Override
+ public RedisTestContainer getTestContainer() {
+ return redisContainer;
+ }
+
+ @Override
+ public Client createClient(final RedisStreamSpoutConfig config, final int instanceId) {
+ return new JedisClient(config, instanceId);
+ }
+}
diff --git a/src/test/java/org/sourcelab/storm/spout/redis/client/jedis/JedisClient_RedisIntegrationTest.java b/src/test/java/org/sourcelab/storm/spout/redis/client/jedis/JedisClient_RedisIntegrationTest.java
new file mode 100644
index 0000000..e58c84f
--- /dev/null
+++ b/src/test/java/org/sourcelab/storm/spout/redis/client/jedis/JedisClient_RedisIntegrationTest.java
@@ -0,0 +1,37 @@
+package org.sourcelab.storm.spout.redis.client.jedis;
+
+import org.junit.jupiter.api.Tag;
+import org.sourcelab.storm.spout.redis.RedisStreamSpoutConfig;
+import org.sourcelab.storm.spout.redis.client.AbstractClientIntegrationTest;
+import org.sourcelab.storm.spout.redis.client.Client;
+import org.sourcelab.storm.spout.redis.util.test.RedisTestContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+/**
+ * NOTE: This Integration test requires Docker to run.
+ *
+ * This integration test verifies JedisClient against a Redis instance to verify
+ * things work as expected when consuming from a Redis instance.
+ *
+ * Test cases are defined in {@link AbstractClientIntegrationTest}.
+ */
+@Testcontainers
+@Tag("Integration")
+public class JedisClient_RedisIntegrationTest extends AbstractClientIntegrationTest {
+ /**
+ * This test depends on the following Redis Container.
+ */
+ @Container
+ public RedisTestContainer redisContainer = RedisTestContainer.newRedisContainer();
+
+ @Override
+ public RedisTestContainer getTestContainer() {
+ return redisContainer;
+ }
+
+ @Override
+ public Client createClient(final RedisStreamSpoutConfig config, final int instanceId) {
+ return new JedisClient(config, instanceId);
+ }
+}
diff --git a/src/test/java/org/sourcelab/storm/spout/redis/client/LettuceClient_RedisClusterIntegrationTest.java b/src/test/java/org/sourcelab/storm/spout/redis/client/lettuce/LettuceClient_RedisClusterIntegrationTest.java
similarity index 57%
rename from src/test/java/org/sourcelab/storm/spout/redis/client/LettuceClient_RedisClusterIntegrationTest.java
rename to src/test/java/org/sourcelab/storm/spout/redis/client/lettuce/LettuceClient_RedisClusterIntegrationTest.java
index 338a35c..0c60760 100644
--- a/src/test/java/org/sourcelab/storm/spout/redis/client/LettuceClient_RedisClusterIntegrationTest.java
+++ b/src/test/java/org/sourcelab/storm/spout/redis/client/lettuce/LettuceClient_RedisClusterIntegrationTest.java
@@ -1,6 +1,9 @@
-package org.sourcelab.storm.spout.redis.client;
+package org.sourcelab.storm.spout.redis.client.lettuce;
import org.junit.jupiter.api.Tag;
+import org.sourcelab.storm.spout.redis.RedisStreamSpoutConfig;
+import org.sourcelab.storm.spout.redis.client.AbstractClientIntegrationTest;
+import org.sourcelab.storm.spout.redis.client.Client;
import org.sourcelab.storm.spout.redis.util.test.RedisTestContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
@@ -11,11 +14,11 @@
* This integration test verifies LettuceClient against a RedisCluster instance to verify
* things work as expected when consuming from a RedisCluster.
*
- * Test cases are defined in {@link AbstractLettuceClientIntegrationTest}.
+ * Test cases are defined in {@link AbstractClientIntegrationTest}.
*/
@Testcontainers
@Tag("Integration")
-public class LettuceClient_RedisClusterIntegrationTest extends AbstractLettuceClientIntegrationTest {
+public class LettuceClient_RedisClusterIntegrationTest extends AbstractClientIntegrationTest {
/**
* This test depends on the following Redis Container.
*/
@@ -23,7 +26,12 @@ public class LettuceClient_RedisClusterIntegrationTest extends AbstractLettuceCl
public RedisTestContainer redisContainer = RedisTestContainer.newRedisClusterContainer();
@Override
- RedisTestContainer getTestContainer() {
+ public RedisTestContainer getTestContainer() {
return redisContainer;
}
+
+ @Override
+ public Client createClient(final RedisStreamSpoutConfig config, final int instanceId) {
+ return new LettuceClient(config, instanceId);
+ }
}
diff --git a/src/test/java/org/sourcelab/storm/spout/redis/client/LettuceClient_RedisIntegrationTest.java b/src/test/java/org/sourcelab/storm/spout/redis/client/lettuce/LettuceClient_RedisIntegrationTest.java
similarity index 57%
rename from src/test/java/org/sourcelab/storm/spout/redis/client/LettuceClient_RedisIntegrationTest.java
rename to src/test/java/org/sourcelab/storm/spout/redis/client/lettuce/LettuceClient_RedisIntegrationTest.java
index b28c976..0b5ae1f 100644
--- a/src/test/java/org/sourcelab/storm/spout/redis/client/LettuceClient_RedisIntegrationTest.java
+++ b/src/test/java/org/sourcelab/storm/spout/redis/client/lettuce/LettuceClient_RedisIntegrationTest.java
@@ -1,6 +1,9 @@
-package org.sourcelab.storm.spout.redis.client;
+package org.sourcelab.storm.spout.redis.client.lettuce;
import org.junit.jupiter.api.Tag;
+import org.sourcelab.storm.spout.redis.RedisStreamSpoutConfig;
+import org.sourcelab.storm.spout.redis.client.AbstractClientIntegrationTest;
+import org.sourcelab.storm.spout.redis.client.Client;
import org.sourcelab.storm.spout.redis.util.test.RedisTestContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
@@ -11,11 +14,11 @@
* This integration test verifies LettuceClient against a Redis instance to verify
* things work as expected when consuming from a Redis instance.
*
- * Test cases are defined in {@link AbstractLettuceClientIntegrationTest}.
+ * Test cases are defined in {@link AbstractClientIntegrationTest}.
*/
@Testcontainers
@Tag("Integration")
-public class LettuceClient_RedisIntegrationTest extends AbstractLettuceClientIntegrationTest {
+public class LettuceClient_RedisIntegrationTest extends AbstractClientIntegrationTest {
/**
* This test depends on the following Redis Container.
*/
@@ -23,7 +26,12 @@ public class LettuceClient_RedisIntegrationTest extends AbstractLettuceClientInt
public RedisTestContainer redisContainer = RedisTestContainer.newRedisContainer();
@Override
- RedisTestContainer getTestContainer() {
+ public RedisTestContainer getTestContainer() {
return redisContainer;
}
+
+ @Override
+ public Client createClient(final RedisStreamSpoutConfig config, final int instanceId) {
+ return new LettuceClient(config, instanceId);
+ }
}
diff --git a/src/test/java/org/sourcelab/storm/spout/redis/client/LettuceClusterAdapterTest.java b/src/test/java/org/sourcelab/storm/spout/redis/client/lettuce/LettuceClusterAdapterTest.java
similarity index 97%
rename from src/test/java/org/sourcelab/storm/spout/redis/client/LettuceClusterAdapterTest.java
rename to src/test/java/org/sourcelab/storm/spout/redis/client/lettuce/LettuceClusterAdapterTest.java
index d5db7cd..e8eb057 100644
--- a/src/test/java/org/sourcelab/storm/spout/redis/client/LettuceClusterAdapterTest.java
+++ b/src/test/java/org/sourcelab/storm/spout/redis/client/lettuce/LettuceClusterAdapterTest.java
@@ -1,4 +1,4 @@
-package org.sourcelab.storm.spout.redis.client;
+package org.sourcelab.storm.spout.redis.client.lettuce;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
diff --git a/src/test/java/org/sourcelab/storm/spout/redis/client/LettuceRedisAdapterTest.java b/src/test/java/org/sourcelab/storm/spout/redis/client/lettuce/LettuceRedisAdapterTest.java
similarity index 97%
rename from src/test/java/org/sourcelab/storm/spout/redis/client/LettuceRedisAdapterTest.java
rename to src/test/java/org/sourcelab/storm/spout/redis/client/lettuce/LettuceRedisAdapterTest.java
index f1caecc..bfdfaf9 100644
--- a/src/test/java/org/sourcelab/storm/spout/redis/client/LettuceRedisAdapterTest.java
+++ b/src/test/java/org/sourcelab/storm/spout/redis/client/lettuce/LettuceRedisAdapterTest.java
@@ -1,4 +1,4 @@
-package org.sourcelab.storm.spout.redis.client;
+package org.sourcelab.storm.spout.redis.client.lettuce;
import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
diff --git a/src/test/java/org/sourcelab/storm/spout/redis/util/test/RedisTestHelper.java b/src/test/java/org/sourcelab/storm/spout/redis/util/test/RedisTestHelper.java
index 637c320..b8572b4 100644
--- a/src/test/java/org/sourcelab/storm/spout/redis/util/test/RedisTestHelper.java
+++ b/src/test/java/org/sourcelab/storm/spout/redis/util/test/RedisTestHelper.java
@@ -3,9 +3,9 @@
import io.lettuce.core.RedisClient;
import io.lettuce.core.api.sync.RedisStreamCommands;
import io.lettuce.core.cluster.RedisClusterClient;
-import org.sourcelab.storm.spout.redis.client.LettuceAdapter;
-import org.sourcelab.storm.spout.redis.client.LettuceClusterAdapter;
-import org.sourcelab.storm.spout.redis.client.LettuceRedisAdapter;
+import org.sourcelab.storm.spout.redis.client.lettuce.LettuceAdapter;
+import org.sourcelab.storm.spout.redis.client.lettuce.LettuceClusterAdapter;
+import org.sourcelab.storm.spout.redis.client.lettuce.LettuceRedisAdapter;
import java.util.ArrayList;
import java.util.HashMap;