<a href="https://colab.research.google.com/github/gacerioni/redis-workshop-mensageria-pubsub-streams/blob/master/redis_workshop_mensageria_pubsub_streams.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Workshop - Mensageria com Listas, Pub/Sub e Streams

![Redis](https://redis.io/wp-content/uploads/2024/04/Logotype.svg?auto=webp&quality=85,75&width=120)


Bem-vind[ao]s ao Workshop! Vamos explorar como utilizar o Redis para mensageria e comunicação eficiente entre serviços. Vamos começar com o conceito de filas, avançando para Pub/Sub e Streams.

Para uma experiência premium, como a que eu quero que vocês tenham, recomendo MUITO utilizar o Redis Insight (App ou Web) pra apoiar na visualização dos dados.

https://redis.com/redis-enterprise/redis-insight/

## Objetivos do Workshop

Este notebook irá fazer uma introdução ao uso de listas no Redis para implementar filas de mensagens. Em seguida, exploraremos o Pub/Sub e Streams para comunicação em tempo real e processamento de dados.



Espero que gostem! 🖖

## Setup Rápido - e testes pra ver se tá tudo redondo antes de iniciar o lab

In [None]:
# Vamos instalar a lib do redis escolhida para o teste
!pip install -q redis

# E instalar a CLI, via redis-tools, que inclui a famosa redis-cli
!apt-get update
!apt-get install -y redis-tools

#### Configurando e testando a conexão com o seu Redis Cloud

Coloque o endpoint host, port, e as credenciais pertinentes ao seu setup.

Vou deixar o meu DB mesmo aqui, como referência.

In [None]:
# Testando a redis-cli
import os

# Coloque aqui os dados do seu DB do Redis Cloud
REDIS_HOST="redis-19581.c308.sa-east-1-1.ec2.redns.redis-cloud.com"
REDIS_PORT=19581
REDIS_PASSWORD="nhtuquVSLbh2kUt2I86z5QwGu3KrcaYx"

# Caso o SSL esteja ativo pro endpoint, adicione --tls
# Recomendo não misturar lé com cré aqui, visto que não vamos ter nenhuma informação sensível passando pelo fio.
if REDIS_PASSWORD!="":
  os.environ["REDIS_CONN"]=f"-h {REDIS_HOST} -p {REDIS_PORT} -a {REDIS_PASSWORD} --no-auth-warning"
else:
  os.environ["REDIS_CONN"]=f"-h {REDIS_HOST} -p {REDIS_PORT}"

# Caso o SSL esteja ativo pro endpoint, use rediss:// como o URL prefix
REDIS_URL = f"redis://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}"
INDEX_NAME = f"qna:idx"

# Test a Redis connection
!redis-cli $REDIS_CONN PING

# DANGER ZONE - CASO QUEIRA DAR UM REFRESH GERAL
# este comando abaixo ficará comentado, pois ele deleta todos os dados da sua base do Redis Cloud
!redis-cli $REDIS_CONN FLUSHDB

In [None]:
# Testando via Python (redis-py)
import redis
r = redis.Redis(
  host=REDIS_HOST,
  port=REDIS_PORT,
  password=REDIS_PASSWORD,
  decode_responses=True)
r.ping()

# Parte Um - Introdução às Filas no Redis

**Filas são uma estrutura de dados fundamental para a comunicação assíncrona entre serviços.**

Em ciência da computação, uma fila é uma coleção de entidades mantidas em uma sequência, onde elementos são adicionados em uma extremidade da sequência e removidos da outra. Esta característica é conhecida como FIFO (First In, First Out), o que significa que o primeiro elemento adicionado à fila é o primeiro a ser removido.

Filas são amplamente utilizadas na programação para organizar tarefas de forma sequencial. Um exemplo moderno é um sistema de venda de ingressos de futebol, onde os pedidos são enfileirados e processados na ordem em que são recebidos, garantindo que cada cliente receba seu ingresso corretamente e na sequência certa.


## Criando uma Fila no Redis

No Redis, usamos listas para implementar filas. Vamos começar criando uma fila e adicionando algumas mensagens.

In [None]:
# Deletar a fila caso já exista, só pra não confundir a gente
r.delete("minhafila")

# Adicionar elementos à fila (enqueue)
r.lpush("minhafila", "Tarefa 1")
r.lpush("minhafila", "Tarefa 2")
r.lpush("minhafila", "Tarefa 3")
r.lpush("minhafila", "Tarefa 4")
r.lpush("minhafila", "Tarefa 5")

# Verificar os elementos na fila
print(r.lrange("minhafila", 0, -1))

Neste exemplo, usamos o comando LPUSH para adicionar elementos à fila chamada "minhafila". Verificamos os elementos na fila usando o comando LRANGE, que retorna todos os elementos da lista.

## Consumo de Mensagens/Elementos da Fila  (Dequeue)

Depois de adicionar mensagens à fila, precisamos consumir essas mensagens para processá-las.

O Redis oferece o comando `RPOP` para remover e retornar o último elemento da lista, seguindo a ordem **FIFO** (first-in-first-out)

In [None]:
# Remover e consumir o próximo elemento da fila (dequeue)
mensagem = r.rpop("minhafila")
print(f"Mensagem consumida: {mensagem}")

# Verificar os elementos restantes na fila
print(r.lrange("minhafila", 0, -1))

# execute algumas vezes, para vc entender como a fila está sendo consumida e exaurida ao mesmo tempo

### 🤓 Vamos adicionar um exemplo mais prático e interessante!

Imagine que estamos processando pedidos de ingressos para um jogo de futebol. Cada mensagem na fila representa um pedido de ingresso que precisa ser processado.

In [None]:
import time
# Populando nossa lista com alguns pedidos de tickets novos
r.lpush("tickets:futeborrrr:fila", "Pedido 1",  "Pedido 2",  "Pedido 3",  "Pedido 4",  "Pedido 5",  "Pedido 6")

##### INICIO DO ROBOZINHO SIMPLES QUE PROCESSA OS PEDIDOS

# Processar pedidos de ingressos
def processar_pedido(pedido):
    time.sleep(0.5)
    print(f"Processando pedido: {pedido}")

# Consumir e processar mensagens na fila
while True:
    pedido = r.rpop("tickets:futeborrrr:fila")
    if not pedido:
        print("Nenhum pedido pendente.")
        break
    processar_pedido(pedido)

# Verificar os elementos restantes na fila
print(r.lrange("tickets:futeborrrr:fila", 0, -1))


Neste código, definimos uma função processar_pedido que simula o processamento de um pedido. Em seguida, usamos um loop para consumir e processar todas as mensagens na fila "minhafila" até que ela esteja vazia.

Com isso, você aprendeu a consumir mensagens de uma fila no Redis, removendo e processando cada mensagem na ordem correta. Vamos continuar com os próximos passos, como visualizar (peek) e implementar filas confiáveis, em blocos subsequentes.

## Visualizando os elementos da fila (Peek)

A operação de "peek" permite visualizar os elementos na fila sem removê-los. Isso pode ser útil para verificar o próximo item a ser processado ou para inspecionar o estado atual da fila.



In [None]:
fila = "tickets:futeborrrr:filapeek"

# Popular uma fila pra testar o peek
r.lpush(fila, "Pedido 1",  "Pedido 2",  "Pedido 3",  "Pedido 4",  "Pedido 5",  "Pedido 6")

# Visualizar o próximo elemento da fila (peek)
proximo_elemento = r.lrange(fila, -1, -1)
print(f"Próximo elemento a ser processado: {proximo_elemento}")

# Visualizar os três últimos elementos da fila
tres_ultimos_elementos = r.lrange(fila, -3, -1)
print(f"Três últimos elementos na fila: {tres_ultimos_elementos}")

# Visualizar todos os elementos na fila
todos_elementos = r.lrange(fila, 0, -1)
print(f"Todos os elementos na fila: {todos_elementos}")

## Implementando Filas Confiáveis

Para garantir que mensagens não sejam perdidas se um consumidor falhar, podemos implementar filas confiáveis no Redis. Isso envolve mover as mensagens para uma fila temporária até que o processamento seja confirmado.

### Pegar uma mensagem/elemento da fila para processar, enquanto a move para uma fila temporária - como uma triagem

In [None]:
# Use este bloco para iniciar ou resetar as filas deste exercicio
r.delete("tickets:counterstrike2:fila_primaria")
r.delete("tickets:counterstrike2:fila_temporaria")
r.lpush("tickets:counterstrike2:fila_primaria", "Pessoa 1", "Pessoa 2", "Pessoa 3", "Pessoa 4", "Pessoa 5")

In [None]:
# Aqui já pegamos uma mensagem para processar, enquanto a movemos da fila principal para uma fila temporária
mensagem_a_ser_processada = r.rpoplpush("tickets:counterstrike2:fila_primaria", "tickets:counterstrike2:fila_temporaria")

print("Elemento da fila que será processado: {0}".format(mensagem_a_ser_processada))

In [None]:
# E o elemento fica na temporaria enquanto o servico esta fazendo o que precisa fazer
r.lrange("tickets:counterstrike2:fila_temporaria", 0, -1)

### Confirmar o Processamento de uma Mensagem
Caso esteja tudo certo, você vai e apaga a mensagem da fila temporária, finalizando a triagem.



In [None]:
# Após processar a mensagem, removê-la da fila temporária
r.lrem("tickets:counterstrike2:fila_temporaria", 1, mensagem_a_ser_processada)

# Parte Dois - Pub/Sub no Redis

## Introdução ao Pub/Sub no Redis

O padrão **Publish/Subscribe** (Pub/Sub) no Redis é uma poderosa ferramenta de mensageria que permite a comunicação eficiente entre serviços. Diferente de filas tradicionais, onde mensagens são enviadas diretamente de um produtor para um consumidor, o Pub/Sub permite que mensagens sejam publicadas em canais específicos e consumidas por múltiplos assinantes. Isso é especialmente útil em cenários onde a mesma mensagem precisa ser entregue a vários consumidores simultaneamente, como em sistemas de notificações em tempo real.

Neste tutorial, aprenderemos como configurar e usar o Pub/Sub no Redis. Começaremos com exemplos simples de publicação e consumo de mensagens, e avançaremos para um caso de uso mais complexo com múltiplos consumidores. Através de exercícios práticos, você entenderá como o Pub/Sub pode ser utilizado para criar sistemas de comunicação escaláveis e desacoplados.

## Configurando Pub/Sub no Redis

Vamos com um exemplo simples de Pub/Sub, onde um produtor publica mensagens em um canal e um consumidor se inscreve para receber essas mensagens.

🔊 Bem na pegada fire-and-forget: não estava lá pra ouvir... perdeu!\
*E isso é o ideal, ok? Depois vamos ver Streams, não se preocupem!*

### Consumidor Assinando um Canal no Redis

In [None]:
# Vamos usar threading apenas pq o consumer precisa ficar rodando sem bloquear o Colab, ok?

# Conectar-se ao Redis, só pra ter certeza que a connection com o db está ok
import redis
r = redis.Redis(
  host=REDIS_HOST,
  port=REDIS_PORT,
  password=REDIS_PASSWORD,
  decode_responses=True)

def consumidor():
    pubsub = r.pubsub()
    pubsub.subscribe('meucanal')
    for mensagem in pubsub.listen():
        if mensagem['type'] == 'message':
            print(f"Mensagem recebida: {mensagem['data']}")

# Iniciar o consumidor em uma thread separada para não bloquear o notebook
import threading
consumidor_thread = threading.Thread(target=consumidor)
consumidor_thread.start()

### Produtor enviando mensagens no canal de pub/sub recém-criado

In [None]:
# Publicar uma mensagem no canal
r.publish('meucanal', 'Tranquilidade, galera?')

# Note que o Threading do Python aqui vai jogar pro processo principal;
#   com isso, a mensagem vai sair aqui mesmo!
# Em produção, imagine que são dois sitemas completamente apartados.

### Trocando de Ferramenta - Vamos ver Pub/Sub no Redis Insight!

Como vocês notaram, o Threading do Python vai acabar jogando a mensagem na tela assim que ela for publicada pelo publisher.

**Porém, vamos ver isso no Redis Insight agora. Vai clarear bastante!**

1-) Abra o Redis Insight no seu PC e se conecte com o database que está usando aqui. Provavelmente um Redis Cloud Free mesmo.

