Imports

In [1]:
import h5py
from sklearn.preprocessing import StandardScaler
from extratorFeatures import *
import threading
import queue
import time
import os
from collections import deque
import pickle

 Modelo IA

In [19]:
# --- 1. Dataset ---

def carregar_dataset_features(caminho_arquivo_h5):
    """Carrega as features (X) e os rótulos (y) de um arquivo HDF5."""
    try:
        with h5py.File(caminho_arquivo_h5, 'r') as hf:
            X = hf['features'][:]
            y = hf['labels'][:]
        return X, y
    except (FileNotFoundError, KeyError) as e:
        print(f"Erro ao carregar o dataset '{caminho_arquivo_h5}': {e}")
        return None, None

def initDataset(dataset_path):
    """Carrega e pré-processa os dados, retornando o normalizador (scaler)."""
    X_data, y_data = carregar_dataset_features(dataset_path)
    if X_data is None:
        return None, None, None

    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X_data)
    X_final = X_scaled.T
    y_final = y_data.reshape(1, y_data.shape[0])
    return X_final, y_final, scaler

# --- 2. Regressão Logística ---

def sigmoid(z):
    return 1 / (1 + np.exp(-z))

def initialize_with_zeros(dim):
    w = np.zeros((dim, 1))
    b = 0.0
    return w, b

def propagate(w, b, X, Y):
    m = X.shape[1]
    A = sigmoid(np.dot(w.T, X) + b)
    epsilon = 1e-15
    cost = (-1/m) * np.sum(Y * np.log(A + epsilon) + (1 - Y) * np.log(1 - A + epsilon))
    dw = (1/m) * np.dot(X, (A - Y).T)
    db = (1/m) * np.sum(A - Y)
    return {"dw": dw, "db": db}, np.squeeze(cost)

def optimize(w, b, X, Y, num_iterations, learning_rate, print_cost=False):
    costs = []
    for i in range(num_iterations):
        grads, cost = propagate(w, b, X, Y)
        w = w - learning_rate * grads["dw"]
        b = b - learning_rate * grads["db"]
        if i % 100 == 0:
            costs.append(cost)
            if print_cost:
                print(f"Custo após iteração {i}: {cost}")
    return {"w": w, "b": b}, grads, costs

def predict(w, b, X):
    A = sigmoid(np.dot(w.T, X) + b)
    return (A > 0.5) * 1

def predict_prob(w, b, X): return sigmoid(np.dot(w.T, X) + b)

def model(dataset_path, num_iterations=3000, learning_rate=0.01, print_cost=True):
    """Constrói, treina e retorna o modelo completo."""
    X_train, Y_train, scaler = initDataset(dataset_path)
    if X_train is None: return None

    w, b = initialize_with_zeros(X_train.shape[0])
    parameters, _, costs = optimize(w, b, X_train, Y_train, num_iterations, learning_rate, print_cost)

    Y_prediction_train = predict(parameters["w"], parameters["b"], X_train)
    train_accuracy = 100 - np.mean(np.abs(Y_prediction_train - Y_train)) * 100
    print(f"\nAcurácia no conjunto de treino: {train_accuracy:.2f} %")

    return {
        "costs": costs, "w": parameters["w"], "b": parameters["b"],
        "scaler": scaler, "train_accuracy": train_accuracy
    }

# --- 3. Funções de Teste e Validação ---

def calcular_precisao_dataset(dataset_path, model_results):
    """Calcula a acurácia do modelo em um novo dataset."""
    if "scaler" not in model_results:
        print("Erro: 'scaler' não encontrado.")
        return

    w, b, scaler = model_results["w"], model_results["b"], model_results["scaler"]
    X_test_data, Y_test_data = carregar_dataset_features(dataset_path)
    if X_test_data is None: return

    X_test_scaled = scaler.transform(X_test_data)
    Y_prediction_test = predict(w, b, X_test_scaled.T)
    test_accuracy = 100 - np.mean(np.abs(Y_prediction_test - Y_test_data.reshape(1, -1))) * 100
    print(f"Acurácia no dataset '{dataset_path}': {test_accuracy:.2f} %")
    return test_accuracy

