New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-3446] Fixes RedisIO non-prefix read operations #4656
Conversation
BaseReadFn to abstract general jedis operations. Separated key fetch using prefix and get by key into serparate DoFn.
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions. |
the fix is in place @jbonofre please help review |
We have turned on autoformatting of the codebase, which causes small conflicts across the board. You can probably safely rebase and just keep your changes. Like this:
Please ping me if you run into any difficulty. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -200,6 +200,7 @@ public void populateDisplayData(DisplayData.Builder builder) { | |||
|
|||
return input | |||
.apply(Create.of(keyPattern())) | |||
.apply(ParDo.of(new ReadKeywsWithPattern(connectionConfiguration()))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/ReadKeywsWithPattern/ReadKeysWithPattern
|
||
private final RedisConnectionConfiguration connectionConfiguration; | ||
private abstract static class BaseReadFn<T> extends DoFn<String, T> { | ||
protected final RedisConnectionConfiguration connectionConfiguration; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can be package private
|
||
private transient Jedis jedis; | ||
protected transient Jedis jedis; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can be package private
|
||
public ReadFn(RedisConnectionConfiguration connectionConfiguration) { | ||
public BaseReadFn(RedisConnectionConfiguration connectionConfiguration) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove public, in general it is a common Beam practice to restrict access as much as possible. You can use IntelliJ's analyze code to do this.
@@ -288,28 +297,33 @@ public void processElement(ProcessContext processContext) throws Exception { | |||
while (!finished) { | |||
ScanResult<String> scanResult = jedis.scan(cursor, scanParams); | |||
List<String> keys = scanResult.getResult(); | |||
|
|||
Pipeline pipeline = jedis.pipelined(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: What is the reason to remove pipelining in general, seems like if the approach of this PR is more composable, it would perform worse, won't it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The number of keys that need to be looked up in a given window or batch can vary. Ideally, we should have a configurable batch size and use MGET https://redis.io/commands/mget
if wanted to optimize further.
Pipelining an entire window or batch can cause memory spikes in Redis depending on the number of keys being looked up, for the time being, to simplify things I removed pipeline.
Closing this pr, opened a new one with fixes and rebased. #5841 |
URL: https://issues.apache.org/jira/browse/BEAM-3446