In [None]:
from kafka import KafkaConsumer, KafkaProducer
from unittest.mock import MagicMock
import pytest
from psycopg2 import OperationalError
import importlib
import util as f
importlib.reload(f)
from dotenv import load_dotenv
load_dotenv()
import os
import uuid

In [43]:
topico = 'sensores-iot'
servidor= 'localhost:9092'

PRODUCER

In [97]:
def test_criar_producer():

    producer = f.criar_producer(servidor)
    assert producer is not None, "Producer retornou None"
    assert isinstance(producer, KafkaProducer), "Não é uma instância de KafkaProducer"

    print("Teste criar producer foi bem-sucedido!!!")

    print("Fechando conexão depois do teste....")
    producer.close()

test_criar_producer()

Teste criar producer foi bem-sucedido!!!
Fechando conexão depois do teste....


In [98]:
dados = f.gerar_dados_sensor()

def test_gerar_dados_sensor():
    #Verificar se retorna um dicionario
    assert isinstance(dados, dict)

    #Verificar se o dicionario tem as chaves corretas
    chaves_esperadas = {'sensor_id', 'temperatura', 'umidade', 'data_hora', 'localizacao'}
    assert set(dados.keys()) == chaves_esperadas, "Chaves não correspondentes"

    #Verificar se os tipos estão no formato correto
    assert isinstance(dados['sensor_id'], str), "sensor_id não é uma string"
    assert isinstance(dados['temperatura'], float), "temperatura não é um float"
    assert isinstance(dados['umidade'], float), "umidade não é um float"
    assert isinstance(dados['data_hora'], str), "data_hora não é uma string"
    assert isinstance(dados['localizacao'], str), "localizacao não é uma string"

    #Vericar se o intervalo esta o range de valores esperados
    assert 12.00 <= dados['temperatura'] <= 40.00, "temperatuta fora do intervalo esperado"
    assert 20.00 <= dados['umidade'] <= 80.00, "umidade fora do intervalo esperado"

    print("Teste para gerar dados para o sensor foi bem-sucedido!!!")

test_gerar_dados_sensor()
     

Teste para gerar dados para o sensor foi bem-sucedido!!!


In [102]:
def test_enviar_dados():
    mock_producer = MagicMock()

    f.enviar_dados(mock_producer, topico, dados)

    #Aqui verifica se o método send foi chamado corretamente com os parametros certos
    mock_producer.send.assert_called_once_with(topico, dados)

    print("Teste enviar dados foi bem-sucedido!!!")

test_enviar_dados()

[INFO] Dados enviados para o 'sensores-iot': {'sensor_id': 'bfcb6899-2406-40ba-be6f-4ce57c83bd82', 'temperatura': 13.93, 'umidade': 73.0, 'data_hora': '1981-05-01T18:48:02', 'localizacao': 'Pacheco'}
Teste enviar dados foi bem-sucedido!!!


CONEXÂO

In [103]:
def test_criar_conexao_postgresql():
    database=os.getenv('POSTGRES_DB')
    user=os.getenv('POSTGRES_USER')
    password=os.getenv('POSTGRES_PASSWORD')
    host="localhost"
    porta="5432"

    try:
        conexao = f.criar_conexao_postgresql(database, user, password, host, porta)
        print("[DEBUG] Conexao retornada:", conexao)
        assert conexao is not None
        assert conexao.closed == 0
    except OperationalError as e:
        pytest.fail(f"Erro ao conectar ao PostgreSQL: {e}")
    except AssertionError as e:
        print("[ERROR] Falha de assercao:", repr(e))
    except Exception as e:
        try:
            print(f"[ERROR] Erro inesperado: {e}")
        except UnicodeDecodeError:
            print(f"[ERROR] Erro inesperado (caracteres inválidos): {repr(e)}")
    finally:
        if conexao and conexao.closed == 0:
            conexao.close()
            print("Conexao com o PostgreSQL fechada após o teste.")

test_criar_conexao_postgresql()

[INFO] Conexao com o PostgreSQL estabelecida.
[DEBUG] Conexao retornada: <connection object at 0x000002227CB20590; dsn: 'user=sensores_user password=xxx dbname=sensores_db host=localhost port=5432 client_encoding=UTF8', closed: 0>
Conexao com o PostgreSQL fechada após o teste.


CRIAR SCHEMA

In [104]:
database=os.getenv('POSTGRES_DB')
user=os.getenv('POSTGRES_USER')
password=os.getenv('POSTGRES_PASSWORD')
host="localhost"
porta="5432"


conexao = f.criar_conexao_postgresql(database, user, password, host, porta)



[INFO] Conexao com o PostgreSQL estabelecida.


