# Com o serviço da Amazon Kinesis funcionando, podemos nos conectar

In [None]:
# instalar as bibliotecas necessárias

!pip install boto3
!pip install requests


## Vamos testar a conexão com o Stream

In [4]:
import boto3

def check_kinesis_connection(stream_name, endpoint_url):
    """
    Verifica a conexão com o Kinesis no LocalStack.

    :param stream_name: Nome da stream que você deseja verificar.
    :param endpoint_url: URL do endpoint do LocalStack para Kinesis.
    :return: Retorna True se a stream existir, caso contrário, retorna False.
    """
    try:
        # Configurando o cliente do Kinesis para o LocalStack
        kinesis_client = boto3.client(
            'kinesis',
            region_name='us-east-1',
            endpoint_url=endpoint_url,
            aws_access_key_id='dummy_access_key',
            aws_secret_access_key='dummy_secret_key'
        )


        # Tentando obter a descrição da stream
        response = kinesis_client.describe_stream(StreamName=stream_name)

        # Se a resposta contiver informações da stream, então a conexão foi bem-sucedida
        if 'StreamDescription' in response:
            return True
        else:
            return False

    except Exception as e:
        print(f"Erro ao tentar se conectar ao Kinesis no LocalStack: {e}")
        return False

# Usando a função
endpoint = "http://localhost:4566"  # Assumindo que o LocalStack está rodando no localhost na porta padrão
stream_name = "pokemon_stream"

if check_kinesis_connection(stream_name, endpoint):
    print(f"A stream {stream_name} existe e a conexão foi bem-sucedida!")
else:
    print(f"Problema ao se conectar ou a stream {stream_name} não existe.")


A stream pokemon_stream existe e a conexão foi bem-sucedida!


## vamos coletar os dados de pokemon e guardar no S3

In [5]:
import boto3
import requests
import json

def get_all_pokemon_from_pokeapi(limit=100):
    """Busca os pokémons da PokeAPI."""
    url = f"https://pokeapi.co/api/v2/pokemon?limit={limit}"
    response = requests.get(url)
    return response.json()

def store_to_localstack_s3(data, bucket_name, file_name):
    """Armazena os dados no LocalStack S3."""
    s3_client = boto3.client(
        's3',
        endpoint_url='http://localhost:4566',
        region_name='us-east-1',
        aws_access_key_id='dummy_access_key',
        aws_secret_access_key='dummy_secret_key'
    )

    # Certifique-se de que o bucket existe
    if bucket_name not in [bucket['Name'] for bucket in s3_client.list_buckets()['Buckets']]:
        s3_client.create_bucket(Bucket=bucket_name)

    s3_client.put_object(Bucket=bucket_name, Key=file_name, Body=json.dumps(data))

def main():
    data = get_all_pokemon_from_pokeapi()
    store_to_localstack_s3(data, 'pokebucket', 'pokemons.json')
    print("Pokémons armazenados com sucesso!")

if __name__ == "__main__":
    main()


Pokémons armazenados com sucesso!


# Vamos ler os dados do S3 e enviar para o Kinesis

In [6]:
import boto3
import json

def read_from_localstack_s3(bucket_name, file_name):
    """Lê os dados do LocalStack S3."""
    s3_client = boto3.client(
        's3',
        endpoint_url='http://localhost:4566',
        region_name='us-east-1',
        aws_access_key_id='dummy_access_key',
        aws_secret_access_key='dummy_secret_key'
    )

    response = s3_client.get_object(Bucket=bucket_name, Key=file_name)
    return json.loads(response['Body'].read())

def send_to_kinesis(data, stream_name):
    """Envia os dados para o Kinesis no LocalStack."""
    kinesis_client = boto3.client(
        'kinesis',
        endpoint_url='http://localhost:4566',
        region_name='us-east-1',
        aws_access_key_id='dummy_access_key',
        aws_secret_access_key='dummy_secret_key'
    )

    for item in data['results']:
        # Enviando cada Pokémon individualmente para o Kinesis
        kinesis_client.put_record(
            StreamName=stream_name,
            Data=json.dumps(item),
            PartitionKey=item['name']  # usando o nome do Pokémon como chave de partição
        )

def main():
    data = read_from_localstack_s3('pokebucket', 'pokemons.json')
    send_to_kinesis(data, 'pokemon_stream')
    print("Pokémons enviados para Kinesis com sucesso!")

if __name__ == "__main__":
    main()


Pokémons enviados para Kinesis com sucesso!


# Consumir o Kinsesis

In [8]:
import boto3
import json

def read_from_kinesis(stream_name, shard_id):
    """Lê os registros do Kinesis no LocalStack."""
    kinesis_client = boto3.client(
        'kinesis',
        endpoint_url='http://localhost:4566',
        region_name='us-east-1',
        aws_access_key_id='dummy_access_key',
        aws_secret_access_key='dummy_secret_key'
    )

    # Obtenha um shard iterator
    shard_it = kinesis_client.get_shard_iterator(
        StreamName=stream_name,
        ShardId=shard_id,
        ShardIteratorType='TRIM_HORIZON'  # Lê desde o início da shard
    )["ShardIterator"]

    records = []
    while shard_it:
        response = kinesis_client.get_records(ShardIterator=shard_it, Limit=100)

        # Processar os registros
        for rec in response['Records']:
            payload = json.loads(rec['Data'])
            records.append(payload)

        # Próximo shard iterator
        shard_it = response.get('NextShardIterator')

    return records

def main():
    records = read_from_kinesis('pokemon_stream', 'shardId-000000000000')

    for record in records:
        print(record)  # Imprime cada registro

if __name__ == "__main__":
    main()
