In [32]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.datasets import imdb
from tensorflow.keras.utils import pad_sequences

In [33]:
# --- CONFIGURA√á√ïES ---
hp = {
    'MAX_WORD_INDEX': 1000,
    'MAX_LEN': 300,       # Sequ√™ncias inicial (30)
    'N_STATES': 3 ,       # N
    'N_ITER': 30          # Itera√ß√µes iniciais (5)
}

print("1. Carregando dados...")
(train_data, train_labels), (test_data, test_labels) = imdb.load_data(num_words=hp['MAX_WORD_INDEX'])

# Separamos os dados por classe para treinar os HMM
# Modelo Positivo s√≥ v√™ reviews positivos (label 1)
# Modelo Negativo s√≥ v√™ reviews negativos (label 0)
train_pos = [x for x, y in zip(train_data, train_labels) if y == 1]
train_neg = [x for x, y in zip(train_data, train_labels) if y == 0]

# Quantidade para o treino
LIMIT_TRAIN = 300
train_pos = pad_sequences(train_pos[:LIMIT_TRAIN], maxlen=hp['MAX_LEN'])
train_neg = pad_sequences(train_neg[:LIMIT_TRAIN], maxlen=hp['MAX_LEN'])
X_test = pad_sequences(test_data[:100], maxlen=hp['MAX_LEN']) # Teste pequeno
y_test_sample = test_labels[:100]

# --- CLASSE HMM ---

class HMMManual:
    def __init__(self, n_states, n_emissions):
        self.N = n_states
        self.M = n_emissions

        # Inicializa√ß√£o aleat√≥ria e normalizada
        np.random.seed(42)
        self.pi = np.random.rand(self.N)
        self.pi /= np.sum(self.pi)

        self.A = np.random.rand(self.N, self.N)
        self.A /= self.A.sum(axis=1, keepdims=True)

        self.B = np.random.rand(self.N, self.M)
        self.B /= self.B.sum(axis=1, keepdims=True)

    def _forward(self, obs):
        """Calcula alpha (probabilidade de chegar no estado i no tempo t)"""
        T = len(obs)
        alpha = np.zeros((T, self.N))
        scale = np.zeros(T) # Fator de escala para evitar underflow

        # Inicializa√ß√£o
        alpha[0] = self.pi * self.B[:, obs[0]]
        scale[0] = np.sum(alpha[0]) + 1e-10 # +1e-10 evita divis√£o por zero
        alpha[0] /= scale[0]

        # Indu√ß√£o (indo)
        for t in range(1, T):
            for j in range(self.N):
                alpha[t, j] = np.sum(alpha[t-1] * self.A[:, j]) * self.B[j, obs[t]]

            scale[t] = np.sum(alpha[t]) + 1e-10
            alpha[t] /= scale[t]

        return alpha, scale

    def _backward(self, obs, scale):
        """Calcula beta (probabilidade do futuro dado o estado i no tempo t)"""
        T = len(obs)
        beta = np.zeros((T, self.N))

        # Inicializa√ß√£o (no tempo T, beta = 1)
        beta[T-1] = 1.0 / scale[T-1] # Usa a mesma escala do forward

        # Indu√ß√£o Reversa (vindo)
        for t in range(T-2, -1, -1):
            for i in range(self.N):
                # transi√ß√£o * emiss√£o_futura * beta_futuro
                beta[t, i] = np.sum(self.A[i, :] * self.B[:, obs[t+1]] * beta[t+1, :])

            beta[t] /= scale[t] # Normaliza com escala do forward

        return beta

    def train(self, sequences, n_iter=5):
        for it in range(n_iter):
            # A_num[i, j] = Esperan√ßa de transi√ß√µes de i para j
            A_num = np.zeros((self.N, self.N))
            A_den = np.zeros((self.N, 1))

            # B_num[j, k] = Esperan√ßa de estar em j e emitir k
            B_num = np.zeros((self.N, self.M))
            B_den = np.zeros((self.N, 1))

            Pi_new = np.zeros(self.N)

            # E-STEP: Calcula estat√≠sticas para cada sequ√™ncia
            for seq in sequences:
                # Remove padding (0) para o treino
                obs = [x for x in seq if x != 0]
                if len(obs) < 2: continue

                T = len(obs)
                alpha, scale = self._forward(obs)
                beta = self._backward(obs, scale)

                # Calcula Gamma (Prob de estar no estado i no tempo t)
                # gamma[t, i] ~ alpha * beta
                gamma = alpha * beta
                # Normaliza gamma
                gamma /= (np.sum(gamma, axis=1, keepdims=True) + 1e-10)

                # Calcula Xi (Prob de transi√ß√£o i -> j no tempo t)
                xi = np.zeros((T-1, self.N, self.N))
                for t in range(T-1):
                    denom = np.sum(alpha[t,:] * np.sum(self.A * self.B[:, obs[t+1]] * beta[t+1, :], axis=1)) + 1e-10
                    for i in range(self.N):
                        xi[t, i, :] = alpha[t, i] * self.A[i, :] * self.B[:, obs[t+1]] * beta[t+1, :] / denom

                # M-STEP (Acumula√ß√£o): Soma as ocorr√™ncias esperadas
                Pi_new += gamma[0]

                # Atualiza acumuladores de A
                A_num += np.sum(xi, axis=0)
                A_den += np.sum(gamma[:-1], axis=0).reshape(-1, 1)

                # Atualiza acumuladores de B
                for t in range(T):
                    symbol = obs[t]
                    B_num[:, symbol] += gamma[t]
                B_den += np.sum(gamma, axis=0).reshape(-1, 1)

            # ATUALIZA√á√ÉO FINAL DOS PAR√ÇMETROS
            self.pi = Pi_new / len(sequences)
            self.A = A_num / (A_den + 1e-10) # +1e-10 evita div por 0
            self.B = B_num / (B_den + 1e-10)

            print(f"  > Itera√ß√£o {it+1}/{n_iter} conclu√≠da.")

    def score(self, observation_sequence):
        """Retorna Log-Likelihood (usado para classificar)"""
        obs = [o for o in observation_sequence if o != 0]
        if len(obs) == 0: return -np.inf

        _, scale = self._forward(obs)
        # Log da probabilidade √© a soma dos logs das escalas
        return np.sum(np.log(scale + 1e-10))

# --- TRAINING LOOP ---

print("\n2. Treinando Modelo POSITIVO (Isso pode demorar um pouco)...")
hmm_pos = HMMManual(hp['N_STATES'], hp['MAX_WORD_INDEX'] + 1)
hmm_pos.train(train_pos, n_iter=hp['N_ITER'])

print("\n3. Treinando Modelo NEGATIVO...")
hmm_neg = HMMManual(hp['N_STATES'], hp['MAX_WORD_INDEX'] + 1)
hmm_neg.train(train_neg, n_iter=hp['N_ITER'])

print("\n4. Testando Acur√°cia...")
acertos = 0
for i in range(len(X_test)):
    seq = X_test[i]
    if len([x for x in seq if x != 0]) < 2: continue

    # Classifica√ß√£o Bayesiana (De quem √© mais prov√°vel ser essa frase)
    score_pos = hmm_pos.score(seq)
    score_neg = hmm_neg.score(seq)

    pred = 1.0 if score_pos > score_neg else 0.0

    if pred == y_test_sample[i]:
        acertos += 1

print(f"\n=== RESULTADO FINAL ===")
print(f"Acur√°cia no teste ({len(X_test)} amostras): {acertos/len(X_test):.2f}")

1. Carregando dados...

2. Treinando Modelo POSITIVO (Isso pode demorar um pouco)...
  > Itera√ß√£o 1/30 conclu√≠da.
  > Itera√ß√£o 2/30 conclu√≠da.
  > Itera√ß√£o 3/30 conclu√≠da.
  > Itera√ß√£o 4/30 conclu√≠da.
  > Itera√ß√£o 5/30 conclu√≠da.
  > Itera√ß√£o 6/30 conclu√≠da.
  > Itera√ß√£o 7/30 conclu√≠da.
  > Itera√ß√£o 8/30 conclu√≠da.
  > Itera√ß√£o 9/30 conclu√≠da.
  > Itera√ß√£o 10/30 conclu√≠da.
  > Itera√ß√£o 11/30 conclu√≠da.
  > Itera√ß√£o 12/30 conclu√≠da.
  > Itera√ß√£o 13/30 conclu√≠da.
  > Itera√ß√£o 14/30 conclu√≠da.
  > Itera√ß√£o 15/30 conclu√≠da.
  > Itera√ß√£o 16/30 conclu√≠da.
  > Itera√ß√£o 17/30 conclu√≠da.
  > Itera√ß√£o 18/30 conclu√≠da.
  > Itera√ß√£o 19/30 conclu√≠da.
  > Itera√ß√£o 20/30 conclu√≠da.
  > Itera√ß√£o 21/30 conclu√≠da.
  > Itera√ß√£o 22/30 conclu√≠da.
  > Itera√ß√£o 23/30 conclu√≠da.
  > Itera√ß√£o 24/30 conclu√≠da.
  > Itera√ß√£o 25/30 conclu√≠da.
  > Itera√ß√£o 26/30 conclu√≠da.
  > Itera√ß√£o 27/30 conclu√≠da.
  > Itera√ß√£o 28/30 conclu√≠da.