2-) Feito isso, abra o menu de Pub/Sub, no canto esquerdo da GUI:
![](https://github.com/gacerioni/redis-workshop-mensageria-pubsub-streams/blob/master/static/pubsub_menu.png?raw=true)

3-) Clique em **Subscribe**, o botão azul do lado superior direito da GUI

4-) E agora publique algumas mensagens:
 - tanto por aqui no Colab (basta rodar o bloco acima algumas vezes, mudando a mensagem
 - quanto direto no Redis Insight, pra vocês testarem por lá também.


![](https://github.com/gacerioni/redis-workshop-mensageria-pubsub-streams/blob/master/static/pubsub_demo.png?raw=true)


# Parte Três - Finalmente, Stream!

## Introdução ao Redis Streams

**Redis Streams** são estruturas de dados poderosas para registrar e distribuir eventos em tempo real. Consumer Groups permitem que múltiplos consumidores processem mensagens de um stream de forma escalável, garantindo que cada mensagem seja processada por apenas um consumidor no grupo. Isso é essencial para aplicações que necessitam distribuir a carga de trabalho de forma eficiente entre vários trabalhadores.

Temos garantia de entrega, gestão de acks, persistência, controle de cardinalidade, e uma estrutura de dados que se assemelha a um Redis `HASH`.\
Ou seja, cada elemento pode ser uma entidade completamente diferente na Stream.

## Criando uma Stream e adicionando algumas mensagens

Neste passo, vamos criar um stream no Redis e adicionar algumas mensagens representando registros de SKU, cada um com campos diferentes para simular variações nos dados.

In [None]:
# Reinicializar a stream - caso queira ter uma exp atomica
# comente a linha abaixo para preservar dados existentes, e ir crescendo a stream
r.delete("mystream")

# Adicionar registros de SKU ao stream
r.xadd("mystream", {"sku": "SKU123", "name": "Produto A", "price": "10.99"})
r.xadd("mystream", {"sku": "SKU124", "name": "Produto B", "price": "15.49", "discount": "5%"})
r.xadd("mystream", {"sku": "SKU125", "name": "Produto C", "price": "7.99"})
r.xadd("mystream", {"sku": "SKU126", "name": "Produto D", "price": "20.00", "category": "Eletrônicos"})
r.xadd("mystream", {"sku": "SKU127", "name": "Produto E", "price": "13.75", "stock": "50"})

# Verificar as mensagens no stream
entradas = r.xrange("mystream", "-", "+")
print("Entradas no stream: ")
for entrada in entradas:
    print(entrada)


## Escrita de Mensagens - Gerenciamento de Tamanho/Memória com XADD e XTRIM

Neste passo, vamos explorar como gerenciar o tamanho da stream utilizando os comandos `XADD` com `MAXLEN`, e também `XTRIM` de forma isolada. Isso ajuda a manter o stream em um tamanho gerenciável, removendo entradas antigas automaticamente.

In [None]:
# Reinicializar a stream - caso queira ter uma experiência atômica
# Comente a linha abaixo para preservar dados existentes e ir crescendo a stream
r.delete("mystream")

# Adicionar registros de SKU ao stream com limite de tamanho
# maxlen=3, approximate=False garante que o stream não ultrapasse 3 entradas de forma exata
r.xadd("mystream", {"sku": "SKU123", "name": "Produto A", "price": "10.99"}, maxlen=3, approximate=False)
r.xadd("mystream", {"sku": "SKU124", "name": "Produto B", "price": "15.49", "discount": "5%"}, maxlen=3, approximate=False)
r.xadd("mystream", {"sku": "SKU125", "name": "Produto C", "price": "7.99"}, maxlen=3, approximate=False)
r.xadd("mystream", {"sku": "SKU126", "name": "Produto D", "price": "20.00", "category": "Eletrônicos"}, maxlen=3, approximate=False)
r.xadd("mystream", {"sku": "SKU127", "name": "Produto E", "price": "13.75", "stock": "50"}, maxlen=3, approximate=False)

# Limitar o tamanho do stream com XTRIM - opcional, pode ser usado em vez de XADD com maxlen
# r.xtrim("mystream", maxlen=3, approximate=False)

# Verificar as mensagens no stream
# XRANGE percorre todas as entradas do stream, do início ao fim
entradas = r.xrange("mystream", "-", "+")
print("Entradas no stream (com maxlen=3): ")
for entrada in entradas:
    print(entrada)


## Explorando Comandos Comuns de Redis Streams

Neste passo, vamos explorar alguns comandos comuns para trabalhar com streams no Redis: `XREAD`, `XRANGE` e `XREVRANGE`.

### XREAD: Ler Entradas de um Stream

O comando `XREAD` pode ser usado para ler entradas a partir de uma posição específica no stream. A posição é determinada pelo ID da mensagem. Usando '0' como o ID inicial, você lê a partir do início do stream.

⚠ *Note que o exercicio anterior deve ter matado o **Produto A** e **B**, por conta do `maxlen=3`. Então deve estar começando do `Produto C` aí!*

In [None]:
# Ler duas entradas de um stream, começando do início da stream, aguardando por 300ms SE a Stream estiver vazia
entradas = r.xread({'mystream': '0'}, count=2, block=300)
print("Entradas lidas com XREAD: ")
for entrada in entradas:
    print(entrada)

**Explicando o código acima:** O comando `XREAD` lê entradas de um stream, permitindo que consumidores recebam novas mensagens.

- **'0':** Especifica que queremos começar a leitura do início do stream.
- **count=2:** Lê até duas entradas.
- **block=300:** Aguarda por até 300ms se o stream estiver vazio.

### XRANGE: Consultar um Intervalo de Entradas

O comando XRANGE retorna um intervalo específico de entradas em um stream.

Utilizamos "-" e "+" para indicar o intervalo completo.
```
"-" e "+" indicam o intervalo completo, do início ao fim.
```


In [None]:
# Consultar todas as entradas do stream
entradas = r.xrange("mystream", "-", "+")
print("Entradas no stream com XRANGE: ")
for entrada in entradas:
    print(entrada)


### XREVRANGE: Consultar um Intervalo em Ordem Reversa

O comando XREVRANGE retorna um intervalo específico de entradas em ordem reversa.

Utilizamos "+" e "-" para indicar o intervalo completo em ordem inversa.

```
"+" e "-" indicam o intervalo completo, do fim ao início.
```

In [None]:
# Consultar todas as entradas do stream em ordem reversa
entradas = r.xrevrange("mystream", "+", "-")
print("Entradas no stream com XREVRANGE: ")
for entrada in entradas:
    print(entrada)


## Consumer Groups no Redis Streams - uma breve introdução

**Consumer Groups** permitem que múltiplos consumidores processem mensagens de um stream de forma escalável. Cada mensagem é processada por apenas um consumidor dentro do grupo, garantindo eficiência e evitando duplicação de processamento. Ajuda muito em estratégias como fan-out.

### Criar e Gerenciar Consumer Groups

Criar um Grupo de Consumidores:

O comando `XGROUP CREATE` abaixo cria um grupo de consumidores chamado `mygroup` no stream mystream.

- id="0" indica que o grupo começará a ler a partir da primeira mensagem no stream.
- mkstream=True cria o stream se ele não existir.

In [None]:
# Criar um grupo de consumidores no stream 'mystream'
r.xgroup_create("mystream", "mygroup", id="0", mkstream=True)
print("Grupo de consumidores 'mygroup' criado no stream 'mystream'.")


**Vou deixar vocês mesmo encontrarem o Consumer Group criado no Redis Insight!**\
Observe que criamos o consumer group indicando a stream em si.

![](https://github.com/gacerioni/redis-workshop-mensageria-pubsub-streams/blob/master/static/stream_consumer_group_vanilla.png?raw=true)



------

Porém, um mesmo grupo de consumidores pode pegar mensagens de várias streams ao mesmo tempo.

Olha só como funciona o método `XREADGROUP` da nossa lib do Python.\
Note como ela aceita um dict contendo as streams e o modo de acesso delas:

![](https://github.com/gacerioni/redis-workshop-mensageria-pubsub-streams/blob/master/static/stream_consumer_xreadgroup.png?raw=true)


Desta forma, poderíamos fazer algo como isso:
```python
    def run(self):
        while True:
            try:
                streams = {stream: '>' for stream in STREAM_NAMES.values()}
                results = self.client.xreadgroup(GROUP_NAME, self.consumer_name, streams, count=1, block=1000)
                if results:
                    for stream, messages in results:
                        for message_id, message in messages:
                            # Simulate blablabla
                            <...>
```


### Continuando com Consumer Groups - Um Consumidor Entra no Grupo!

O comando `XREADGROUP` permite que um consumidor leia mensagens de um grupo de consumidores. Já é o join do grupo em si, atômico.

- **"`mygroup`":** Nome do grupo de consumidores.
- **"`consumer1`":** Nome do consumidor.
- **"`mystream": ">"`:** Lê novas mensagens do stream a partir do final.

In [None]:
# Consumidor 'consumer1' lê mensagens do grupo 'mygroup' no stream 'mystream'
entradas = r.xreadgroup("mygroup", "consumer1", {"mystream": ">"}, count=2, block=3000)
print("Entradas lidas por consumer1: ")
for entrada in entradas:
    print(entrada)

In [None]:
# NOVO CONSUMIDOR CHEGANDO PRA AJUDAR!
# Consumidor 'consumer2' lê mensagens do grupo 'mygroup' no stream 'mystream'
entradas = r.xreadgroup("mygroup", "consumer2", {"mystream": ">"}, count=2, block=3000)
print("Entradas lidas por consumer2: ")
for entrada in entradas:
    print(entrada)


#### Explicação

- `XGROUP CREATE`: Cria um grupo de consumidores, permitindo que múltiplos consumidores leiam mensagens de um stream de forma coordenada.
- `XREADGROUP`: Utilizado por consumidores para ler mensagens de um grupo, garantindo que cada mensagem seja processada por apenas um consumidor dentro do grupo

### Ainda relacionado com Consumer Groups - Gerenciar Mensagens Pendentes com PEL (Pending Entry List)

O comando `XPENDING` exibe informações sobre mensagens pendentes em um grupo de consumidores, incluindo quem está processando cada mensagem e há quanto tempo.

In [None]:
# Verificar mensagens pendentes no grupo 'mygroup'
pendentes = r.xpending("mystream", "mygroup")
print("Mensagens pendentes: ", pendentes)


**Acho que o `Consumer 1` está se esquecendo de algo, certo?** 🧐🤔

O comando `XACK` permite que um consumidor reconheça a conclusão do processamento de uma mensagem, removendo-a da lista de pendentes.\
**É assim que a gente sabe que deu tudo certo com o processamendo da mensagem pelo Consumer.**

In [None]:
# Reconhecer uma mensagem processada pelo 'consumer1'
r.xack("mystream", "mygroup", "1526569495631-0")
print("Mensagem reconhecida (acknowledged).")

### Trampo de Supervisão - Trabalhando com Consumer Groups e PEL

Neste passo, vamos configurar um cenário completo com dois consumidores, onde um consumidor processa e reconhece uma mensagem, e verificamos a lista de mensagens pendentes (PEL). Em seguida, transferimos uma mensagem pendente de um consumidor para outro.

Daí já reforçamos o conceito de uma vez!

#### Criar um Grupo de Consumidores e Adicionar Mensagens - Do Zero!

In [None]:
# Reinicializar a stream - comente esta linha para preservar dados existentes
r.delete("mystream")

# Adicionar registros de SKU ao stream com limite de tamanho
r.xadd("mystream", {"sku": "SKU123", "name": "Produto A", "price": "10.99"}, maxlen=3, approximate=False)
r.xadd("mystream", {"sku": "SKU124", "name": "Produto B", "price": "15.49", "discount": "5%"}, maxlen=3, approximate=False)
r.xadd("mystream", {"sku": "SKU125", "name": "Produto C", "price": "7.99"}, maxlen=3, approximate=False)

# Criar um grupo de consumidores no stream 'mystream'
r.xgroup_create("mystream", "mygroup", id="0", mkstream=True)
print("Grupo de consumidores 'mygroup' criado no stream 'mystream'.")


#### Vida normal - mas com uma falha no processamento de um dos elementos

Vamos criar um cenário onde o `Consumer 1` lê duas (2) mensagens, processa apenas uma e envia o `ACK` para essa única mensagem.

Em seguida, usamos `XPENDING` e `XCLAIM` para transferir a mensagem pendente para o `Consumer 2`.

In [None]:
# Consumidor 'consumer1' lê duas mensagens do grupo 'mygroup' no stream 'mystream'
entradas = r.xreadgroup("mygroup", "consumer1", {"mystream": ">"}, count=2, block=3000)
print("Entradas lidas por consumer1: ")
for entrada in entradas:
    print(entrada)

# Consumidor 'consumer1' reconhece a primeira mensagem processada
msg_id_ack = entradas[0][1][0][0]
r.xack("mystream", "mygroup", msg_id_ack)
print(f"Mensagem {msg_id_ack} reconhecida (acknowledged) pelo consumer1.")

# Verificar mensagens pendentes antes da redistribuição
pendentes = r.xpending("mystream", "mygroup")
print("Mensagens pendentes antes da redistribuição: ", pendentes)

#### PEL - Supervisor verifica e redistribui a mensagem pendente para o Consumer 2

Vamos imaginar que já se passou o tempo máximo para processar aquela mensagem, uma regra nossa.

**Neste caso, o `Consumer 1` entrou em Stall**. *Pode ser que tenha algo errado com ele, ou algo quebrou ao processar a última mensagem.*\
Portanto, vamos mandar essa mensagem pendente pro Consumer 2, e ver se ele consegue lidar com ela.

**Explicação em detalhes do que vai acontecer no bloco a seguir:**


1. **O Supervisor Verifica Mensagens Pendentes:** Usamos `XPENDING` para verificar mensagens pendentes. O critério do que é uma mensagem stalled é livre. Um segundo pode ser demais em alguns casos de uso.
2. **Transferir Mensagem Pendente:** Com `XCLAIM`, transferimos a mensagem pendente do consumer1 para consumer2.
3. **Consumidor 2 Lê Mensagens Pendentes e Novas:** consumer2 usa `XREADGROUP` para ler as mensagens pendentes e novas, garantindo que todas as mensagens sejam processadas. Cada consumer é uma maquininha de processar. Ela vai pegar a mensagem stalled e o que tiver que ser pego na stream (vc configura os parâmetros disso tbm)
4. **Reconhecer Mensagens:** o consumer2 envia um `XACK` para reconhecer as mensagens processadas.
5. **Verificar Lista de Pendências (PEL):** Verificamos a **PEL** antes e depois do reconhecimento das mensagens. Vamos exaurir essa lista, já que a stream é pequena e não está crescendo agora.

In [None]:
# Supervisor verifica mensagens pendentes
pendentes = r.xpending("mystream", "mygroup")
print("Mensagens pendentes: ", pendentes)

# Verificar se há mensagens pendentes e redistribuir para 'consumer2'
if pendentes['pending'] > 0:
    # Obter a lista detalhada de mensagens pendentes
    pendentes_detalhes = r.xpending_range("mystream", "mygroup", "-", "+", 10)
    for mensagem in pendentes_detalhes:
        mensagem_id_pendente = mensagem['message_id']
        consumidor_original = mensagem['consumer']

        # Transferir a mensagem pendente para 'consumer2'
        r.xclaim("mystream", "mygroup", "consumer2", min_idle_time=0, message_ids=[mensagem_id_pendente])
        print(f"Mensagem {mensagem_id_pendente} transferida de '{consumidor_original}' para 'consumer2'.")

# Consumidor 2 lê as mensagens pendentes e novas do grupo
entradas_consumer2 = r.xreadgroup("mygroup", "consumer2", {"mystream": "0"}, count=2, block=3000)
print("")
print("############################")
print("Entradas lidas por consumer2 após a redistribuição: ")
for entrada in entradas_consumer2:
    print(entrada)

# Printa a pending List ANTES DO ACK
print("")
print("############################")
print("PEL ANTES DO ACK DA PRIMEIRA MENSAGEM DO CONSUMER 2 - MANTENDO A ORDEM, SERIA O PRODUTO B: ")
print(r.xpending("mystream", "mygroup"))

# Consumidor 2 manda o ack, tudo certo
for entrada in entradas_consumer2[0][1]:
    mensagem_id_pendente = entrada[0]
    print("")
    print("############################")
    print("Dando ack na mensagem ", mensagem_id_pendente)
    r.xack("mystream", "mygroup", mensagem_id_pendente)

# Printa a pending List
print("")
print("############################")
print("PEL DEPOIS DO ACK - FICANDO APENAS A ÚLTIMA MENSAGEM - PRODUTO C: ")
print(r.xpending("mystream", "mygroup"))


# Parabéns!!!

Neste workshop, exploramos como usar Redis Streams e grupos de consumidores para criar um sistema de mensagens robusto e escalável. Abordamos:

- **Filas:** Criando e gerenciando filas com Redis Streams.
- **Pub/Sub:** Publicação e assinatura de mensagens para comunicação em tempo real.
- **Streams:** Adicionando, lendo e gerenciando mensagens com XADD, XREAD, XRANGE, XREVRANGE.
- **Grupos de Consumidores:** Processamento coordenado de mensagens com XREADGROUP, XCLAIM e XPENDING.

Esperamos que este conteúdo tenha fornecido uma visão clara de como implementar sistemas de mensageria eficientes usando Redis. Se tiver dúvidas ou quiser explorar mais,