diff --git a/README.md b/README.md index 092953d..90bc614 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,8 @@ This project is an [Apache Storm](https://storm.apache.org/) Spout for consuming ### Features -- Ability to consume from Redis Streams while maintaing state. +- Ability to consume from Redis Streams while maintaining state. +- Ability to consume from a single Redis server or a RedisCluster. - Parallelism supported via unique Consumer Ids. ### Usage & Configuration @@ -30,9 +31,6 @@ The spout is configured using the [RedisStreamSpoutConfig](src/main/java/org/sou | Property | Required | Description | |----------|----------|-------------| -| `Host` | Required | The hostname to connect to Redis at. | -| `Port` | Required | The port to connect to Redis at. | -| `Password` | optional | Password to connect to Redis using. | | `Group Name` | Required | The Consumer group name the Spout should use. | | `Consumer Id Prefix` | Required | A prefix to use for generating unique Consumer Ids within the Consumer Group. To support multiple parallel consumers, the Spout instance will be appended to the end of this value. | | `Stream Key` | Required | The Redis key to consume messages from. | @@ -44,17 +42,24 @@ The spout is configured using the [RedisStreamSpoutConfig](src/main/java/org/sou ```java // Create config final RedisStreamSpoutConfig.Builder config = RedisStreamSpoutConfig.newBuilder() - // Set Connection Properties - .withHost("localhost") - .withPort(6179) + // If you want to connect to a single Redis instance: + .withServer("localhost", 6759) + + // OR if you want to talk to a RedisCluster: + .withClusterNode("node1.hostname.com", 6759) + .withClusterNode("node2.hostname.com", 6759) + ... + // Consumer Properties .withGroupName("StormConsumerGroup") .withConsumerIdPrefix("StormConsumer") .withStreamKey("RedisStreamKeyName") - // Tuple Handler Class + + // Tuple Converter instance (see note below) .withTupleConverter(..Your TupleConvertor implementation...) - // Failure Handler - .withFailureHandler(new RetryFailedTuples(10)); + + // Failure Handler instance (see note below) + .withFailureHandler(new ExponentialBackoffFailureHandler(...)); // Create Spout diff --git a/src/main/java/org/sourcelab/storm/spout/redis/RedisStreamSpout.java b/src/main/java/org/sourcelab/storm/spout/redis/RedisStreamSpout.java index 28bba33..e76f02c 100644 --- a/src/main/java/org/sourcelab/storm/spout/redis/RedisStreamSpout.java +++ b/src/main/java/org/sourcelab/storm/spout/redis/RedisStreamSpout.java @@ -21,7 +21,7 @@ /** * Redis Stream based Spout for Apache Storm 2.2.x. */ -public class RedisStreamSpout implements IRichSpout { +public class RedisStreamSpout implements IRichSpout, AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(RedisStreamSpout.class); /** diff --git a/src/main/java/org/sourcelab/storm/spout/redis/RedisStreamSpoutConfig.java b/src/main/java/org/sourcelab/storm/spout/redis/RedisStreamSpoutConfig.java index 37e5e81..4f3a2a4 100644 --- a/src/main/java/org/sourcelab/storm/spout/redis/RedisStreamSpoutConfig.java +++ b/src/main/java/org/sourcelab/storm/spout/redis/RedisStreamSpoutConfig.java @@ -3,7 +3,11 @@ import org.sourcelab.storm.spout.redis.failhandler.NoRetryHandler; import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Objects; +import java.util.stream.Collectors; /** * Configuration properties for the spout. @@ -12,9 +16,8 @@ public class RedisStreamSpoutConfig implements Serializable { /** * Redis server details. */ - private final String host; - private final int port; - private final String password; + private final RedisServer redisServer; + private final RedisCluster redisCluster; /** * The Redis key to stream from. @@ -69,11 +72,12 @@ public class RedisStreamSpoutConfig implements Serializable { /** * Constructor. - * See Builder instance. + * Use Builder instance. */ - public RedisStreamSpoutConfig( + private RedisStreamSpoutConfig( // Redis Connection Properties - final String host, final int port, final String password, + final RedisServer redisServer, + final RedisCluster redisCluster, // Consumer properties final String streamKey, final String groupName, final String consumerIdPrefix, // Classes @@ -83,10 +87,20 @@ public RedisStreamSpoutConfig( final int maxConsumePerRead, final int maxTupleQueueSize, final int maxAckQueueSize, final long consumerDelayMillis, final boolean metricsEnabled ) { - // Connection Details. - this.host = Objects.requireNonNull(host); - this.port = port; - this.password = password; + // Connection + if (redisCluster != null && redisServer != null) { + throw new IllegalStateException( + "You cannot configure connection details for both a single Redis server and RedisCluster. " + + "Use either Builder.withServer() OR Builder.withClusterNode(), but NOT both. " + + "If talking to a single Redis instance use Builder.withServer(). " + + "If talking to a RedisCluster use Builder.withClusterNode() to configure one or more nodes in the cluster." + ); + } else if (redisCluster == null && redisServer == null) { + throw new IllegalStateException("You must configure connection details for either a single Redis server and RedisCluster."); + } + + this.redisCluster = redisCluster; + this.redisServer = redisServer; // Consumer Details this.groupName = Objects.requireNonNull(groupName); @@ -105,18 +119,6 @@ public RedisStreamSpoutConfig( this.metricsEnabled = metricsEnabled; } - public String getHost() { - return host; - } - - public int getPort() { - return port; - } - - public String getPassword() { - return password; - } - public String getStreamKey() { return streamKey; } @@ -133,18 +135,19 @@ public int getMaxConsumePerRead() { return maxConsumePerRead; } + public boolean isConnectingToCluster() { + return redisCluster != null; + } + /** * Build a Redis connection string based on configured properties. * @return Redis Connection string. */ public String getConnectString() { - String connectStr = "redis://"; - if (getPassword() != null && !getPassword().trim().isEmpty()) { - connectStr += getPassword() + "@"; + if (!isConnectingToCluster()) { + return redisServer.getConnectString(); } - connectStr += getHost() + ":" + getPort(); - - return connectStr; + return redisCluster.getConnectString(); } public int getMaxTupleQueueSize() { @@ -186,9 +189,8 @@ public static final class Builder { /** * Connection details. */ - private String host; - private int port; - private String password; + private final List clusterNodes = new ArrayList<>(); + private RedisServer redisServer = null; /** * Consumer details. @@ -219,36 +221,104 @@ public static final class Builder { private Builder() { } - public Builder withHost(final String host) { - this.host = host; - return this; + /** + * Define connection details for connecting to a single Redis server. + * + * NOTE: If you want to connect to a RedisCluster, {@link Builder#withClusterNode}. + * + * @param host Host of redis server to connect to. + * @param port Port of redis server to connect to. + * @return Builder. + */ + public Builder withServer(final String host, final int port) { + return withServer(host, port, null); } /** - * Set the port parameter. Attempts to handle input in both - * Number or String input. + * Define connection details for connecting to a single Redis server. * - * @param port Port value. - * @return Builder instance. - * @throws IllegalArgumentException if passed a non-number representation value. + * NOTE: If you want to connect to a RedisCluster, {@link Builder#withClusterNode}. + * + * @param host Host of redis server to connect to. + * @param port Port of redis server to connect to. + * @param password (optional) Password for redis server, or NULL if no password required. + * @return Builder. */ - public Builder withPort(final Object port) { - Objects.requireNonNull(port); - if (port instanceof Number) { - return withPort(((Number) port).intValue()); - } else if (port instanceof String) { - return withPort(Integer.parseInt((String) port)); - } - throw new IllegalArgumentException("Port must be a Number!"); + public Builder withServer(final String host, final int port, final String password) { + return withServer(new RedisServer(host, port, password)); } - public Builder withPort(final int port) { - this.port = port; + /** + * Define connection details for connecting to a single Redis server. + * + * NOTE: If you want to connect to a RedisCluster, {@link Builder#withClusterNode}. + * + * @param redisServer Defines a redis server to connect to. + * @return Builder. + */ + private Builder withServer(final RedisServer redisServer) { + if (!clusterNodes.isEmpty()) { + // Cannot define both cluster servers and redis server instances. + throw new IllegalStateException( + "You cannot configure connection details for both a single Redis server and RedisCluster. " + + "Use either Builder.withServer() OR Builder.withClusterNode(), but NOT both. " + + "If talking to a single Redis instance use Builder.withServer(). " + + "If talking to a RedisCluster use Builder.withClusterNode() to configure one or more nodes in the cluster." + ); + } + this.redisServer = Objects.requireNonNull(redisServer); return this; } - public Builder withPassword(final String password) { - this.password = password; + /** + * Define connection details for connecting to a RedisCluster. + * Call this method as many times as needed to add nodes in your cluster. + * + * NOTE: If you want to connect to a single redis instance, {@link Builder#withServer}. + * + * @param host Host of redis node. + * @param port Port of redis node. + * @return Builder. + */ + public Builder withClusterNode(final String host, final int port) { + return withClusterNode(host, port, null); + } + + /** + * Define connection details for connecting to a RedisCluster. + * Call this method as many times as needed to add nodes in your cluster. + * + * NOTE: If you want to connect to a single redis instance, {@link Builder#withServer}. + * + * @param host Host of redis node. + * @param port Port of redis node. + * @param password (optional) Password for redis node, or NULL if no password required. + * @return Builder. + */ + public Builder withClusterNode(final String host, final int port, final String password) { + return withClusterNode(new RedisServer(host, port, password)); + } + + /** + * Define connection details for connecting to a RedisCluster. + * Call this method as many times as needed to add nodes in your cluster. + * + * NOTE: If you want to connect to a single redis instance, {@link Builder#withServer}. + * + * @param node Defines a node in the RedisCluster. + * @return Builder. + */ + private Builder withClusterNode(final RedisServer node) { + if (redisServer != null) { + // Cannot define both cluster servers and redis server instances. + throw new IllegalStateException( + "You cannot configure connection details for both a single Redis server and RedisCluster. " + + "Use either Builder.withServer() OR Builder.withClusterNode(), but NOT both. " + + "If talking to a single Redis instance use Builder.withServer(). " + + "If talking to a RedisCluster use Builder.withClusterNode() to configure one or more nodes in the cluster." + ); + } + clusterNodes.add(Objects.requireNonNull(node)); return this; } @@ -320,9 +390,15 @@ public Builder withMetricsEnabled(final boolean enabled) { * @return Configuration instance. */ public RedisStreamSpoutConfig build() { + RedisCluster redisCluster = null; + if (!clusterNodes.isEmpty()) { + redisCluster = new RedisCluster(clusterNodes); + } + return new RedisStreamSpoutConfig( // Redis connection properties - host, port, password, + redisServer, redisCluster, + // Consumer Properties streamKey, groupName, consumerIdPrefix, // Classes @@ -333,4 +409,106 @@ public RedisStreamSpoutConfig build() { ); } } + + /** + * Defines a RedisCluster connection details. + */ + public static class RedisCluster { + private final List servers; + + /** + * Constructor. + * @param servers One or more Nodes in the RedisCluster. + */ + public RedisCluster(final List servers) { + Objects.requireNonNull(servers); + this.servers = Collections.unmodifiableList(new ArrayList<>(servers)); + } + + public List getServers() { + return servers; + } + + @Override + public String toString() { + return "RedisCluster{" + + "servers=" + servers + + '}'; + } + + /** + * The URI for connecting to this RedisCluster. + * @return URI for the cluster. + */ + public String getConnectString() { + return getServers().stream() + .map(RedisServer::getConnectString) + .collect(Collectors.joining(",")); + } + } + + /** + * Defines a Single RedisServer instance connection details. + */ + public static class RedisServer { + private final String host; + private final int port; + private final String password; + + /** + * Constructor. + * @param host hostname of redis server. + * @param port port of redis server. + */ + public RedisServer(final String host, final int port) { + this(host, port, null); + } + + /** + * Constructor. + * @param host hostname of redis server. + * @param port port of redis server. + * @param password (optional) password for server, or NULL if not required. + */ + public RedisServer(final String host, final int port, final String password) { + this.host = host; + this.port = port; + this.password = password; + } + + public String getHost() { + return host; + } + + public int getPort() { + return port; + } + + public String getPassword() { + return password; + } + + /** + * The URI for connecting to this Redis Server instance. + * @return URI for the server. + */ + public String getConnectString() { + String connectStr = "redis://"; + + if (getPassword() != null && !getPassword().trim().isEmpty()) { + connectStr += getPassword() + "@"; + } + connectStr += getHost() + ":" + getPort(); + + return connectStr; + } + + @Override + public String toString() { + return "RedisServer{" + + "host='" + host + '\'' + + ", port=" + port + + '}'; + } + } } diff --git a/src/main/java/org/sourcelab/storm/spout/redis/client/LettuceAdapter.java b/src/main/java/org/sourcelab/storm/spout/redis/client/LettuceAdapter.java new file mode 100644 index 0000000..1ea7804 --- /dev/null +++ b/src/main/java/org/sourcelab/storm/spout/redis/client/LettuceAdapter.java @@ -0,0 +1,30 @@ +package org.sourcelab.storm.spout.redis.client; + +import io.lettuce.core.api.sync.RedisStreamCommands; + +/** + * Adapter to allow usage of both RedisClient and RedisClusterClient. + */ +public interface LettuceAdapter { + + /** + * Is the underlying client connected? + * @return true if connected, false if not. + */ + boolean isConnected(); + + /** + * Call connect. + */ + void connect(); + + /** + * Get sync Redis Stream Commands instance. + */ + RedisStreamCommands getSyncCommands(); + + /** + * Call shutdown. + */ + void shutdown(); +} diff --git a/src/main/java/org/sourcelab/storm/spout/redis/client/LettuceClient.java b/src/main/java/org/sourcelab/storm/spout/redis/client/LettuceClient.java index 4cbc425..44049d8 100644 --- a/src/main/java/org/sourcelab/storm/spout/redis/client/LettuceClient.java +++ b/src/main/java/org/sourcelab/storm/spout/redis/client/LettuceClient.java @@ -7,8 +7,7 @@ import io.lettuce.core.StreamMessage; import io.lettuce.core.XGroupCreateArgs; import io.lettuce.core.XReadArgs; -import io.lettuce.core.api.StatefulRedisConnection; -import io.lettuce.core.api.sync.RedisCommands; +import io.lettuce.core.cluster.RedisClusterClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.sourcelab.storm.spout.redis.Message; @@ -38,13 +37,7 @@ public class LettuceClient implements Client { /** * The underlying Redis Client. */ - private final RedisClient redisClient; - - /** - * Underlying connection objects. - */ - private StatefulRedisConnection connection; - private RedisCommands syncCommands; + private final LettuceAdapter adapter; /** * Re-usable instance to prevent unnecessary garbage creation. @@ -62,7 +55,11 @@ public LettuceClient(final RedisStreamSpoutConfig config, final int instanceId) this( config, instanceId, - RedisClient.create(config.getConnectString()) + // Determine which adapter to use based on what type of redis instance we are + // communicating with. + config.isConnectingToCluster() + ? new LettuceClusterAdapter(RedisClusterClient.create(config.getConnectString())) + : new LettuceRedisAdapter(RedisClient.create(config.getConnectString())) ); } @@ -70,11 +67,11 @@ public LettuceClient(final RedisStreamSpoutConfig config, final int instanceId) * Protected constructor for injecting a RedisClient instance, typically for tests. * @param config Configuration. * @param instanceId Which instance number is this running under. - * @param redisClient RedisClient instance. + * @param adapter RedisClient instance. */ - LettuceClient(final RedisStreamSpoutConfig config, final int instanceId, final RedisClient redisClient) { + LettuceClient(final RedisStreamSpoutConfig config, final int instanceId, final LettuceAdapter adapter) { this.config = Objects.requireNonNull(config); - this.redisClient = Objects.requireNonNull(redisClient); + this.adapter = Objects.requireNonNull(adapter); // Calculate consumerId this.consumerId = config.getConsumerIdPrefix() + instanceId; @@ -95,17 +92,15 @@ public LettuceClient(final RedisStreamSpoutConfig config, final int instanceId) @Override public void connect() { - if (connection != null) { + if (adapter.isConnected()) { throw new IllegalStateException("Cannot call connect more than once!"); } - // Connect - connection = redisClient.connect(); - syncCommands = connection.sync(); + adapter.connect(); try { // Attempt to create consumer group - syncCommands.xgroupCreate( + adapter.getSyncCommands().xgroupCreate( // Start the group at first offset for our key. XReadArgs.StreamOffset.from(config.getStreamKey(), "0-0"), // Define the group name @@ -133,7 +128,7 @@ public void connect() { @Override public List nextMessages() { // Get next batch of messages. - final List> messages = syncCommands.xreadgroup( + final List> messages = adapter.getSyncCommands().xreadgroup( consumerFrom, xreadArgs, lastConsumed @@ -149,7 +144,7 @@ public List nextMessages() { @Override public void commitMessage(final String msgId) { // Confirm that the message has been processed using XACK - syncCommands.xack( + adapter.getSyncCommands().xack( config.getStreamKey(), config.getGroupName(), msgId @@ -158,11 +153,6 @@ public void commitMessage(final String msgId) { @Override public void disconnect() { - // Close our connection and shutdown. - if (connection != null) { - connection.close(); - connection = null; - } - redisClient.shutdown(); + adapter.shutdown(); } } diff --git a/src/main/java/org/sourcelab/storm/spout/redis/client/LettuceClusterAdapter.java b/src/main/java/org/sourcelab/storm/spout/redis/client/LettuceClusterAdapter.java new file mode 100644 index 0000000..21e178d --- /dev/null +++ b/src/main/java/org/sourcelab/storm/spout/redis/client/LettuceClusterAdapter.java @@ -0,0 +1,60 @@ +package org.sourcelab.storm.spout.redis.client; + +import io.lettuce.core.api.sync.RedisStreamCommands; +import io.lettuce.core.cluster.RedisClusterClient; +import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; + +import java.util.Objects; + +/** + * Adapter for talking to a RedisCluster. + * If you need to talk to a single Redis instance {@link LettuceRedisAdapter}. + */ +public class LettuceClusterAdapter implements LettuceAdapter { + /** + * The underlying Redis Client. + */ + private final RedisClusterClient redisClient; + + /** + * Underlying connection objects. + */ + private StatefulRedisClusterConnection connection; + private RedisStreamCommands syncCommands; + + public LettuceClusterAdapter(final RedisClusterClient redisClient) { + this.redisClient = Objects.requireNonNull(redisClient); + } + + @Override + public boolean isConnected() { + return connection != null; + } + + @Override + public void connect() { + if (isConnected()) { + throw new IllegalStateException("Cannot call connect more than once!"); + } + connection = redisClient.connect(); + } + + @Override + public RedisStreamCommands getSyncCommands() { + if (syncCommands == null) { + syncCommands = connection.sync(); + } + return syncCommands; + } + + @Override + public void shutdown() { + // Close our connection and shutdown. + if (connection != null) { + syncCommands = null; + connection.close(); + connection = null; + } + redisClient.shutdown(); + } +} diff --git a/src/main/java/org/sourcelab/storm/spout/redis/client/LettuceRedisAdapter.java b/src/main/java/org/sourcelab/storm/spout/redis/client/LettuceRedisAdapter.java new file mode 100644 index 0000000..9fdbddf --- /dev/null +++ b/src/main/java/org/sourcelab/storm/spout/redis/client/LettuceRedisAdapter.java @@ -0,0 +1,61 @@ +package org.sourcelab.storm.spout.redis.client; + +import io.lettuce.core.RedisClient; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.sync.RedisStreamCommands; + +import java.util.Objects; + +/** + * Adapter for talking to a single Redis instance. + * If you need to talk to a RedisCluster {@link LettuceClusterAdapter}. + */ +public class LettuceRedisAdapter implements LettuceAdapter { + + /** + * The underlying Redis Client. + */ + private final RedisClient redisClient; + + /** + * Underlying connection objects. + */ + private StatefulRedisConnection connection; + private RedisStreamCommands syncCommands; + + public LettuceRedisAdapter(final RedisClient redisClient) { + this.redisClient = Objects.requireNonNull(redisClient); + } + + @Override + public boolean isConnected() { + return connection != null; + } + + @Override + public void connect() { + if (isConnected()) { + throw new IllegalStateException("Cannot call connect more than once!"); + } + connection = redisClient.connect(); + } + + @Override + public RedisStreamCommands getSyncCommands() { + if (syncCommands == null) { + syncCommands = connection.sync(); + } + return syncCommands; + } + + @Override + public void shutdown() { + // Close our connection and shutdown. + if (connection != null) { + syncCommands = null; + connection.close(); + connection = null; + } + redisClient.shutdown(); + } +} diff --git a/src/test/java/org/sourcelab/storm/spout/redis/AbstractRedisStreamSpoutIntegrationTest.java b/src/test/java/org/sourcelab/storm/spout/redis/AbstractRedisStreamSpoutIntegrationTest.java new file mode 100644 index 0000000..13a371b --- /dev/null +++ b/src/test/java/org/sourcelab/storm/spout/redis/AbstractRedisStreamSpoutIntegrationTest.java @@ -0,0 +1,619 @@ +package org.sourcelab.storm.spout.redis; + +import org.apache.storm.generated.StreamInfo; +import org.apache.storm.spout.ISpout; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichSpout; +import org.apache.storm.topology.OutputFieldsGetter; +import org.apache.storm.tuple.Fields; +import org.apache.storm.utils.Utils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.sourcelab.storm.spout.redis.example.TestTupleConverter; +import org.sourcelab.storm.spout.redis.failhandler.RetryFailedTuples; +import org.sourcelab.storm.spout.redis.util.outputcollector.EmittedTuple; +import org.sourcelab.storm.spout.redis.util.outputcollector.StubSpoutCollector; +import org.sourcelab.storm.spout.redis.util.test.RedisTestContainer; +import org.sourcelab.storm.spout.redis.util.test.RedisTestHelper; +import org.sourcelab.storm.spout.redis.util.test.StreamConsumerInfo; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +/** + * Abstract Integration test. Meant to allow for defining shared test cases that can be validated + * against both a Redis instance as well as RedisCluster instance. + */ +abstract class AbstractRedisStreamSpoutIntegrationTest { + + /** + * @return The Appropriate RedisTestContainer instance. + */ + abstract RedisTestContainer getTestContainer(); + + // Configuration values + private static final String GROUP_NAME = "MyGroupName"; + private static final String CONSUMER_ID_PREFIX = "ConsumerIdPrefix"; + private static final String CONSUMER_ID = CONSUMER_ID_PREFIX + "2"; + + private final Map stormConfig = Collections.emptyMap(); + + private RedisTestHelper redisTestHelper; + private RedisStreamSpoutConfig.Builder configBuilder; + private String streamKey; + + // Mocks + private TopologyContext mockTopologyContext; + + @BeforeEach + void setup() { + // Generate a random stream key + streamKey = "MyStreamKey" + System.currentTimeMillis(); + + // Create config + configBuilder = RedisStreamSpoutConfig.newBuilder() + // Consumer Properties + .withGroupName(GROUP_NAME) + .withConsumerIdPrefix(CONSUMER_ID_PREFIX) + .withStreamKey(streamKey) + // Failure Handler + .withNoRetryFailureHandler() + // Tuple Handler Class + .withTupleConverter(new TestTupleConverter("timestamp", "value")); + + // Set Connection Properties + getTestContainer().addConnectionDetailsToConfig(configBuilder); + + // Setup mock + mockTopologyContext = mock(TopologyContext.class); + when(mockTopologyContext.getThisTaskIndex()) + .thenReturn(2); + + // Create test helper + redisTestHelper = getTestContainer().getRedisTestHelper(); + } + + @AfterEach + void cleanup() { + // Verify standard metric interactions + verify(mockTopologyContext, times(3)) + .registerGauge(anyString(), any()); + + // Verify all mock interactions accounted for + verifyNoMoreInteractions(mockTopologyContext); + } + + /** + * Most basic lifecycle smoke test. + */ + @Test + void smokeTest_openAndClose() { + // Create spout + try (final RedisStreamSpout spout = new RedisStreamSpout(configBuilder.build())) { + + final StubSpoutCollector collector = new StubSpoutCollector(); + + // Open spout + spout.open(stormConfig, mockTopologyContext, new SpoutOutputCollector(collector)); + + // Close spout via autocloseable + } + + // Verify mocks + verify(mockTopologyContext, times(1)).getThisTaskIndex(); + } + + /** + * Basic lifecycle smoke test. + */ + @Test + void smokeTest_openActivateDeactivateAndClose() throws InterruptedException { + // Create spout + try (final RedisStreamSpout spout = new RedisStreamSpout(configBuilder.build())) { + final StubSpoutCollector collector = new StubSpoutCollector(); + + // Open spout + spout.open(stormConfig, mockTopologyContext, new SpoutOutputCollector(collector)); + + // activate spout + spout.activate(); + + // Small sleep + Thread.sleep(3000L); + + // Deactivate and close via Autoclosable + spout.deactivate(); + } + + // Verify mocks + verify(mockTopologyContext, times(1)).getThisTaskIndex(); + } + + /** + * Verifies the behavior when you attempt to connect to a redis instance + * that does not exist. Looks like nothing. You get errors in the logs. + * + * Disabled for now. + */ + void smokeTest_configureInvalidRedisHost() throws InterruptedException { + // Lets override the redis host with something invalid + configBuilder + .withServer(getTestContainer().getHost(), 124); + + // Create spout + try (final RedisStreamSpout spout = new RedisStreamSpout(configBuilder.build())) { + final StubSpoutCollector collector = new StubSpoutCollector(); + + // Open spout + spout.open(stormConfig, mockTopologyContext, new SpoutOutputCollector(collector)); + + // activate spout + spout.activate(); + + // Small sleep + Thread.sleep(3000L); + + // Deactivate (noop) + spout.deactivate(); + + // Lets try calling activate one more time + spout.activate(); + spout.deactivate(); + + // Deactivate and close via Autocloseable + spout.deactivate(); + } + + // Verify mocks + verify(mockTopologyContext, times(1)).getThisTaskIndex(); + } + + /** + * Basic usage test. + */ + @Test + void smokeTest_consumeAndAckMessages() throws InterruptedException { + // Create spout + try (final RedisStreamSpout spout = new RedisStreamSpout(configBuilder.build())) { + final StubSpoutCollector collector = new StubSpoutCollector(); + + // Open spout + spout.open(stormConfig, mockTopologyContext, new SpoutOutputCollector(collector)); + + // activate spout + spout.activate(); + + // Lets publish 10 messages to the stream + final List producedMsgIds = redisTestHelper.produceMessages(streamKey, 10); + + // Now lets try to get those from the spout + do { + spout.nextTuple(); + Thread.sleep(100L); + } while (collector.getEmittedTuples().size() < 10); + + // Call next tuple a few more times, should be a no-op + for (int counter = 0; counter < 10; counter++) { + Thread.sleep(100L); + spout.nextTuple(); + } + + // Verify what got emitted. + assertEquals(10, collector.getEmittedTuples().size(), "Should have found 10 emitted tuples."); + + final String expectedStreamId = Utils.DEFAULT_STREAM_ID; + for (int index = 0; index < producedMsgIds.size(); index++) { + final EmittedTuple emittedTuple = collector.getEmittedTuples().get(index); + + // Verify message Id. + assertEquals(producedMsgIds.get(index), emittedTuple.getMessageId()); + + // Verify Stream Id + assertEquals(expectedStreamId, emittedTuple.getStreamId()); + + // Verify tuple value + assertEquals(3, emittedTuple.getTuple().size(), "Should have 3 values"); + + // Look for value + final String expectedValue = "value" + index; + boolean foundValue = emittedTuple.getTuple().stream() + .anyMatch((entry) -> entry.equals(expectedValue)); + assertTrue(foundValue, "Failed to find key tuple value"); + + final String expectedMsgIdValue = producedMsgIds.get(index); + foundValue = emittedTuple.getTuple().stream() + .anyMatch((entry) -> entry.equals(expectedMsgIdValue)); + assertTrue(foundValue, "Failed to find msgId tuple value"); + } + + // See that we have 10 items pending + StreamConsumerInfo consumerInfo = redisTestHelper.getConsumerInfo(streamKey, GROUP_NAME, CONSUMER_ID); + assertNotNull(consumerInfo, "Failed to find consumer info!"); + + // Verify we have 10 items pending + assertEquals(10L, consumerInfo.getPending(), "Found entries pending"); + + // Now Ack the messages + collector.getEmittedTuples().stream() + .map(EmittedTuple::getMessageId) + .forEach(spout::ack); + + // Small delay waiting for processing. + Thread.sleep(1000L); + + // Verify that our message were acked in redis. + consumerInfo = redisTestHelper.getConsumerInfo(streamKey, GROUP_NAME, CONSUMER_ID); + assertNotNull(consumerInfo, "Failed to find consumer info!"); + + // Verify we have nothing pending + assertEquals(0L, consumerInfo.getPending(), "Found entries pending?"); + + // Deactivate and close via Autocloseable + spout.deactivate(); + } + + // Verify mocks + verify(mockTopologyContext, times(1)).getThisTaskIndex(); + } + + /** + * Basic usage with retry failure handler. + */ + @Test + void smokeTest_consumeFailAndAckMessages() throws InterruptedException { + // Swap out failure handler + configBuilder.withFailureHandler(new RetryFailedTuples(2)); + + // Create spout + try (final RedisStreamSpout spout = new RedisStreamSpout(configBuilder.build())) { + final StubSpoutCollector collector = new StubSpoutCollector(); + + // Open spout + spout.open(stormConfig, mockTopologyContext, new SpoutOutputCollector(collector)); + + // activate spout + spout.activate(); + + // Lets publish 10 messages to the stream + List producedMsgIds = redisTestHelper.produceMessages(streamKey, 10); + + // Now lets try to get 5 of those those from the spout... + do { + spout.nextTuple(); + Thread.sleep(100L); + } while (collector.getEmittedTuples().size() < 5); + + + // Verify what got emitted. + assertEquals(5, collector.getEmittedTuples().size(), "Should have found 10 emitted tuples."); + + final String expectedStreamId = Utils.DEFAULT_STREAM_ID; + for (int index = 0; index < 5; index++) { + final EmittedTuple emittedTuple = collector.getEmittedTuples().get(index); + + // Verify message Id. + assertEquals(producedMsgIds.get(index), emittedTuple.getMessageId()); + + // Verify Stream Id + assertEquals(expectedStreamId, emittedTuple.getStreamId()); + + // Verify tuple value + assertEquals(3, emittedTuple.getTuple().size(), "Should have 3 values"); + + // Look for value + final String expectedValue = "value" + index; + boolean foundValue = emittedTuple.getTuple().stream() + .anyMatch((entry) -> entry.equals(expectedValue)); + assertTrue(foundValue, "Failed to find key tuple value"); + + final String expectedMsgIdValue = producedMsgIds.get(index); + foundValue = emittedTuple.getTuple().stream() + .anyMatch((entry) -> entry.equals(expectedMsgIdValue)); + assertTrue(foundValue, "Failed to find msgId tuple value"); + } + + // See that we have 10 items pending + StreamConsumerInfo consumerInfo = redisTestHelper.getConsumerInfo(streamKey, GROUP_NAME, CONSUMER_ID); + assertNotNull(consumerInfo, "Failed to find consumer info!"); + assertEquals(10L, consumerInfo.getPending(), "Found entries pending"); + + final List messageIdsToFail = new ArrayList<>(); + + for (int index = 0; index < 5; index++) { + // Now ack the first 3 messages + if (index < 3) { + spout.ack( + collector.getEmittedTuples().get(index).getMessageId() + ); + } else { + // Fail the remaining two + messageIdsToFail.add((String) collector.getEmittedTuples().get(index).getMessageId()); + spout.fail( + collector.getEmittedTuples().get(index).getMessageId() + ); + } + } + + // And reset our collector + collector.reset(); + + // Small delay waiting for processing. + Thread.sleep(1000L); + + // Verify that our message were acked in redis. + consumerInfo = redisTestHelper.getConsumerInfo(streamKey, GROUP_NAME, CONSUMER_ID); + assertNotNull(consumerInfo, "Failed to find consumer info!"); + + // Verify we have 7 pending + assertEquals(7L, consumerInfo.getPending(), "Found entries pending"); + + // Ask for the next two tuples, we should get our failed tuples back out. + do { + spout.nextTuple(); + } while (collector.getEmittedTuples().size() < 2); + + // We should have emitted two tuples. + assertEquals(2, collector.getEmittedTuples().size()); + assertEquals(messageIdsToFail.get(0), collector.getEmittedTuples().get(0).getMessageId()); + assertEquals(messageIdsToFail.get(1), collector.getEmittedTuples().get(1).getMessageId()); + + // Ack them + spout.ack(messageIdsToFail.get(0)); + spout.ack(messageIdsToFail.get(1)); + + // Small delay waiting for processing. + Thread.sleep(1000L); + + // Verify that our message were acked in redis. + consumerInfo = redisTestHelper.getConsumerInfo(streamKey, GROUP_NAME, CONSUMER_ID); + assertNotNull(consumerInfo, "Failed to find consumer info!"); + + // Verify we have 5 pending + assertEquals(5L, consumerInfo.getPending(), "Found entries pending"); + + // Deactivate and close via Autocloseable + spout.deactivate(); + } + + // Verify mocks + verify(mockTopologyContext, times(1)).getThisTaskIndex(); + } + + /** + * Verify declareOutputFields using TestTupleConverter. + */ + @Test + void test_declareOutputFields() { + // Create a test implementation + final TupleConverter converter = new DummyTupleConverter() ; + + // Update config + configBuilder.withTupleConverter(converter); + + // Create spout + try (final RedisStreamSpout spout = new RedisStreamSpout(configBuilder.build())) { + final StubSpoutCollector collector = new StubSpoutCollector(); + + // Open spout and activate. + spout.open(stormConfig, mockTopologyContext, new SpoutOutputCollector(collector)); + spout.activate(); + + // Publish 9 records to redis. + redisTestHelper.produceMessages(streamKey, 9); + + // Pull via spout + do { + spout.nextTuple(); + } while (collector.getEmittedTuples().size() < 9); + + // We should have emitted 9 tuples. + assertEquals(9, collector.getEmittedTuples().size()); + + // Make sure each tuple went out on the correct stream + for (int index = 0; index < 9; index++) { + final String expectedStream = "stream" + ((index % 3) + 1); + final EmittedTuple emittedTuple = collector.getEmittedTuples().get(index); + + // Verify stream + assertEquals(expectedStream, emittedTuple.getStreamId()); + } + + // Deactivate and close via Autocloseable + spout.deactivate(); + } + + // Verify mocks + verify(mockTopologyContext, times(1)).getThisTaskIndex(); + } + + /** + * Verify spout emits tuples down the correct stream. + */ + @Test + void test_EmitDownSeparateStreams() { + // Create a test implementation + final TupleConverter converter = new DummyTupleConverter() ; + + // Update config + configBuilder.withTupleConverter(converter); + + // Create spout + try (final RedisStreamSpout spout = new RedisStreamSpout(configBuilder.build())) { + final StubSpoutCollector collector = new StubSpoutCollector(); + + // Open spout + spout.open(stormConfig, mockTopologyContext, new SpoutOutputCollector(collector)); + + // Ask for stream names + final OutputFieldsGetter getter = new OutputFieldsGetter(); + spout.declareOutputFields(getter); + + // Validate + final Map entries = getter.getFieldsDeclaration(); + assertEquals(3, entries.size(), "Should have 3 entries"); + + // Verify Stream1 + assertTrue(entries.containsKey("stream1"), "should have entry for 'stream1'"); + StreamInfo info = entries.get("stream1"); + assertEquals(3, info.get_output_fields().size(), "Should have 3 fields"); + assertEquals("field_a", info.get_output_fields().get(0)); + assertEquals("field_b", info.get_output_fields().get(1)); + assertEquals("field_c", info.get_output_fields().get(2)); + + // Verify Stream2 + assertTrue(entries.containsKey("stream2"), "should have entry for 'stream2'"); + info = entries.get("stream2"); + assertEquals(3, info.get_output_fields().size(), "Should have 3 fields"); + assertEquals("field_d", info.get_output_fields().get(0)); + assertEquals("field_e", info.get_output_fields().get(1)); + assertEquals("field_f", info.get_output_fields().get(2)); + + // Verify Stream3 + assertTrue(entries.containsKey("stream3"), "should have entry for 'stream3'"); + info = entries.get("stream3"); + assertEquals(3, info.get_output_fields().size(), "Should have 3 fields"); + assertEquals("field_g", info.get_output_fields().get(0)); + assertEquals("field_h", info.get_output_fields().get(1)); + assertEquals("field_i", info.get_output_fields().get(2)); + + // Deactivate and close via Autocloseable + spout.deactivate(); + } + + // Verify mocks + verify(mockTopologyContext, times(1)).getThisTaskIndex(); + } + + /** + * Verify if tuple converter instance returns null, then the message + * is simply acked and nothing is emitted. + */ + @Test + void test_NullConversionJustGetsAckedNothingEmitted() { + // Create a test implementation + final TupleConverter converter = new NullTupleConverter() ; + + // Update config + configBuilder.withTupleConverter(converter); + + // Create spout + try (final RedisStreamSpout spout = new RedisStreamSpout(configBuilder.build())) { + final StubSpoutCollector collector = new StubSpoutCollector(); + + // Open spout and activate + spout.open(stormConfig, mockTopologyContext, new SpoutOutputCollector(collector)); + spout.activate(); + + // Publish 10 records to redis. + redisTestHelper.produceMessages(streamKey, 10); + + // Attempt to pull via spout. + // We expect to get nothing. + final long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10); + do { + spout.nextTuple(); + } while (System.currentTimeMillis() < endTime); + + // We should have emitted 0 tuples. + assertEquals(0, collector.getEmittedTuples().size()); + + // Verify that all are showing as acked in redis. + // See that we have 10 items pending + StreamConsumerInfo consumerInfo = redisTestHelper.getConsumerInfo(streamKey, GROUP_NAME, CONSUMER_ID); + assertNotNull(consumerInfo, "Failed to find consumer info!"); + + // Verify we have 0 items pending + assertEquals(0L, consumerInfo.getPending(), "Found entries pending"); + + // Deactivate and close via Autocloseable + spout.deactivate(); + } + + // Verify mocks + verify(mockTopologyContext, times(1)).getThisTaskIndex(); + } + + /** + * Dummy Implementation for tests. + */ + private static class DummyTupleConverter implements TupleConverter { + private final String[] streams = new String[]{"stream1", "stream2", "stream3"}; + + private int counter = 0; + + @Override + public TupleValue createTuple(final Message message) { + final String streamName; + switch (counter) { + case 0: + streamName = "stream1"; + break; + case 1: + streamName = "stream2"; + break; + default: + streamName = "stream3"; + break; + } + // Increment counter + counter = (counter + 1) % 3; + + final List values = new ArrayList<>(); + values.add("value1"); + values.add("value2"); + values.add("value3"); + + return new TupleValue(values, streamName); + } + + @Override + public Fields getFieldsFor(final String stream) { + if ("stream1".equals(stream)) { + return new Fields("field_a", "field_b", "field_c"); + } else if ("stream2".equals(stream)) { + return new Fields("field_d", "field_e", "field_f"); + } else if ("stream3".equals(stream)) { + return new Fields("field_g", "field_h", "field_i"); + } + throw new IllegalArgumentException("Unknow stream " + stream); + } + + @Override + public List streams() { + return Arrays.asList(streams); + } + } + + /** + * Implementation that always returns null. + */ + private static class NullTupleConverter implements TupleConverter { + + @Override + public TupleValue createTuple(final Message message) { + return null; + } + + @Override + public Fields getFieldsFor(final String stream) { + return new Fields("value"); + } + } +} diff --git a/src/test/java/org/sourcelab/storm/spout/redis/RedisStreamSpoutConfigTest.java b/src/test/java/org/sourcelab/storm/spout/redis/RedisStreamSpoutConfigTest.java new file mode 100644 index 0000000..b38f516 --- /dev/null +++ b/src/test/java/org/sourcelab/storm/spout/redis/RedisStreamSpoutConfigTest.java @@ -0,0 +1,31 @@ +package org.sourcelab.storm.spout.redis; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertThrows; + +class RedisStreamSpoutConfigTest { + + /** + * Verifies if you add both Redis servers and RedisCluster nodes it + * will throw an exception. + */ + @Test + void verify_cannotAddBothClusterAndServerEntries() { + final RedisStreamSpoutConfig.Builder builder = RedisStreamSpoutConfig.newBuilder() + .withServer("hostname", 123); + + assertThrows(IllegalStateException.class, () -> builder.withClusterNode("clusterhost", 323)); + } + + /** + * Verifies if you add both Redis servers and RedisCluster nodes it + * will throw an exception. + */ + @Test + void verify_cannotAddBothServerEntriesAndClusterNodes() { + final RedisStreamSpoutConfig.Builder builder = RedisStreamSpoutConfig.newBuilder() + .withClusterNode("clusterhost", 323); + assertThrows(IllegalStateException.class, () -> builder.withServer("host", 123)); + } +} \ No newline at end of file diff --git a/src/test/java/org/sourcelab/storm/spout/redis/RedisStreamSpoutIntegrationTest.java b/src/test/java/org/sourcelab/storm/spout/redis/RedisStreamSpoutIntegrationTest.java deleted file mode 100644 index 62cd6c5..0000000 --- a/src/test/java/org/sourcelab/storm/spout/redis/RedisStreamSpoutIntegrationTest.java +++ /dev/null @@ -1,623 +0,0 @@ -package org.sourcelab.storm.spout.redis; - -import org.apache.storm.generated.StreamInfo; -import org.apache.storm.spout.ISpout; -import org.apache.storm.spout.SpoutOutputCollector; -import org.apache.storm.task.TopologyContext; -import org.apache.storm.topology.IRichSpout; -import org.apache.storm.topology.OutputFieldsGetter; -import org.apache.storm.tuple.Fields; -import org.apache.storm.utils.Utils; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; -import org.sourcelab.storm.spout.redis.example.TestTupleConverter; -import org.sourcelab.storm.spout.redis.failhandler.RetryFailedTuples; -import org.sourcelab.storm.spout.redis.util.outputcollector.EmittedTuple; -import org.sourcelab.storm.spout.redis.util.outputcollector.StubSpoutCollector; -import org.sourcelab.storm.spout.redis.util.test.RedisTestHelper; -import org.sourcelab.storm.spout.redis.util.test.StreamConsumerInfo; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; - -/** - * Integration Test over Spout. - */ -@Testcontainers -@Tag("Integration") -class RedisStreamSpoutIntegrationTest { - /** - * This test depends on the following Redis Container. - */ - @Container - public GenericContainer redis = new GenericContainer<>("redis:5.0.3-alpine") - .withExposedPorts(6379); - - // Configuration values - private static final String GROUP_NAME = "MyGroupName"; - private static final String CONSUMER_ID_PREFIX = "ConsumerIdPrefix"; - private static final String CONSUMER_ID = CONSUMER_ID_PREFIX + "2"; - - private final Map stormConfig = Collections.emptyMap(); - - private RedisTestHelper redisTestHelper; - private RedisStreamSpoutConfig.Builder configBuilder; - private String streamKey; - - // Mocks - private TopologyContext mockTopologyContext; - - @BeforeEach - void setup() { - // Generate a random stream key - streamKey = "MyStreamKey" + System.currentTimeMillis(); - - // Create config - configBuilder = RedisStreamSpoutConfig.newBuilder() - // Set Connection Properties - .withHost(redis.getHost()) - .withPort(redis.getFirstMappedPort()) - // Consumer Properties - .withGroupName(GROUP_NAME) - .withConsumerIdPrefix(CONSUMER_ID_PREFIX) - .withStreamKey(streamKey) - // Failure Handler - .withNoRetryFailureHandler() - // Tuple Handler Class - .withTupleConverter(new TestTupleConverter("timestamp", "value")); - - // Setup mock - mockTopologyContext = mock(TopologyContext.class); - when(mockTopologyContext.getThisTaskIndex()) - .thenReturn(2); - - // Create test helper - redisTestHelper = new RedisTestHelper("redis://" + redis.getHost() + ":" + redis.getFirstMappedPort()); - } - - @AfterEach - void cleanup() { - // Verify standard metric interactions - verify(mockTopologyContext, times(3)) - .registerGauge(anyString(), any()); - - // Verify all mock interactions accounted for - verifyNoMoreInteractions(mockTopologyContext); - } - - /** - * Most basic lifecycle smoke test. - */ - @Test - void smokeTest_openAndClose() { - // Create spout - final ISpout spout = new RedisStreamSpout(configBuilder.build()); - final StubSpoutCollector collector = new StubSpoutCollector(); - - // Open spout - spout.open(stormConfig, mockTopologyContext, new SpoutOutputCollector(collector)); - - // Close spout - spout.close(); - - // Verify mocks - verify(mockTopologyContext, times(1)).getThisTaskIndex(); - } - - /** - * Basic lifecycle smoke test. - */ - @Test - void smokeTest_openActivateDeactivateAndClose() throws InterruptedException { - // Create spout - final ISpout spout = new RedisStreamSpout(configBuilder.build()); - final StubSpoutCollector collector = new StubSpoutCollector(); - - // Open spout - spout.open(stormConfig, mockTopologyContext, new SpoutOutputCollector(collector)); - - // activate spout - spout.activate(); - - // Small sleep - Thread.sleep(3000L); - - // Deactivate (noop) - spout.deactivate(); - - // Close spout - spout.close(); - - // Verify mocks - verify(mockTopologyContext, times(1)).getThisTaskIndex(); - } - - /** - * Verifies the behavior when you attempt to connect to a redis instance - * that does not exist. Looks like nothing. You get errors in the logs. - * - * Disabled for now. - */ - void smokeTest_configureInvalidRedisHost() throws InterruptedException { - // Lets override the redis host with something invalid - configBuilder.withPort(1234); - - // Create spout - final ISpout spout = new RedisStreamSpout(configBuilder.build()); - final StubSpoutCollector collector = new StubSpoutCollector(); - - // Open spout - spout.open(stormConfig, mockTopologyContext, new SpoutOutputCollector(collector)); - - // activate spout - spout.activate(); - - // Small sleep - Thread.sleep(3000L); - - // Deactivate (noop) - spout.deactivate(); - - // Lets try calling activate one more time - spout.activate(); - spout.deactivate(); - - // Close spout - spout.close(); - - // Verify mocks - verify(mockTopologyContext, times(1)).getThisTaskIndex(); - } - - /** - * Basic usage test. - */ - @Test - void smokeTest_consumeAndAckMessages() throws InterruptedException { - // Create spout - final ISpout spout = new RedisStreamSpout(configBuilder.build()); - final StubSpoutCollector collector = new StubSpoutCollector(); - - // Open spout - spout.open(stormConfig, mockTopologyContext, new SpoutOutputCollector(collector)); - - // activate spout - spout.activate(); - - // Lets publish 10 messages to the stream - final List producedMsgIds = redisTestHelper.produceMessages(streamKey, 10); - - // Now lets try to get those from the spout - do { - spout.nextTuple(); - Thread.sleep(100L); - } while (collector.getEmittedTuples().size() < 10); - - // Call next tuple a few more times, should be a no-op - for (int counter = 0; counter < 10; counter++) { - Thread.sleep(100L); - spout.nextTuple(); - } - - // Verify what got emitted. - assertEquals(10, collector.getEmittedTuples().size(), "Should have found 10 emitted tuples."); - - final String expectedStreamId = Utils.DEFAULT_STREAM_ID; - for (int index = 0; index < producedMsgIds.size(); index++) { - final EmittedTuple emittedTuple = collector.getEmittedTuples().get(index); - - // Verify message Id. - assertEquals(producedMsgIds.get(index), emittedTuple.getMessageId()); - - // Verify Stream Id - assertEquals(expectedStreamId, emittedTuple.getStreamId()); - - // Verify tuple value - assertEquals(3, emittedTuple.getTuple().size(), "Should have 3 values"); - - // Look for value - final String expectedValue = "value" + index; - boolean foundValue = emittedTuple.getTuple().stream() - .anyMatch((entry) -> entry.equals(expectedValue)); - assertTrue(foundValue, "Failed to find key tuple value"); - - final String expectedMsgIdValue = producedMsgIds.get(index); - foundValue = emittedTuple.getTuple().stream() - .anyMatch((entry) -> entry.equals(expectedMsgIdValue)); - assertTrue(foundValue, "Failed to find msgId tuple value"); - } - - // See that we have 10 items pending - StreamConsumerInfo consumerInfo = redisTestHelper.getConsumerInfo(streamKey, GROUP_NAME, CONSUMER_ID); - assertNotNull(consumerInfo, "Failed to find consumer info!"); - - // Verify we have 10 items pending - assertEquals(10L, consumerInfo.getPending(), "Found entries pending"); - - // Now Ack the messages - collector.getEmittedTuples().stream() - .map(EmittedTuple::getMessageId) - .forEach(spout::ack); - - // Small delay waiting for processing. - Thread.sleep(1000L); - - // Verify that our message were acked in redis. - consumerInfo = redisTestHelper.getConsumerInfo(streamKey, GROUP_NAME, CONSUMER_ID); - assertNotNull(consumerInfo, "Failed to find consumer info!"); - - // Verify we have nothing pending - assertEquals(0L, consumerInfo.getPending(), "Found entries pending?"); - - // Deactivate and close - spout.deactivate(); - spout.close(); - - // Verify mocks - verify(mockTopologyContext, times(1)).getThisTaskIndex(); - } - - /** - * Basic usage with retry failure handler. - */ - @Test - void smokeTest_consumeFailAndAckMessages() throws InterruptedException { - // Swap out failure handler - configBuilder.withFailureHandler(new RetryFailedTuples(2)); - - // Create spout - final ISpout spout = new RedisStreamSpout(configBuilder.build()); - final StubSpoutCollector collector = new StubSpoutCollector(); - - // Open spout - spout.open(stormConfig, mockTopologyContext, new SpoutOutputCollector(collector)); - - // activate spout - spout.activate(); - - // Lets publish 10 messages to the stream - List producedMsgIds = redisTestHelper.produceMessages(streamKey, 10); - - // Now lets try to get 5 of those those from the spout... - do { - spout.nextTuple(); - Thread.sleep(100L); - } while (collector.getEmittedTuples().size() < 5); - - - // Verify what got emitted. - assertEquals(5, collector.getEmittedTuples().size(), "Should have found 10 emitted tuples."); - - final String expectedStreamId = Utils.DEFAULT_STREAM_ID; - for (int index = 0; index < 5; index++) { - final EmittedTuple emittedTuple = collector.getEmittedTuples().get(index); - - // Verify message Id. - assertEquals(producedMsgIds.get(index), emittedTuple.getMessageId()); - - // Verify Stream Id - assertEquals(expectedStreamId, emittedTuple.getStreamId()); - - // Verify tuple value - assertEquals(3, emittedTuple.getTuple().size(), "Should have 3 values"); - - // Look for value - final String expectedValue = "value" + index; - boolean foundValue = emittedTuple.getTuple().stream() - .anyMatch((entry) -> entry.equals(expectedValue)); - assertTrue(foundValue, "Failed to find key tuple value"); - - final String expectedMsgIdValue = producedMsgIds.get(index); - foundValue = emittedTuple.getTuple().stream() - .anyMatch((entry) -> entry.equals(expectedMsgIdValue)); - assertTrue(foundValue, "Failed to find msgId tuple value"); - } - - // See that we have 10 items pending - StreamConsumerInfo consumerInfo = redisTestHelper.getConsumerInfo(streamKey, GROUP_NAME, CONSUMER_ID); - assertNotNull(consumerInfo, "Failed to find consumer info!"); - assertEquals(10L, consumerInfo.getPending(), "Found entries pending"); - - final List messageIdsToFail = new ArrayList<>(); - - for (int index = 0; index < 5; index++) { - // Now ack the first 3 messages - if (index < 3) { - spout.ack( - collector.getEmittedTuples().get(index).getMessageId() - ); - } else { - // Fail the remaining two - messageIdsToFail.add((String) collector.getEmittedTuples().get(index).getMessageId()); - spout.fail( - collector.getEmittedTuples().get(index).getMessageId() - ); - } - } - - // And reset our collector - collector.reset(); - - // Small delay waiting for processing. - Thread.sleep(1000L); - - // Verify that our message were acked in redis. - consumerInfo = redisTestHelper.getConsumerInfo(streamKey, GROUP_NAME, CONSUMER_ID); - assertNotNull(consumerInfo, "Failed to find consumer info!"); - - // Verify we have 7 pending - assertEquals(7L, consumerInfo.getPending(), "Found entries pending"); - - // Ask for the next two tuples, we should get our failed tuples back out. - do { - spout.nextTuple(); - } while (collector.getEmittedTuples().size() < 2); - - // We should have emitted two tuples. - assertEquals(2, collector.getEmittedTuples().size()); - assertEquals(messageIdsToFail.get(0), collector.getEmittedTuples().get(0).getMessageId()); - assertEquals(messageIdsToFail.get(1), collector.getEmittedTuples().get(1).getMessageId()); - - // Ack them - spout.ack(messageIdsToFail.get(0)); - spout.ack(messageIdsToFail.get(1)); - - // Small delay waiting for processing. - Thread.sleep(1000L); - - // Verify that our message were acked in redis. - consumerInfo = redisTestHelper.getConsumerInfo(streamKey, GROUP_NAME, CONSUMER_ID); - assertNotNull(consumerInfo, "Failed to find consumer info!"); - - // Verify we have 5 pending - assertEquals(5L, consumerInfo.getPending(), "Found entries pending"); - - // Deactivate and close - spout.deactivate(); - spout.close(); - - // Verify mocks - verify(mockTopologyContext, times(1)).getThisTaskIndex(); - } - - /** - * Verify declareOutputFields using TestTupleConverter. - */ - @Test - void test_declareOutputFields() { - // Create a test implementation - final TupleConverter converter = new DummyTupleConverter() ; - - // Update config - configBuilder.withTupleConverter(converter); - - // Create spout - final IRichSpout spout = new RedisStreamSpout(configBuilder.build()); - final StubSpoutCollector collector = new StubSpoutCollector(); - - // Open spout and activate. - spout.open(stormConfig, mockTopologyContext, new SpoutOutputCollector(collector)); - spout.activate(); - - // Publish 9 records to redis. - redisTestHelper.produceMessages(streamKey, 9); - - // Pull via spout - do { - spout.nextTuple(); - } while (collector.getEmittedTuples().size() < 9); - - // We should have emitted 9 tuples. - assertEquals(9, collector.getEmittedTuples().size()); - - // Make sure each tuple went out on the correct stream - for (int index = 0; index < 9; index++) { - final String expectedStream = "stream" + ((index % 3) + 1); - final EmittedTuple emittedTuple = collector.getEmittedTuples().get(index); - - // Verify stream - assertEquals(expectedStream, emittedTuple.getStreamId()); - } - - // Deactivate and close - spout.deactivate(); - spout.close(); - - // Verify mocks - verify(mockTopologyContext, times(1)).getThisTaskIndex(); - } - - /** - * Verify spout emits tuples down the correct stream. - */ - @Test - void test_EmitDownSeparateStreams() { - // Create a test implementation - final TupleConverter converter = new DummyTupleConverter() ; - - // Update config - configBuilder.withTupleConverter(converter); - - // Create spout - // Create spout - final IRichSpout spout = new RedisStreamSpout(configBuilder.build()); - final StubSpoutCollector collector = new StubSpoutCollector(); - - // Open spout - spout.open(stormConfig, mockTopologyContext, new SpoutOutputCollector(collector)); - - // Ask for stream names - final OutputFieldsGetter getter = new OutputFieldsGetter(); - spout.declareOutputFields(getter); - - // Validate - final Map entries = getter.getFieldsDeclaration(); - assertEquals(3, entries.size(), "Should have 3 entries"); - - // Verify Stream1 - assertTrue(entries.containsKey("stream1"), "should have entry for 'stream1'"); - StreamInfo info = entries.get("stream1"); - assertEquals(3, info.get_output_fields().size(), "Should have 3 fields"); - assertEquals("field_a", info.get_output_fields().get(0)); - assertEquals("field_b", info.get_output_fields().get(1)); - assertEquals("field_c", info.get_output_fields().get(2)); - - // Verify Stream2 - assertTrue(entries.containsKey("stream2"), "should have entry for 'stream2'"); - info = entries.get("stream2"); - assertEquals(3, info.get_output_fields().size(), "Should have 3 fields"); - assertEquals("field_d", info.get_output_fields().get(0)); - assertEquals("field_e", info.get_output_fields().get(1)); - assertEquals("field_f", info.get_output_fields().get(2)); - - // Verify Stream3 - assertTrue(entries.containsKey("stream3"), "should have entry for 'stream3'"); - info = entries.get("stream3"); - assertEquals(3, info.get_output_fields().size(), "Should have 3 fields"); - assertEquals("field_g", info.get_output_fields().get(0)); - assertEquals("field_h", info.get_output_fields().get(1)); - assertEquals("field_i", info.get_output_fields().get(2)); - - // Deactivate and close - spout.close(); - - // Verify mocks - verify(mockTopologyContext, times(1)).getThisTaskIndex(); - } - - /** - * Verify if tuple converter instance returns null, then the message - * is simply acked and nothing is emitted. - */ - @Test - void test_NullConversionJustGetsAckedNothingEmitted() { - // Create a test implementation - final TupleConverter converter = new NullTupleConverter() ; - - // Update config - configBuilder.withTupleConverter(converter); - - // Create spout - final IRichSpout spout = new RedisStreamSpout(configBuilder.build()); - final StubSpoutCollector collector = new StubSpoutCollector(); - - // Open spout and activate - spout.open(stormConfig, mockTopologyContext, new SpoutOutputCollector(collector)); - spout.activate(); - - // Publish 10 records to redis. - redisTestHelper.produceMessages(streamKey, 10); - - // Attempt to pull via spout. - // We expect to get nothing. - final long endTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10); - do { - spout.nextTuple(); - } while (System.currentTimeMillis() < endTime); - - // We should have emitted 0 tuples. - assertEquals(0, collector.getEmittedTuples().size()); - - // Verify that all are showing as acked in redis. - // See that we have 10 items pending - StreamConsumerInfo consumerInfo = redisTestHelper.getConsumerInfo(streamKey, GROUP_NAME, CONSUMER_ID); - assertNotNull(consumerInfo, "Failed to find consumer info!"); - - // Verify we have 0 items pending - assertEquals(0L, consumerInfo.getPending(), "Found entries pending"); - - // Deactivate and close - spout.deactivate(); - spout.close(); - - // Verify mocks - verify(mockTopologyContext, times(1)).getThisTaskIndex(); - } - - /** - * Dummy Implementation for tests. - */ - private static class DummyTupleConverter implements TupleConverter { - private final String[] streams = new String[]{"stream1", "stream2", "stream3"}; - - private int counter = 0; - - @Override - public TupleValue createTuple(final Message message) { - final String streamName; - switch (counter) { - case 0: - streamName = "stream1"; - break; - case 1: - streamName = "stream2"; - break; - default: - streamName = "stream3"; - break; - } - // Increment counter - counter = (counter + 1) % 3; - - final List values = new ArrayList<>(); - values.add("value1"); - values.add("value2"); - values.add("value3"); - - return new TupleValue(values, streamName); - } - - @Override - public Fields getFieldsFor(final String stream) { - if ("stream1".equals(stream)) { - return new Fields("field_a", "field_b", "field_c"); - } else if ("stream2".equals(stream)) { - return new Fields("field_d", "field_e", "field_f"); - } else if ("stream3".equals(stream)) { - return new Fields("field_g", "field_h", "field_i"); - } - throw new IllegalArgumentException("Unknow stream " + stream); - } - - @Override - public List streams() { - return Arrays.asList(streams); - } - } - - /** - * Implementation that always returns null. - */ - private static class NullTupleConverter implements TupleConverter { - - @Override - public TupleValue createTuple(final Message message) { - return null; - } - - @Override - public Fields getFieldsFor(final String stream) { - return new Fields("value"); - } - } -} \ No newline at end of file diff --git a/src/test/java/org/sourcelab/storm/spout/redis/RedisStreamSpout_ClusterIntegrationTest.java b/src/test/java/org/sourcelab/storm/spout/redis/RedisStreamSpout_ClusterIntegrationTest.java new file mode 100644 index 0000000..6c36374 --- /dev/null +++ b/src/test/java/org/sourcelab/storm/spout/redis/RedisStreamSpout_ClusterIntegrationTest.java @@ -0,0 +1,24 @@ +package org.sourcelab.storm.spout.redis; + +import org.junit.jupiter.api.Tag; +import org.sourcelab.storm.spout.redis.util.test.RedisTestContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +/** + * + */ +@Testcontainers +@Tag("Integration") +public class RedisStreamSpout_ClusterIntegrationTest extends AbstractRedisStreamSpoutIntegrationTest { + /** + * This test depends on the following Redis Container. + */ + @Container + public RedisTestContainer testContainer = RedisTestContainer.newRedisClusterContainer(); + + @Override + RedisTestContainer getTestContainer() { + return testContainer; + } +} diff --git a/src/test/java/org/sourcelab/storm/spout/redis/RedisStreamSpout_RedisIntegrationTest.java b/src/test/java/org/sourcelab/storm/spout/redis/RedisStreamSpout_RedisIntegrationTest.java new file mode 100644 index 0000000..7de99c2 --- /dev/null +++ b/src/test/java/org/sourcelab/storm/spout/redis/RedisStreamSpout_RedisIntegrationTest.java @@ -0,0 +1,21 @@ +package org.sourcelab.storm.spout.redis; + +import org.junit.jupiter.api.Tag; +import org.sourcelab.storm.spout.redis.util.test.RedisTestContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +@Testcontainers +@Tag("Integration") +public class RedisStreamSpout_RedisIntegrationTest extends AbstractRedisStreamSpoutIntegrationTest { + /** + * This test depends on the following Redis Container. + */ + @Container + public RedisTestContainer testContainer = RedisTestContainer.newRedisContainer(); + + @Override + RedisTestContainer getTestContainer() { + return testContainer; + } +} diff --git a/src/test/java/org/sourcelab/storm/spout/redis/client/LettuceClientIntegrationTest.java b/src/test/java/org/sourcelab/storm/spout/redis/client/AbstractLettuceClientIntegrationTest.java similarity index 93% rename from src/test/java/org/sourcelab/storm/spout/redis/client/LettuceClientIntegrationTest.java rename to src/test/java/org/sourcelab/storm/spout/redis/client/AbstractLettuceClientIntegrationTest.java index 4c358b9..e4b5160 100644 --- a/src/test/java/org/sourcelab/storm/spout/redis/client/LettuceClientIntegrationTest.java +++ b/src/test/java/org/sourcelab/storm/spout/redis/client/AbstractLettuceClientIntegrationTest.java @@ -2,15 +2,12 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.sourcelab.storm.spout.redis.Message; import org.sourcelab.storm.spout.redis.RedisStreamSpoutConfig; import org.sourcelab.storm.spout.redis.example.TestTupleConverter; +import org.sourcelab.storm.spout.redis.util.test.RedisTestContainer; import org.sourcelab.storm.spout.redis.util.test.RedisTestHelper; -import org.testcontainers.containers.GenericContainer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; import java.util.ArrayList; import java.util.List; @@ -22,16 +19,12 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; -@Testcontainers -@Tag("Integration") -class LettuceClientIntegrationTest { - /** - * This test depends ont he following Redis Container. - */ - @Container - public GenericContainer redis = new GenericContainer<>("redis:5.0.3-alpine") - .withExposedPorts(6379); - +/** + * Abstract Integration test over LettuceClient. + * Used as a base to test the client against both a single Redis instance, and against + * a RedisCluster instance. + */ +abstract class AbstractLettuceClientIntegrationTest { private static final String CONSUMER_ID_PREFIX = "ConsumerId"; private static final int MAX_CONSUMED_PER_READ = 10; @@ -41,8 +34,10 @@ class LettuceClientIntegrationTest { private LettuceClient client; private String streamKey; + abstract RedisTestContainer getTestContainer(); + @BeforeEach - void setUp() { + void setUp(){ // Generate a random stream key streamKey = "MyStreamKey" + System.currentTimeMillis(); @@ -52,8 +47,8 @@ void setUp() { // Create client instance under test. client = new LettuceClient(config, 1); - // Ensure that the key exists! - redisTestHelper = new RedisTestHelper(config.getConnectString()); + // Create test helper instance. + redisTestHelper = getTestContainer().getRedisTestHelper(); } @AfterEach @@ -77,7 +72,7 @@ void testConnectAndDisconnect_smokeTest() { * Simple connect, consume, and disconnect smoke test for a single consumer. */ @Test - void testSimpleConsume() { + void testSimpleConsume() throws InterruptedException { // Connect client.connect(); @@ -351,15 +346,16 @@ private void verifyConsumedMessagesExistWithNoDuplicates( } private RedisStreamSpoutConfig createConfiguration(final String consumerId) { - return RedisStreamSpoutConfig.newBuilder() - .withHost(redis.getHost()) - .withPort(redis.getFirstMappedPort()) + final RedisStreamSpoutConfig.Builder builder = RedisStreamSpoutConfig.newBuilder() .withGroupName("DefaultGroupName") .withStreamKey(streamKey) .withConsumerIdPrefix(consumerId) .withMaxConsumePerRead(MAX_CONSUMED_PER_READ) .withNoRetryFailureHandler() - .withTupleConverter(new TestTupleConverter()) + .withTupleConverter(new TestTupleConverter()); + + return getTestContainer() + .addConnectionDetailsToConfig(builder) .build(); } -} \ No newline at end of file +} diff --git a/src/test/java/org/sourcelab/storm/spout/redis/client/ConsumerTest.java b/src/test/java/org/sourcelab/storm/spout/redis/client/ConsumerTest.java index b7d908d..e2e3771 100644 --- a/src/test/java/org/sourcelab/storm/spout/redis/client/ConsumerTest.java +++ b/src/test/java/org/sourcelab/storm/spout/redis/client/ConsumerTest.java @@ -39,8 +39,7 @@ class ConsumerTest { // Create config private final RedisStreamSpoutConfig config = RedisStreamSpoutConfig.newBuilder() - .withHost(HOSTNAME) - .withPort(PORT) + .withServer(HOSTNAME, PORT) .withStreamKey(STREAM_KEY) .withGroupName("GroupName") .withConsumerIdPrefix("ConsumerId") diff --git a/src/test/java/org/sourcelab/storm/spout/redis/client/LettuceClient_RedisClusterIntegrationTest.java b/src/test/java/org/sourcelab/storm/spout/redis/client/LettuceClient_RedisClusterIntegrationTest.java new file mode 100644 index 0000000..338a35c --- /dev/null +++ b/src/test/java/org/sourcelab/storm/spout/redis/client/LettuceClient_RedisClusterIntegrationTest.java @@ -0,0 +1,29 @@ +package org.sourcelab.storm.spout.redis.client; + +import org.junit.jupiter.api.Tag; +import org.sourcelab.storm.spout.redis.util.test.RedisTestContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +/** + * NOTE: This Integration test requires Docker to run. + * + * This integration test verifies LettuceClient against a RedisCluster instance to verify + * things work as expected when consuming from a RedisCluster. + * + * Test cases are defined in {@link AbstractLettuceClientIntegrationTest}. + */ +@Testcontainers +@Tag("Integration") +public class LettuceClient_RedisClusterIntegrationTest extends AbstractLettuceClientIntegrationTest { + /** + * This test depends on the following Redis Container. + */ + @Container + public RedisTestContainer redisContainer = RedisTestContainer.newRedisClusterContainer(); + + @Override + RedisTestContainer getTestContainer() { + return redisContainer; + } +} diff --git a/src/test/java/org/sourcelab/storm/spout/redis/client/LettuceClient_RedisIntegrationTest.java b/src/test/java/org/sourcelab/storm/spout/redis/client/LettuceClient_RedisIntegrationTest.java new file mode 100644 index 0000000..b28c976 --- /dev/null +++ b/src/test/java/org/sourcelab/storm/spout/redis/client/LettuceClient_RedisIntegrationTest.java @@ -0,0 +1,29 @@ +package org.sourcelab.storm.spout.redis.client; + +import org.junit.jupiter.api.Tag; +import org.sourcelab.storm.spout.redis.util.test.RedisTestContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +/** + * NOTE: This Integration test requires Docker to run. + * + * This integration test verifies LettuceClient against a Redis instance to verify + * things work as expected when consuming from a Redis instance. + * + * Test cases are defined in {@link AbstractLettuceClientIntegrationTest}. + */ +@Testcontainers +@Tag("Integration") +public class LettuceClient_RedisIntegrationTest extends AbstractLettuceClientIntegrationTest { + /** + * This test depends on the following Redis Container. + */ + @Container + public RedisTestContainer redisContainer = RedisTestContainer.newRedisContainer(); + + @Override + RedisTestContainer getTestContainer() { + return redisContainer; + } +} diff --git a/src/test/java/org/sourcelab/storm/spout/redis/client/LettuceClusterAdapterTest.java b/src/test/java/org/sourcelab/storm/spout/redis/client/LettuceClusterAdapterTest.java new file mode 100644 index 0000000..d5db7cd --- /dev/null +++ b/src/test/java/org/sourcelab/storm/spout/redis/client/LettuceClusterAdapterTest.java @@ -0,0 +1,77 @@ +package org.sourcelab.storm.spout.redis.client; + +import io.lettuce.core.cluster.RedisClusterClient; +import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; +import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +class LettuceClusterAdapterTest { + private RedisClusterClient mockClusterClient; + private StatefulRedisClusterConnection mockConnection; + + @BeforeEach + void setup() { + mockClusterClient = mock(RedisClusterClient.class); + mockConnection = mock(StatefulRedisClusterConnection.class); + } + + @AfterEach + void cleanup() { + verifyNoMoreInteractions(mockClusterClient); + verifyNoMoreInteractions(mockConnection); + } + + @Test + void testAdapter() { + final RedisAdvancedClusterCommands mockCommands = mock(RedisAdvancedClusterCommands.class); + + // Setup mocks + when(mockClusterClient.connect()) + .thenReturn(mockConnection); + + when(mockConnection.sync()) + .thenReturn(mockCommands); + + // Create instance + final LettuceClusterAdapter adapter = new LettuceClusterAdapter(mockClusterClient); + + // Verify "not connected" + assertFalse(adapter.isConnected(), "Should return false"); + + // Call connect. + adapter.connect(); + + // Verify "connected" + assertTrue(adapter.isConnected(), "Should return true"); + + // Verify interactions + verify(mockClusterClient, times(1)) + .connect(); + + // Call sync multiple times + assertNotNull(adapter.getSyncCommands()); + assertNotNull(adapter.getSyncCommands()); + assertNotNull(adapter.getSyncCommands()); + + // Only interacts with mock once. + verify(mockConnection, times(1)) + .sync(); + + // Call shutdown + adapter.shutdown(); + + verify(mockConnection, times(1)).close(); + verify(mockClusterClient, times(1)).shutdown(); + } +} \ No newline at end of file diff --git a/src/test/java/org/sourcelab/storm/spout/redis/client/LettuceRedisAdapterTest.java b/src/test/java/org/sourcelab/storm/spout/redis/client/LettuceRedisAdapterTest.java new file mode 100644 index 0000000..f1caecc --- /dev/null +++ b/src/test/java/org/sourcelab/storm/spout/redis/client/LettuceRedisAdapterTest.java @@ -0,0 +1,78 @@ +package org.sourcelab.storm.spout.redis.client; + +import io.lettuce.core.RedisClient; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.sync.RedisCommands; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +class LettuceRedisAdapterTest { + + private RedisClient mockRedisClient; + private StatefulRedisConnection mockConnection; + + @BeforeEach + void setup() { + mockRedisClient = mock(RedisClient.class); + mockConnection = mock(StatefulRedisConnection.class); + } + + @AfterEach + void cleanup() { + verifyNoMoreInteractions(mockRedisClient); + verifyNoMoreInteractions(mockConnection); + } + + @Test + void testAdapter() { + final RedisCommands mockCommands = mock(RedisCommands.class); + + // Setup mocks + when(mockRedisClient.connect()) + .thenReturn(mockConnection); + + when(mockConnection.sync()) + .thenReturn(mockCommands); + + // Create instance + final LettuceRedisAdapter adapter = new LettuceRedisAdapter(mockRedisClient); + + // Verify "not connected" + assertFalse(adapter.isConnected(), "Should return false"); + + // Call connect. + adapter.connect(); + + // Verify "connected" + assertTrue(adapter.isConnected(), "Should return true"); + + // Verify interactions + verify(mockRedisClient, times(1)) + .connect(); + + // Call sync multiple times + assertNotNull(adapter.getSyncCommands()); + assertNotNull(adapter.getSyncCommands()); + assertNotNull(adapter.getSyncCommands()); + + // Only interacts with mock once. + verify(mockConnection, times(1)) + .sync(); + + // Call shutdown + adapter.shutdown(); + + verify(mockConnection, times(1)).close(); + verify(mockRedisClient, times(1)).shutdown(); + } +} \ No newline at end of file diff --git a/src/test/java/org/sourcelab/storm/spout/redis/example/ExampleLocalTopology.java b/src/test/java/org/sourcelab/storm/spout/redis/example/ExampleLocalTopology.java index a37cd65..08deaf0 100644 --- a/src/test/java/org/sourcelab/storm/spout/redis/example/ExampleLocalTopology.java +++ b/src/test/java/org/sourcelab/storm/spout/redis/example/ExampleLocalTopology.java @@ -9,8 +9,8 @@ import org.sourcelab.storm.spout.redis.RedisStreamSpoutConfig; import org.sourcelab.storm.spout.redis.failhandler.ExponentialBackoffConfig; import org.sourcelab.storm.spout.redis.failhandler.ExponentialBackoffFailureHandler; +import org.sourcelab.storm.spout.redis.util.test.RedisTestContainer; import org.sourcelab.storm.spout.redis.util.test.RedisTestHelper; -import org.testcontainers.containers.GenericContainer; import java.io.BufferedReader; import java.io.IOException; @@ -20,9 +20,11 @@ /** * Example topology using the RedisStreamSpout deployed against a LocalTopology cluster. + * + * NOTE: This required Docker to run. */ public class ExampleLocalTopology { - private final GenericContainer redis; + private final RedisTestContainer redis; private Thread producerThread; /** @@ -44,8 +46,7 @@ public static void main(final String[] args) throws Exception { */ public ExampleLocalTopology() { // Setup REDIS Container. - redis = new GenericContainer<>("redis:5.0.3-alpine") - .withExposedPorts(6379); + redis = RedisTestContainer.newRedisContainer(); } /** @@ -65,8 +66,7 @@ public void runExample(final boolean enableDebug) throws Exception { // Create config final RedisStreamSpoutConfig.Builder configBuilder = RedisStreamSpoutConfig.newBuilder() // Set Connection Properties - .withHost(redis.getHost()) - .withPort(redis.getFirstMappedPort()) + .withServer(redis.getHost(), redis.getFirstMappedPort()) // Consumer Properties .withGroupName(groupName) .withConsumerIdPrefix(consumerPrefix) @@ -121,7 +121,7 @@ public void runExample(final boolean enableDebug) throws Exception { private void startProducerThread(final String streamKey) { final Runnable runnable = () -> { // Create helper - final RedisTestHelper testHelper = new RedisTestHelper("redis://" + redis.getHost() + ":" + redis.getFirstMappedPort()); + final RedisTestHelper testHelper = redis.getRedisTestHelper(); long tupleCounter = 0L; do { diff --git a/src/test/java/org/sourcelab/storm/spout/redis/funnel/MemoryFunnelTest.java b/src/test/java/org/sourcelab/storm/spout/redis/funnel/MemoryFunnelTest.java index c3ca511..9b6958d 100644 --- a/src/test/java/org/sourcelab/storm/spout/redis/funnel/MemoryFunnelTest.java +++ b/src/test/java/org/sourcelab/storm/spout/redis/funnel/MemoryFunnelTest.java @@ -47,8 +47,7 @@ public void cleanup() { void testPassingMessages() { // Create config final RedisStreamSpoutConfig config = RedisStreamSpoutConfig.newBuilder() - .withHost("host") - .withPort(123) + .withServer("host", 123) .withGroupName("GroupName") .withStreamKey("Key") .withConsumerIdPrefix("ConsumerId") @@ -108,8 +107,7 @@ void testPassingMessages() { void testPassingAcks() { // Create config final RedisStreamSpoutConfig config = RedisStreamSpoutConfig.newBuilder() - .withHost("host") - .withPort(123) + .withServer("host", 123) .withGroupName("GroupName") .withStreamKey("Key") .withConsumerIdPrefix("ConsumerId") @@ -164,8 +162,7 @@ void testPassingAcks() { void test_failureHandler() { // Create config final RedisStreamSpoutConfig config = RedisStreamSpoutConfig.newBuilder() - .withHost("host") - .withPort(123) + .withServer("host", 123) .withGroupName("GroupName") .withStreamKey("Key") .withConsumerIdPrefix("ConsumerId") @@ -261,8 +258,7 @@ void test_failureHandler() { void test_disablingMetricsDoesNotRegisterMetrics() { // Create config final RedisStreamSpoutConfig config = RedisStreamSpoutConfig.newBuilder() - .withHost("host") - .withPort(123) + .withServer("host", 123) .withGroupName("GroupName") .withStreamKey("Key") .withConsumerIdPrefix("ConsumerId") diff --git a/src/test/java/org/sourcelab/storm/spout/redis/util/test/RedisTestContainer.java b/src/test/java/org/sourcelab/storm/spout/redis/util/test/RedisTestContainer.java new file mode 100644 index 0000000..3f9029b --- /dev/null +++ b/src/test/java/org/sourcelab/storm/spout/redis/util/test/RedisTestContainer.java @@ -0,0 +1,134 @@ +package org.sourcelab.storm.spout.redis.util.test; + +import org.sourcelab.storm.spout.redis.RedisStreamSpoutConfig; +import org.testcontainers.containers.FixedHostPortGenericContainer; +import org.testcontainers.containers.startupcheck.MinimumDurationRunningStartupCheckStrategy; + +import java.time.Duration; + +/** + * Wrapper on top of TestContainer which attempts to hide the differences between running an Integration + * test against a Redis instance vs a RedisCluster instance. + */ +public class RedisTestContainer extends FixedHostPortGenericContainer { + /** + * The name of the Docker image to run Redis integration tests against. + */ + public static final String REDIS_DOCKER_CONTAINER_IMAGE = "redis:6.0.5-alpine"; + + /** + * The name of the Docker image to run RedisCluster integration tests against. + */ + public static final String REDIS_CLUSTER_DOCKER_CONTAINER_IMAGE = "grokzen/redis-cluster:latest"; + + /** + * This gets set to true if we're running the RedisCluster container. + */ + private final boolean isCluster; + + /** + * TestHelper instance. + */ + private RedisTestHelper redisTestHelper = null; + + /** + * Constructor. + * @param dockerImageName Name of the image to launch. + */ + private RedisTestContainer(final String dockerImageName) { + super(dockerImageName); + + if (getDockerImageName().equalsIgnoreCase(REDIS_DOCKER_CONTAINER_IMAGE)) { + this.isCluster = false; + } else { + this.isCluster = true; + } + } + + /** + * Factory method for a new TestContainer running a Redis server. + */ + public static RedisTestContainer newRedisContainer() { + return new RedisTestContainer(REDIS_DOCKER_CONTAINER_IMAGE) + .withExposedPorts(6379); + } + + /** + * Factory method for a new TestContainer running a RedisCluster server. + */ + public static RedisTestContainer newRedisClusterContainer() { + return new RedisTestContainer(REDIS_CLUSTER_DOCKER_CONTAINER_IMAGE) + // Because of how RedisCluster service discovery works, we MUST use fixed ports. + .withFixedExposedPort(7000, 7000) + .withFixedExposedPort(7001, 7001) + .withFixedExposedPort(7002, 7002) + .withFixedExposedPort(7003, 7003) + .withFixedExposedPort(7004, 7004) + .withFixedExposedPort(7005, 7005) + + // Override IP so Discovery works. + .withEnv("IP", "127.0.0.1") + // TODO need to come up with a better solution to determining if the cluster is online and ready. + .withStartupCheckStrategy( + new MinimumDurationRunningStartupCheckStrategy(Duration.ofSeconds(10)) + ); + } + + /** + * Getter for the RedisTestHelper utility class configured + * to talk to the Redis instance running in the TestContainer. + */ + public RedisTestHelper getRedisTestHelper() { + if (redisTestHelper == null) { + if (!isCluster) { + redisTestHelper = RedisTestHelper.createRedisHelper(getConnectStr()); + } else { + redisTestHelper = RedisTestHelper.createClusterHelper(getConnectStr()); + } + } + return redisTestHelper; + } + + /** + * Get the port Redis instance is listening on. + */ + public int getPort() { + if (isCluster) { + return 7000; + } else { + return getFirstMappedPort(); + } + } + + /** + * Get the connection URI to talk to the Redis instance running in the Container. + */ + public String getConnectStr() { + return "redis://" + getHost() + ":" + getPort(); + } + + /** + * Utility method to update a RedisStreamSpoutConfig with the appropriate connection + * details to talk to the Redis instance running in the Container. + */ + public RedisStreamSpoutConfig.Builder addConnectionDetailsToConfig(final RedisStreamSpoutConfig.Builder builder) { + if (isCluster) { + builder.withClusterNode(getHost(), getPort()); + } else { + builder.withServer(getHost(), getPort()); + } + return builder; + } + + /** + * Override the default stop() method to also shutdown the RedisTestHelper instance. + */ + @Override + public void stop() { + if (redisTestHelper != null) { + redisTestHelper.close(); + redisTestHelper = null; + } + super.stop(); + } +} diff --git a/src/test/java/org/sourcelab/storm/spout/redis/util/test/RedisTestHelper.java b/src/test/java/org/sourcelab/storm/spout/redis/util/test/RedisTestHelper.java index a5cafc8..637c320 100644 --- a/src/test/java/org/sourcelab/storm/spout/redis/util/test/RedisTestHelper.java +++ b/src/test/java/org/sourcelab/storm/spout/redis/util/test/RedisTestHelper.java @@ -1,10 +1,11 @@ package org.sourcelab.storm.spout.redis.util.test; import io.lettuce.core.RedisClient; -import io.lettuce.core.api.StatefulRedisConnection; -import io.lettuce.core.api.sync.RedisCommands; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import io.lettuce.core.api.sync.RedisStreamCommands; +import io.lettuce.core.cluster.RedisClusterClient; +import org.sourcelab.storm.spout.redis.client.LettuceAdapter; +import org.sourcelab.storm.spout.redis.client.LettuceClusterAdapter; +import org.sourcelab.storm.spout.redis.client.LettuceRedisAdapter; import java.util.ArrayList; import java.util.HashMap; @@ -16,38 +17,47 @@ * Test Helper for interacting with a live Redis server. */ public class RedisTestHelper implements AutoCloseable { - private static final Logger logger = LoggerFactory.getLogger(RedisTestHelper.class); - - private final RedisClient redisClient; - private final StatefulRedisConnection connection; + private final LettuceAdapter redisClient; /** * Constructor. - * @param connectStr Redis Connect string. + * See Factory methods. */ - public RedisTestHelper(final String connectStr) { - this.redisClient = RedisClient.create(connectStr); - this.connection = redisClient.connect(); + private RedisTestHelper(final LettuceAdapter adapter) { + this.redisClient = Objects.requireNonNull(adapter); + this.redisClient.connect(); } - public void createStreamKey(final String key) { - Objects.requireNonNull(key); - final RedisCommands syncCommands = connection.sync(); - - final Map messageBody = new HashMap<>(); - messageBody.put("key", "0"); + /** + * Factory method for creating an instance configured to talk to a single Redis instance. + * @param connectStr URI for Redis instance. + */ + public static RedisTestHelper createRedisHelper(final String connectStr) { + return new RedisTestHelper( + new LettuceRedisAdapter(RedisClient.create(connectStr)) + ); + } - // Write initial value. - final String messageId = syncCommands.xadd( - key, - messageBody + /** + * Factory method for creating an instance configured to talk to a RedisCluster instance. + * @param connectStr URI for RedisCluster instance. + */ + public static RedisTestHelper createClusterHelper(final String connectStr) { + return new RedisTestHelper( + new LettuceClusterAdapter(RedisClusterClient.create(connectStr)) ); } + /** + * Produce generic messages into the supplied stream key. + * @param stream StreamKey to produce messages into. + * @param numberOfMessages How many messages to produce. + * @return List of MessageIds produced. + */ public List produceMessages(final String stream, final int numberOfMessages) { final List messageIds = new ArrayList<>(); - final RedisCommands commands = connection.sync(); + final RedisStreamCommands commands = redisClient.getSyncCommands(); for (int index = 0; index < numberOfMessages; index++) { final Map messageBody = new HashMap<>(); @@ -71,7 +81,7 @@ public List produceMessages(final String stream, final int numberOfMessa * @return messageId produced. */ public String produceMessage(final String stream, final Map values) { - final RedisCommands commands = connection.sync(); + final RedisStreamCommands commands = redisClient.getSyncCommands(); return commands.xadd( stream, @@ -97,7 +107,7 @@ public StreamConsumerInfo getConsumerInfo(final String streamKey, final String g * @return Map of ConsumerId => Details about that consumer. */ public Map getStreamInfo(final String streamKey, final String groupName) { - final RedisCommands commands = connection.sync(); + final RedisStreamCommands commands = redisClient.getSyncCommands(); final List result = commands.xinfoConsumers(streamKey, groupName); final Map consumerInfos = new HashMap<>(); @@ -143,9 +153,10 @@ public Map getStreamInfo(final String streamKey, fin return consumerInfos; } - @Override + /** + * Shutdown lifecycle method. + */ public void close() { - connection.close(); redisClient.shutdown(); } }