In [37]:
# --- Teste ---
import numpy as np
from tensorflow.keras.datasets import imdb

# Garantir que usamos o mesmo mapeamento que o Keras usou no treino
word_index = imdb.get_word_index()
# O Keras reserva os primeiros √≠ndices: 0=Padding, 1=Start, 2=Unknown
word_index = {k: (v + 3) for k, v in word_index.items()}
word_index["<PAD>"] = 0
word_index["<START>"] = 1
word_index["<UNK>"] = 2

def classificar_frase(texto):
    # Limpeza
    texto_limpo = texto.lower().replace('.', '').replace(',', '').replace('!', '').replace('?', '')
    palavras = texto_limpo.split()

    # Tradu√ß√£o
    seq_numerica = []
    for w in palavras:
        if w in word_index:
            idx = word_index[w]
            if idx >= hp['MAX_WORD_INDEX']:
                seq_numerica.append(2) # 2 = Desconhecido
            else:
                seq_numerica.append(idx)
        else:
            seq_numerica.append(2) # Palavra que n√£o existe no IMDB vira <UNK>

    # Formata√ß√£o para o HMM
    seq_formatada = np.array([x for x in seq_numerica if x != 0])

    if len(seq_formatada) == 0:
        return "Frase vazia ou palavras fora do vocabul√°rio conhecido."

    # Bayes (Score nos dois modelos)
    log_prob_pos = hmm_pos.score(seq_formatada) # Modelo Positivo
    log_prob_neg = hmm_neg.score(seq_formatada) # Modelo Negativo

    # Decis√£o
    print(f"Frase: \"{texto}\"")
    print(f"Score Positivo: {log_prob_pos:.2f}")
    print(f"Score Negativo: {log_prob_neg:.2f}")

    if log_prob_pos > log_prob_neg:
        return ">> RESULTADO: üòÑ POSITIVO"
    else:
        return ">> RESULTADO: üò° NEGATIVO"



frase_teste = "I hated this movie it is the worst film ever terrible story"

# Executa a classifica√ß√£o
print(classificar_frase(frase_teste))

Frase: "I hated this movie it is the worst film ever terrible story"
Score Positivo: -66.62
Score Negativo: -62.21
>> RESULTADO: üò° NEGATIVO