In [106]:
def test_criar_schema():
    schema = 'schema_teste'
    cursor = conexao.cursor()

    f.criar_schema(schema, conexao)

    cursor.execute(f"""SELECT schema_name 
                   FROM information_schema.schemata
                   WHERE schema_name = '{schema}'""")
    resultado = cursor.fetchone()
    assert resultado is not None, f"O schema '{schema}' não foi criado"
    assert resultado[0] == schema, f"O schema foi criado com nome incorreto: {resultado[0]}"

    cursor.close()
    print(f"Teste criar schema '{schema}' foi bem-sucedido!!!")


test_criar_schema()

[INFO] Criacao do schema 'schema_teste' foi bem-sucedido.
Teste criar schema 'schema_teste' foi bem-sucedido!!!


CRIAR TABELA

In [None]:
def test_criar_tabela():
    colunas = [
        "sensor_id UUID PRIMARY KEY",
        "temperatura FLOAT",
        "umidade FLOAT",
        "data_hora TIMESTAMP",
        "localizacao VARCHAR(100)"
    ]

    tabela = 'tabela_teste'
    schema = 'schema_teste'

    f.criar_tabela(colunas, schema, tabela, conexao)

    # Verifica se a tabela foi criada corretamente
    cursor = conexao.cursor()
    cursor.execute(f"""SELECT table_name 
                   FROM information_schema.tables
                   WHERE table_schema = '{schema}' AND table_name = '{tabela}'""")
    resultado = cursor.fetchone()
    assert resultado is not None, f"A tabela '{tabela}' não foi criada no schema '{schema}'"
    assert resultado[0] == tabela, f"A tabela foi criada com nome incorreto: {resultado[0]}"

    cursor.close()
    print(f"Teste criar tabela '{tabela}' no schema '{schema}' foi bem-sucedido!!!")

test_criar_tabela()

[INFO] Criacao de tabela 'tabela_teste' no schema 'schema_teste' foi bem-sucedido.
Teste criar tabela 'tabela_teste' no schema 'schema_teste' foi bem-sucedido!!!


CONSUMER

In [108]:
def test_criar_consumer():

    consumer = f.criar_consumer(topico, servidor)
    assert consumer is not None, "Consumer retornou None"
    assert isinstance(consumer, KafkaConsumer), "Consumer não é uma instância de KafkaConsumer"

    print("Teste criar consumer foi bem-sucedido!!!")
    print("Fechando consumer depois do teste...")
    consumer.close()

test_criar_consumer()

Teste criar consumer foi bem-sucedido!!!
Fechando consumer depois do teste...


In [109]:
def consumer_mock():
    sensor_uuid = str(uuid.uuid4())
    mensagem_mock = MagicMock()
    mensagem_mock.value = {
        "sensor_id": sensor_uuid,
        "temperatura": 22.5,
        "umidade": 60,
        "data_hora": "2025-05-29T10:30:00",
        "localizacao": "SP"
    }

    consumer = MagicMock()
    consumer.__iter__.return_value = [mensagem_mock]
    return consumer


In [110]:
def test_consumir_dados(consumer_mock):
    dados_coletados = list(f.consumir_dados(consumer_mock))

    assert len(dados_coletados) == 1, "Era esperado coletar 1 mensagem"
    assert isinstance(dados_coletados[0], dict), "Dados coletados não são um dicionário" 
    assert "sensor_id" in dados_coletados[0], "Os dados coletados devem possuir sensor_id"
    assert isinstance(dados_coletados[0]["sensor_id"], str), "sensor_id não é uma string UUID"

    print("Teste consumir dados foi bem-sucedido!!!")
    return dados_coletados

test_consumir_dados(consumer_mock())

[INFO] Dados recebidos : {'sensor_id': '2d116ea1-7a2e-47cf-bc84-ff3264d8c9d8', 'temperatura': 22.5, 'umidade': 60, 'data_hora': '2025-05-29T10:30:00', 'localizacao': 'SP'}
Teste consumir dados foi bem-sucedido!!!


[{'sensor_id': '2d116ea1-7a2e-47cf-bc84-ff3264d8c9d8',
  'temperatura': 22.5,
  'umidade': 60,
  'data_hora': '2025-05-29T10:30:00',
  'localizacao': 'SP'}]

In [111]:
def test_armazenamento_sucesso():
    # Criando mocks
    conexao_mock = MagicMock()
    cursor_mock = MagicMock()
    conexao_mock.cursor.return_value = cursor_mock

    # Dados simulados
    dados = {"sensor_id": "12345", "valor": 42}
    schema = "schema_teste"
    tabela = "tabela_teste"

    # Chamando a função com mocks
    f.armazenar_dados(dados, schema, tabela, conexao_mock)

    # Verificando se os métodos corretos foram chamados
    cursor_mock.execute.assert_called_once_with(
        f"INSERT INTO {schema}.{tabela} (sensor_id, valor) VALUES (%s, %s)",
        tuple(dados.values())
    )
    conexao_mock.commit.assert_called_once()
    cursor_mock.close.assert_called_once()

test_armazenamento_sucesso()

[INFO] Dados armazenados na tabela 'tabela_teste' do schema 'schema_teste'.
