New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

supports for all streams operations #1040

Merged
merged 38 commits into from Nov 1, 2018

Conversation

Projects
None yet
4 participants
@RoeyPrat
Collaborator

RoeyPrat commented Oct 17, 2018

@andymccurdy @nicois
this should give us support of all streams commands in redis 5.0
based on nicois work in #920, and commits from @itamarhaber.
also fixed some issues with tests, and changed CI to run on newer redis version.

@andymccurdy

This comment has been minimized.

Owner

andymccurdy commented Oct 18, 2018

Great, thanks. I’m visiting family for a few days and will be back on Wednesday. I’ll plan on reviewing/merging this then.

One observation already: looks like redis 5.0 is fully released now. Can we update the Travis config to pull the full release version and update the skip test decorators back to 5.0?

@itamarhaber

This comment has been minimized.

Collaborator

itamarhaber commented Oct 18, 2018

@andymccurdy enjoy the visit :)

@RoeyPrat

This comment has been minimized.

Collaborator

RoeyPrat commented Oct 20, 2018

@andymccurdy updated travis config to redis5.
this PR is not running travis by automation, but you can see the run here

@andymccurdy

I've made a bunch of inline notes. Most are python 2/3 compatibility and Token-encoding literal string issues. I tried to provide suggested changes for all of those.

There's a few non-comment strings that are using double quotes instead of single quotes. Let's try to be consistent with the rest of the code base and use single quotes for non-comment strings.

Also I didn't see a response callback or any tests for the XPENDING command.

Show resolved Hide resolved tox.ini Outdated
@@ -1675,6 +1749,360 @@ def sunionstore(self, dest, keys, *args):
args = list_or_args(keys, args)
return self.execute_command('SUNIONSTORE', dest, *args)
# STREAMS COMMANDS
def xadd(self, _name, id='*', maxlen=None, approximate=True, **kwargs):

This comment has been minimized.

@andymccurdy

andymccurdy Oct 25, 2018

Owner

I think the key/value pairs of the entry should be specified as a single dict arg rather than kwargs. This is one of the things I regret and intend to change about ZADD. I've seen a number of bug reports with ZADD where a user wanted to use an entry name that conflicted with one of the command arguments. This is also the reason redis-py hasn't included the the newer ZADD args like nx/xx/etc. yet -- it would be backwards incompatible to anyone with a sorted set including an element named nx.

This comment has been minimized.

@RoeyPrat

RoeyPrat Oct 28, 2018

Collaborator

ok. changing it to "fields" and setting it as a must arg

"""
pieces = []
if maxlen is not None:
if not isinstance(maxlen, int) or maxlen < 1:

This comment has been minimized.

@andymccurdy

andymccurdy Oct 25, 2018

Owner
Suggested change Beta
if not isinstance(maxlen, int) or maxlen < 1:
if not isinstance(maxlen, (int, long)) or maxlen < 1:

for python 2/3 compat we need isinstance to check both int and long.

This comment has been minimized.

@RoeyPrat

RoeyPrat Oct 28, 2018

Collaborator

ok

pieces.append(str(maxlen))
pieces.append(id)
for pair in iteritems(kwargs):
pieces.append(pair[0])

This comment has been minimized.

@andymccurdy

andymccurdy Oct 25, 2018

Owner

pieces.extend(pair) is slightly more efficient.

This comment has been minimized.

@RoeyPrat

RoeyPrat Oct 28, 2018

Collaborator

ok

"""
pieces = [start, finish]
if count is not None:
if not isinstance(count, int) or count < 1:

This comment has been minimized.

@andymccurdy

andymccurdy Oct 25, 2018

Owner
Suggested change Beta
if not isinstance(count, int) or count < 1:
if not isinstance(count, (int, long)) or count < 1:

py 2/3 compat

This comment has been minimized.

@RoeyPrat

RoeyPrat Oct 28, 2018

Collaborator

ok

if not isinstance(param, int):
raise RedisError("XCLAIM {} must be an integer"
.format(param_name))
pieces.append(str(param))

This comment has been minimized.

@andymccurdy

andymccurdy Oct 25, 2018

Owner

We need to supply the actual int values here in addition to the option names.

This comment has been minimized.

@RoeyPrat

RoeyPrat Oct 28, 2018

Collaborator

actually that is the int value. what i forgot to add was the param name...
renaming param to param_value, and adding param_name to pieces.

@@ -232,6 +232,61 @@ def int_or_none(response):
return int(response)
def stream_key(response):

This comment has been minimized.

@andymccurdy

andymccurdy Oct 25, 2018

Owner

I don't see why we need this callback as it's only returning the response. I think it can be removed and XADD can be omitted from the callback list.

This comment has been minimized.

