In [2]:
import numpy as np
from scipy.linalg import expm
import random
import copy

def euler(Q, p0, dt, steps):
    """
    Simulates a continuous-time Markov chain using the Euler method.

    Parameters:
    - Q: Transition rate matrix (numpy array of shape NxN)
    - p0: Initial probability distribution (numpy array of shape N)
    - dt: Time step
    - steps: Number of time steps

    Returns:
    - p_history: List of probability vectors over time
    """
    p = p0.copy()
    p_history = [p]

    for i in range(steps):
        p = p + (dt/(1-i*dt))* np.matmul(Q, p)  # Euler update step
        p = np.clip(p, 0, 1)       # Ensure probabilities stay in [0,1]
        p /= np.sum(p)             # Normalize to ensure sum(p) = 1
        p_history.append(p)


    return np.array(p_history)

def tweedie(Q, p0, dt, steps):
    p = p0.copy()
    p_history = [p]
    for i in range(steps):
        p = np.matmul(expm((dt/(1-i*dt))*Q),p)# Tweedie update step
        p = np.clip(p, 0, 1)       # Ensure probabilities stay in [0,1]
        p /= np.sum(p)             # Normalize to ensure sum(p) = 1
        p_history.append(p)

    
    return np.array(p_history)

def tweedie2(p0,lam,beta,N,dt, steps):
    p = p0.copy()
    p_history = [p]
    delta_M=np.zeros((N+1,))
    delta_M[-1]=1
    for i in range(steps):
        a=(1-i*dt)**(N*lam)
        b=(1-i*dt)**beta
        p = a*b*p0+b*(1-a)/N+(1-b)*delta_M# Tweedie update step
        p = np.clip(p, 0, 1)       # Ensure probabilities stay in [0,1]
        p /= np.sum(p)             # Normalize to ensure sum(p) = 1
        p_history.append(p)

    
    return np.array(p_history)

def Gillespie_fixed_t(x0,lam,beta,N,t):    
    mask_prob=1-(1-t)**beta
    flip_prob=1-(1-t)**(N*lam)
    if mask_prob<random.random():
        x=N
    elif flip_prob<random.random():
        x=np.random.randint(N)
    else:
        x=x0
    return x        

def stoch_euler_for_backward(Q, x0, dt, steps,L,Denoiser,lam,beta):
    """
    Simulates the backward MC using Euler (Alg.2 in Lou)

    Parameters:
    - Q: Transition rate matrix (numpy array of shape NxN)
    - x0: sample from initial distribution, in {0,...,N-1}^L
    - dt: Time step
    - steps: Number of time steps

    Returns:
    - x_hist: List of samples in {0,...,N-1}^L where each coordinate evolves independently according to Q 
    """
    x = np.array(x0)
    x_history = [copy.deepcopy(x)]
    N=Q.shape[0]-1
    tokens=range(N+1)

    for j in range(2,steps):
        p=np.zeros([N+1,L])
        score=Denoiser(x,1-j*dt,N,L,lam,beta) #denoiser output - should be (N+1)xL with entry [x,i] being (12) in Lou et.al.
        x=copy.deepcopy(x_history[-1])
        #can parallelize this for loop
        for i in range(L):
            p[x[i],i]=1
            p[:,i] = p[:,i]+ (1/j)* np.multiply(Q[x[i],:],score[:,i])  # Euler update step
            p[:,i] = np.clip(p[:,i], 0, 1)       # Ensure probabilities stay in [0,1]
            p[:,i] /= np.sum(p[:,i])             # Normalize to ensure sum(p) = 1
            x[i]=np.random.choice(tokens,size=1,p=p[:,i])[0]
        x_history.append(x)

    return x_history

