# Redis' Subscriber Implementation

In event-driven architectures, efficient message consumption is essential for real-time communication between different components of a system. Redis' `subscriber` implementation enables seamless event handling using its Pub/Sub and Streams features. This approach ensures that messages published to Redis are effectively processed by multiple consumers, facilitating distributed and scalable event-driven applications. The advantages of using a subscriber include Low Latency Processing, Scalability, Decoupled Architecture, Event-Driven Workflows, and Reliable Message Processing, making it ideal for handling real-time data streams. For more information about Redis' Pub/Sub, please check this [reference](https://redis.io/docs/latest/develop/interact/pubsub/).

Objectives:

In this notebook, the user will:
- Set up a Redis connection to ensure reliable event consumption.
- Create a subscriber function that listens to Redis Streams.
- Process incoming messages and handle them accordingly.
- Implement consumer groups to distribute workload efficiently.
- Acknowledge processed messages to prevent re-processing.
- Test message reception to verify that the subscriber correctly handles events from Redis streams.

### Table of content:
- [Import dependencies](#import-dependencies)
- [Real-Time Single-Stream Event Processing with Redis Streams and Consumer Groups](#real-time-single-stream-event-processing-with-redis-streams-and-consumer-groups)
- [Scalable Multi-Stream Event Processing with Redis Consumer Groups](#scalable-multi-stream-event-processing-with-redis-consumer-groups)

## Import dependencies

In [2]:
import redis
import json



## Real-Time Single-Stream Event Processing with Redis Streams and Consumer Groups
The user establishes a Redis connection to consume messages from multiple streams using Redis Streams and consumer groups. It first connects to Redis, defines stream names, and attempts to create a consumer group if it does not already exist. The message processing function (`read_stream`) continuously fetches new messages from the streams using `xreadgroup()`, decodes them, and prints the event details. After processing each message, it acknowledges receipt with `xack()` to prevent reprocessing. The script runs in an infinite loop, ensuring real-time event consumption, making it ideal for scalable, event-driven architectures where multiple consumers can process data efficiently.
> Note: The user may prefer to execute [Scalable Multi-Stream Event Processing with Redis Consumer Groups](#scalable-multi-stream-event-processing-with-redis-consumer-groups) for the reasons mentioned in the corresponding section.

In [None]:

# Configuration for the Redis connection
redis_host = 'redis-service'  # Update with your Redis host
redis_port = 6379             # Default Redis port
redis_db = 0                  # Redis database number
stream_name = 'WATERBODIES'           # Name of the Redis stream
consumer_group = 'my-consumer-group'  # Consumer group name
consumer_name = 'my-consumer'         # Unique name for the consumer

# Connect to Redis
r = redis.Redis(host=redis_host, port=redis_port, db=redis_db)

streams = ['STREAM', 'WATERBODIES', "WATERBODIESF"] 

for stream in streams:
    # Create a consumer group if it doesn't already exist
    try:
        r.xgroup_create(stream_name, consumer_group, id='0', mkstream=True)
    except redis.exceptions.ResponseError as e:
        # Ignore the error if the group already exists
        if "BUSYGROUP Consumer Group name already exists" not in str(e):
            raise

# Read messages from the stream
def read_stream():
    try:
        # Fetch new messages from the consumer group
        streams = {
            "STREAM": '>',
            "WATERBODIES": '>',
            "WATERBODIESF": '>',
        }

        messages = r.xreadgroup(consumer_group, consumer_name, streams, count=10, block=5000)
        if messages:
            for stream, events in messages:
                for event_id, event_data in events:

                    event_data_str = {k.decode("utf-8"): v.decode("utf-8") for k, v in event_data.items()}
                    
                    print(f"{stream_name} - Event Id: {event_id.decode('utf-8')} Event Data: {event_data_str['message']}")
                    
                    # Acknowledge the message
                    r.xack(stream_name, consumer_group, event_id)

    except Exception as e:
        print(f"Error reading stream: {e}")

# Continuously read from the stream
while True:
    read_stream()


## Scalable Multi-Stream Event Processing with Redis Consumer Groups

The user establishes a Redis connection to efficiently consume messages from multiple streams using Redis Streams and consumer groups. The script first connects to Redis, dynamically defines a list of streams, and attempts to create a consumer group for each stream if it does not already exist. This ensures that each stream is independently managed and can be consumed in scalable manner.The message processing function (`read_stream`) continuously listens to all defined streams using `xreadgroup()`, retrieves new messages, decodes them, and prints relevant details. After processing, it acknowledges each message dynamically with `xack()` in the correct stream to prevent reprocessing. The script runs in an infinite loop, enabling real-time, high-throughput event consumption across multiple streams, making it ideal for distributed systems and event-driven architectures.

This implementation differs from a single-stream Redis consumer by dynamically handling multiple streams, ensuring efficient event processing across various sources. Unlike a single-stream setup, where messages are processed from a single predefined Redis stream, this script enables per-stream consumer group creation, preventing conflicts and allowing parallelized message consumption. Additionally, it acknowledges messages within their respective streams, preventing errors that might arise from incorrectly acknowledging messages in a fixed stream. By distributing workload across multiple streams, this approach ensures greater scalability, improved fault tolerance, and optimized resource utilization, making it more suitable for high-throughput applications such as real-time analytics, microservices communication, and large-scale distributed event processing.



In [3]:
# Configuration for the Redis connection
redis_host = 'redis-service'  # Update with your Redis host
redis_port = 6379             # Default Redis port
redis_db = 0                  # Redis database number
consumer_group = 'my-consumer-group'  # Consumer group name
consumer_name = 'my-consumer'         # Unique name for the consumer

# Connect to Redis
r = redis.Redis(host=redis_host, port=redis_port, db=redis_db)

# List of stream names
streams = ['STREAM', 'WATERBODIES', 'WATERBODIESFAILURE']

# Create a consumer group for each stream if it doesn't already exist
for stream in streams:
    try:
        r.xgroup_create(stream, consumer_group, id='0', mkstream=True)
    except redis.exceptions.ResponseError as e:
        # Ignore the error if the group already exists
        if "BUSYGROUP Consumer Group name already exists" not in str(e):
            raise

# Read messages from the streams
def read_stream():
    try:
        # Stream dictionary with '>' for new messages
        stream_dict = {stream: '>' for stream in streams}

        # Fetch new messages from the consumer group
        messages = r.xreadgroup(consumer_group, consumer_name, stream_dict, count=10, block=5000)
        if messages:
            for stream, events in messages:
                for event_id, event_data in events:
                    # Decode the event data
                    event_data_str = {k.decode("utf-8"): v.decode("utf-8") for k, v in event_data.items()}
                    
                    print(f"{stream.decode('utf-8')} - Event Id: {event_id.decode('utf-8')} Event Data: {event_data_str}")

                    # Acknowledge the message in the current stream
                    r.xack(stream, consumer_group, event_id)

    except Exception as e:
        print(f"Error reading stream: {e}")

# Continuously read from the streams
while True:
    read_stream()


STREAM - Event Id: 1739961108086-0 Event Data: {'subject': 'https://earth-search.aws.element84.com/v1/collections/sentinel-2-l2a/items/S2B_10TFK_20211230_0_L2A', 'producer': 'project-a', 'href': 'https://earth-search.aws.element84.com/v1/collections/sentinel-2-l2a/items/S2B_10TFK_20211230_0_L2A'}
STREAM - Event Id: 1739961109089-0 Event Data: {'subject': 'https://earth-search.aws.element84.com/v0/collections/sentinel-s2-l2a-cogs/items/S2D_10TFK_20210708_0_L2A', 'producer': 'project-a', 'href': 'https://earth-search.aws.element84.com/v0/collections/sentinel-s2-l2a-cogs/items/S2D_10TFK_20210708_0_L2A'}
STREAM - Event Id: 1739961110090-0 Event Data: {'subject': 'https://earth-search.aws.element84.com/v0/collections/sentinel-s2-l2a-cogs/items/S2A_10TFK_20210708_0_L2A', 'producer': 'project-a', 'href': 'https://earth-search.aws.element84.com/v0/collections/sentinel-s2-l2a-cogs/items/S2A_10TFK_20210708_0_L2A'}
STREAM - Event Id: 1739961111092-0 Event Data: {'subject': 'https://earth-search.a