<a href="https://colab.research.google.com/github/flipfelly/Streaming-com-Kinesis/blob/main/Consumidor.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Notebook Consumidor

Aqui vamos criar um consumidor que ficará "escutando" os dados em streaming criados pelo **produtor**. A célula continuará rodando, recebendo dados, até que seja interrompida manualmente ou que o fluxo de dados seja excluído da AWS.

---

### **Pré-requisito:**
- É esperado que você já tenha lido o **notebook Produtor** e executado as etapas nele.

---

### **Como conseguir o ShardID:**

1. O **shardID** é impresso no **notebook Produtor**.
2. Para obtê-lo, execute a célula principal do notebook **Produtor**.
3. O **shardID** será impresso em um formato como este:
   - Exemplo de print do notebook Produtor:
     ```
     {'ShardId': 'shardId-000000000002', ...}
     ```
4. **Copie apenas** a parte destacada: `shardId-000000000002`.
5. **Importante**: Não utilize o shardID do exemplo, mas sim o **resultado da sua aplicação**.

---
### ⚠️ **Atenção**:
- As **credenciais de acesso**, a **StreamName** e a **`region_name`** estão explicadas no **notebook Produtor**. Certifique-se de seguir essas instruções para garantir o acesso correto ao Kinesis.


In [2]:
!pip install boto3

Collecting boto3
  Downloading boto3-1.36.1-py3-none-any.whl.metadata (6.6 kB)
Collecting botocore<1.37.0,>=1.36.1 (from boto3)
  Downloading botocore-1.36.1-py3-none-any.whl.metadata (5.7 kB)
Collecting jmespath<2.0.0,>=0.7.1 (from boto3)
  Downloading jmespath-1.0.1-py3-none-any.whl.metadata (7.6 kB)
Collecting s3transfer<0.12.0,>=0.11.0 (from boto3)
  Downloading s3transfer-0.11.1-py3-none-any.whl.metadata (1.7 kB)
Downloading boto3-1.36.1-py3-none-any.whl (139 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.2/139.2 kB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading botocore-1.36.1-py3-none-any.whl (13.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.3/13.3 MB[0m [31m62.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading jmespath-1.0.1-py3-none-any.whl (20 kB)
Downloading s3transfer-0.11.1-py3-none-any.whl (84 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m84.1/84.1 kB[0m [31m5.9 MB/s[0m eta [36m0:00:00


# Explicação do Código

---

### 1. **Obtendo o Shard Iterator**

O **`get_shard_iterator`** é utilizado para obter o iterator (ponteiro) que permite ler os dados de um **shard** específico dentro de um fluxo. Ele recebe:
- **StreamName**: Nome do fluxo Kinesis.
- **ShardId**: ID do shard do qual os dados serão lidos.
- **ShardIteratorType**: Define onde a leitura começará, neste caso, a partir do dado mais recente (**LATEST**).

O retorno é o **ShardIterator**, que é utilizado para acessar os registros subsequentes.

---

### 2. **Loop de Leitura dos Dados**

A leitura dos dados ocorre dentro de um **loop** contínuo, que verifica se há mais dados a serem lidos (enquanto o **shard** não for `None`). A cada iteração:
- **`get_records(ShardIterator=shard)`**: Obtém os registros a partir do **shardIterator** atual.
- **`resultado['Records']`**: Extrai os registros dos dados recebidos.
- **`NextShardIterator`**: Obtém o próximo **shardIterator** para continuar a leitura nos registros seguintes.

---

### 3. **Exibição dos Registros**

Cada registro recebido é printado dentro de um **loop**:
- **`SequenceNumber`**: Exibe o número sequencial do registro.
- **`ApproximateArrivalTimestamp`**: Exibe o timestamp aproximado de chegada do registro ao Kinesis.
- **`PartitionKey`**: Exibe a chave de partição associada ao registro, que pode ser usada para identificar a origem dos dados.
- **`Data`**: Exibe os dados reais enviados para o fluxo, geralmente em formato binário, que podem ser processados conforme necessário.

---
## ⚠️ **Aviso Importante:**

**Não se assuste** com a mensagem de erro registrada neste notebook. Ela é gerada porque precisamos **interromper a célula manualmente**.

A mensagem **KeyboardInterrupt** é apenas uma indicação de que o processo foi interrompido de forma controlada.

---


In [5]:
import boto3

cliente = boto3.client('kinesis', aws_access_key_id = 'id da chave de acesso', aws_secret_access_key = "chave de acesso", region_name='codigo da região')

shard = cliente.get_shard_iterator(StreamName='Nome do stream', ShardId='shardId-000000000002 - Colocar id gerado', ShardIteratorType='LATEST')['ShardIterator']

while shard is not None:
    resultado = cliente.get_records(ShardIterator=shard)
    registros = resultado['Records']
    shard = resultado["NextShardIterator"]

    for registro in registros:
      print(registro["SequenceNumber"])
      print(registro["ApproximateArrivalTimestamp"])
      print(registro["PartitionKey"])
      print(registro["Data"])

49659682377937857660418786146689697367636689047640866850
2025-01-17 13:23:48.798000+00:00
02
b'{"idvendedor": "999", "nome": "Nelsol"}'
49659682377937857660418786226868075576118126340734976034
2025-01-17 13:24:06.081000+00:00
02
b'{"idvendedor": "123", "nome": "Marina"}'
49659682377937857660418786304854671273638241077525741602
2025-01-17 13:24:23.760000+00:00
02
b'{"idvendedor": "231", "nome": "Pedro"}'


KeyboardInterrupt: 