In [None]:
d = model("Datasets/dataset_features.h5", num_iterations = 50000, learning_rate = 0.005, print_cost = True)

Testes com videos com multithreading

In [29]:
def reader_thread(video_path, raw_frames_queue, stop_event):
    """
    Thread 1: Leitora de Vídeo
    - Apenas lê os frames do arquivo de vídeo.
    - Coloca os frames brutos em uma fila (raw_frames_queue).
    - Isola o gargalo de I/O.
    """
    print("[INFO] Thread Leitora iniciada.")
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Erro: Não foi possível abrir o vídeo em '{video_path}'")
        stop_event.set()
        return

    while not stop_event.is_set():
        if raw_frames_queue.full():
            # Se a fila estiver cheia, espera um pouco para não consumir toda a memória
            time.sleep(0.01)
            continue

        ret, frame = cap.read()
        if not ret:
            # Fim do vídeo
            print("[INFO] Fim do vídeo. Sinalizando para threads terminarem.")
            stop_event.set()
            break

        raw_frames_queue.put(frame)

    cap.release()
    print("[INFO] Thread Leitora finalizada.")


def processor_thread(raw_frames_queue, processed_frames_queue, model_results, stop_event, LARGURA_PROCESSAMENTO=640):
    """
    Thread 2: Processadora de Frames
    - Pega um frame da fila de frames brutos.
    - Realiza todo o processamento pesado (redimensionamento, detecção, inferência).
    - Desenha os resultados no frame.
    - Coloca o frame processado na fila de frames processados.
    """
    print("[INFO] Thread Processadora iniciada.")
    w, b, scaler = model_results["w"], model_results["b"], model_results["scaler"]

    while not stop_event.is_set():
        try:
            frame = raw_frames_queue.get(timeout=1)
        except queue.Empty:
            if stop_event.is_set():
                break
            continue

        altura_original, largura_original = frame.shape[:2]
        fator_escala = LARGURA_PROCESSAMENTO / largura_original
        nova_altura = int(altura_original * fator_escala)
        frame_processado = cv2.resize(frame, (LARGURA_PROCESSAMENTO, nova_altura), interpolation=cv2.INTER_AREA)

        contornos, img_hsv = extrair_contornos(frame_processado)

        if contornos:
            for contorno in contornos:
                features = extrair_features(img_hsv, contorno)

                if features is not None:
                    features_scaled = scaler.transform(features.reshape(1, -1))
                    prob = sigmoid(np.dot(w.T, features_scaled.T) + b)[0, 0]

                    if prob > 0.6:  # Limiar de decisão
                        color = (0, 0, 255)  # Vermelho

                        contorno_original = (contorno / fator_escala).astype(np.int32)
                        cv2.drawContours(frame, [contorno_original], -1, color, 2)

                        (x, y, _, _) = cv2.boundingRect(contorno_original)
                        cv2.putText(frame, f"{prob:.2f}", (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

        # Coloca o frame (com ou sem detecções) na fila de exibição
        if not processed_frames_queue.full():
            processed_frames_queue.put(frame)

    print("[INFO] Thread Processadora finalizada.")


def testar_video(video_path, model_results):
    """
    Thread Principal Otimizada: Exibição com FPS estável.
    - Desacopla a taxa de exibição da taxa de processamento.
    - Sempre exibe o frame mais recente disponível a uma taxa de quadros alvo.
    - Adiciona um contador de FPS para visualização.
    """
    # Filas e evento de parada (mesma configuração de antes)
    raw_frames_queue = queue.Queue(maxsize=10)
    processed_frames_queue = queue.Queue(maxsize=10)
    stop_event = threading.Event()

    # --- Criação e início das threads (mesma configuração de antes) ---
    reader = threading.Thread(target=reader_thread, args=(video_path, raw_frames_queue, stop_event))
    processor = threading.Thread(target=processor_thread, args=(raw_frames_queue, processed_frames_queue, model_results, stop_event))
    reader.daemon = True
    processor.daemon = True
    reader.start()
    processor.start()

    TARGET_FPS = 30.0
    TARGET_FRAME_TIME = 1.0 / TARGET_FPS

    fps_calculator = deque(maxlen=int(TARGET_FPS))

    latest_frame = None

    while not stop_event.is_set():
        frame_start_time = time.time()

        try:
            temp_frame = None
            while True:
                temp_frame = processed_frames_queue.get_nowait()
                # Se um novo frame foi pego, atualiza a variável
                if temp_frame is not None:
                    latest_frame = temp_frame
        except queue.Empty:
            # Fila vazia, continua com o último frame que tínhamos
            pass

        if latest_frame is not None:
            fps_calculator.append(frame_start_time)

            # Calcula o FPS com base nos timestamps
            if len(fps_calculator) > 1:
                time_diff = fps_calculator[-1] - fps_calculator[0]
                current_fps = len(fps_calculator) / time_diff if time_diff > 0 else 0.0
                cv2.putText(latest_frame, f"Display FPS: {current_fps:.1f}",
                            (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)

            cv2.imshow('Teste em Video Multithread - FPS Estavel', latest_frame)

        if cv2.waitKey(1) & 0xFF == ord('q'):
            print("[INFO] Tecla 'q' pressionada. Encerrando...")
            stop_event.set()
            break

        # Garantir a taxa de quadros ---
        elapsed_time = time.time() - frame_start_time
        sleep_time = TARGET_FRAME_TIME - elapsed_time
        if sleep_time > 0:
            time.sleep(sleep_time)

        if not reader.is_alive() and processed_frames_queue.empty():
             break

    reader.join(timeout=1)
    processor.join(timeout=1)
    cv2.destroyAllWindows()
    print("[INFO] Aplicação encerrada.")

In [4]:
testar_video("Videos/Normal/Normal1.mp4", d)

cv2.destroyAllWindows()

Erro ao abrir vídeo: 'video/Normal/Normal2.mp4'


In [30]:
testar_video("Videos/Fire/Fire1.mp4", d)

cv2.destroyAllWindows()

[INFO] Thread Leitora iniciada.
[INFO] Thread Processadora iniciada.
[INFO] Fim do vídeo. Sinalizando para threads terminarem.
[INFO] Thread Leitora finalizada.
[INFO] Thread Processadora finalizada.
[INFO] Aplicação encerrada.


Save e Load do modelo treinado

In [8]:
def salvar_modelo(model_results, caminho_arquivo):
    """
    Salva os componentes essenciais do modelo treinado em um arquivo usando pickle.

    Args:
        model_results (dict): Dicionário contendo 'w', 'b', e 'scaler'.
        caminho_arquivo (str): Caminho para salvar o arquivo do modelo (ex: 'modelo.pkl').
    """
    # Extrai apenas os componentes necessários para a previsão
    modelo_para_salvar = {
        'w': model_results['w'],
        'b': model_results['b'],
        'scaler': model_results['scaler']
    }

    try:
        with open(caminho_arquivo, 'wb') as f:
            pickle.dump(modelo_para_salvar, f)
        print(f"Modelo salvo com sucesso em '{caminho_arquivo}'")
    except Exception as e:
        print(f"Erro ao salvar o modelo: {e}")

def carregar_modelo(caminho_arquivo):
    """
    Carrega um modelo treinado de um arquivo pickle.

    Args:
        caminho_arquivo (str): Caminho do arquivo do modelo a ser carregado.

    Returns:
        dict: Dicionário contendo 'w', 'b', e 'scaler', ou None se falhar.
    """
    try:
        with open(caminho_arquivo, 'rb') as f:
            model_results = pickle.load(f)
        print(f"Modelo carregado com sucesso de '{caminho_arquivo}'")
        return model_results
    except FileNotFoundError:
        print(f"Erro: Arquivo do modelo não encontrado em '{caminho_arquivo}'")
        return None
    except Exception as e:
        print(f"Erro ao carregar o modelo: {e}")
        return None


In [None]:
salvar_modelo(d, "Modelos/modelo_features")

In [10]:
d = carregar_modelo("Modelos/modelo_features")

Modelo carregado com sucesso de 'Modelos/modelo_features'
