In [21]:
import numpy as np
import tqdm

### Create Synthetic Dataset on Predefined HMM

In [22]:
np.set_printoptions(suppress=True, precision=4)
np.set_printoptions(linewidth=100, threshold=np.inf)
np.set_printoptions(formatter={'int': '{:5d}'.format})

In [23]:
num_states = 10  # number of different hidden states
num_observations = 500  # number of distinct observations
num_sequences = 500  # number of sequence
max_length = 20

In [24]:
# matrix with size (num_states, num_states)
transition_probs = np.random.dirichlet(np.ones(num_states) * 1, size=num_states)
# matrix with size (num_states, num_observations)
emission_probs = np.random.dirichlet(np.ones(num_observations) * 2, size=num_states)
# initial probability, equal
initial_state_dist = np.ones(num_states) / num_states

In [25]:
emission_probs.shape

(10, 500)

In [26]:
def simulate_hmm(num_seq, min_length, max_length, pi, A, B):
    """
    Simulate the HMM and generate strings (observation sequences)
    :param num_seq: the number of HMM sequences
    :param pi: initial probability distribution
    :param A: transition matrix
    :param B: emission matrix
    :return: sequence of hidden states and observations
    """
    sequences = []
    hidden_states = []
    for _ in range(num_seq):
        sequence_len = np.random.randint(min_length, max_length + 1)
        current_state = np.random.choice(num_states, p=pi)
        observation_seq = []
        state_seq = []
        for _ in range(sequence_len):
            state_seq.append(current_state)
            observation = np.random.choice(num_observations, p=B[current_state])
            observation_seq.append(observation)
            current_state = np.random.choice(num_states, p=A[current_state])
        sequences.append(observation_seq)
        hidden_states.append(state_seq)

    return sequences, hidden_states

In [27]:
syn_observations, syn_hidden_states = simulate_hmm(num_seq=num_sequences, min_length=10, max_length=max_length,
                                                   pi=initial_state_dist, A=transition_probs, B=emission_probs)

In [28]:
def add_noise_to_states(hidden_states, number_states, flip_prob=0.5):
    """
    Add noise to hidden states.
    :param hidden_states: input sequence of hidden states
    :param number_states: the number of all distinct hidden states.
    :param flip_prob: the probability of changing one hidden state to another one.
    :return:
    """
    noisy_hidden_states = []
    for sequence in hidden_states:
        noisy_sequence = []
        for state in sequence:
            if np.random.rand() < flip_prob:
                # Flip the state to a different random state
                possible_states = list(range(number_states))
                possible_states.remove(state)  # Remove the current state from possibilities
                new_state = np.random.choice(possible_states)  # and choose another one
                noisy_sequence.append(new_state)
            else:
                noisy_sequence.append(state)
        noisy_hidden_states.append(noisy_sequence)
    return noisy_hidden_states

In [29]:
noisy_level = 0.5
noisy_hidden_states = add_noise_to_states(syn_hidden_states, num_states, flip_prob=noisy_level)

### Refine sequences of hidden states

In [30]:
def refine_seq(syn_hidden_states_, noisy_hidden_states_, if_print=False):
    # insert state 0 at the beginning of each hidden state sequence
    for i in range(len(syn_hidden_states_)):
        for j in range(len(syn_hidden_states_[i])):
            syn_hidden_states_[i][j] += 1
            noisy_hidden_states_[i][j] += 1

    return syn_hidden_states_, noisy_hidden_states_

In [31]:
from copy import deepcopy

syn_hidden_states_, noisy_hidden_states_ = refine_seq(deepcopy(syn_hidden_states), deepcopy(noisy_hidden_states))

In [32]:
file_path = f"../data/hmm_syn_dataset(norefine_state-{num_states}_obs-{num_observations}_size-{num_sequences}_maxL-{max_length}).npz"
seq_arr = np.array(syn_observations, dtype=object)
hid_arr = np.array(syn_hidden_states_, dtype=object)
noisy_hid_arr = np.array(noisy_hidden_states_, dtype=object)
trans_arr = transition_probs
emis_arr = emission_probs

In [33]:
np.savez(file_path, observation=seq_arr, real_hidden=hid_arr, noisy_hidden=noisy_hid_arr,
         real_trans=trans_arr, emis=emis_arr, noisy_level=noisy_level)