Skip to content

Commit

Permalink
Implement XREAD (#147)
Browse files Browse the repository at this point in the history
  • Loading branch information
cunla committed May 8, 2023
1 parent 97674e2 commit f3fe513
Show file tree
Hide file tree
Showing 21 changed files with 158 additions and 91 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.10"
python-version: "3.11"
- name: Install dependencies
env:
PYTHON_KEYRING_BACKEND: keyring.backends.null.Keyring
Expand Down
8 changes: 4 additions & 4 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
- uses: actions/setup-python@v4
with:
cache-dependency-path: poetry.lock
python-version: "3.10"
python-version: "3.11"
- name: Install dependencies
env:
PYTHON_KEYRING_BACKEND: keyring.backends.null.Keyring
Expand Down Expand Up @@ -51,15 +51,15 @@ jobs:
max-parallel: 8
fail-fast: false
matrix:
redis-image: [ "redis:6.2.10", "redis:7.0.7" ]
redis-image: [ "redis:6.2.12", "redis:7.0.11" ]
python-version: [ "3.7", "3.8", "3.10", "3.11" ]
redis-py: [ "4.3.6", "4.4.4", "4.5.4" ]
include:
- python-version: "3.10"
- python-version: "3.11"
redis-image: "redis:6.2.10"
redis-py: "4.5.4"
lupa: true
- python-version: "3.10"
- python-version: "3.11"
redis-image: "redis/redis-stack:7.0.6-RC3"
redis-py: "4.5.4"
lupa: true
Expand Down
18 changes: 18 additions & 0 deletions docs/about/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,24 @@ description: Change log of all fakeredis releases

## Next release

## v2.12.0

### 🚀 Features
- Implement `XREAD` #147

## v2.11.2

### 🧰 Bug Fixes

- Unique FakeServer when no connection params are provided (#142)

## v2.11.1

### 🧰 Maintenance

- Minor fixes supporting multiple connections
- Update documentation

## v2.11.0

### 🚀 Features
Expand Down
8 changes: 4 additions & 4 deletions docs/redis-commands/Redis.md
Original file line number Diff line number Diff line change
Expand Up @@ -1485,6 +1485,10 @@ Return the number of messages in a stream.

Returns the messages from a stream within a range of IDs.

### [XREAD](https://redis.io/commands/xread/)

Returns messages from multiple streams with IDs greater than the ones requested. Blocks until a message is available otherwise.

### [XREVRANGE](https://redis.io/commands/xrevrange/)

Returns the messages from a stream within a range of IDs in reverse order.
Expand Down Expand Up @@ -1565,10 +1569,6 @@ Returns information about a stream.

Returns the information and entries from a stream consumer group's pending entries list.

#### [XREAD](https://redis.io/commands/xread/) <small>(not implemented)</small>

Returns messages from multiple streams with IDs greater than the ones requested. Blocks until a message is available otherwise.

#### [XREADGROUP](https://redis.io/commands/xreadgroup/) <small>(not implemented)</small>

Returns new or historical messages from a stream for a consumer in agroup. Blocks until a message is available otherwise.
Expand Down
4 changes: 2 additions & 2 deletions docs/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
mkdocs==1.4.2
mkdocs-material==9.1.8
mkdocs==1.4.3
mkdocs-material==9.1.9
2 changes: 1 addition & 1 deletion fakeredis/_command_args_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def _parse_params(
while i < len(actual_args):
found = False
for key in args_info:
if null_terminate(actual_args[i]).lower() == key:
if null_terminate(actual_args[i]) == key:
arg_position, _ = args_info[key]
results[arg_position], parsed = _parse_params(key, i, actual_args)
i += parsed
Expand Down
13 changes: 10 additions & 3 deletions fakeredis/_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ def parse_id(id_str: str):
return timestamp, sequence

@classmethod
def decode(cls, value):
def decode(cls, value, exclusive=False):
if value == b'-':
return cls(BeforeAny(), True)
elif value == b'+':
return cls(AfterAny(), True)
elif value[:1] == b'(':
return cls(cls.parse_id(value[1:]), True)
return cls(cls.parse_id(value), False)
return cls(cls.parse_id(value), exclusive)


class XStream:
Expand Down Expand Up @@ -88,10 +88,14 @@ def find_index(self, id_str: str) -> Tuple[int, bool]:
ind = bisect.bisect_left(list(map(lambda x: x[0], self._values)), ts_seq)
return ind, self._values[ind][0] == ts_seq

@staticmethod
def _encode_id(record):
return f'{record[0][0]}-{record[0][1]}'.encode()

@staticmethod
def _format_record(record):
results = list(record[1:][0])
return [f'{record[0][0]}-{record[0][1]}'.encode(), results]
return [XStream._encode_id(record), results]

def trim(self,
maxlen: Optional[int] = None,
Expand Down Expand Up @@ -125,3 +129,6 @@ def match(record):
if reverse:
return list(reversed(tuple(matches)))
return list(matches)

def last_item_key(self):
XStream._encode_id(self._values[-1])
1 change: 0 additions & 1 deletion fakeredis/commands_mixins/bitmap_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@


class BitmapCommandsMixin:
# BITMAP commands
# TODO: bitfield, bitfield_ro, bitpos
@staticmethod
def _bytes_as_bin_string(value):
Expand Down
4 changes: 2 additions & 2 deletions fakeredis/commands_mixins/connection_mixin.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from fakeredis import _msgs as msgs
from fakeredis._commands import command, DbIndex
from fakeredis._helpers import SimpleError, OK, SimpleString
from fakeredis._commands import (command, DbIndex)
from fakeredis._helpers import (SimpleError, OK, SimpleString)

PONG = SimpleString(b'PONG')

Expand Down
2 changes: 1 addition & 1 deletion fakeredis/commands_mixins/generic_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from fakeredis._commands import (
command, Key, Int, DbIndex, BeforeAny, CommandItem, SortFloat,
delete_keys, key_value_type, )
from fakeredis._helpers import compile_pattern, SimpleError, OK, casematch
from fakeredis._helpers import (compile_pattern, SimpleError, OK, casematch)
from fakeredis._zset import ZSet


Expand Down
4 changes: 2 additions & 2 deletions fakeredis/commands_mixins/hash_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
import math

from fakeredis import _msgs as msgs
from fakeredis._commands import command, Key, Hash, Int, Float
from fakeredis._helpers import SimpleError, OK
from fakeredis._commands import (command, Key, Hash, Int, Float)
from fakeredis._helpers import (SimpleError, OK)


class HashCommandsMixin:
Expand Down
11 changes: 3 additions & 8 deletions fakeredis/commands_mixins/list_mixin.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
import functools

from fakeredis import _msgs as msgs
from fakeredis._commands import (
Key, command, Int, CommandItem, Timeout, fix_range)
from fakeredis._helpers import (
OK, SimpleError, SimpleString, casematch)
from fakeredis._commands import (Key, command, Int, CommandItem, Timeout, fix_range)
from fakeredis._helpers import (OK, SimpleError, SimpleString, casematch)


def _list_pop(get_slice, key, *args):
"""Implements lpop and rpop.
`get_slice` must take a count and return a slice expression for the
range to pop.
`get_slice` must take a count and return a slice expression for the range to pop.
"""
# This implementation is somewhat contorted to match the odd
# behaviours described in https://github.com/redis/redis/issues/9680.
Expand All @@ -36,8 +33,6 @@ def _list_pop(get_slice, key, *args):


class ListCommandsMixin:
# List commands

def _bpop_pass(self, keys, op, first_pass):
for key in keys:
item = CommandItem(key, self._db, item=self._db.get(key), default=[])
Expand Down
1 change: 0 additions & 1 deletion fakeredis/commands_mixins/pubsub_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@


class PubSubCommandsMixin:
# Pubsub commands
def __init__(self, *args, **kwargs):
super(PubSubCommandsMixin, self).__init__(*args, **kwargs)
self._pubsub = 0 # Count of subscriptions
Expand Down
8 changes: 5 additions & 3 deletions fakeredis/commands_mixins/scripting_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import logging

from fakeredis import _msgs as msgs
from fakeredis._commands import command, Int
from fakeredis._helpers import SimpleError, SimpleString, null_terminate, OK, encode_command
from fakeredis._commands import (command, Int)
from fakeredis._helpers import (SimpleError, SimpleString, null_terminate, OK, encode_command)

LOGGER = logging.getLogger('fakeredis')
REDIS_LOG_LEVELS = {
Expand Down Expand Up @@ -84,6 +84,8 @@ def _convert_redis_result(self, lua_runtime, result):
]
return lua_runtime.table_from(converted)
elif isinstance(result, SimpleError):
if result.value.startswith('ERR wrong number of arguments'):
raise SimpleError(msgs.WRONG_ARGS_MSG7)
raise result
else:
raise RuntimeError("Unexpected return type from redis: {}".format(type(result)))
Expand Down Expand Up @@ -175,7 +177,7 @@ def eval(self, script, numkeys, *keys_and_args):
try:
result = lua_runtime.execute(script)
except SimpleError as ex:
if self.version == 6:
if self.version <= 6:
raise SimpleError(msgs.SCRIPT_ERROR_MSG.format(sha1.decode(), ex))
raise SimpleError(ex.value)
except LuaError as ex:
Expand Down
6 changes: 3 additions & 3 deletions fakeredis/commands_mixins/server_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ def save(self):

@command(())
def time(self):
now_us = round(time.time() * 1000000)
now_s = now_us // 1000000
now_us %= 1000000
now_us = round(time.time() * 1_000_000)
now_s = now_us // 1_000_000
now_us %= 1_000_000
return [str(now_s).encode(), str(now_us).encode()]

@command((DbIndex, DbIndex))
Expand Down
45 changes: 42 additions & 3 deletions fakeredis/commands_mixins/streams_mixin.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import functools
from typing import List

import fakeredis._msgs as msgs
from fakeredis._command_args_parsing import extract_args
from fakeredis._commands import Key, command
from fakeredis._helpers import SimpleError
from fakeredis._commands import Key, command, CommandItem
from fakeredis._helpers import SimpleError, casematch
from fakeredis._stream import XStream, StreamRangeTest


class StreamsCommandsMixin:
@command(name="XADD", fixed=(Key(),), repeat=(bytes,), )
def xadd(self, key, *args):

(nomkstream, limit, maxlen, minid), left_args = extract_args(
args, ('nomkstream', '+limit', '~+maxlen', '~minid'), error_on_unexpected=False)
if nomkstream and key.value is None:
Expand Down Expand Up @@ -71,3 +73,40 @@ def xrange(self, key, _min, _max, *args):
def xrevrange(self, key, _min, _max, *args):
(count,), _ = extract_args(args, ('+count',))
return self._xrange(key, _max, _min, True, count)

def _xread(self, stream_start_id_list: List, count: int, first_pass: bool):
max_inf = StreamRangeTest.decode(b'+')
res = list()
for (item, start_id) in stream_start_id_list:
stream_results = self._xrange(item, start_id, max_inf, False, count)
if first_pass and (count is None or len(stream_results) < count):
raise SimpleError(msgs.WRONGTYPE_MSG)
if len(stream_results) > 0:
res.append([item.key, stream_results])
return res

@staticmethod
def _parse_start_id(key: CommandItem, s: bytes) -> StreamRangeTest:
if s == b'$':
return StreamRangeTest.decode(key.value.last_item_key(), exclusive=True)
return StreamRangeTest.decode(s, exclusive=True)

@command(name="XREAD", fixed=(bytes,), repeat=(bytes,))
def xread(self, *args):
(count, timeout,), left_args = extract_args(args, ('+count', '+block',), error_on_unexpected=False)
if (len(left_args) < 3
or not casematch(left_args[0], b'STREAMS')
or len(left_args) % 2 != 1):
raise SimpleError(msgs.SYNTAX_ERROR_MSG)
left_args = left_args[1:]
num_streams = int(len(left_args) / 2)

stream_start_id_list = list()
for i in range(num_streams):
item = CommandItem(left_args[i], self._db, item=self._db.get(left_args[i]), default=None)
start_id = self._parse_start_id(item, left_args[i + num_streams])
stream_start_id_list.append((item, start_id,))
if timeout is None:
return self._xread(stream_start_id_list, count, False)
else:
return self._blocking(timeout, functools.partial(self._xread, stream_start_id_list, count))
50 changes: 25 additions & 25 deletions fakeredis/commands_mixins/string_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,45 +17,45 @@ def _lcs(s1, s2):
pi = [[0] * (l2 + 1) for _ in range(0, l1 + 1)]

# Algorithm to calculate the length of the longest common subsequence
for i in range(1, l1 + 1):
for j in range(1, l2 + 1):
if s1[i - 1] == s2[j - 1]:
opt[i][j] = opt[i - 1][j - 1] + 1
pi[i][j] = 0
elif opt[i][j - 1] >= opt[i - 1][j]:
opt[i][j] = opt[i][j - 1]
pi[i][j] = 1
for r in range(1, l1 + 1):
for c in range(1, l2 + 1):
if s1[r - 1] == s2[c - 1]:
opt[r][c] = opt[r - 1][c - 1] + 1
pi[r][c] = 0
elif opt[r][c - 1] >= opt[r - 1][c]:
opt[r][c] = opt[r][c - 1]
pi[r][c] = 1
else:
opt[i][j] = opt[i - 1][j]
pi[i][j] = 2
opt[r][c] = opt[r - 1][c]
pi[r][c] = 2
# Length of the longest common subsequence is saved at opt[n][m]

# Algorithm to calculate the longest common subsequence using the Pi array
# Also calculate the list of matches
i, j = l1, l2
r, c = l1, l2
result = ''
matches = list()
s1ind, s2ind, curr_length = None, None, 0

while i > 0 and j > 0:
if pi[i][j] == 0:
result = chr(s1[i - 1]) + result
i -= 1
j -= 1
while r > 0 and c > 0:
if pi[r][c] == 0:
result = chr(s1[r - 1]) + result
r -= 1
c -= 1
curr_length += 1
elif pi[i][j] == 2:
i -= 1
elif pi[r][c] == 2:
r -= 1
else:
j -= 1
c -= 1

if pi[i][j] == 0 and curr_length == 1:
s1ind = i
s2ind = j
elif pi[i][j] > 0 and curr_length > 0:
matches.append([[i, s1ind], [j, s2ind], curr_length])
if pi[r][c] == 0 and curr_length == 1:
s1ind = r
s2ind = c
elif pi[r][c] > 0 and curr_length > 0:
matches.append([[r, s1ind], [c, s2ind], curr_length])
s1ind, s2ind, curr_length = None, None, 0
if curr_length:
matches.append([[s1ind, i], [s2ind, j], curr_length])
matches.append([[s1ind, r], [s2ind, c], curr_length])

return opt[l1][l2], result.encode(), matches

Expand Down

0 comments on commit f3fe513

Please sign in to comment.