From 3cbf521be71296162b2b52d0139c5986a178cde5 Mon Sep 17 00:00:00 2001 From: M Sazzadul Hoque <7600764+sazzad16@users.noreply.github.com> Date: Thu, 19 May 2022 16:53:22 +0600 Subject: [PATCH] Change xread/xreadGroup calls --- .../redis/AbstractRedisStreamConsumer.java | 74 +++++++------------ .../connectors/redis/RedisStreamConsumer.java | 27 ++++--- .../redis/RedisStreamGroupConsumer.java | 35 ++++----- .../redis/table/RedisStreamDynamicSource.java | 23 +++--- 4 files changed, 71 insertions(+), 88 deletions(-) diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/AbstractRedisStreamConsumer.java b/src/main/java/org/apache/flink/streaming/connectors/redis/AbstractRedisStreamConsumer.java index caf4776..3b5240e 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/redis/AbstractRedisStreamConsumer.java +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/AbstractRedisStreamConsumer.java @@ -18,28 +18,27 @@ package org.apache.flink.streaming.connectors.redis; import org.apache.flink.streaming.connectors.redis.config.StartupMode; - import redis.clients.jedis.Jedis; import redis.clients.jedis.StreamEntry; import redis.clients.jedis.StreamEntryID; -import java.util.AbstractMap.SimpleEntry; -import java.util.Arrays; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Properties; +import java.util.stream.Collectors; -/** @param */ +/** + * @param + */ public abstract class AbstractRedisStreamConsumer extends RedisConsumerBase { - protected final Entry[] streamEntryIds; - private final Map keyIndex = new HashMap<>(); + protected final Map streamEntryIds; public AbstractRedisStreamConsumer( - StartupMode startupMode, String[] streamKeys, Properties configProps) { - super(Arrays.asList(streamKeys), configProps); + StartupMode startupMode, List streamKeys, Properties configProps) { + super(streamKeys, configProps); final StreamEntryID streamEntryID; switch (startupMode) { case EARLIEST: @@ -60,24 +59,17 @@ public AbstractRedisStreamConsumer( throw new IllegalStateException(); } this.streamEntryIds = prepareStreamEntryIds(streamKeys, streamEntryID); - initializeKeyIndex(); - } - - public AbstractRedisStreamConsumer( - String[] streamKeys, Long[] timestamps, Properties configProps) { - this(streamKeys, streamEntryIds(timestamps), configProps); } public AbstractRedisStreamConsumer( - String[] streamKeys, StreamEntryID[] streamIds, Properties configProps) { + List streamKeys, List streamIds, Properties configProps) { this(prepareStreamEntryIds(streamKeys, streamIds), configProps); } private AbstractRedisStreamConsumer( - Entry[] streamIds, Properties configProps) { + Map streamIds, Properties configProps) { super(null, configProps); this.streamEntryIds = streamIds; - initializeKeyIndex(); } @Override @@ -104,44 +96,32 @@ protected abstract void collect( SourceContext sourceContext, String streamKey, StreamEntry streamEntry); protected void updateIdForKey(String streamKey, StreamEntryID streamEntryID) { - int index = keyIndex.get(streamKey); - if (this.streamEntryIds[index].getValue().toString().equals(">")) { + if (this.streamEntryIds.get(streamKey).toString().equals(">")) { // skip } else { - this.streamEntryIds[index].setValue(streamEntryID); + this.streamEntryIds.put(streamKey, streamEntryID); } } - private void initializeKeyIndex() { - int index = 0; - for (Entry streamEntryId : streamEntryIds) { - keyIndex.put(streamEntryId.getKey(), index++); - } + private static Map prepareStreamEntryIds( + List streamKeys, StreamEntryID streamId) { + Map streams = new LinkedHashMap<>(streamKeys.size()); + streamKeys.forEach(streamKey -> streams.put(streamKey, streamId)); + return streams; } - private static Entry[] prepareStreamEntryIds( - String[] streamKeys, StreamEntryID streamId) { - Entry[] streams = new Entry[streamKeys.length]; - for (int i = 0; i < streamKeys.length; i++) { - streams[i] = new SimpleEntry<>(streamKeys[i], streamId); + private static Map prepareStreamEntryIds( + List streamKeys, List streamIds) { + Map streams = new LinkedHashMap<>(streamKeys.size()); + for (int i = 0; i < streamKeys.size(); i++) { + streams.put(streamKeys.get(i), streamIds.get(i)); } - return (Entry[]) streams; + return streams; } - private static Entry[] prepareStreamEntryIds( - String[] streamKeys, StreamEntryID[] streamIds) { - Entry[] streams = new Entry[streamKeys.length]; - for (int i = 0; i < streamKeys.length; i++) { - streams[i] = new SimpleEntry<>(streamKeys[i], streamIds[i]); - } - return (Entry[]) streams; - } - - private static StreamEntryID[] streamEntryIds(Long[] timestamps) { - StreamEntryID[] entryIds = new StreamEntryID[timestamps.length]; - for (int i = 0; i < timestamps.length; i++) { - entryIds[i] = new StreamEntryID(timestamps[i], 0L); - } - return entryIds; + public static List convertToStreamEntryIDs(List timestamps) { + return timestamps.stream() + .map(ts -> new StreamEntryID(ts, 0L)) + .collect(Collectors.toList()); } } diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/RedisStreamConsumer.java b/src/main/java/org/apache/flink/streaming/connectors/redis/RedisStreamConsumer.java index 5e6a7ea..f1ae6ab 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/redis/RedisStreamConsumer.java +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/RedisStreamConsumer.java @@ -18,16 +18,19 @@ package org.apache.flink.streaming.connectors.redis; import org.apache.flink.streaming.connectors.redis.config.StartupMode; - import redis.clients.jedis.Jedis; import redis.clients.jedis.StreamEntry; import redis.clients.jedis.StreamEntryID; +import redis.clients.jedis.params.XReadParams; +import java.util.Arrays; import java.util.List; import java.util.Map.Entry; import java.util.Properties; -/** @param */ +/** + * @param + */ public class RedisStreamConsumer extends AbstractRedisStreamConsumer { private final DataConverter dataConverter; @@ -36,24 +39,23 @@ public RedisStreamConsumer( Properties configProps, StartupMode startupMode, DataConverter dataConverter, - String... streamKeys) { - super(startupMode, streamKeys, configProps); - this.dataConverter = dataConverter; + String streamKey) { + this(configProps, startupMode, dataConverter, Arrays.asList(streamKey)); } public RedisStreamConsumer( + Properties configProps, + StartupMode startupMode, DataConverter dataConverter, - String[] streamKeys, - Long[] timestamps, - Properties configProps) { - super(streamKeys, timestamps, configProps); + List streamKeys) { + super(startupMode, streamKeys, configProps); this.dataConverter = dataConverter; } public RedisStreamConsumer( DataConverter dataConverter, - String[] streamKeys, - StreamEntryID[] streamIds, + List streamKeys, + List streamIds, Properties configProps) { super(streamKeys, streamIds, configProps); this.dataConverter = dataConverter; @@ -61,7 +63,8 @@ public RedisStreamConsumer( @Override protected List>> read(Jedis jedis) { - return jedis.xread(1, 0L, streamEntryIds); + // return jedis.xread(XReadParams.xReadParams().count(1).block(0), streamEntryIds); + return jedis.xread(XReadParams.xReadParams().count(1), streamEntryIds); } @Override diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/RedisStreamGroupConsumer.java b/src/main/java/org/apache/flink/streaming/connectors/redis/RedisStreamGroupConsumer.java index bfaa1a3..8f35b77 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/redis/RedisStreamGroupConsumer.java +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/RedisStreamGroupConsumer.java @@ -18,16 +18,19 @@ package org.apache.flink.streaming.connectors.redis; import org.apache.flink.streaming.connectors.redis.config.StartupMode; - import redis.clients.jedis.Jedis; import redis.clients.jedis.StreamEntry; import redis.clients.jedis.StreamEntryID; +import redis.clients.jedis.params.XReadGroupParams; +import java.util.Arrays; import java.util.List; import java.util.Map.Entry; import java.util.Properties; -/** @param */ +/** + * @param + */ public class RedisStreamGroupConsumer extends AbstractRedisStreamConsumer { private final String group; @@ -46,7 +49,7 @@ public RedisStreamGroupConsumer( consumerName, StartupMode.GROUP_OFFSETS, dataConverter, - new String[] {streamKey}, + Arrays.asList(streamKey), config); } @@ -55,7 +58,7 @@ public RedisStreamGroupConsumer( String consumerName, StartupMode startupMode, DataConverter dataConverter, - String[] streamKeys, + List streamKeys, Properties config) { super(startupMode, streamKeys, config); this.group = groupName; @@ -67,21 +70,8 @@ public RedisStreamGroupConsumer( String groupName, String consumerName, DataConverter dataConverter, - String[] streamKeys, - Long[] timestamps, - Properties config) { - super(streamKeys, timestamps, config); - this.group = groupName; - this.consumer = consumerName; - this.dataConverter = dataConverter; - } - - public RedisStreamGroupConsumer( - String groupName, - String consumerName, - DataConverter dataConverter, - String[] streamKeys, - StreamEntryID[] streamIds, + List streamKeys, + List streamIds, Properties config) { super(streamKeys, streamIds, config); this.group = groupName; @@ -91,7 +81,12 @@ public RedisStreamGroupConsumer( @Override protected List>> read(Jedis jedis) { - return jedis.xreadGroup(group, consumer, 1, 0L, true, streamEntryIds); + return jedis.xreadGroup( + group, + consumer, + // XReadGroupParams.xReadGroupParams().count(1).block(0).noAck(), + XReadGroupParams.xReadGroupParams().count(1).noAck(), + streamEntryIds); } @Override diff --git a/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisStreamDynamicSource.java b/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisStreamDynamicSource.java index 6676d82..03f9237 100644 --- a/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisStreamDynamicSource.java +++ b/src/main/java/org/apache/flink/streaming/connectors/redis/table/RedisStreamDynamicSource.java @@ -28,9 +28,9 @@ import org.apache.flink.table.connector.source.SourceFunctionProvider; import org.apache.flink.table.data.RowData; import org.apache.flink.util.Preconditions; - import redis.clients.jedis.StreamEntryID; +import java.util.Arrays; import java.util.Objects; import java.util.Optional; import java.util.Properties; @@ -291,16 +291,17 @@ protected AbstractRedisStreamConsumer createRedisConsumer() { groupName.get(), consumerName.get(), converter, - new String[] {streamKey}, - new Long[] {timestamp}, + Arrays.asList(streamKey), + RedisStreamGroupConsumer.convertToStreamEntryIDs( + Arrays.asList(timestamp)), config); case SPECIFIC_OFFSETS: return new RedisStreamGroupConsumer<>( groupName.get(), consumerName.get(), converter, - new String[] {streamKey}, - new StreamEntryID[] {streamEntryId}, + Arrays.asList(streamKey), + Arrays.asList(streamEntryId), config); default: return new RedisStreamGroupConsumer<>( @@ -308,19 +309,23 @@ protected AbstractRedisStreamConsumer createRedisConsumer() { consumerName.get(), startupMode, converter, - new String[] {streamKey}, + Arrays.asList(streamKey), config); } } else { switch (startupMode) { case TIMESTAMP: return new RedisStreamConsumer<>( - converter, new String[] {streamKey}, new Long[] {timestamp}, config); + converter, + Arrays.asList(streamKey), + AbstractRedisStreamConsumer.convertToStreamEntryIDs( + Arrays.asList(timestamp)), + config); case SPECIFIC_OFFSETS: return new RedisStreamConsumer<>( converter, - new String[] {streamKey}, - new StreamEntryID[] {streamEntryId}, + Arrays.asList(streamKey), + Arrays.asList(streamEntryId), config); default: return new RedisStreamConsumer<>(config, startupMode, converter, streamKey);