Skip to content

Commit

Permalink
Change xread/xreadGroup calls
Browse files Browse the repository at this point in the history
  • Loading branch information
sazzad16 committed May 19, 2022
1 parent d4e3caa commit 3cbf521
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,27 @@
package org.apache.flink.streaming.connectors.redis;

import org.apache.flink.streaming.connectors.redis.config.StartupMode;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;

import java.util.AbstractMap.SimpleEntry;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.stream.Collectors;

/** @param <T> */
/**
* @param <T>
*/
public abstract class AbstractRedisStreamConsumer<T> extends RedisConsumerBase<T> {

protected final Entry<String, StreamEntryID>[] streamEntryIds;
private final Map<String, Integer> keyIndex = new HashMap<>();
protected final Map<String, StreamEntryID> streamEntryIds;

public AbstractRedisStreamConsumer(
StartupMode startupMode, String[] streamKeys, Properties configProps) {
super(Arrays.asList(streamKeys), configProps);
StartupMode startupMode, List<String> streamKeys, Properties configProps) {
super(streamKeys, configProps);
final StreamEntryID streamEntryID;
switch (startupMode) {
case EARLIEST:
Expand All @@ -60,24 +59,17 @@ public AbstractRedisStreamConsumer(
throw new IllegalStateException();
}
this.streamEntryIds = prepareStreamEntryIds(streamKeys, streamEntryID);
initializeKeyIndex();
}

public AbstractRedisStreamConsumer(
String[] streamKeys, Long[] timestamps, Properties configProps) {
this(streamKeys, streamEntryIds(timestamps), configProps);
}

public AbstractRedisStreamConsumer(
String[] streamKeys, StreamEntryID[] streamIds, Properties configProps) {
List<String> streamKeys, List<StreamEntryID> streamIds, Properties configProps) {
this(prepareStreamEntryIds(streamKeys, streamIds), configProps);
}

private AbstractRedisStreamConsumer(
Entry<String, StreamEntryID>[] streamIds, Properties configProps) {
Map<String, StreamEntryID> streamIds, Properties configProps) {
super(null, configProps);
this.streamEntryIds = streamIds;
initializeKeyIndex();
}

@Override
Expand All @@ -104,44 +96,32 @@ protected abstract void collect(
SourceContext<T> sourceContext, String streamKey, StreamEntry streamEntry);

protected void updateIdForKey(String streamKey, StreamEntryID streamEntryID) {
int index = keyIndex.get(streamKey);
if (this.streamEntryIds[index].getValue().toString().equals(">")) {
if (this.streamEntryIds.get(streamKey).toString().equals(">")) {
// skip
} else {
this.streamEntryIds[index].setValue(streamEntryID);
this.streamEntryIds.put(streamKey, streamEntryID);
}
}

private void initializeKeyIndex() {
int index = 0;
for (Entry<String, StreamEntryID> streamEntryId : streamEntryIds) {
keyIndex.put(streamEntryId.getKey(), index++);
}
private static Map<String, StreamEntryID> prepareStreamEntryIds(
List<String> streamKeys, StreamEntryID streamId) {
Map<String, StreamEntryID> streams = new LinkedHashMap<>(streamKeys.size());
streamKeys.forEach(streamKey -> streams.put(streamKey, streamId));
return streams;
}

private static Entry<String, StreamEntryID>[] prepareStreamEntryIds(
String[] streamKeys, StreamEntryID streamId) {
Entry<?, ?>[] streams = new Entry<?, ?>[streamKeys.length];
for (int i = 0; i < streamKeys.length; i++) {
streams[i] = new SimpleEntry<>(streamKeys[i], streamId);
private static Map<String, StreamEntryID> prepareStreamEntryIds(
List<String> streamKeys, List<StreamEntryID> streamIds) {
Map<String, StreamEntryID> streams = new LinkedHashMap<>(streamKeys.size());
for (int i = 0; i < streamKeys.size(); i++) {
streams.put(streamKeys.get(i), streamIds.get(i));
}
return (Entry<String, StreamEntryID>[]) streams;
return streams;
}

private static Entry<String, StreamEntryID>[] prepareStreamEntryIds(
String[] streamKeys, StreamEntryID[] streamIds) {
Entry<?, ?>[] streams = new Entry<?, ?>[streamKeys.length];
for (int i = 0; i < streamKeys.length; i++) {
streams[i] = new SimpleEntry<>(streamKeys[i], streamIds[i]);
}
return (Entry<String, StreamEntryID>[]) streams;
}

private static StreamEntryID[] streamEntryIds(Long[] timestamps) {
StreamEntryID[] entryIds = new StreamEntryID[timestamps.length];
for (int i = 0; i < timestamps.length; i++) {
entryIds[i] = new StreamEntryID(timestamps[i], 0L);
}
return entryIds;
public static List<StreamEntryID> convertToStreamEntryIDs(List<Long> timestamps) {
return timestamps.stream()
.map(ts -> new StreamEntryID(ts, 0L))
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@
package org.apache.flink.streaming.connectors.redis;

import org.apache.flink.streaming.connectors.redis.config.StartupMode;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.params.XReadParams;

import java.util.Arrays;
import java.util.List;
import java.util.Map.Entry;
import java.util.Properties;

/** @param <T> */
/**
* @param <T>
*/
public class RedisStreamConsumer<T> extends AbstractRedisStreamConsumer<T> {

private final DataConverter<T> dataConverter;
Expand All @@ -36,32 +39,32 @@ public RedisStreamConsumer(
Properties configProps,
StartupMode startupMode,
DataConverter<T> dataConverter,
String... streamKeys) {
super(startupMode, streamKeys, configProps);
this.dataConverter = dataConverter;
String streamKey) {
this(configProps, startupMode, dataConverter, Arrays.asList(streamKey));
}

public RedisStreamConsumer(
Properties configProps,
StartupMode startupMode,
DataConverter<T> dataConverter,
String[] streamKeys,
Long[] timestamps,
Properties configProps) {
super(streamKeys, timestamps, configProps);
List<String> streamKeys) {
super(startupMode, streamKeys, configProps);
this.dataConverter = dataConverter;
}

public RedisStreamConsumer(
DataConverter<T> dataConverter,
String[] streamKeys,
StreamEntryID[] streamIds,
List<String> streamKeys,
List<StreamEntryID> streamIds,
Properties configProps) {
super(streamKeys, streamIds, configProps);
this.dataConverter = dataConverter;
}

@Override
protected List<Entry<String, List<StreamEntry>>> read(Jedis jedis) {
return jedis.xread(1, 0L, streamEntryIds);
// return jedis.xread(XReadParams.xReadParams().count(1).block(0), streamEntryIds);
return jedis.xread(XReadParams.xReadParams().count(1), streamEntryIds);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@
package org.apache.flink.streaming.connectors.redis;

import org.apache.flink.streaming.connectors.redis.config.StartupMode;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.params.XReadGroupParams;

import java.util.Arrays;
import java.util.List;
import java.util.Map.Entry;
import java.util.Properties;

/** @param <T> */
/**
* @param <T>
*/
public class RedisStreamGroupConsumer<T> extends AbstractRedisStreamConsumer<T> {

private final String group;
Expand All @@ -46,7 +49,7 @@ public RedisStreamGroupConsumer(
consumerName,
StartupMode.GROUP_OFFSETS,
dataConverter,
new String[] {streamKey},
Arrays.asList(streamKey),
config);
}

Expand All @@ -55,7 +58,7 @@ public RedisStreamGroupConsumer(
String consumerName,
StartupMode startupMode,
DataConverter<T> dataConverter,
String[] streamKeys,
List<String> streamKeys,
Properties config) {
super(startupMode, streamKeys, config);
this.group = groupName;
Expand All @@ -67,21 +70,8 @@ public RedisStreamGroupConsumer(
String groupName,
String consumerName,
DataConverter<T> dataConverter,
String[] streamKeys,
Long[] timestamps,
Properties config) {
super(streamKeys, timestamps, config);
this.group = groupName;
this.consumer = consumerName;
this.dataConverter = dataConverter;
}

public RedisStreamGroupConsumer(
String groupName,
String consumerName,
DataConverter<T> dataConverter,
String[] streamKeys,
StreamEntryID[] streamIds,
List<String> streamKeys,
List<StreamEntryID> streamIds,
Properties config) {
super(streamKeys, streamIds, config);
this.group = groupName;
Expand All @@ -91,7 +81,12 @@ public RedisStreamGroupConsumer(

@Override
protected List<Entry<String, List<StreamEntry>>> read(Jedis jedis) {
return jedis.xreadGroup(group, consumer, 1, 0L, true, streamEntryIds);
return jedis.xreadGroup(
group,
consumer,
// XReadGroupParams.xReadGroupParams().count(1).block(0).noAck(),
XReadGroupParams.xReadGroupParams().count(1).noAck(),
streamEntryIds);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;

import redis.clients.jedis.StreamEntryID;

import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
Expand Down Expand Up @@ -291,36 +291,41 @@ protected AbstractRedisStreamConsumer<RowData> createRedisConsumer() {
groupName.get(),
consumerName.get(),
converter,
new String[] {streamKey},
new Long[] {timestamp},
Arrays.asList(streamKey),
RedisStreamGroupConsumer.convertToStreamEntryIDs(
Arrays.asList(timestamp)),
config);
case SPECIFIC_OFFSETS:
return new RedisStreamGroupConsumer<>(
groupName.get(),
consumerName.get(),
converter,
new String[] {streamKey},
new StreamEntryID[] {streamEntryId},
Arrays.asList(streamKey),
Arrays.asList(streamEntryId),
config);
default:
return new RedisStreamGroupConsumer<>(
groupName.get(),
consumerName.get(),
startupMode,
converter,
new String[] {streamKey},
Arrays.asList(streamKey),
config);
}
} else {
switch (startupMode) {
case TIMESTAMP:
return new RedisStreamConsumer<>(
converter, new String[] {streamKey}, new Long[] {timestamp}, config);
converter,
Arrays.asList(streamKey),
AbstractRedisStreamConsumer.convertToStreamEntryIDs(
Arrays.asList(timestamp)),
config);
case SPECIFIC_OFFSETS:
return new RedisStreamConsumer<>(
converter,
new String[] {streamKey},
new StreamEntryID[] {streamEntryId},
Arrays.asList(streamKey),
Arrays.asList(streamEntryId),
config);
default:
return new RedisStreamConsumer<>(config, startupMode, converter, streamKey);
Expand Down

0 comments on commit 3cbf521

Please sign in to comment.