Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-3446] Fixes RedisIO non-prefix read operations #4656

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ public PCollection<KV<String, String>> expand(PBegin input) {

return input
.apply(Create.of(keyPattern()))
.apply(ParDo.of(new ReadKeywsWithPattern(connectionConfiguration())))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/ReadKeywsWithPattern/ReadKeysWithPattern

.apply(RedisIO.readAll().withConnectionConfiguration(connectionConfiguration()));
}

Expand Down Expand Up @@ -260,16 +261,12 @@ public PCollection<KV<String, String>> expand(PCollection<String> input) {

}

/**
* A {@link DoFn} requesting Redis server to get key/value pairs.
*/
private static class ReadFn extends DoFn<String, KV<String, String>> {

private final RedisConnectionConfiguration connectionConfiguration;
private abstract static class BaseReadFn<T> extends DoFn<String, T> {
protected final RedisConnectionConfiguration connectionConfiguration;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be package private


private transient Jedis jedis;
protected transient Jedis jedis;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be package private


public ReadFn(RedisConnectionConfiguration connectionConfiguration) {
public BaseReadFn(RedisConnectionConfiguration connectionConfiguration) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove public, in general it is a common Beam practice to restrict access as much as possible. You can use IntelliJ's analyze code to do this.

this.connectionConfiguration = connectionConfiguration;
}

Expand All @@ -278,6 +275,18 @@ public void setup() {
jedis = connectionConfiguration.connect();
}

@Teardown
public void teardown() {
jedis.close();
}
}

private static class ReadKeywsWithPattern extends BaseReadFn<String> {

ReadKeywsWithPattern(RedisConnectionConfiguration connectionConfiguration) {
super(connectionConfiguration);
}

@ProcessElement
public void processElement(ProcessContext processContext) throws Exception {
ScanParams scanParams = new ScanParams();
Expand All @@ -288,28 +297,33 @@ public void processElement(ProcessContext processContext) throws Exception {
while (!finished) {
ScanResult<String> scanResult = jedis.scan(cursor, scanParams);
List<String> keys = scanResult.getResult();

Pipeline pipeline = jedis.pipelined();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: What is the reason to remove pipelining in general, seems like if the approach of this PR is more composable, it would perform worse, won't it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The number of keys that need to be looked up in a given window or batch can vary. Ideally, we should have a configurable batch size and use MGET https://redis.io/commands/mget if wanted to optimize further.
Pipelining an entire window or batch can cause memory spikes in Redis depending on the number of keys being looked up, for the time being, to simplify things I removed pipeline.

if (keys != null) {
for (String key : keys) {
pipeline.get(key);
}
List<Object> values = pipeline.syncAndReturnAll();
for (int i = 0; i < values.size(); i++) {
processContext.output(KV.of(keys.get(i), (String) values.get(i)));
}
for (String k : keys) {
processContext.output(k);
}

cursor = scanResult.getStringCursor();
if (cursor.equals("0")) {
finished = true;
}
}
}
}
/**
* A {@link DoFn} requesting Redis server to get key/value pairs.
*/
private static class ReadFn extends BaseReadFn<KV<String, String>> {

@Teardown
public void teardown() {
jedis.close();
ReadFn(RedisConnectionConfiguration connectionConfiguration) {
super(connectionConfiguration);
}

@ProcessElement
public void processElement(ProcessContext processContext) throws Exception {
String key = processContext.element();

String value = jedis.get(key);
if (value != null) {
processContext.output(KV.of(key, value));
}
}

}
Expand Down