<a href="https://colab.research.google.com/github/gacerioni/redis-workshop-series-ptbr-gabs/blob/main/redis_workshop_alem_do_cache_nov_2024.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Workshop - Redis muito além do Cache - 2024

![Redis](https://redis.io/wp-content/uploads/2024/04/Logotype.svg?auto=webp&quality=85,75&width=120)



Bem-vindo ao workshop hands-on "Redis Muito Além do Cache"! Aqui, você terá uma experiência prática com os principais data types do Redis, explorando além do uso básico como cache. Vamos focar em como o Redis pode ser usado como um banco de dados completo e versátil.


Para uma experiência premium, como a que eu quero que vocês tenham, recomendo MUITO utilizar o Redis Insight (App ou Web) pra apoiar na visualização dos dados.

https://redis.com/redis-enterprise/redis-insight/

---

## 🤯 Não tem um Redis ainda? Agora você vai ter! 🚀

O **Redis** é sempre o mesmo, independente da offering dele. Jamais iremos criar impedância ou complexidade entre as APIs, e isso inclui o Community Edition.

De qualquer forma, recomendo que vocês façam esse tutorial usando o [Redis Cloud](https://redis.io/try-free/). Free forever, sem pegadinha, e você já faz o onboarding na plataforma SaaS.



## Objetivos do Workshop

O objetivo deste notebook é apresentar as principais estruturas de dados do Redis e proporcionar uma prática interativa com cada uma delas. Nosso foco será na execução de comandos e na manipulação de dados em tempo real, com exemplos que você pode adaptar para suas próprias aplicações.

**Nota**: Já configuramos um ambiente de desenvolvimento no Google Colab para você começar imediatamente.

## Setup Inicial

Antes de começar a explorar, vamos configurar o ambiente e garantir que você esteja pronto para executar comandos Redis diretamente no Google Colab.

In [1]:
# Instala a biblioteca Redis para Python
!pip install -q redis

# Instala a CLI do Redis (redis-tools) para executar comandos diretamente
!apt-get update -qq
!apt-get install -y -qq redis-tools

# Configuração da conexão com o Redis Cloud (preencha com suas credenciais)
import os

REDIS_HOST = "redis-18443.c309.us-east-2-1.ec2.redns.redis-cloud.com"  # Exemplo: "redis-12345.c1.sa-east-1-3.ec2.cloud.redislabs.com"
REDIS_PORT = 18443             # Porta do seu Redis
REDIS_PASSWORD = "secret42"   # Senha do Redis

# Definindo a variável de ambiente para usar a redis-cli
os.environ["REDIS_CONN"] = f"-h {REDIS_HOST} -p {REDIS_PORT} -a {REDIS_PASSWORD} --no-auth-warning"

# Testando a conexão com o Redis Cloud
!redis-cli $REDIS_CONN PING

# Importando a biblioteca redis-py e testando a conexão
import redis

r = redis.Redis(
    host=REDIS_HOST,
    port=REDIS_PORT,
    password=REDIS_PASSWORD,
    decode_responses=True
)

# Testando a conexão com o Redis usando redis-py
if r.ping():
    print("Conexão com o Redis bem-sucedida!")
else:
    print("Erro ao conectar com o Redis.")

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/261.5 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m71.7/261.5 kB[0m [31m2.2 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m [32m256.0/261.5 kB[0m [31m4.0 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m261.5/261.5 kB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[?25hW: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
Selecting previously unselected package libjemalloc2:amd64.
(Reading database ... 123634 files and directories currently installed.)
Preparing to unpack .../0-libjemalloc2_5.2.1-4ubuntu1_amd64.deb ...
Unpacking libjemalloc2:amd64 (5.2.1-4ubuntu1) ...
Selecting previously unselected package liblua5.1-0:amd

Agora que você está com tudo configurado, vamos começar a explorar os data types básicos do Redis e realizar alguns exercícios interativos para entender como eles funcionam na prática.

# Data Types Básicos no Redis

**O Redis oferece uma variedade de estruturas de dados, cada uma otimizada para diferentes cenários de uso. Aqui, vamos explorar os principais data types com exemplos rápidos para entender suas funcionalidades.**

## 1. Strings
As **Strings** são o data type mais simples no Redis, armazenando qualquer sequência de bytes, como texto ou números. Elas são amplamente usadas para contadores, sessões de usuário, e caching de valores simples.

**Exemplo de Caso de Uso**: Cache Aside, contadores e KV-stores, Lock Distribuído, Rate Limiter, Session Storage, Feature Flags, Gaming, etc.

### Exemplo com Strings



In [2]:
# Armazenar um valor simples (pode ser uma configuração ou dado de cache)
r.set("config:modo", "produção")
print("Modo atual:", r.get("config:modo"))

# Atualizar o valor (simular uma mudança de configuração)
r.set("config:modo", "manutenção")
print("Modo atualizado:", r.get("config:modo"))

# Usar Strings como um contador para likes em um post
r.set("post:123:likes", 10)
print("Likes iniciais:", r.get("post:123:likes"))

# Incrementar e decrementar o contador de likes
r.incr("post:123:likes")
print("Likes após um incremento:", r.get("post:123:likes"))
r.decr("post:123:likes")
print("Likes após um decremento:", r.get("post:123:likes"))

# Simular uma expiração (TTL) para uma sessão de usuário
r.setex("sessao:usuario:456", 30, "ativo")
print("Sessão de usuário criada com TTL de 30 segundos.")

Modo atual: produção
Modo atualizado: manutenção
Likes iniciais: 10
Likes após um incremento: 11
Likes após um decremento: 10
Sessão de usuário criada com TTL de 30 segundos.


## 2. Lists

As **Lists** são coleções ordenadas de strings, úteis para filas de tarefas ou logs. Podem funcionar como uma fila (FIFO) ou uma pilha (LIFO). A API do Redis fornece muitos comandos seguros, atômicos e previsíveis para você implementar arquiteturas e filas robustas e prontas para PROD. Simples é bem diferente de frágil. 90% de todas as transações financeiras que você faz em um dia passaram por um Redis.

Caso de Uso: Implementar filas simples de mensagens, arquiteturas desacopladas, arquiteturas assíncronas, sistemas de vazão, integração com Lambdas, Triggers, Filas de Trabalho, etc.

### Exemplo com Lists


In [3]:
# Simular uma fila de tarefas com LPUSH e RPOP (FIFO)
r.lpush("fila:tarefas", "tarefa1", "tarefa2", "tarefa3")
print("Tarefas na fila (FIFO):", r.lrange("fila:tarefas", 0, -1))

# Remover e processar a tarefa mais antiga
tarefa = r.rpop("fila:tarefas")
print("Processando:", tarefa)
print("Tarefas restantes na fila:", r.lrange("fila:tarefas", 0, -1))

# Simular uma pilha de mensagens (LIFO)
r.rpush("pilha:mensagens", "mensagem1", "mensagem2", "mensagem3")
print("Mensagens na pilha (LIFO):", r.lrange("pilha:mensagens", 0, -1))

# Remover e processar a mensagem mais recente
mensagem = r.rpop("pilha:mensagens")
print("Processando mensagem:", mensagem)
print("Mensagens restantes na pilha:", r.lrange("pilha:mensagens", 0, -1))

Tarefas na fila (FIFO): ['tarefa3', 'tarefa2', 'tarefa1', 'tarefa3', 'tarefa2']
Processando: tarefa2
Tarefas restantes na fila: ['tarefa3', 'tarefa2', 'tarefa1', 'tarefa3']
Mensagens na pilha (LIFO): ['mensagem1', 'mensagem2', 'mensagem1', 'mensagem2', 'mensagem3']
Processando mensagem: mensagem3
Mensagens restantes na pilha: ['mensagem1', 'mensagem2', 'mensagem1', 'mensagem2']


## 3. Sets

Os **Sets** são coleções de strings únicas, ideais para armazenar elementos sem duplicatas. São úteis para deduplicação, gerenciamento de IDs únicos, ou até para operações de conjuntos, como interseção e união.

Caso de Uso: Listar usuários únicos online em uma aplicação, deduplicar itens, aplicar teoria dos conjuntos, ou até criar índices secundários personalizados.

### Exemplo com Sets

In [4]:
# Adicionar usuários únicos online
r.sadd("usuarios:online", "alice", "bob", "carol")
print("Usuários online:", r.smembers("usuarios:online"))

# Adicionar um usuário duplicado (não será adicionado novamente)
r.sadd("usuarios:online", "alice")
print("Usuários online (após tentativa de duplicação):", r.smembers("usuarios:online"))

# Remover um usuário que saiu
r.srem("usuarios:online", "bob")
print("Usuários online (após saída de Bob):", r.smembers("usuarios:online"))

# Operações de conjuntos: Simular interseção com outro set de usuários
r.sadd("usuarios:ativos", "alice", "dave", "carol")
interseccao = r.sinter("usuarios:online", "usuarios:ativos")
print("Usuários online e ativos:", interseccao)

Usuários online: {'alice', 'carol', 'bob'}
Usuários online (após tentativa de duplicação): {'alice', 'carol', 'bob'}
Usuários online (após saída de Bob): {'alice', 'carol'}
Usuários online e ativos: {'alice', 'carol'}


## 4. Sorted Sets

Os **Sorted Sets** são como sets, mas com uma pontuação associada a cada elemento, o que permite manter os elementos ordenados. Eles são ideais para rankings, agendamentos, e qualquer caso em que a ordem baseada em pontuação seja importante.

Caso de Uso: Classificação de jogadores em um jogo, leaderboards em geral, análises em tempo real, agendamento de tarefas, ou filas de prioridade.

### Exemplo com Sorted Sets



In [5]:
# Adicionar jogadores com suas pontuações
r.zadd("jogo:ranking", {"player1": 1500, "player2": 3000, "player3": 2500})
print("Ranking dos jogadores (do menor para o maior):", r.zrange("jogo:ranking", 0, -1, withscores=True))

# Consultar o ranking em ordem decrescente (do maior para o menor)
print("Ranking dos jogadores (do maior para o menor):", r.zrevrange("jogo:ranking", 0, -1, withscores=True))

# Atualizar a pontuação de um jogador
r.zincrby("jogo:ranking", 500, "player1")
print("Ranking atualizado (do maior para o menor):", r.zrevrange("jogo:ranking", 0, -1, withscores=True))

# Consultar a posição de um jogador
posicao = r.zrevrank("jogo:ranking", "player1")
print("Posição do player1 no ranking:", posicao + 1)  # +1 para uma posição humana (começando do 1)

# Remover um jogador do ranking
r.zrem("jogo:ranking", "player2")
print("Ranking após remoção de player2:", r.zrevrange("jogo:ranking", 0, -1, withscores=True))

Ranking dos jogadores (do menor para o maior): [('player1', 1500.0), ('player3', 2500.0), ('player2', 3000.0)]
Ranking dos jogadores (do maior para o menor): [('player2', 3000.0), ('player3', 2500.0), ('player1', 1500.0)]
Ranking atualizado (do maior para o menor): [('player2', 3000.0), ('player3', 2500.0), ('player1', 2000.0)]
Posição do player1 no ranking: 3
Ranking após remoção de player2: [('player3', 2500.0), ('player1', 2000.0)]


## 5. Hashes

Os **Hashes** são coleções de pares campo-valor, úteis para armazenar objetos, como perfis de usuários ou configurações. Você também pode setar TTLs para cada sub-key, abrindo ainda mais possibilidades e controle.

Caso de Uso: Armazenar informações de usuário de maneira estruturada, banco de dados primário, NoSQL, ORM, Session Storage, AIML Feature Stores, etc.

### Exemplo com Hashes

In [6]:
# Criar um hash com informações de usuário
r.hset("usuario:1001", mapping={"nome": "Gabriel", "email": "gabriel@example.com", "idade": "32"})
print("Perfil do usuário:", r.hgetall("usuario:1001"))

# Recuperar apenas um campo específico (email)
email = r.hget("usuario:1001", "email")
print("Email do usuário:", email)

# Atualizar a idade do usuário
r.hset("usuario:1001", "idade", "33")
print("Perfil atualizado:", r.hgetall("usuario:1001"))

# Remover um campo do hash (por exemplo, o email)
r.hdel("usuario:1001", "email")
print("Perfil após remoção do email:", r.hgetall("usuario:1001"))

# Definir um TTL para a chave do usuário (expira após 60 segundos)
r.expire("usuario:1001", 60)
print("TTL definido para a chave 'usuario:1001' (60 segundos)")

Perfil do usuário: {'nome': 'Gabriel', 'email': 'gabriel@example.com', 'idade': '32'}
Email do usuário: gabriel@example.com
Perfil atualizado: {'nome': 'Gabriel', 'email': 'gabriel@example.com', 'idade': '33'}
Perfil após remoção do email: {'nome': 'Gabriel', 'idade': '33'}
TTL definido para a chave 'usuario:1001' (60 segundos)


### Exercício com Hashes - HASH Sub-Key Expiration

Com o Redis 7.4, é possível aplicar expirações (TTL) a campos individuais, o que abre novas possibilidades para gerenciar dados temporários.

In [7]:
# Criar um hash simulando dados de sessão do usuário
r.hset("sessao:usuario:1337", mapping={
    "nome": "Gabriel",
    "sobrenome": "Cerioni",
    "token": "super-token-secreto<....>",            # Token que expira
    "ultimo_login": "2024-11-04",
    "pais_origem": "BRA",
    "geoloc_estimado": "-46.63389,-23.55028",        # Praça da Sé, SP
    "user_tier_level": 3                             # Imagine um sistema de Tier de 0-10 (noob to VIP)
})
print("Dados da sessão:", r.hgetall("sessao:usuario:1337"))

# Aplicar um TTL de 30 segundos apenas ao campo 'token' usando HEXPIRE
r.execute_command("HEXPIRE", "sessao:usuario:1337", 30, "FIELDS", 1, "token")
print("TTL de 30 segundos definido para o campo 'token'.")

# Verificar os dados da sessão (o token ainda está presente)
print("Dados da sessão (após definir TTL):")
for campo, valor in r.hgetall("sessao:usuario:1337").items():
    print(f"{campo}: {valor}")

# Após 30 segundos, o campo 'token' será automaticamente removido. Aguarde para ver o efeito.

# OPCIONAL - Remover o TTL do campo 'token', tornando-o persistente novamente
#r.execute_command("HPERSIST", "sessao:usuario:1337", "FIELDS", 1, "token")
#print("O campo 'token' agora é persistente novamente:", r.hgetall("sessao:usuario:1337"))

Dados da sessão: {'nome': 'Gabriel', 'sobrenome': 'Cerioni', 'ultimo_login': '2024-11-04', 'pais_origem': 'BRA', 'geoloc_estimado': '-46.63389,-23.55028', 'user_tier_level': '3', 'token': 'super-token-secreto<....>'}
TTL de 30 segundos definido para o campo 'token'.
Dados da sessão (após definir TTL):
token: super-token-secreto<....>
nome: Gabriel
sobrenome: Cerioni
ultimo_login: 2024-11-04
pais_origem: BRA
geoloc_estimado: -46.63389,-23.55028
user_tier_level: 3


### Teaser de Indexação com RediSearch
#### *Calma, você ainda vai aprender a usar o `Redis Search and Query Engine`!*

Agora que você aprendeu a manipular dados estruturados com Hashes, imagine o poder de realizar buscas otimizadas e complexas. No **Redis**, com **RediSearch**, é possível indexar campos de forma poderosa e eficiente, como neste exemplo rápido de indexação e busca que veremos mais a fundo na próxima sessão:





In [8]:
from redis.commands.search.field import TagField, NumericField, GeoField
from redis.commands.search.indexDefinition import IndexDefinition, IndexType
from redis.commands.search.query import Query

# 1. Deletar o índice se ele já existir, para garantir que começamos do zero
try:
    r.ft("idx:sessao_usuarios").dropindex(delete_documents=False)
except:
    pass

# 2. Criar o índice para o hash 'sessao:usuario:'
r.ft("idx:sessao_usuarios").create_index(
    [
        TagField("pais_origem"),
        NumericField("user_tier_level"),
        GeoField("geoloc_estimado")
    ],
    definition=IndexDefinition(prefix=["sessao:usuario:"], index_type=IndexType.HASH)
)

# 3. Indexar um hash de exemplo com dados de sessão do usuário
r.hset("sessao:usuario:1337", mapping={
    "nome": "Gabriel",
    "sobrenome": "Cerioni",
    "token": "super-token-secreto<....>",
    "ultimo_login": "2024-11-04",
    "pais_origem": "BRA",
    "geoloc_estimado": "-46.63389,-23.55028",  # Praça da Sé, SP
    "user_tier_level": 3
})
print("✅ Dados de sessão indexados com sucesso!")

# 4. Executar uma busca única: usuários do Brasil, nível de tier entre 2 e 5, próximos ao centro de SP
query = Query("@pais_origem:{BRA} @user_tier_level:[2 5] @geoloc_estimado:[-46.63389 -23.55028 5 km]")

# 5. Executar a busca e exibir os resultados
res = r.ft("idx:sessao_usuarios").search(query)
print("🔍 Resultado da busca com RediSearch:")
for doc in res.docs:
    print(doc.__dict__)

✅ Dados de sessão indexados com sucesso!
🔍 Resultado da busca com RediSearch:
{'id': 'sessao:usuario:1337', 'payload': None, 'nome': 'Gabriel', 'sobrenome': 'Cerioni', 'ultimo_login': '2024-11-04', 'pais_origem': 'BRA', 'geoloc_estimado': '-46.63389,-23.55028', 'user_tier_level': '3', 'token': 'super-token-secreto<....>'}


# Data Types Avançados no Redis

Agora que você já conhece os tipos de dados básicos no Redis, chegou a hora de explorar algumas das funcionalidades mais poderosas e avançadas. Esses data types são perfeitos para cenários que exigem alta performance, escalabilidade e funcionalidades que vão muito além de simples caches.

Prepare-se para ver o Redis como um verdadeiro *Canivete Suíço* de soluções de dados, desde mensageria em tempo real até armazenamento eficiente de documentos JSON, e tudo isso com exemplos práticos que você poderá adaptar para seus projetos.

## RedisJSON - Redis como um NoSQL Primário

O **RedisJSON** é um módulo que permite armazenar, consultar e modificar documentos JSON diretamente no Redis. Ele é especialmente útil para quem precisa trabalhar com dados hierárquicos e quer a flexibilidade de documentos JSON, mas com a performance do Redis.

Ele é um bom candidato a ser o seu NoSQL Primário. Integra-se nativamente com ferramentais de CDC e ORM, onde sua aplicação considera o Redis como um DAO Repository ou algo similar, assim como qualquer outro banco de dados NoSQL.

### Caso de Uso: Catálogo de Produtos

Imagine que você esteja criando um catálogo de produtos para um e-commerce. Você pode armazenar informações complexas como detalhes do produto, preço, inventário e reviews em formato JSON. Isso permite consultas eficientes e a capacidade de atualizar dados específicos de maneira atômica.

In [9]:
# Importando a biblioteca para trabalhar com JSON no Redis
import redis
from redis.commands.json.path import Path

# Conectando ao Redis (ajuste as credenciais conforme necessário)
r = redis.Redis(
    host=REDIS_HOST,
    port=REDIS_PORT,
    password=REDIS_PASSWORD,
    decode_responses=True
)

# 1. Criar um documento JSON para um produto no catálogo
produto = {
    "id": "123",
    "nome": "Fone de Ouvido Bluetooth",
    "marca": "MarcaX",
    "preco": 299.99,
    "estoque": 50,
    "reviews": [
        {"usuario": "Alice", "comentario": "Ótimo produto!", "nota": 5},
        {"usuario": "Bob", "comentario": "Bom custo-benefício.", "nota": 4}
    ]
}

# 2. Armazenar o documento JSON no Redis
r.json().set("produto:123", Path.root_path(), produto)
print("✅ Produto armazenado com sucesso!")

# 3. Consultar o nome e o preço do produto
nome = r.json().get("produto:123", Path(".nome"))
preco = r.json().get("produto:123", Path(".preco"))
print(f"Nome: {nome}, Preço: R${preco}")

# 4. Atualizar o preço do produto
r.json().set("produto:123", Path(".preco"), 279.99)
print("💰 Preço atualizado para R$279.99!")

# 5. Adicionar um novo review ao produto
novo_review = {"usuario": "Carlos", "comentario": "Funciona muito bem!", "nota": 5}
r.json().arrappend("produto:123", Path(".reviews"), novo_review)
print("📝 Novo review adicionado!")

# 6. Consultar o documento atualizado
produto_atualizado = r.json().get("produto:123")
print("🔍 Produto atualizado:", produto_atualizado)

✅ Produto armazenado com sucesso!
Nome: Fone de Ouvido Bluetooth, Preço: R$299.99
💰 Preço atualizado para R$279.99!
📝 Novo review adicionado!
🔍 Produto atualizado: {'id': '123', 'nome': 'Fone de Ouvido Bluetooth', 'marca': 'MarcaX', 'preco': 279.99, 'estoque': 50, 'reviews': [{'usuario': 'Alice', 'comentario': 'Ótimo produto!', 'nota': 5}, {'usuario': 'Bob', 'comentario': 'Bom custo-benefício.', 'nota': 4}, {'usuario': 'Carlos', 'comentario': 'Funciona muito bem!', 'nota': 5}]}


## Redis Pub/Sub - Mensageria em Tempo Real

O **Pub/Sub** do Redis é uma ferramenta poderosa para sistemas de mensageria. Ele permite que aplicações publiquem mensagens em canais específicos e que outros sistemas ou serviços se inscrevam (subscribe) para receber essas mensagens em tempo real. Isso é muito útil para criar sistemas distribuídos, notificações em tempo real, ou arquiteturas de eventos.

### Caso de Uso: Notificações em Tempo Real

Imagine que você tem um sistema de monitoramento em uma aplicação de e-commerce, onde precisa alertar os usuários sobre mudanças de preço, eventos ou novas promoções. O **Redis Pub/Sub** facilita a comunicação em tempo real entre diferentes componentes do sistema.

### Exemplo com Pub/Sub

No exemplo abaixo, criaremos um simples sistema de publicação e assinatura:

In [10]:
import threading
import time

# Função para o assinante
def subscriber():
    pubsub = r.pubsub()
    pubsub.subscribe("notificacoes")

    print("📡 Assinante aguardando mensagens no canal 'notificacoes'...")

    for mensagem in pubsub.listen():
        if mensagem["type"] == "message":
            print(f"🔔 Nova mensagem recebida: {mensagem['data']}")

# Iniciar o assinante em uma thread separada
assinante_thread = threading.Thread(target=subscriber)
assinante_thread.start()

# Simular o publicador que envia mensagens
time.sleep(2)  # Simula um atraso antes de enviar a mensagem
r.publish("notificacoes", "Promoção: 50% de desconto em todos os produtos!")
time.sleep(1)
r.publish("notificacoes", "Novidade: Lançamos uma nova coleção de verão!")

📡 Assinante aguardando mensagens no canal 'notificacoes'...
🔔 Nova mensagem recebida: Promoção: 50% de desconto em todos os produtos!


1

🔔 Nova mensagem recebida: Novidade: Lançamos uma nova coleção de verão!


## Redis Streams - Comunicação Eficiente e Processamento de Dados

O **Redis Stream** é uma estrutura de dados poderosa que permite capturar e distribuir eventos em tempo real. Ele é útil em casos como processamento de logs, filas de eventos distribuídas, ou qualquer aplicação que precise de alta disponibilidade e escalabilidade.

### Caso de Uso: Sistema de Processamento de Pedidos

Imagine um e-commerce processando pedidos em tempo real. Cada pedido pode ser registrado como uma mensagem na Stream, e grupos de consumidores podem processar esses pedidos em paralelo, distribuindo a carga de trabalho de maneira eficiente.

### Exemplo com Redis Streams

#### 1. Adicionando Mensagens à Stream

Vamos criar uma Stream e adicionar mensagens que representam pedidos de um sistema de e-commerce.


In [11]:
# 1. Adicionando Mensagens à Stream
print("### Adicionando Mensagens à Stream ###")
r.delete("pedidos_stream")  # Deletar a Stream se já existir

# Adicionar mensagens
r.xadd("pedidos_stream", {"pedido_id": "001", "cliente": "Gabs", "valor": "100.00"})
r.xadd("pedidos_stream", {"pedido_id": "002", "cliente": "Magro", "valor": "250.50"})
r.xadd("pedidos_stream", {"pedido_id": "003", "cliente": "Bart", "valor": "89.99"})

# Verificar mensagens na Stream
entradas = r.xrange("pedidos_stream", "-", "+")
print("Entradas na Stream de Pedidos:")
for entrada in entradas:
    print(f"ID: {entrada[0]}, Dados: {entrada[1]}")


### Adicionando Mensagens à Stream ###
Entradas na Stream de Pedidos:
ID: 1735582513936-0, Dados: {'pedido_id': '001', 'cliente': 'Gabs', 'valor': '100.00'}
ID: 1735582513979-0, Dados: {'pedido_id': '002', 'cliente': 'Magro', 'valor': '250.50'}
ID: 1735582514023-0, Dados: {'pedido_id': '003', 'cliente': 'Bart', 'valor': '89.99'}


#### 2. Criando um Grupo de Consumidores

Vamos criar um grupo de consumidores chamado grupo_pedidos para processar as mensagens.

In [12]:
# 2. Criando um Grupo de Consumidores
print("\n### Criando um Grupo de Consumidores ###")
try:
    r.xgroup_create("pedidos_stream", "grupo_pedidos", id="0", mkstream=True)
    print("Grupo de consumidores 'grupo_pedidos' criado com sucesso!")
except redis.exceptions.ResponseError:
    print("Grupo de consumidores já existe, prosseguindo...")


### Criando um Grupo de Consumidores ###
Grupo de consumidores 'grupo_pedidos' criado com sucesso!


#### 3. Consumindo Mensagens com um Consumidor

Vamos usar o `XREADGROUP` para consumir mensagens no grupo de consumidores.

In [13]:
# 3. Consumindo Mensagens com um Consumidor
print("\n### Consumindo Mensagens com consumer1 ###")
entradas = r.xreadgroup("grupo_pedidos", "consumer1", {"pedidos_stream": ">"}, count=2, block=5000)

if entradas:
    print("Mensagens lidas pelo consumer1:")
    for stream, mensagens in entradas:
        for msg_id, dados in mensagens:
            print(f"ID: {msg_id}, Dados: {dados}")
            # Confirmar (acknowledge) o processamento da mensagem
            r.xack("pedidos_stream", "grupo_pedidos", msg_id)
            print(f"Mensagem {msg_id} reconhecida (acknowledged) pelo consumer1.")
else:
    print("Nenhuma mensagem disponível para consumer1.")


### Consumindo Mensagens com consumer1 ###
Mensagens lidas pelo consumer1:
ID: 1735582513936-0, Dados: {'pedido_id': '001', 'cliente': 'Gabs', 'valor': '100.00'}
Mensagem 1735582513936-0 reconhecida (acknowledged) pelo consumer1.
ID: 1735582513979-0, Dados: {'pedido_id': '002', 'cliente': 'Magro', 'valor': '250.50'}
Mensagem 1735582513979-0 reconhecida (acknowledged) pelo consumer1.


#### 4. Gerenciamento de Mensagens Pendentes

Podemos verificar mensagens pendentes que ainda não foram processadas e transferi-las para outro consumidor, se necessário.


In [14]:
# 4. Gerenciamento de Mensagens Pendentes
print("\n### Gerenciamento de Mensagens Pendentes ###")
pendentes = r.xpending("pedidos_stream", "grupo_pedidos")
print("Mensagens pendentes no grupo 'grupo_pedidos':", pendentes)


### Gerenciamento de Mensagens Pendentes ###
Mensagens pendentes no grupo 'grupo_pedidos': {'pending': 0, 'min': None, 'max': None, 'consumers': []}


## RedisTimeSeries - Gerenciando Séries Temporais de Forma Eficiente

O **RedisTimeSeries** é um módulo que permite coletar, armazenar, processar e consultar grandes volumes de dados de séries temporais de forma eficiente. Ele é ideal para aplicações de monitoramento, métricas de IoT, ou sistemas que precisam de dados históricos com agregações em tempo real.

### Caso de Uso: Monitoramento de Métricas de Aplicações (Estilo Prometheus)

Imagine que você queira monitorar o uso de CPU e memória de um cluster de servidores em tempo real. **RedisTimeSeries** é uma escolha ideal, pois permite armazenar essas métricas e fazer consultas rápidas para análise de tendências.

### Exemplo com RedisTimeSeries

#### 1. Criando Séries Temporais com Retenção de Dados

Vamos começar criando séries temporais para monitorar métricas de CPU e memória, com um tempo de retenção de dados de 30 dias.

In [15]:
# Função para criar uma série temporal de forma segura
def create_timeseries_if_not_exists(key, retention, labels):
    try:
        r.execute_command("TS.CREATE", key, "RETENTION", retention, "LABELS", *labels)
        print(f"Série temporal '{key}' criada com sucesso!")
    except redis.exceptions.ResponseError as e:
        if "already exists" in str(e):
            print(f"Série temporal '{key}' já existe, prosseguindo...")
        else:
            raise e

# Criar séries temporais para CPU e memória com 30 dias de retenção (2.592.000.000 ms)
create_timeseries_if_not_exists("metric:cpu_usage", 2592000000, ["type", "cpu", "region", "east"])
create_timeseries_if_not_exists("metric:memory_usage", 2592000000, ["type", "memory", "region", "east"])

Série temporal 'metric:cpu_usage' já existe, prosseguindo...
Série temporal 'metric:memory_usage' já existe, prosseguindo...


#### 2. Adicionando Dados em Tempo Real

Podemos adicionar dados de uso de CPU e memória com a marcação de tempo atual.

In [16]:
# Adicionando dados com o timestamp atual
r.execute_command("TS.ADD", "metric:cpu_usage", "*", 75.5)  # 75.5% de uso de CPU
r.execute_command("TS.ADD", "metric:memory_usage", "*", 512)  # 512 MB de uso de memória

print("Dados adicionados com sucesso às séries temporais!")

Dados adicionados com sucesso às séries temporais!


#### 3. Consulta Simples: Recuperar Dados em Intervalo de Tempo

Vamos consultar o uso de CPU no último minuto.


In [17]:
import time

# Recuperar o timestamp atual
current_time = int(time.time() * 1000)  # em milissegundos

# Consultar dados de uso de CPU nos últimos 60 segundos (60.000 ms)
cpu_usage = r.execute_command("TS.RANGE", "metric:cpu_usage", current_time - 60000, current_time)
print("Uso de CPU nos últimos 60 segundos:")
for data_point in cpu_usage:
    print(f"Timestamp: {data_point[0]}, Valor: {data_point[1]}")

Uso de CPU nos últimos 60 segundos:
Timestamp: 1735582514528, Valor: 75.5


#### 4. Agregações: Média de Uso por Minuto

Para ver a média de uso de CPU a cada minuto, usamos agregações.

In [18]:
# Agregar dados de uso de CPU a cada minuto (60.000 ms) com a média
cpu_avg = r.execute_command("TS.RANGE", "metric:cpu_usage", "-", "+", "AGGREGATION", "avg", 60000)
print("Média de uso de CPU por minuto:")
for data_point in cpu_avg:
    print(f"Timestamp: {data_point[0]}, Valor: {data_point[1]}")

Média de uso de CPU por minuto:
Timestamp: 1735581660000, Valor: 67.75
Timestamp: 1735581720000, Valor: 75
Timestamp: 1735582500000, Valor: 75.5


#### 5. Compactação de Dados

Podemos criar regras para compactar dados automaticamente, reduzindo o tamanho da série temporal.



In [19]:
# Garantir que a série compactada não exista antes de criar
try:
    r.execute_command("TS.CREATE", "metric:cpu_usage:avg_minute", "RETENTION", 0)  # Sem retenção para a série compactada
    print("Série compactada 'metric:cpu_usage:avg_minute' criada com sucesso.")
except redis.exceptions.ResponseError:
    print("A série 'metric:cpu_usage:avg_minute' já existe, prosseguindo...")

# Criar a regra de compactação de forma segura
try:
    r.execute_command("TS.CREATERULE", "metric:cpu_usage", "metric:cpu_usage:avg_minute", "AGGREGATION", "avg", 60000)
    print("Regra de compactação criada: uso de CPU será agregado a cada minuto.")
except redis.exceptions.ResponseError:
    print("A regra de compactação já existe, prosseguindo...")

# Adicionar alguns dados para verificar a compactação
current_time = int(time.time() * 1000)  # Timestamp em milissegundos
r.execute_command("TS.ADD", "metric:cpu_usage", current_time, 60.0)
r.execute_command("TS.ADD", "metric:cpu_usage", current_time + 30000, 70.0)  # 30 segundos depois
r.execute_command("TS.ADD", "metric:cpu_usage", current_time + 60000, 80.0)  # 60 segundos depois

# Consultar os dados compactados imediatamente
cpu_avg_compacted = r.execute_command("TS.RANGE", "metric:cpu_usage:avg_minute", "-", "+")
print("Dados compactados de uso de CPU (média por minuto):")
for data_point in cpu_avg_compacted:
    print(f"Timestamp: {data_point[0]}, Valor: {data_point[1]}")

A série 'metric:cpu_usage:avg_minute' já existe, prosseguindo...
A regra de compactação já existe, prosseguindo...
Dados compactados de uso de CPU (média por minuto):
Timestamp: 1735581660000, Valor: 60
Timestamp: 1735581720000, Valor: 75
Timestamp: 1735582500000, Valor: 68.5


## Estruturas de Dados Probabilísticas com Redis

O Redis oferece suporte a estruturas de dados probabilísticas que economizam espaço e são extremamente rápidas para operações específicas. Vamos explorar dois exemplos simples: **HyperLogLog** e **Bloom Filter**.

### 1. HyperLogLog

O **HyperLogLog** é uma estrutura de dados probabilística que estima a cardinalidade (número de elementos únicos) de um conjunto. Em vez de armazenar todos os elementos, ele fornece uma estimativa com um erro padrão de `0,81%`, usando apenas até `12 KB` de memória.

**Casos de Uso:** Contar visitantes únicos de um site, motores de fraude, consultas de pesquisa únicas ou qualquer aplicação que necessite de contagem de elementos únicos de forma eficiente, com altíssima performance e baixo custo.

#### Exemplo Simples com HyperLogLog



In [20]:
# Adicionar elementos a um HyperLogLog
r.pfadd("visitors", "Alice", "Bob", "Charlie", "Alice")
print(r.pfcount("visitors"))  # Saída: 3 (estimativa de visitantes únicos)

# Adicionar mais visitantes
r.pfadd("visitors", "Dave", "Eve")
print(r.pfcount("visitors"))  # Saída: 5 (estimativa atualizada)

# Unir HyperLogLogs (exemplo de caso de uso com múltiplas origens de visitantes)
r.pfadd("new_visitors", "Frank", "Grace")
r.pfmerge("all_visitors", "visitors", "new_visitors")
print(r.pfcount("all_visitors"))  # Saída: estimativa total

5
5
7


### 2. Bloom Filter

O **Bloom Filter** é uma estrutura de dados probabilística que verifica se um elemento pertence a um conjunto, mas com possibilidade de falsos positivos (afirmação errada de presença) e garantia de ausência correta. É ideal para casos onde queremos evitar operações custosas ao verificar a presença de um elemento.

**Casos de uso:** Detecção de fraudes financeiras, verificação rápida de nomes de usuários, evitar a exibição repetida de anúncios ou evitar a duplicação de nomes de modelos de produtos.

#### Exemplo Simples com Bloom Filter



In [21]:
# Tentar criar o Bloom Filter somente se ele não existir
try:
    r.bf().reserve("product_filter", 0.001, 1000)
    print("Bloom Filter 'product_filter' criado com sucesso!")
except redis.exceptions.ResponseError as e:
    if "item exists" in str(e):
        print("Bloom Filter 'product_filter' já existe, prosseguindo...")
    else:
        raise e

# Adicionar produtos ao filtro
r.bf().add("product_filter", "Bike Model X")
r.bf().add("product_filter", "Bike Model Y")

# Verificar se os produtos estão no filtro
print(r.bf().exists("product_filter", "Bike Model X"))  # Saída: True
print(r.bf().exists("product_filter", "Bike Model Z"))  # Saída: False

# Adicionar múltiplos produtos e verificar a existência
r.bf().madd("product_filter", "Bike Model Z", "Bike Model W")
print(r.bf().mexists("product_filter", "Bike Model Z", "Bike Model W"))  # Saída: [True, True]

Bloom Filter 'product_filter' já existe, prosseguindo...
1
1
[1, 1]
