### 1

In [None]:
%pip install pgmpy

In [None]:
from pgmpy.models import DiscreteBayesianNetwork
from pgmpy.factors.discrete import TabularCPD 
from pgmpy.inference import VariableElimination
import numpy as np

  from .autonotebook import tqdm as notebook_tqdm


In [6]:
model = DiscreteBayesianNetwork([('H', 'O'), ('W', 'O'), ('R', 'H', 'W'), ('E', 'H'), ('C','R')])

In [7]:
print("Independente locale:")
print(model.local_independencies('O'))
print(model.local_independencies('H'))
print(model.local_independencies('W'))
print(model.local_independencies('R'))
print(model.local_independencies('E'))
print(model.local_independencies('C'))

Independente locale:
(O ⟂ C, E, R | H, W)
(H ⟂ C, W | E, R)
(W ⟂ C, H, E, R)
(R ⟂ E, W | C)
(E ⟂ C, W, R)
(C ⟂ E, W)


In [21]:
cpd_o = TabularCPD(variable='O', variable_card=2, values=[[0.3], [0.7]])
cpd_H = TabularCPD(variable='H', variable_card=2,
                   values=[[0.9, 0.2], [0.1, 0.8]],
                   evidence=['O'], evidence_card=[2])
cpd_W = TabularCPD(variable='W', variable_card=2,
                   values=[[0.1, 0.6], [0.9, 0.4]],
                   evidence=['O'], evidence_card=[2])
cpd_R = TabularCPD(variable='R', variable_card=2,
                   values=[[0.6, 0.9, 0.3, 0.5], 
                           [0.4, 0.1, 0.7, 0.5]],
                   evidence=['H','W'], evidence_card=[2,2])
cpd_E = TabularCPD(variable='E', variable_card=2,
                   values=[[0.8,0.2], [0.2, 0.8]],
                   evidence=['H'], evidence_card=[2])
cpd_C = TabularCPD(variable='C', variable_card=2,
                   values=[[0.85, 0.40], [0.15, 0.60]],
                   evidence=['R'], evidence_card=[2])

In [22]:
model.add_cpds(cpd_o, cpd_H, cpd_W, cpd_R, cpd_E, cpd_C)



In [23]:
print("Model is consistent:", model.check_model())

ValueError: CPD associated with H doesn't have proper parents associated with it.

In [None]:
model.check_model()

infer = VariableElimination(model)

jc_HC = infer.query(variables=['H', 'C'])
jc_EC = infer.query(variables=['E', 'C'])

print("P(H, C):")
print(jc_HC)
print("\nP(E, C):")
print(jc_EC)

evidence = {'C': 0}

map_HW = infer.map_query(variables=['H', 'W'], evidence=evidence)
print(map_HW)

NameError: name 'game_model' is not defined

In [None]:
post_HW = infer.query(variables=['H', 'W'], evidence=evidence)
print("\nP(H, W | C=0):")
print(post_HW)

vals = post_HW.values
idx = np.argmax(vals)
assignment_idx = np.unravel_index(idx, vals.shape)
assignment = dict(zip(post_HW.variables, assignment_idx))
prob_map = vals[assignment_idx]
print("\nMAP assignment (from posterior) and its probability:")
print(assignment, prob_map)

### 2

In [24]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from hmmlearn import hmm

In [25]:
states = ["Walking", "Running","Resting"]
n_states = len(states)

observations = ["Low", "Medium", "High"]
n_observations = len(observations)

start_probability = np.array([1/3, 1/3, 1/3])

transition_probability = np.array([ 
    [0.6, 0.3, 0.1], #Walking
    [0.2, 0.7, 0.1], #Running
    [0.3, 0.2, 0.5] #Resting
])

emission_probability = np.array([ 
    [0.1, 0.7, 0.2], #Walking
    [0.05, 0.25, 0.7], #Running
    [0.8, 0.15, 0.05] #Resting
])

model = hmm.CategoricalHMM(n_components=n_states)
model.startprob_ = start_probability
model.transmat_ = transition_probability
model.emissionprob_ = emission_probability

De la bonus lab5:

In [26]:
def forward_algorithm(obs_sequence, start_prob, trans_prob, emit_prob): # Defines the custom Forward Algorithm to compute P(O|λ).
    T = len(obs_sequence) # Gets the length of the observation sequence.
    N = len(start_prob) # Gets the number of hidden states.

    alpha = np.zeros((T, N)) # Initializes the alpha matrix (forward probabilities).

    for i in range(N): # Initialization step (t=0):
        alpha[0][i] = start_prob[i] * emit_prob[i][obs_sequence[0]] # alpha_0(i) = π_i * b_i(o_1).

    for t in range(1, T): # Induction step (t=1 to T-1): Iterates through time.
        for j in range(N): # Iterates over the current state j.
            alpha[t][j] = sum(alpha[t-1][i] * trans_prob[i][j] # Sums over all previous states i: alpha_{t-1}(i) * a_ij.
                             for i in range(N)) * emit_prob[j][obs_sequence[t]] # Multiplies by the emission probability b_j(o_{t+1}).
def viterbi_algorithm(obs_sequence, start_prob, trans_prob, emit_prob): # Defines the custom Viterbi Algorithm to find the most likely state sequence.
    T = len(obs_sequence) # Gets the length of the observation sequence.
    N = len(start_prob) # Gets the number of hidden states.

    delta = np.zeros((T, N)) # Initializes the delta matrix (maximum path probability).
    psi = np.zeros((T, N), dtype=int) # Initializes the psi matrix (backpointers/predecessor states).

    for i in range(N): # Initialization step (t=0):
        delta[0][i] = start_prob[i] * emit_prob[i][obs_sequence[0]] # delta_0(i) = π_i * b_i(o_1).

    for t in range(1, T): # Recursion step (t=1 to T-1): Iterates through time.
        for j in range(N): # Iterates over the current state j.
            probs = [delta[t-1][i] * trans_prob[i][j] for i in range(N)] # Calculates the path probability coming from each previous state i.
            delta[t][j] = max(probs) * emit_prob[j][obs_sequence[t]] # Stores the maximum path probability to state j at time t.
            psi[t][j] = np.argmax(probs) # Stores the index of the best predecessor state i.

    best_path = np.zeros(T, dtype=int) # Initializes the array to store the final best state sequence.
    best_path[T-1] = np.argmax(delta[T-1]) # Termination step: Finds the index of the most likely final state.

    for t in range(T-2, -1, -1): # Path Backtracking step: Iterates backwards from T-2 to 0.
        best_path[t] = psi[t+1][best_path[t+1]] # Uses the backpointer to recover the previous most likely state.

    return best_path # Returns the sequence of hidden state indices.

In [None]:
observations_sequence = ["Medium","High","Low"]
obs_indices = [observations.index(o) for o in observations_sequence]

prob = forward_algorithm(obs_indices, start_probability, 
                        transition_probability, emission_probability)
print("P(observations) =", prob)

hidden_states = viterbi_algorithm(obs_indices, start_probability,
                                  transition_probability, emission_probability)
print("Most likely states:", [states[i] for i in hidden_states])

In [None]:
observations_sequence = ["Medium","High","Low"]
X = np.array([[observations.index(o)] for o in observations_sequence], dtype=int)
logprob = model.score(X)
print("P(observations) =", np.exp(logprob))

hidden_states = model.predict(X)
print("Most likely states:", [states[i] for i in hidden_states])