@RoeyPrat

RoeyPrat Oct 28, 2018

Collaborator

XADD removing unnecessary stream_key parse function

def stream_list(response):
if response is None:
return None
result = []

This comment has been minimized.

@andymccurdy

andymccurdy Oct 25, 2018

Owner

The pairs_to_dict callback already does the dict construction that we need. This can simply be:

return [(r[0], pairs_to_dict(r[1])) for r in response]

This comment has been minimized.

@RoeyPrat

RoeyPrat Oct 28, 2018

Collaborator

ok. stream_list should reuse pairs_to_dict

return None
result = dict()
for r in response:
result[r[0].decode('utf-8')] = stream_list(r[1])

This comment has been minimized.

@andymccurdy

andymccurdy Oct 25, 2018

Owner
Suggested change Beta
result[r[0].decode('utf-8')] = stream_list(r[1])
result[nativestr(r[0])] = stream_list(r[1])

The response type can vary based on the decode_responses flag. Use nativestr to only decode if we're dealing with bytes on py3 or str on py2

This comment has been minimized.

@RoeyPrat

RoeyPrat Oct 28, 2018

Collaborator

ok.
multi_stream_list should use nativestr for compatibility

return stream_list(response)
def multi_stream_list(response):

This comment has been minimized.

@andymccurdy

andymccurdy Oct 25, 2018

Owner

There are several options for the return structure of multi_stream_list:

# a list of of 2-item stream/messages lists
[
    [
        'stream-1',
        [
            [
                '1540456647578-0',
                {'key-1': 'value-1', 'key-2': 'value-2'}
            ],
            [
                '1540456647578-1',
                {'key-1': 'value-1', 'key-2': 'value-2'}
            ],
        ],
    ],
    [
        'stream-2',
        [
            [
                '1540456647578-0',
                {'key-1': 'value-1', 'key-2': 'value-2'}
            ],
            [
                '1540456647578-1',
                {'key-1': 'value-1', 'key-2': 'value-2'}
            ],
        ],
    ],
]

# this allows all list-based iterating....
for stream, messages in r.xread(streams={'stream-1': 0, 'stream-2': 0}):
    for message_id, message_data in messages:
        ....


# alternatively, the existing code represents this return value
# as a dict of streams to lists of message id: dict attr pairs
{
    'stream-1': [
        [
            '1540456647578-0',
            {'key-1': 'value-1', 'key-2': 'value-2'}
        ],
        [
            '1540456647578-1',
            {'key-1': 'value-1', 'key-2': 'value-2'}
        ],
    ],
    'stream-2': [
        [
            '1540456662589-0',
            {'key-1': 'value-1', 'key-2': 'value-2'}
        ],
        [
            '1540456662589-1',
            {'key-1': 'value-1', 'key-2': 'value-2'}
        ],
    ]
}

# iterating here requires the extra .items() call
for stream, messages in r.xread(streams={'stream-1': 0, 'stream-2': 0}).items()
    for message_id, message_data in messages:
        ....
# given redis returns null when there are no messages to read,
# you can't reliably index directly into the dictionary, which seems to
# take away much of the value of the dictionary
# this blows up with a TypeError if there's no message to read
# since xread()'s return value will be None.
r.xread(streams={'stream-1': 99999999999})['stream-1']

I think the first option might be the best choice here. I don't see any value from making the return value a dict of {stream: messages} pairs when you can't reliably index into the dictionary. It seems like this just forces users to type more. Am I missing something?

This comment has been minimized.

@RoeyPrat

RoeyPrat Oct 28, 2018

Collaborator

ok.
allow list based iterating on XREADGROUP results

return [[nativestr(r[0]), stream_list(r[1])] for r in response]
def parse_xpending(response):

This comment has been minimized.

@andymccurdy

andymccurdy Oct 29, 2018

Owner

To be more explicit, response callbacks can accept **options. These come from passing **kwargs to execute_command. In the xtrim_range function, you could pass a parse_detail=True to execute_command, then here say something like:

if options.get('parse_detail', False):
    return parse_range_xpending(response)
return self.execute_command('XPENDING', name, groupname)
def xpending_range(self, name, groupname, start='-', end='+', count=-1,
consumername=None):

This comment has been minimized.

@coleifer

coleifer Nov 1, 2018

The count=-1 is something I had been using, but it is not supported going forward I don't believe. See:

antirez/redis#5459 (comment)

@coleifer

This comment has been minimized.

coleifer commented Nov 1, 2018

Just FYI, I maintain a project called walrus that implements (among other things) a subclass of the redis.Redis object and has support for the streams APIs.

Many of the signatures and behaviors are the same as in this PR, but I thought I'd mention a few slight differences for what they're worth.

