<a href="https://colab.research.google.com/github/denisabrantesredis/denisd-redis-learning-sessions/blob/main/Streams/Streams.ipynb" target="_newt">
<img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>

<div style="display:flex;width=100%;">
<img src="https://redis.io/wp-content/uploads/2024/04/Logotype.svg?auto=webp&quality=85,75&width=120" alt="Redis" width="90"/>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
</div>

# Redis Learning Session - Streams

<img src="https://github.com/denisabrantesredis/denisd-redis-learning-sessions/blob/main/Streams/_assets/images/banner.png?raw=true" alt="Redis Data Types"/>

[Try a streams payment monitoring demo application](https://github.com/gacerioni/redis-mvp-lists-fifo)

In this notebook, we will explore the Streams capabilities provided by Redis.

## Installing the Pre-Reqs

In [None]:
!pip install -q redis

## Installing Redis Locally
If you are not using Redis Cloud as a database, uncomment and run the code below to install Redis locally. Then set your connection to 127.0.0.1

In [None]:
# %%sh
# sudo apt-get install lsb-release curl gpg
# curl -fsSL https://packages.redis.io/gpg | sudo gpg --dearmor -o /usr/share/keyrings/redis-archive-keyring.gpg
# sudo chmod 644 /usr/share/keyrings/redis-archive-keyring.gpg
# echo "deb [signed-by=/usr/share/keyrings/redis-archive-keyring.gpg] https://packages.redis.io/deb $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/redis.list
# sudo apt-get update
# sudo apt-get install redis > /dev/null 2>&1
# redis-server --daemonize yes

## Connecting to Redis

In [None]:
import redis
from google.colab import userdata

#### Setup the Connection String

<img src="https://github.com/denisabrantesredis/denisd-redis-learning-sessions/blob/main/Streams/_assets/images/callout_secrets.png?raw=true" alt="Callout - Use Google Colab secrets instead"/>

In [None]:
try:
  REDIS_HOST = userdata.get('REDIS_HOST')
except:
  REDIS_HOST="127.0.0.1"

try:
  REDIS_PORT = userdata.get('REDIS_PORT')
except:
  REDIS_PORT=6379

try:
  REDIS_PASSWORD = userdata.get('REDIS_PASSWORD')
except:
  REDIS_PASSWORD=""

REDIS_URL = f"redis://default:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}"

#### Testing the Connection to Redis

<img src="https://github.com/denisabrantesredis/denisd-redis-learning-sessions/blob/main/Streams/_assets/images/callout_connection.png?raw=true" alt="Callout - Make sure connection works"/>

In [None]:
r = redis.from_url(REDIS_URL, decode_responses=True)

if r.ping():
    print("Connection successful!")
else:
    print("Connection issue!")

## Inserting Messages

Streams are an append-only data structure. The fundamental write command, called XADD, appends a new entry to the specified stream. If the specified stream does not exist, it gets created automatically by the command.

Each stream entry consists of one or entries (messages) with field-value pairs (somewhat like a dictionary or a Redis hash). Each message needs a unique identifier, which can be determined by the Redis server by passing the `*` character as the message ID. This causes the Redis server to generate a new ID which is a combination of current timestamp (at the millisecond level) and a unique sequence number.

Every new ID will be monotonically increasing, so in more simple terms, every new entry added will have a higher ID compared to all the past entries. Auto-generation of IDs by the server is almost always the desired approach, and the reasons for specifying an ID explicitly are very rare.

In [None]:
r.xadd("mystream", id="*", fields={"sensor_id": 1234, "temperature": 19.8})

#### Print Stream information

Here we can find the key that contains the `mystream` stream, and we can see how many messages it has.

In [None]:
print(r.keys("my*"))
print(r.xlen("mystream"))

Next, we add a few more entries to the stream and check the size again:

In [None]:
r.xadd("mystream", id="*", fields={"vehicle_id": 1, "name": "my cool car", "price": 20000})
r.xadd("mystream", id="*", fields={"vehicle_id": 2, "name": "my old car", "price": 100})
r.xadd("mystream", id="*", fields={"vehicle_id": 3, "name": "my family car", "price": 1000})

In [None]:
print(r.keys("my*"))
print(r.xlen("mystream"))

<img src="https://github.com/denisabrantesredis/denisd-redis-learning-sessions/blob/main/Streams/_assets/images/callout_insight.png?raw=true" alt="Callout - Check Redis Insight"/>

Open Redis Insight and confirm that the Stream key was generated. There should be only one key, called `mystream`, which contains all the 4 entries we generated, since no consumer has processed these messages yet.

Needless to say, we shouldn't be inserting completely different messages to the same stream; we will fix that in a bit.

&nbsp;

&nbsp;

## Reading Messages

There are multiple ways to read data from Redis Streams: consumer apps can request all entries from the beginning of a stream; they can search for a subset of entries within specific time ranges (which is why the default ID is timestamp-based) or they can just get entries that are new since the last entry they received.

This diversity allows Streams to be used in different ways, to solve for different use cases.

#### Read all messages from a stream

Here we are using the special operators `-` and `+` to instruct Redis to provide all entries in a stream, from beginning to end.

PS: this operation is not recommended for large streams, as retrieval might take too much time.

In [None]:
all_entries = r.xrange("mystream", "-", "+")

for entry in all_entries:
  print(entry)

If the field names are identical between entries, Streams will only store this data once, to optimize space usage.

Another option for this command is to start at the first entry. If the ID of the first entry is unknown, you can use the `0` character instead.

In [None]:
all_entries = r.xrange("mystream", 0, "+")

for entry in all_entries:
  print(entry)

We can also limit the number of retrieved entries using the `count` parameter. Notice that in this case, retrieval starts from the beginning of the stream.

In [None]:
all_entries = r.xrange("mystream", "-", "+", count=2)

for entry in all_entries:
  print(entry)

#### Retrieve specific entries

The `xrange` command can be used to retrieve a specific subset of entries, based on ID and count. In the Search lab, we saw how we can return 30 entries of IoT metrics from Streams, using the `xrange` command.

First, let's repeat the last command, but save the ID of the last message retrieved:

In [None]:
all_entries = r.xrange("mystream", "-", "+", count=2)

message_id = all_entries[len(all_entries)-1][0]
message_id

Next, retrieve 2 more entries, starting at this ID:

In [None]:
all_entries = r.xrange("mystream", message_id, "+", count=2)

for entry in all_entries:
  print(entry)

#### Reverse Retrieval

The `xrevrange` command can be used to retrieve entries in reverse, from newer to older. This can be useful for requirements like 'retrieve the last 10 entries':

In [None]:
all_entries = r.xrevrange("mystream", message_id, "-", count=3)

for entry in all_entries:
  print(entry)

Notice how, even though we specified `count=3`, there is only 1 other entry older than our `message_id`, so the command only returns 2 entries.

## Listening for new messages

<b>IMPORTANT</b>: This will only work if you are using an external Redis Database (like Redis Cloud) and if you have Redis Insight configured.

The `xread` command will listen to new entries that are published to a stream. It can run indefinitely (which is not recommended), or within a timeout.

For this lab, we will be using the following paramters:

- `count=1`: the client will receive 1 entry at a time

- `block=2000`: the command will listen for new entries for a total of 2,000 milliseconds (2 seconds). Notice that the client will be blocked while it's listening, which is why this exercise won't work if you are using Redis inside the notebook; you won't be able to create new entries while the client is listening, because this is a blocking operation.

- `streams={"mystream": '$'}`: the dollar sign instructs Redis to only send new entries to the client, emulating the behavior of Pub/Sub.

In [None]:
l = r.xread( count=1, block=2000, streams={"mystream": '$'} )
print( f"after 2s block, got this: {l}")
print( f"stream length: {r.xlen("mystream")}")

After this command, nothing will happen, because there is no Producer app sending new entries. We can use Redis Insight to emulate a producer.

First, make sure Redis Insight is connected to the database. Then, go to the Workbench and paste the following command (<b>but don't run it yet!</b>):

`XADD mystream * vehicle_id 4 name 'my work car' price 50000`

Then, run the same command as before, but increase it to 10 seconds. While it's running, go to Insights and run the command above to create the new entry.

In [None]:
l = r.xread( count=1, block=10000, streams={"mystream": '$'} )
print( f"after 10s block, got this: {l}")
print( f"stream length: {r.xlen("mystream")}")

The output will show the new entry and the updated stream length of 5.

## Consumer Groups

A consumer group is used to retrieve data from a stream, serving multiple consumers, while providing certain guarantees:

- Each message is served to a different consumer so that it is not possible that the same message will be delivered to multiple consumers.

- Consumers are identified, within a consumer group, by a name. This means that even after a disconnect, the stream consumer group retains all the state, since the client will claim again to be the same consumer.

- Each consumer group has the concept of the first ID never consumed so that, when a consumer asks for new messages, it can provide just messages that were not previously delivered.

- Consuming a message, however, requires an explicit acknowledgment using a specific command. Redis interprets the acknowledgment as: this message was correctly processed so it can be evicted from the consumer group.

- A consumer group tracks all the messages that are currently pending, that is, messages that were delivered to some consumer of the consumer group, but are yet to be acknowledged as processed. Thanks to this feature, when accessing the message history of a stream, each consumer will only see messages that were delivered to it.

#### Housekeeping: Trimming the stream

Since we have messages in different formats in our stream, let's empty the stream before we continue.

The `xtrim` command is used to limit the size of a stream, by deleting older entries. It can be configured in different ways to support different requirements. In this case, we'll use the `maxlen=0` parameter to tell Redis to completely empty the stream.

In [None]:
print(r.xtrim("mystream", maxlen=0))
print(r.xlen("mystream"))

The output for the first command (4) shows how many entries were removed (trimmmed).

<img src="https://github.com/denisabrantesredis/denisd-redis-learning-sessions/blob/main/Streams/_assets/images/callout_insight.png?raw=true" alt="Callout - Check Redis Insight"/>

You can use Redis Insight to check on the stream data. The `mystream` key should still be there, but it should be empty now. Also, the 2 Consumer Groups we created should also be there, but with 0 consumers and 0 pending entries listed.

&nbsp;
&nbsp;

#### Creating Consumer Groups

First, let's create a few new messages to populate our stream:

In [None]:
print(r.xadd("mystream", id="*", fields={"id": 1, "message": "apple"}))
print(r.xadd("mystream", id="*", fields={"id": 2, "message": "orange"}))
print(r.xadd("mystream", id="*", fields={"id": 3, "message": "strawberry"}))
print(r.xadd("mystream", id="*", fields={"id": 4, "message": "apricot"}))
print(r.xadd("mystream", id="*", fields={"id": 5, "message": "banana"}))
print(r.xadd("mystream", id="*", fields={"id": 6, "message": "banana"}))
print(r.xadd("mystream", id="*", fields={"id": 7, "message": "banana"}))
print(r.xadd("mystream", id="*", fields={"id": 8, "message": "banana"}))
print(r.xlen("mystream"))

Next, let's create 2 consumer groups:

In [None]:
print(r.xgroup_create("mystream", "myGroupFromStart", 0))

print(r.xgroup_create("mystream", "myGroupFromEnd", "$"))

These 2 groups will behave differently: `myGroupFromStart` will receive all entries for that stream, since the first one (because it's using the `0` character), while `myGroupFromEnd` will only receive new entries, since it's using the dollar sign in its definition.

## Reading Entries as a Consumer

The `xreadgroup` command is similar to the `xread` command, but it was designed to work with Consumer Groups. This is how we can ensure that 2 consumers in the same group will not retrieve the same entries, for instance.

Let's start by having a consumer called `Alice` retrieving 1 entry from the stream:

In [None]:
all_entries = r.xreadgroup("myGroupFromStart", consumername="Alice", count=1, streams={"mystream":'>'})

for entry in all_entries:
  print(entry)

Here we retrieved the first entry. As discussed in the session, this entry is now in the Pending Entries List, and we can check that by running the following command:

In [None]:
print(r.xpending("mystream", "myGroupFromStart"))

Notice that the output includes which consumer currently 'owns' the entry.

It will stay there until it is acknowledged by the consumer (`Alice`, or another one, if the entry is pending for too long and the consumers have logic to retrieve pending entries).

&nbsp;

Next, let's have another consumer in the same group (call this one `Bob`) retrieve an entry:

In [None]:
all_entries = r.xreadgroup("myGroupFromStart", consumername="Bob", count=1, streams={"mystream":'>'})

for entry in all_entries:
  print(entry)

Even though `Bob` ran the same exact command as `Alice`, it received the second entry in the stream, seeing as the first one is currently being processed by `Alice`. This is controlled by the `>` character, that is used to determine that Redis should only send entries that have not yet been delivered to any consumers.

&nbsp;

Let's now have our consumer `Alice` retrieving another entry from the stream:

In [None]:
all_entries = r.xreadgroup("myGroupFromStart", consumername="Alice", count=1, streams={"mystream":'>'})

for entry in all_entries:
  print(entry)

The 3rd entry on the stream was retrieved by this command.

Now, let's see what happens when we try to get an entry for the `myGroupFromEnd` consumer group:

In [None]:
all_entries = r.xreadgroup("myGroupFromEnd", consumername="Mary", count=1, streams={"mystream":'>'})

for entry in all_entries:
  print(entry)

It returns no results, because there are no entries inserted after the consumer group was created.

## Pending Entries

There are multiple ways entries can be handled once they are delivered to a consumer:

- They can be acknowledged and kept (permanently or until they are trimmed);

- They can be acknowledged and deleted;

- Crashed consumers can re-connect to Redis, check their pending entries and resume processing;

- A monitoring process can reassign pending entries to other consumers if it's taking too long to process;

- Other consumers can proactively find and claim pending entries.

&nbsp;

The `xpending_range` command can be used to provide a list of pending entries for a Consumer Group in a stream, along with the length of time since the message was delivered, and a counter of how many times this message was delivered (which is extremely useful to identify 'poison pill' entries, which are payloads that cannot be processed and keep being bounced around between consumers).

In [None]:
pending_entries = r.xpending_range("mystream", "myGroupFromStart", "-", "+", 10)

for entry in pending_entries:
  print(entry)

Because we've used the `-` and `+` operators here, we are retrieving all pending messages (to a maximum of 10), from the beginning of the stream. It's possible to narrow down this command and use specific IDs (timestamps) to further refine the search.

It's also possible to narrow down this command to a single consumer:

In [None]:
pending_entries = r.xpending_range("mystream", "myGroupFromStart", "-", "+", 10, "Alice")

for entry in pending_entries:
  print(entry)

## Claiming Entries

Entries can be claimed by (or on behalf of) other consumers. Let's say, for instance, that our `Alice` consumer is having problems, so we want to reassign all of Alice's pending tasks to the `Bob` consumer.

First, we create a list with all the entry IDs that are pending for `Alice`:

In [None]:
pending_entries = r.xpending_range("mystream", "myGroupFromStart", "-", "+", 10, "Alice")
alice_entries = []

for entry in pending_entries:
  alice_entries.append(entry['message_id'])

print(alice_entries)  

Next, we assign these entries to `Bob`:

In [None]:
print(r.xclaim(name="mystream", 
               groupname="myGroupFromStart", 
               consumername="Bob", 
               min_idle_time=360, 
               message_ids=alice_entries)
               )

Notice that we are using the `xclaim` command, passing the list of message IDs. However, we are also setting the `min_time_idle` parameter. There are 2 reasons why we do that:

1 - We don't want to risk reassigning entries that are still actively being processed (because the `xpending_range` command might have been issued 1 ms after the entry was created); and

2 - The `xclaim` command resets the `time_since_delivered` parameters, which means that if 2 consumers try to claim the same entry, the first will succeed, but the second will fail, because `min_idle_time` will be lower than the input parameter.

&nbsp;

Finally, check the list of pending entries again, and we should see that all pending entries are now with the `Bob` consumer:

In [None]:
pending_entries = r.xpending_range("mystream", "myGroupFromStart", "-", "+", 10)

for entry in pending_entries:
  print(entry)

To simplify this process, you can use the `XAUTOCLAIM` command, which finds pending entries and claims them automatically, without the need to produce a list of `message_ids`.

&nbsp;

<img src="https://github.com/denisabrantesredis/denisd-redis-learning-sessions/blob/main/Streams/_assets/images/callout_insight.png?raw=true" alt="Callout - Check Redis Insight"/>

Check the stream in Redis Insight, you can go to the Consumer Groups tab and click on myGroupFromStart to see the consumers and their pending entries. Bob should be showing 3, while Alice shows 0.

&nbsp;

## Acknowledging Entries

Once an entry has been processed, it can be acknowledged by the consumer, so it will be removed from the Pending Entries List. 

There are different ways to handle acknowledge entries: if they need to be processed by other consumer groups, they can be acknowledged and removed from the Pending Entries List for this consumer group, but not others. If they don't need further processing, they can be acknowledged and deleted from all consumer groups; if they are needed for audit purposes, they can be acknowledged and kept permanently.

First, let's add a few new entries to our stream:

In [None]:
print(r.xadd("mystream", id="*", fields={"id":  9, "message": "pear"}))
print(r.xadd("mystream", id="*", fields={"id": 10, "message": "pineapple"}))
print(r.xadd("mystream", id="*", fields={"id": 11, "message": "watermelon"}))
print(r.xadd("mystream", id="*", fields={"id": 12, "message": "peach"}))

Next, let's create a couple of consumers on the other consumer group (`myGroupFromEnd`):

In [None]:
all_entries = r.xreadgroup("myGroupFromEnd", consumername="Mary", count=1, streams={"mystream":'>'})

for entry in all_entries:
  print(entry)

In [None]:
all_entries = r.xreadgroup("myGroupFromEnd", consumername="John", count=1, streams={"mystream":'>'})

for entry in all_entries:
  print(entry)

Note that they received the new entries, instead of the entries that were already in the stream.

#### Delivering all entries

To explore the different acknowledgment options, we need to make sure all entries are delivered to a consumer. Let's run this loop that will ensure that all entries get delivered:

In [None]:
for i in range(10):
  r.xreadgroup("myGroupFromStart", consumername="Alice", count=1, streams={"mystream":'>'})
  r.xreadgroup("myGroupFromStart", consumername="Bob", count=1, streams={"mystream":'>'})
  r.xreadgroup("myGroupFromEnd", consumername="Mary", count=1, streams={"mystream":'>'})
  r.xreadgroup("myGroupFromEnd", consumername="John", count=1, streams={"mystream":'>'})

Next, check the size of the stream:

In [None]:
print(r.xlen("mystream"))

And check that every entry was delivered to a consumer:

In [None]:
group1_entries = r.xpending_range("mystream", "myGroupFromStart", "-", "+", 100)
group2_entries = r.xpending_range("mystream", "myGroupFromEnd", "-", "+", 100)

for entry in group1_entries:
  print(entry)

for entry in group2_entries:
  print(entry)

print(f"\n Total: {len(group1_entries) + len(group2_entries)} pending entries.")

Notice that the `myGroupFromEnd` consumer group will only receive the 4 new entries we've created, while the `myGroupFromStart` consumer group will receive all. So the final count of pending entries will be higher than the size of the stream.

#### Acknowledging entries for one consumer group

In this first example, we'll acknowledge the first entry delivered to the `myGroupFromEnd` consumer group, making sure not to delete it, so it can still be processed by the `myGroupFromStart` consumer.

First, we need to grab the message ID:

In [None]:
pending_entries = r.xpending_range("mystream", "myGroupFromEnd", "-", "+", 10, "Mary")
message_id = pending_entries[0]['message_id']
print(message_id)

Then we use the `XACK` command, which removes one or more messages from the Pending Entries List (PEL) of a stream consumer group. A message is marked as 'pending' when it was delivered to some consumer but the server is yet not sure it was processed at least once. 

Once a consumer successfully processes a message, it should call XACK so that such message does not get processed again, and as a side effect, the PEL entry about this message is also purged, releasing memory from the Redis server.

In [None]:
print(r.xack("mystream", "myGroupFromEnd", message_id))

Next, check if the message still appears in the Pending Entries List for both consumer groups:

In [None]:
group1_entries = r.xpending_range("mystream", "myGroupFromStart", "-", "+", 100)
group2_entries = r.xpending_range("mystream", "myGroupFromEnd", "-", "+", 100)

for entry in group1_entries:
  print(entry)

for entry in group2_entries:
  print(entry)

print(f"\n Total: {len(group1_entries) + len(group2_entries)} pending entries - message {message_id} should have been removed from the myGroupFromEnd consumer group.")

As expected, the entry is still waiting to be processed by the consumer from the `myGroupFromStart` consumer group.

Let's acknowledge the message in the other consumer group as well:

In [None]:
print(r.xack("mystream", "myGroupFromStart", message_id))

Now let's check our pending entries again:

In [None]:
group1_entries = r.xpending_range("mystream", "myGroupFromStart", "-", "+", 100)
group2_entries = r.xpending_range("mystream", "myGroupFromEnd", "-", "+", 100)

for entry in group1_entries:
  print(entry)

for entry in group2_entries:
  print(entry)

print(f"\n Total: {len(group1_entries) + len(group2_entries)} pending entries - message {message_id} should have been removed from both consumer groups.")

The entry was removed from both pending lists; however, the size of the stream remains the same:

In [None]:
print(r.xlen("mystream"))

This means that the entry is still stored in the string, even though it was processed by all consumer groups. 

The entry can be deleted with the `XDEL` command:

In [None]:
print(r.xdel("mystream", message_id))

Now we should see the stream size reduced by 1:

In [None]:
print(r.xlen("mystream"))

#### Automating Acknowledged and deletes

In some (or most) use cases, you may not want to preserve an entry after it was processed by all consumer groups, as it could bloat the size of the stream and require more memory. In this case, you can avoid having to take 2 different steps to acknowledge and delete the entry. You could perform both operations with the `XACKDEL` command.

<b>IMPORTANT:</b> This is a new command, available with Redis 8.2; if you are running previous versions of Redis, you will need to use different commands to acknowledge and delete entries.

The `XACKDEL` command combines the functionality of `XACK` and `XDEL` in Redis Streams. It acknowledges the specified entry IDs in the given consumer group and simultaneously attempts to delete the corresponding entries from the stream. It has a few options as to the criteria that should be used to delete the entry; in this case, we will use the `ACKED` option, which tells Redis to delete this entry if it was already acknowledged by all consumer groups.

More information about this command and its options can be found [here](https://redis.io/docs/latest/commands/xackdel/).

Let's get the ID to the second pending message in the `myGroupFromEnd` consumer group:

In [None]:
pending_entries = r.xpending_range("mystream", "myGroupFromEnd", "-", "+", 10, "Mary")
message_id = pending_entries[0]['message_id']
print(message_id)

Now let's acknowledge this entry with the new `XACKDEL` command:

In [None]:
print(r.xackdel("mystream", "myGroupFromEnd", message_id, ref_policy="ACKED"))

Checking the Pending lists again, this message should still be pending with the `myGroupFromStart` consumer group:

In [None]:
group1_entries = r.xpending_range("mystream", "myGroupFromStart", "-", "+", 100)
group2_entries = r.xpending_range("mystream", "myGroupFromEnd", "-", "+", 100)

for entry in group1_entries:
  print(entry)

for entry in group2_entries:
  print(entry)

print(f"\n Total: {len(group1_entries) + len(group2_entries)} pending entries - message {message_id} should have been removed from the myGroupFromEnd consumer group.")

Meaning, the `XACKDEL` command did what it was supposed to do: acknowledge the message, but only delete it if it's no longer pending in any consumer groups (which is not the case here).

Let's run it again, this time for the `myGroupFromStart` consumer group:

In [None]:
print(r.xackdel("mystream", "myGroupFromStart", message_id, ref_policy="ACKED"))

Finally, let's get the list of pending entries and check the stream size:

In [None]:
group1_entries = r.xpending_range("mystream", "myGroupFromStart", "-", "+", 100)
group2_entries = r.xpending_range("mystream", "myGroupFromEnd", "-", "+", 100)

for entry in group1_entries:
  print(entry)

for entry in group2_entries:
  print(entry)

print(f"\n Total: {len(group1_entries) + len(group2_entries)} pending entries - message {message_id} should have been removed from the myGroupFromEnd consumer group.")

print(r.xlen("mystream"))

As we can see, the message was deleted after it was acknowledged.

#### Trimming Acknowledged Entries

This is a new capability of Redis 8.2: we can acknowledge entries as they are being processed, and then periodically trim all the entries that have been acknowledged by all consumer groups. The use case here is to give enough time for additional batch jobs or other routines to use the data in these entries, and then batch delete them periodically (daily, weekly, monthly, ...) so they will not be stored permanently.

First, let's acknowledge all entries in both consumer groups:

In [None]:
group1_entries = r.xpending_range("mystream", "myGroupFromStart", "-", "+", 100)
group2_entries = r.xpending_range("mystream", "myGroupFromEnd", "-", "+", 100)

for entry in group1_entries:
  message_id = entry['message_id']
  r.xack("mystream", "myGroupFromStart", message_id)
  print(f"Acknowledged message ID {message_id} from consumer group myGroupFromStart")


for entry in group2_entries:
  message_id = entry['message_id']
  r.xack("mystream", "myGroupFromEnd", message_id)
  print(f"Acknowledged message ID {message_id} from consumer group myGroupFromEnd")

print(r.xlen("mystream"))

Check the pending entries again:

In [None]:
group1_entries = r.xpending_range("mystream", "myGroupFromStart", "-", "+", 100)
group2_entries = r.xpending_range("mystream", "myGroupFromEnd", "-", "+", 100)

for entry in group1_entries:
  print(entry)

for entry in group2_entries:
  print(entry)

print(f"\n Total: {len(group1_entries) + len(group2_entries)} pending entries.")

print(r.xlen("mystream"))

 No more pending items, but all entries are still in the stream. Next, let's add one final entry in our stream:

In [None]:
new_message_id = r.xadd("mystream", id="*", fields={"id":  13, "message": "cherry"})

Now, let's trim all the acknowledged entries:

In [None]:
print(r.xtrim("mystream", maxlen=0, ref_policy="ACKED", approximate=False))

Then, check the size of the stream:

In [None]:
print(r.xlen("mystream"))

<img src="https://github.com/denisabrantesredis/denisd-redis-learning-sessions/blob/main/Streams/_assets/images/callout_insight.png?raw=true" alt="Callout - Check Redis Insight"/>

Redis insight should be showing a stream with a single entry, and 2 consumer groups with 2 consumers each, and zero pending entries.


&nbsp;

## Get Stream Information

We will end this lab with a few commands you can use to get information about the stream.

Use the `XINFO STREAM` command to get general data about the stream:

In [None]:
stream_info = r.xinfo_stream("mystream", full=True)
for key, value in stream_info.items():
  print(f"{key}: {value}")

Use the `XINFO GROUPS` command to get information about groups:

In [None]:
group_info = r.xinfo_groups("mystream")
for group in group_info:
  print(group)

Use the `XINFO CONSUMERS` command to get information about consumers:

In [None]:
consumer_info = r.xinfo_consumers("mystream", "myGroupFromStart")
for consumer in consumer_info:
  print(consumer)

&nbsp;


&nbsp;



# Congrats, this is the end of the lab!!