In [1]:
import math

def safe_log(value):
    return -float('inf') if value == 0 else math.log(value)

def calculate_log_likelihood(path, sequence, init_probs, trans_probs, emit_probs):
    if len(path) != len(sequence):
        raise ValueError("Path and observation sequence lengths must match")
    
    total_log_prob = 0.0
    for index in range(len(sequence)):
        current_state = path[index]
        current_symbol = sequence[index]

        if index == 0:
            total_log_prob += safe_log(init_probs[current_state])
        else:
            prev_state = path[index - 1]
            transition_prob = trans_probs.get(prev_state, {}).get(current_state, 0)
            total_log_prob += safe_log(transition_prob)
        
        emission_prob = emit_probs.get(current_state, {}).get(current_symbol, 0)
        total_log_prob += safe_log(emission_prob)

    if path[-1] == 'I' and 'end' in trans_probs.get('I', {}):
        total_log_prob += safe_log(trans_probs['I']['end'])

    return total_log_prob

# States
states_list = ['E', '5', 'I']

# Initial state probabilities
initial_probabilities = {
    'E': 1.0,
    '5': 0.0,
    'I': 0.0
}

# Transition probabilities
transition_matrix = {
    'E': {'E': 0.9, '5': 0.1},
    '5': {'I': 1.0},
    'I': {'I': 0.9, 'end': 0.1}
}

# Emission probabilities
emission_matrix = {
    'E': {'A': 0.25, 'C': 0.25, 'G': 0.25, 'T': 0.25},
    '5': {'A': 0.05, 'C': 0.0, 'G': 0.95, 'T': 0.0},
    'I': {'A': 0.4, 'C': 0.1, 'G': 0.1, 'T': 0.4}
}

# Example path and observed sequence
state_path = "EEEEEEEEEEEEEEEEEE5IIIIIII"
observed_sequence = "CTTCATGTGAAAGCAGACGTAAGTCA"

# Compute and print log probability
log_likelihood = calculate_log_likelihood(
    state_path, observed_sequence,
    initial_probabilities, transition_matrix, emission_matrix
)

print(f"Log-likelihood of the observed sequence along the given path: {log_likelihood:.2f}")


Log-likelihood of the observed sequence along the given path: -41.22


In [2]:
import math

def safe_log(prob):
    return -float('inf') if prob == 0 else math.log(prob)

def viterbi_decode(sequence, state_list, start_probs, trans_probs, emit_probs):
    sequence_length = len(sequence)
    
    # Initialize score and backtrace tables
    score_table = [{} for _ in range(sequence_length)]
    backtrack_table = [{} for _ in range(sequence_length)]

    # Initialization step
    first_symbol = sequence[0]
    for state in state_list:
        emission = emit_probs[state].get(first_symbol, 0)
        score_table[0][state] = safe_log(start_probs[state]) + safe_log(emission)
        backtrack_table[0][state] = None

    # Dynamic programming step
    for t in range(1, sequence_length):
        symbol = sequence[t]
        for current_state in state_list:
            emission_prob = emit_probs[current_state].get(symbol, 0)
            if emission_prob == 0:
                continue

            max_score = -float('inf')
            best_prev_state = None

            for prev_state in state_list:
                trans_prob = trans_probs.get(prev_state, {}).get(current_state, 0)
                if trans_prob > 0:
                    prev_score = score_table[t - 1].get(prev_state, -float('inf'))
                    total_score = prev_score + safe_log(trans_prob) + safe_log(emission_prob)
                    if total_score > max_score:
                        max_score = total_score
                        best_prev_state = prev_state

            if best_prev_state is not None:
                score_table[t][current_state] = max_score
                backtrack_table[t][current_state] = best_prev_state

    # Backtracking to retrieve most likely path
    final_time = sequence_length - 1
    best_final_state = max(score_table[final_time], key=score_table[final_time].get)
    best_path = [best_final_state]
    
    current_state = best_final_state
    for t in range(final_time, 0, -1):
        prev_state = backtrack_table[t].get(current_state)
        if prev_state is None:
            break
        best_path.insert(0, prev_state)
        current_state = prev_state

    return ''.join(best_path)

# Run Viterbi decoding on the previously defined observation sequence and model
most_probable_path = viterbi_decode(
    observed_sequence,
    states_list,
    initial_probabilities,
    transition_matrix,
    emission_matrix
)

print(f"Most probable state path: {most_probable_path}")


Most probable state path: EEEEEEEEEEEEEEEEEEEEEEEEEE