XREAD / XREADGROUP are kind of awkward...they supports multiple streams which can all have different IDs. So the natural value from the user is a dict of stream -> id as you have. What I found, though, is that I usually want to just read from a stream at the max ID (if XREAD) or from the last-read-message (if XREADGROUP), or maybe from a couple streams at the max ID, so I allow the following values, and then normalize them to a dict of stream -> id:

  • stream name as a string -> becomes {stream: '$'} (or > for xreadgroup)
  • list of stream names -> becomes {stream1: '$', stream2: '$'} (or > for xreadgroup)
  • stream to explicit id -> no transformation needed

Additionally, the response from the XREADs is kind of weird, because it returns up to count rows from each stream passed in...The response is then read as a dict of {stream: [list of messages], other_stream: [list of other stream messages]}... which makes sense (and is what happens by default in my implementation). But it might also be nice if a single stream is passed-in to just skip the dict business and return the list? IDK...special cases and all that. I've implemented a high-level TimeSeries class that just interleaves the messages from multiple streams to form a single list with a total ordering and that's very handy, but not suitable for a low-level API like this.

Changes look nice, just thought I'd join the mix.

raise RedisError('XREAD streams must be a non empty dict')
pieces.append(Token.get_token('STREAMS'))
pieces.extend(streams.keys())
pieces.extend(streams.values())

This comment has been minimized.

@coleifer

coleifer Nov 1, 2018

Is it guaranteed that these will line up in earlier Python? Since dicts aren't technically ordered (at least until 3.7) it might be safer to be explicit here.

keys, values = zip(*streams.items())

This comment has been minimized.

@andymccurdy

andymccurdy Nov 1, 2018

Owner

See https://docs.python.org/2/library/stdtypes.html#dict.items Specifically:

"If items(), keys(), values(), iteritems(), iterkeys(), and itervalues() are called with no intervening modifications to the dictionary, the lists will directly correspond."

This comment has been minimized.

@andymccurdy

andymccurdy Nov 1, 2018

Owner

but ya, streams.items() seems better. I'll fix. thx.

entry in the PEL even if certain specified IDs are not already in the
PEL assigned to a different client.
justid: optional boolean, false by default. Return just an array of IDs
of messages successfully claimed, without returning the actual message

This comment has been minimized.

@coleifer

coleifer Nov 1, 2018

The redis docs mention some of these parameters are more targeted towards internal usage:

The command has multiple options, however most are mainly for internal use in order to transfer the effects of XCLAIM or other commands to the AOF file and to propagate the same effects to the slaves, and are unlikely to be useful to normal users:

Do you think it's worthwhile exposing more than just name/group/consumer/min idle/id list?

@andymccurdy

This comment has been minimized.

Owner

andymccurdy commented Nov 1, 2018

XREAD / XREADGROUP are kind of awkward...they supports multiple streams which can all have different IDs....

Currently I'm fine just support the single, explicit dict form.

Additionally, the response from the XREADs is kind of weird, because it returns up to count rows from each stream passed in...The response is then read as a dict of {stream: [list of messages], other_stream: [list of other stream messages]}... which makes sense (and is what happens by default in my implementation). But it might also be nice if a single stream is passed-in to just skip the dict business and return the list? IDK...special cases and all that. I've implemented a high-level TimeSeries class that just interleaves the messages from multiple streams to form a single list with a total ordering and that's very handy, but not suitable for a low-level API like this.

@RoeyPrat initially implemented this has a dict with stream names as keys and a list of messages as the value. We talked a bit and found that having a list of (stream, message_list) pairs made for easier iteration. In practice, I can't see anyone using any of the dict functions. You (almost always) want to consume all messages returned to you, especially with XREADGROUP. The new implementation allows for:

for stream, messages in r.xread(...):
    for message_id, fields in messages:
        ...

@andymccurdy andymccurdy merged commit a32a8e6 into andymccurdy:master Nov 1, 2018

1 check passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details
@coleifer

This comment has been minimized.

coleifer commented Nov 1, 2018

We talked a bit and found that having a list of (stream, message_list) pairs made for easier iteration.

Brilliant, I must've misread the code. Probably no point in trying to interleave the messages to provide a total ordering, right?

Looking forward to getting these changes integrated into ol' walrus.

@andymccurdy

This comment has been minimized.

Owner

andymccurdy commented Nov 1, 2018

@coleifer It's early and I haven't had coffee yet. What do you mean by "interleaving the messages to provide a total ordering"?

@itamarhaber

This comment has been minimized.

Collaborator

itamarhaber commented Nov 1, 2018

I ass-u-me @coleifer means "order the messages by their ids"?

@itamarhaber

This comment has been minimized.

