In [1]:
from typing import List, Optional, Tuple
import numpy as np
from mealymarkov import MarkovMealyModel
import os
from dotenv import load_dotenv
load_dotenv()

ozz_FILE_PATH = os.getenv('100_SAVE_PATH')
zir_FILE_PATH = os.getenv('ZIR_SAVE_PATH')

# Example small model (n=4 states, V=2 tokens) that satisfies the constraints.
n = 3
V = 2

# We construct T^0 and T^1 so that T^0 + T^1 is row-stochastic (rows sum to 1).
T0 = np.array([
    [0, 1, 0],
    [0, 0, 1],
    [0, 0, 0.5]
])

T1 = np.array([
    [0, 0, 0],
    [0, 0, 0],
    [0.5, 0, 0]
])

model = MarkovMealyModel(n=n, V=V, T_list=[T0, T1])

# By specification the default eta^0 is uniform
print("Initial eta^0 =", model.eta0)

tokens, states = model.sample_sequence(max_new_tokens=3, seed=42)

print("Generated tokens:", tokens)
print("States (eta^t) traversed:")
for i, s in enumerate(states):
    print(f"t={i} ->", np.round(s, 4))

Initial eta^0 = [0.33333333 0.33333333 0.33333333]
Generated tokens: [0, 0, 1]
States (eta^t) traversed:
t=0 -> [0.3333 0.3333 0.3333]
t=1 -> [0.  0.4 0.6]
t=2 -> [0. 0. 1.]
t=3 -> [1. 0. 0.]


Generating training sequences

In [7]:
import numpy as np
import json
#generating the process as discussed in the previous meet
#for the process that generates 100*
n = 3
V = 2
num_training_samples = 100
sequences = {}
# We construct T^0 and T^1 so that T^0 + T^1 is row-stochastic (rows sum to 1).
T0 = np.array([
    [0, 1, 0],
    [0, 0, 1],
    [0, 0, 0.5]
])

T1 = np.array([
    [0, 0, 0],
    [0, 0, 0],
    [0.5, 0, 0]
])

model = MarkovMealyModel(n=n, V=V, T_list=[T0, T1])
for i in range(num_training_samples):
    tokens, _ = model.sample_sequence(max_new_tokens=50)
    sequences[i] = tokens
with open(ozz_FILE_PATH, 'w') as fp:
    json.dump(sequences, fp, indent=4)

In [19]:

#generating the process as discussed in the previous meet
#for the process that generates ZIR
n = 3
V = 2
num_training_samples = 100
sequences = {}
# We construct T^0 and T^1 so that T^0 + T^1 is row-stochastic (rows sum to 1).
T0 = np.array([
    [0, 1, 0],
    [0, 0, 0],
    [0.5, 0, 0]
])

T1 = np.array([
    [0, 0, 0],
    [0, 0, 1],
    [0.5, 0, 0]
])

model = MarkovMealyModel(n=n, V=V, T_list=[T0, T1])
for i in range(num_training_samples):
    tokens, _ = model.sample_sequence(max_new_tokens=50)
    sequences[i] = tokens
with open(zir_FILE_PATH, 'w') as fp:
    json.dump(sequences, fp, indent=4)

Verifying whether a given probability distribution is what the Markov model would have provided

In [2]:
# Test the verify_sequence function
print('\n' + '='*50)
print('Testing verify_sequence function:')
print('='*50)

# Create a test sequence and probability distribution
test_sequence = ['0', '1', '0']  # String representations of token indices
test_probs = [
    [0.833, 0.166],
    [0.7, 0.3],
    [1, 1]
]

print(f"Test sequence: {test_sequence}")
print(f"Test probabilities: {test_probs}")

# Verify the sequence
is_converged, conv_pos = model.verify_sequence(test_sequence, test_probs, tolerance=0.1)

print(f"\nVerification result:")
print(f"  Is converged: {is_converged}")
print(f"  Convergence position: {conv_pos}")



Testing verify_sequence function:
Test sequence: ['0', '1', '0']
Test probabilities: [[0.833, 0.166], [0.7, 0.3], [1, 1]]
Position 0: KL divergence = 0.001001
		 transformer_probs = [0.833, 0.166], Markov_probs = [0.83333333 0.16666667]
Position 1: KL divergence = -0.000000
		 transformer_probs = [0.7, 0.3], Markov_probs = [0.7 0.3]
Position 2: KL divergence = -0.000000
		 transformer_probs = [1, 1], Markov_probs = [1. 0.]
Convergence analysis:
  Convergence position: 0
  Is converged: True
  Final KL divergence: -0.000000

Verification result:
  Is converged: True
  Convergence position: 0