def stoch_tweedie_for_backward(x0, dt, steps,N,L,Denoiser,lam,beta):
    """
    Simulates the backward MC using Tweedie (Alg.2 in Lou)

    Parameters:

    - x0: sample from initial distribution, in {0,...,N-1}^L
    - dt: Time step
    - steps: Number of time steps

    Returns:
    - x_hist: List of samples in {0,...,N-1}^L where each coordinate evolves independently according to Q 
    """
    x = np.array(x0)
    x_history = [copy.deepcopy(x)]
    tokens=range(N+1)
    for j in range(2,steps):
        p=np.zeros([N+1,L])
        score=Denoiser(x,1-j*dt,N,L,lam,beta) #denoiser output - should be (N+1)xL with entry [x,i] being (12) in Lou et.al.
        x=copy.deepcopy(x_history[-1])
        #can parallelize this for loop
        for i in range(L):
            exp_part=np.zeros(N+1)
            score_part=np.zeros(N+1)
            a_ratio_forward=(1-1/j)**(N*lam)
            b_ratio_forward = (1-1/j)**beta
            a_ratio_backward=1/a_ratio_forward #these will always be >1, seems a problem since we do 1-them? Also the forward an backward ratios end up canceling in the multiplication later anyway so can maybe do without them
            b_ratio_backward=1/b_ratio_forward
            if x[i]==N:
                exp_part=(1-b_ratio_forward)*np.ones(N+1)
                exp_part[N]=1
            else:
                exp_part=b_ratio_forward*(1-a_ratio_forward)/N*np.ones(N+1)
                exp_part[x[i]]+=b_ratio_forward*a_ratio_forward
                exp_part[N]=0
            score_part=b_ratio_backward*(1-a_ratio_backward)*np.sum(score[:N,i])/N*np.ones(N+1) #off diagonal contribution
            score_part+=b_ratio_backward*a_ratio_backward*score[:,i] #diagonal contribution
            score_part[N]=np.sum(score[:N,i])*(1-b_ratio_backward)+score[N,i] #correct the masked token part
            p[:,i] = np.multiply(score_part,exp_part)  # Tweedie update step
            for prob in p[:,i]:
                if prob<0:
                    print("Negative Probability:")
                    print(p[:,i])
            p[:,i] = np.clip(p[:,i], 0, 1)       # Ensure probabilities stay in [0,1]
            p[:,i] /= np.sum(p[:,i])             # Normalize to ensure sum(p) = 1
            x[i]=np.random.choice(tokens,size=1,p=p[:,i])[0]
        x_history.append(x)

    return x_history


def fake_denoiser(x,t,N,L,lam,beta):
    #just insert a random guess for M_theta
    score=np.zeros([N+1,L])
    for i in range(L):
        fake_Mtheta=np.random.rand(N,)/t #this should be actually the output of the model evaluated at x
        fake_Mtheta=np.concatenate((fake_Mtheta,np.zeros(1,)))
        if x[i]==N:
            b=(1-t)**beta
            a=(1-t)**(N*lam)
            score[:,i]=1/(1-t) *b*(1-a)/(N*(1-b))*fake_Mtheta
            score[N,i]=1
        else:
            score[:,i]=1/(1-t)*fake_Mtheta
            score[x[i],i]=1
    return score

def ell(x,y):
    return np.exp(x)-np.exp(y)+np.exp(y)*(y-x)

def ELBO(x0,xt,t,Mbar,N,L,lam,beta):
    #Mbar = log(M_theta) in the notes. This should be the output of our NN
    sigma=1/(1-t)
    b=sigma**(-beta)
    a=sigma**(-N*lam)
    f = b*(1-a)/((1-b)*N)
    lambar=np.log(1+N*a/(1-a))
    sum=0
    for i in range(L):
        for j in range(N):
            if xt[i]==j:
                pass #we need not train this since then we enfore Mbar to be 1
            elif xt[i]==N:
                if x0[i]==j:
                    toexp=lambar
                else:
                    toexp=0
                sum+=f*beta*ell(Mbar[j,i],toexp)
            else:
                if x0[i]==j:
                    toexp=lambar
                elif x0[i]==xt[i]:
                    toexp=-lambar
                else:
                    toexp=0
                sum+=lam*ell(Mbar[j,i],toexp)
    return sigma*sum


# Example Usage
N_states=3
lam = 2 #lambda
beta = .05 #beta
dt = 0.001 #time step   
p0 = np.array([1.0, 0.0, 0.0,0.0]).T  # Initial probability vector
x0=0 #initial token
steps = int(np.floor(1/dt))-1                      # Number of iterations
Q = lam*np.ones((N_states,N_states))  # Construct transition rate matrix
Q = np.concatenate((np.concatenate((Q,beta*np.ones((1,N_states))),axis=0),np.zeros((N_states+1,1))),axis=1)
for i in range(N_states+1):
    Q[i,i]=0
    col_sum=np.sum(Q[:,i])
    Q[i,i]=-col_sum