Collaborator

itamarhaber commented Nov 1, 2018

BTW congrats on this merge - hoping to see more action in this repo ;)

@coleifer

This comment has been minimized.

coleifer commented Nov 1, 2018

Yeap to both

@andymccurdy

This comment has been minimized.

Owner

andymccurdy commented Nov 1, 2018

@itamarhaber @RoeyPrat @coleifer

The following commands all return representations of stream messages: XCLAIM, XINFO STREAM (first-entry, last-entry), XRANGE, XREVRANGE, XREAD and XREADGROUP. Currently these messages are represented as a two item tuple in the form of (message_id, dict_of_fields). The stream name that contains the message is somewhere outside the message, either "higher up" in the data structure, or inferred based on the argument the user supplied when calling the command.

Another idea would be to instead represent each message as a dict in the following format:

{
    'stream': 'stream-name',
    'id': '1234-0',
    'fields': {'field-one', 'val-1'}
}

I think this has three benefits:

  1. The dict representation is more explicit. It's obvious what the fields mean as opposed to the current implementation where you have to remember that message[0] is the message_id and message[1] is is the field dict.

  2. Having the stream name within the message itself makes things easier to debug in user processes. e.g., a worker process needs only to log the message to provide enough debug information about the message, whereas currently it would need to separately also log the stream name.

  3. It makes iterating over XREAD and XREADGROUP responses easier. With this the user doesn't need to perform two loops (currently the first loop iterates over (stream_name, list_of_messages) pairs and the second loop iterates over (message_id, fields) pairs. Instead, all the information they need is provided in each message, so these commands would simply return a list of message dicts.

Thoughts?

@coleifer

This comment has been minimized.

coleifer commented Nov 1, 2018

I wondered about similar things...Redis return values can be awkward at times. My sense is that, since the message ID is discrete from any ID key/value within the message body, it makes sense to treat the message/body as a 2-tuple. It is also close to what the protocol is doing, so it should be easy to understand if you're coming from the protocol docs to the python client. Similarly with the list of (stream, message-list) for read responses. I get the sense that redis-py is first-and-foremost a straight up redis client, so any "interpretation" beyond basic translation into python types is left to the application developer (?). That's not to say you wouldn't add a higher-level Stream / whatever type -- similar to how pub/sub is handled -- but having the x____ commands stick close to the protocol seems good. People can always manipulate the data how they want, since nothing is lost.

groupname: name of the consumer group.
id: ID of the last item in the stream to consider already delivered.
"""
return self.execute_command('XGROUP CREATE', name, groupname, id)

This comment has been minimized.

@coleifer

coleifer Nov 2, 2018

There's a new option to make the streams, "MKSTREAM".

Walrus impl for comparison:

    def xgroup_create(self, key, group, id='$', mkstream=False):
        """
        Create a consumer group.

        :param key: stream key -- must exist before creating a group if
            mkstream is ``False`` (default).
        :param group: consumer group name
        :param id: set the id of the last-received-message
        :param mkstream: create the stream automatically
        """
        cmd = ['XGROUP', 'CREATE', key, group, id]
        if mkstream:
            cmd.append('MKSTREAM')
        return self.execute_command(*cmd) == b'OK'

This comment has been minimized.

@andymccurdy

andymccurdy Nov 2, 2018

Owner

Is this something in beta? I see no mention of it on https://redis.io/commands/xgroup

This comment has been minimized.

This comment has been minimized.

@andymccurdy

andymccurdy Nov 2, 2018

Owner

added the MKSTREAM option in ff3bbdf. Thanks for the heads up.

coleifer added a commit to coleifer/walrus that referenced this pull request Nov 2, 2018

Use upstream redis-py streams and zpop* APIs.
andymccurdy/redis-py#1040 contains changes to add support for the
streams APIs, making it no longer necessary to maintain a separate
implementation. The high-level container-type APIs have not changed
significantly, however:

When reading from the context of a consumer group, the return value is
no longer a dict of {stream: [messages]}, but is rather a list of
[(stream, [messages]), ...].
@coleifer

This comment has been minimized.

coleifer commented Nov 2, 2018

Thanks for this excellent patch. I removed all the overrides from my own lib here:
ef25195a653cdd862fc64123e9686ea1480362ac

hell yeaaa: 7 files changed, 153 insertions(+), 653 deletions(-)

@coleifer

This comment has been minimized.

coleifer commented Nov 2, 2018

Just a heads-up this command is currently in unstable and I'd assume it'll make its way into the next 5.x release: XSETID <stream> <id> for setting the max-id for a stream outside the context of a consumer group.

Note that the comment regarding the command arguments is actually incorrect. I've created a ticket for this here: antirez/redis#5519

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment