In [1]:
import redis

redis_client = redis.Redis(host='localhost', port=6379, password='MyRedisPassword', decode_responses=True)

# Set a single value

In [2]:
redis_client.set('my_key', 'thevalueofmykey')

True

In [3]:
redis_client.get('my_key')

'thevalueofmykey'

# Post a stream and retrieve it

In [4]:
help(redis_client.xadd)

Help on method xadd in module redis.commands.core:

xadd(name, fields, id='*', maxlen=None, approximate=True, nomkstream=False, minid=None, limit=None) method of redis.client.Redis instance
    Add to a stream.
    name: name of the stream
    fields: dict of field/value pairs to insert into the stream
    id: Location to insert this record. By default it is appended.
    maxlen: truncate old stream members beyond this size.
    Can't be specified with minid.
    approximate: actual stream length may be slightly more than maxlen
    nomkstream: When set to true, do not make a stream
    minid: the minimum id in the stream to query.
    Can't be specified with maxlen.
    limit: specifies the maximum number of entries to retrieve
    
    For more information check https://redis.io/commands/xadd



In [5]:
help(redis_client.xrange)

Help on method xrange in module redis.commands.core:

xrange(name, min='-', max='+', count=None) method of redis.client.Redis instance
    Read stream values within an interval.
    name: name of the stream.
    start: first stream ID. defaults to '-',
           meaning the earliest available.
    finish: last stream ID. defaults to '+',
            meaning the latest available.
    count: if set, only return this many items, beginning with the
           earliest available.
    
    For more information check https://redis.io/commands/xrange



In our example we want to send a list of event. Our Aggregate will read the list of event and process it.

In [6]:
import uuid

from dataclasses import dataclass, asdict


class Event:
    pass


@dataclass
class PrintMessage(Event):
    id: str
    msg: str

In [7]:
list_of_event_1 = [
    PrintMessage(str(uuid.uuid4()), "AAA"), 
    PrintMessage(str(uuid.uuid4()), "BBB"),
    PrintMessage(str(uuid.uuid4()), "CCC")
]

for event in list_of_event_1:
    redis_client.xadd("batch_1", asdict(event))

In [8]:
list_of_event_2 = [
    PrintMessage(str(uuid.uuid4()), "111"), 
    PrintMessage(str(uuid.uuid4()), "222"),
    PrintMessage(str(uuid.uuid4()), "333")
]

for event in list_of_event_2:
    redis_client.xadd("batch_2", asdict(event))

In [9]:
for event in redis_client.xrange("batch_1"):
    print(event)

('1643750817733-0', {'id': '6d51ce28-b37e-4710-a966-f2e0c14ceb67', 'msg': 'AAA'})
('1643750817735-0', {'id': '72639493-006a-44d5-afe9-ed650a7638ac', 'msg': 'BBB'})
('1643750817737-0', {'id': '8da043fd-4e3c-4a4e-b498-4f84c721c028', 'msg': 'CCC'})
('1643751576654-0', {'id': '51ca59a1-380e-4752-a534-6940712e97f9', 'msg': 'AAA'})
('1643751576656-0', {'id': '17970c77-4638-4e95-9a78-5fdf0f57a7a8', 'msg': 'BBB'})
('1643751576660-0', {'id': 'a10fb75b-0367-4df3-b27f-0fac9c57a528', 'msg': 'CCC'})


In [12]:
ids = []
for event in redis_client.xrange("batch_2"):
    print(event)
    ids.append(event[0])
redis_client.xdel("batch_2", *ids)

('1643750817746-0', {'id': 'fb14bce2-01bc-4079-9888-f642324eab4d', 'msg': '111'})
('1643750817748-0', {'id': '4d07e016-1caa-4de0-ab31-d06623950a47', 'msg': '222'})
('1643750817750-0', {'id': '01cdbc9e-2026-4254-8112-dfda7adf350d', 'msg': '333'})
('1643751579331-0', {'id': '07589292-86b2-42c1-bf13-f390f55f1b23', 'msg': '111'})
('1643751579333-0', {'id': 'ff5bbc7c-f624-430a-8f53-c78a164bf802', 'msg': '222'})
('1643751579335-0', {'id': '96ee95c8-822e-4558-9957-3249beb764e9', 'msg': '333'})


6

In [13]:
redis_client.xrange("batch_2")

[]

In [11]:
help()

Help on method xdel in module redis.commands.core:

xdel(name, *ids) method of redis.client.Redis instance
    Deletes one or more messages from a stream.
    name: name of the stream.
    *ids: message ids to delete.
    
    For more information check https://redis.io/commands/xdel