print(Q)

[[-4.05  2.    2.    0.  ]
 [ 2.   -4.05  2.    0.  ]
 [ 2.    2.   -4.05  0.  ]
 [ 0.05  0.05  0.05 -0.  ]]


The below are numerical approximations for the forward density (on one token) using Euler, Tweedie1 with approximating the matrix exponential with expm, and Tweedie2 using the exact form of the matrix exponential we calculated by solving the Kolmogorov Equations. We take sigma(t)=1/(1-t), so that by time 1 all states should be masked with probability 1. Notice the large numerical error incurred via the time discritization when beta is chosen to be small: The last entry in p at time 1 is the probability of being in the masked state at time 1, which should be 1.

Probably we will want to be able to pass a time schedule instead of assuming we use sigma(t)=1/(1-t). Also probably want to add some buffer for blowup when t approaches 1.

Known approximation error for numerical approx. for time-inhomegenous linear ODEs?

In [3]:
p_history_e = euler(Q, p0, dt, steps)
p_history_t=tweedie(Q, p0, dt, steps)
p_history_t2=tweedie2(p0,lam,beta,N_states,dt, steps)
# Print results
for t, p in enumerate(zip(p_history_e,p_history_t,p_history_t2)):  
    print(f"t={t*dt:.3f}")
    print(f'Euler:{p[0]}')
    print(f'Tweedie1:{p[1]}')
    print(f'Tweedie2:{p[2]}')

t=0.000
Euler:[1. 0. 0. 0.]
Tweedie1:[1. 0. 0. 0.]
Tweedie2:[1. 0. 0. 0.]
t=0.001
Euler:[9.9595e-01 2.0000e-03 2.0000e-03 5.0000e-05]
Tweedie1:[9.95962177e-01 1.99391228e-03 1.99391228e-03 4.99987500e-05]
Tweedie2:[1. 0. 0. 0.]
t=0.002
Euler:[9.91920373e-01 3.98978979e-03 3.98978979e-03 1.00047548e-04]
Tweedie1:[9.91944603e-01 3.97767584e-03 3.97767584e-03 1.00045045e-04]
Tweedie2:[0.99397727 0.00199094 0.00199094 0.00204086]
t=0.003
Euler:[9.87911036e-01 5.96941071e-03 5.96941071e-03 1.50142736e-04]
Tweedie1:[9.87947198e-01 5.95133164e-03 5.95133164e-03 1.50138978e-04]
Tweedie2:[0.98800866 0.00396388 0.00396388 0.00406358]
t=0.004
Euler:[9.83921906e-01 7.93890397e-03 7.93890397e-03 2.00285657e-04]
Tweedie1:[9.83969878e-01 7.91492051e-03 7.91492051e-03 2.00280643e-04]
Tweedie2:[0.98209355 0.00591904 0.00591904 0.00606837]
t=0.005
Euler:[9.79952902e-01 9.89831065e-03 9.89831065e-03 2.50476406e-04]
Tweedie1:[9.80012563e-01 9.86848318e-03 9.86848318e-03 2.50470132e-04]
Tweedie2:[0.9762313

Gillespie simulation for the forward process with 1 token up to fixed time t

In [4]:
t=random.random()
x_t=Gillespie_fixed_t(x0,lam,beta,N_states,t)
print(x_t)

3


Euler backwards process with a fixed denoiser

In [5]:
L=5
bad_backward_e=stoch_euler_for_backward(Q, [N_states for _ in range(L)], dt, steps,L,fake_denoiser,lam,beta)
for t, x in enumerate(bad_backward_e):  
    print(f"t={t*dt:.4f},x={x}")

t=0.0000,x=[3 3 3 3 3]
t=0.0010,x=[0 0 0 2 1]
t=0.0020,x=[2 2 2 1 2]
t=0.0030,x=[0 0 0 2 0]
t=0.0040,x=[2 1 2 1 1]
t=0.0050,x=[1 2 0 0 2]
t=0.0060,x=[1 2 2 1 1]
t=0.0070,x=[0 2 1 0 2]
t=0.0080,x=[1 2 0 1 1]
t=0.0090,x=[0 2 2 2 1]
t=0.0100,x=[1 0 2 1 0]
t=0.0110,x=[1 0 1 1 0]
t=0.0120,x=[0 0 0 2 1]
t=0.0130,x=[2 1 0 1 1]
t=0.0140,x=[1 0 2 2 1]
t=0.0150,x=[0 1 2 0 1]
t=0.0160,x=[1 2 0 0 2]
t=0.0170,x=[2 0 1 1 2]
t=0.0180,x=[1 0 1 1 2]
t=0.0190,x=[1 0 1 1 0]
t=0.0200,x=[1 0 1 0 2]
t=0.0210,x=[2 1 0 0 0]
t=0.0220,x=[1 2 0 2 2]
t=0.0230,x=[1 0 0 2 1]
t=0.0240,x=[1 0 2 2 0]
t=0.0250,x=[1 2 1 1 0]
t=0.0260,x=[0 2 1 0 2]
t=0.0270,x=[0 1 1 0 0]
t=0.0280,x=[2 2 2 0 2]
t=0.0290,x=[1 0 2 1 0]
t=0.0300,x=[0 2 1 0 2]
t=0.0310,x=[2 1 2 0 2]
t=0.0320,x=[0 0 2 2 1]
t=0.0330,x=[2 0 2 1 1]
t=0.0340,x=[2 2 1 2 2]
t=0.0350,x=[1 2 1 2 2]
t=0.0360,x=[2 2 0 2 2]
t=0.0370,x=[2 2 0 0 0]
t=0.0380,x=[1 0 0 1 2]
t=0.0390,x=[1 0 0 1 0]
t=0.0400,x=[1 1 2 1 1]
t=0.0410,x=[1 0 0 0 0]
t=0.0420,x=[2 2 0 0 0]
t=0.0430,x=

In [6]:
#I suspect there is an issue here, since everything seems to instantaneously unmask
dt=.001
bad_backward_t=stoch_tweedie_for_backward([N_states for _ in range(L)], dt, steps,N_states,L,fake_denoiser,lam,beta)
for t, x in enumerate(bad_backward_t):  
    print(f"t={t*dt:.4f},x={x}")

Negative Probability:
[-364.34466382  528.43413294 -142.625762    -20.46370713]
Negative Probability:
[-364.34466382  528.43413294 -142.625762    -20.46370713]
Negative Probability:
[-364.34466382  528.43413294 -142.625762    -20.46370713]
Negative Probability:
[-143.7783179   349.63492905 -179.39584775  -25.4607634 ]
Negative Probability:
[-143.7783179   349.63492905 -179.39584775  -25.4607634 ]
Negative Probability:
[-143.7783179   349.63492905 -179.39584775  -25.4607634 ]
Negative Probability:
[-21.51569803 -69.41425719 103.95455754 -12.02460232]
Negative Probability:
[-21.51569803 -69.41425719 103.95455754 -12.02460232]
Negative Probability:
[-21.51569803 -69.41425719 103.95455754 -12.02460232]
Negative Probability:
[ 137.98588064 -350.53711018  235.97997467  -22.42874513]
Negative Probability:
[ 137.98588064 -350.53711018  235.97997467  -22.42874513]
Negative Probability:
[-200.0821215   362.18950289 -151.29901387   -9.80836752]
Negative Probability:
[-200.0821215   362.18950289 -

In [7]:
expm(-.1*Q) #maybe the score is so that multiply by this we still get positive values???

array([[ 1.55583898, -0.27541323, -0.27541323,  0.        ],
       [-0.27541323,  1.55583898, -0.27541323,  0.        ],
       [-0.27541323, -0.27541323,  1.55583898,  0.        ],
       [-0.00501252, -0.00501252, -0.00501252,  1.        ]])

In [None]:
x0=[1 for _ in range(L)]
t=.9999
xt=[Gillespie_fixed_t(x0[i],lam,beta,N_states,t) for i in range(L)]
fakeMbar=np.random.rand(N_states,L)
ELBO(x0,xt,t,fakeMbar,N_states,L,lam,beta)
#the ELBO seems to be large for t close to 0 and 1, meaning the beginning and end of the generation process is very important compared to the middle

np.float64(16946.737986553613)