Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,11 @@
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

## 1.1.0 (07/24/2020)
- Add Jedis implementation. Spout defaults to using the Lettuce redis library, but you can configure
to use the Jedis library instead via the `RedisStreamSpoutConfig.withJedisClientLibrary()` method.
- Bugfix on Spout deploy, consumer thread started during `open()` lifecycle call instead of `activate()`.
- Bugfix on Spout restart, resume consuming first from consumers personal pending list.

## 1.0.0 (07/20/2020)
- Initial release!
16 changes: 15 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>org.sourcelab.storm.spout</groupId>
<artifactId>redis-stream-spout</artifactId>
<version>1.0.0</version>
<version>1.1.0</version>

<!-- Module Description and Ownership -->
<name>Redis Streams Spout for Apache Storm.</name>
Expand Down Expand Up @@ -48,6 +48,7 @@

<!-- Redis Client -->
<lettuceVersion>5.3.1.RELEASE</lettuceVersion>
<jedisVersion>3.2.0</jedisVersion>

<!-- Define which version of JUnit 5 to -->
<junit5Version>5.6.2</junit5Version>
Expand Down Expand Up @@ -78,6 +79,11 @@
<artifactId>lettuce-core</artifactId>
<version>${lettuceVersion}</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${jedisVersion}</version>
</dependency>

<!-- Testing Dependencies -->
<dependency>
Expand Down Expand Up @@ -120,6 +126,14 @@
<version>${testContainersVersion}</version>
<scope>test</scope>
</dependency>

<!-- Async Testing Helper -->
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.0.3</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
41 changes: 27 additions & 14 deletions src/main/java/org/sourcelab/storm/spout/redis/RedisStreamSpout.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sourcelab.storm.spout.redis.client.Client;
import org.sourcelab.storm.spout.redis.client.ClientFactory;
import org.sourcelab.storm.spout.redis.client.Consumer;
import org.sourcelab.storm.spout.redis.client.LettuceClient;
import org.sourcelab.storm.spout.redis.client.lettuce.LettuceClient;
import org.sourcelab.storm.spout.redis.funnel.ConsumerFunnel;
import org.sourcelab.storm.spout.redis.funnel.MemoryFunnel;
import org.sourcelab.storm.spout.redis.funnel.SpoutFunnel;
Expand Down Expand Up @@ -83,16 +84,8 @@ public void open(
// Create funnel instance.
this.funnel = new MemoryFunnel(config, spoutConfig, topologyContext);

// Create consumer and client
final int taskIndex = topologyContext.getThisTaskIndex();
final Client client = new LettuceClient(config, taskIndex);
final Consumer consumer = new Consumer(config, client, (ConsumerFunnel) funnel);

// Create background consuming thread.
consumerThread = new Thread(
consumer,
"RedisStreamSpout-ConsumerThread[" + taskIndex + "]"
);
// Create and start consumer thread.
createAndStartConsumerThread();
}

@Override
Expand All @@ -103,12 +96,15 @@ public void close() {

@Override
public void activate() {
if (consumerThread.isAlive()) {
// If the thread is already running and alive
if (consumerThread != null && consumerThread.isAlive()) {
// No-op. It's already running, and deactivate() is a no-op for us.
return;
}
// Start thread, this should return immediately, but start a background processing thread.
consumerThread.start();

// If we haven't created the consumer thread yet, or it has previously died.
// Create and start it
createAndStartConsumerThread();
}

@Override
Expand Down Expand Up @@ -186,4 +182,21 @@ public void declareOutputFields(final OutputFieldsDeclarer declarer) {
public Map<String, Object> getComponentConfiguration() {
return new HashMap<>();
}

/**
* Create background consumer thread.
*/
private void createAndStartConsumerThread() {
// Create consumer and client
final int taskIndex = topologyContext.getThisTaskIndex();
final Client client = new ClientFactory().createClient(config, taskIndex);
final Consumer consumer = new Consumer(config, client, (ConsumerFunnel) funnel);

// Create background consuming thread.
consumerThread = new Thread(
consumer,
"RedisStreamSpout-ConsumerThread[" + taskIndex + "]"
);
consumerThread.start();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.sourcelab.storm.spout.redis;

import org.sourcelab.storm.spout.redis.client.ClientType;
import org.sourcelab.storm.spout.redis.failhandler.NoRetryHandler;

import java.io.Serializable;
Expand Down Expand Up @@ -70,6 +71,11 @@ public class RedisStreamSpoutConfig implements Serializable {
*/
private final boolean metricsEnabled;

/**
* Defines which underlying client library/implementation to use.
*/
private final ClientType clientType;

/**
* Constructor.
* Use Builder instance.
Expand All @@ -85,7 +91,7 @@ private RedisStreamSpoutConfig(

// Other settings
final int maxConsumePerRead, final int maxTupleQueueSize, final int maxAckQueueSize, final long consumerDelayMillis,
final boolean metricsEnabled
final boolean metricsEnabled, final ClientType clientType
) {
// Connection
if (redisCluster != null && redisServer != null) {
Expand Down Expand Up @@ -117,6 +123,9 @@ private RedisStreamSpoutConfig(
this.maxAckQueueSize = maxAckQueueSize;
this.consumerDelayMillis = consumerDelayMillis;
this.metricsEnabled = metricsEnabled;

// Client type implementation
this.clientType = Objects.requireNonNull(clientType);
}

public String getStreamKey() {
Expand Down Expand Up @@ -150,6 +159,17 @@ public String getConnectString() {
return redisCluster.getConnectString();
}

/**
* The URI for connecting to this Redis Server instance with the password masked.
* @return URI for the server.
*/
public String getConnectStringMasked() {
if (!isConnectingToCluster()) {
return redisServer.getConnectStringMasked();
}
return redisCluster.getConnectStringMasked();
}

public int getMaxTupleQueueSize() {
return maxTupleQueueSize;
}
Expand All @@ -174,6 +194,10 @@ public boolean isMetricsEnabled() {
return metricsEnabled;
}

public ClientType getClientType() {
return clientType;
}

/**
* Create a new Builder instance.
* @return Builder for Configuration instance.
Expand Down Expand Up @@ -218,6 +242,12 @@ public static final class Builder {
private long consumerDelayMillis = 1000L;
private boolean metricsEnabled = true;

/**
* Underlying library to use.
* Defaults to using Lettuce.
*/
private ClientType clientType = ClientType.LETTUCE;

private Builder() {
}

Expand Down Expand Up @@ -385,6 +415,27 @@ public Builder withMetricsEnabled(final boolean enabled) {
return this;
}

/**
* Configure the spout to use the Lettuce client library for communicating with redis.
* @return Builder instance.
*/
public Builder withLettuceClientLibrary() {
return withClientType(ClientType.LETTUCE);
}

/**
* Configure the spout to use the Jedis client library for communicating with redis.
* @return Builder instance.
*/
public Builder withJedisClientLibrary() {
return withClientType(ClientType.JEDIS);
}

public Builder withClientType(final ClientType clientType) {
this.clientType = Objects.requireNonNull(clientType);
return this;
}

/**
* Creates new Configuration instance.
* @return Configuration instance.
Expand All @@ -405,7 +456,10 @@ public RedisStreamSpoutConfig build() {
tupleConverter, failureHandler,
// Other settings
maxConsumePerRead, maxTupleQueueSize, maxAckQueueSize, consumerDelayMillis,
metricsEnabled
metricsEnabled,

// Underlying client type
clientType
);
}
}
Expand Down Expand Up @@ -445,6 +499,16 @@ public String getConnectString() {
.map(RedisServer::getConnectString)
.collect(Collectors.joining(","));
}

/**
* The URI for connecting to this Redis Server instance with the password masked.
* @return URI for the server.
*/
public String getConnectStringMasked() {
return getServers().stream()
.map(RedisServer::getConnectStringMasked)
.collect(Collectors.joining(","));
}
}

/**
Expand Down Expand Up @@ -503,6 +567,21 @@ public String getConnectString() {
return connectStr;
}

/**
* The URI for connecting to this Redis Server instance with the password masked.
* @return URI for the server.
*/
public String getConnectStringMasked() {
String connectStr = "redis://";

if (getPassword() != null && !getPassword().trim().isEmpty()) {
connectStr += "XXXXXX@";
}
connectStr += getHost() + ":" + getPort();

return connectStr;
}

@Override
public String toString() {
return "RedisServer{"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.sourcelab.storm.spout.redis.client;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sourcelab.storm.spout.redis.RedisStreamSpoutConfig;
import org.sourcelab.storm.spout.redis.client.jedis.JedisClient;
import org.sourcelab.storm.spout.redis.client.lettuce.LettuceClient;

import java.util.Objects;

/**
* Factory for creating the appropriate Client instance based on config.
*/
public class ClientFactory {
private static final Logger logger = LoggerFactory.getLogger(ClientFactory.class);

/**
* Create the appropriate client intance based on configuration.
* @param config Spout configuration.
* @param instanceId Instance id of spout.
* @return Client.
*/
public Client createClient(final RedisStreamSpoutConfig config, final int instanceId) {
Objects.requireNonNull(config);

switch (config.getClientType()) {
case JEDIS:
logger.info("Using Jedis client library.");
return new JedisClient(config, instanceId);
case LETTUCE:
logger.info("Using Lettuce client library.");
return new LettuceClient(config, instanceId);
default:
throw new IllegalStateException("Unknown/Unhandled Client Type");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.sourcelab.storm.spout.redis.client;

/**
* Defines allowed implementations.
*/
public enum ClientType {
LETTUCE,
JEDIS;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package org.sourcelab.storm.spout.redis.client.jedis;

import redis.clients.jedis.StreamEntry;

import java.util.List;
import java.util.Map;

/**
* Adapter to allow usage of both Jedis and JedisCluster.
*/
public interface JedisAdapter {
/**
* Call connect.
*/
void connect();

/**
* Consume next batch of messages.
* @return List of messages consumed.
*/
List<Map.Entry<String, List<StreamEntry>>> consume();

/**
* Mark the provided messageId as acknowledged/completed.
* @param msgId Id of the message.
*/
void commit(final String msgId);

/**
* Disconnect client.
*/
void close();

/**
* Advance the last offset consumed from PPL.
* @param lastMsgId Id of the last msg consumed.
*/
void advancePplOffset(final String lastMsgId);

/**
* Switch to consuming from latest messages.
*/
void switchToConsumerGroupMessages();
}
Loading