In [1]:
pip install qutip


Collecting qutip
  Downloading qutip-4.7.1-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl (14.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m14.8/14.8 MB[0m [31m37.2 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: qutip
Successfully installed qutip-4.7.1
[0mNote: you may need to restart the kernel to use updated packages.


In [2]:
pip install torchdiffeq

Collecting torchdiffeq
  Downloading torchdiffeq-0.2.3-py3-none-any.whl (31 kB)
Installing collected packages: torchdiffeq
Successfully installed torchdiffeq-0.2.3
[0mNote: you may need to restart the kernel to use updated packages.


In [3]:
from __future__ import print_function, division
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import Dataset
import torch
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import sys
from qutip import *
from torchdiffeq import odeint
import os
import torch
import numpy as np
import math
import matplotlib.pyplot as plt
from matplotlib import cm
import imageio.v2 as imageio

In [4]:
def normalize(a):
	a_oo = a - np.real(a).min()
	return a_oo/np.abs(a_oo).max()

def get_state(theta, phi):
    ket0, ket1 = np.array([[1.],[0.]]), np.array([[0.],[1.]])
    bloch_state = np.cos(theta/2) * ket0 + np.exp(complex(0, phi))*ket1
    return Qobj(bloch_state)

def get_spherical(theta, phi):
    return np.array([np.sin(theta) * np.cos(phi), np.sin(theta) * np.sin(phi), np.cos(theta)])

def sample_bloch(n_samples=50, rand=True):
    if rand:
        thetas = np.sort(np.pi * np.random.rand(n_samples))
        phis = np.sort(2 * np.pi * np.random.rand(n_samples))
        
    else:
        thetas = np.linspace(0, np.pi, n_samples)
        phis = np.linspace(0, 2 * np.pi, n_samples)
    
    bloch_vec = np.dstack(np.meshgrid(thetas, phis)) # [n_samples, n_samples, 2]
    return bloch_vec.reshape(n_samples * n_samples, 2) # [n_samples^2, 2]

def sample_initial_states(n_samples=50, rand=True):
    " sample initial states "
    bloch_vecs = sample_bloch(n_samples, rand)
    states = [get_state(*bvec) for bvec in bloch_vecs]
    spherical = np.asarray([get_spherical(*bvec) for bvec in bloch_vecs])

    return states, bloch_vecs, spherical

def final_states_to_numpy(states):
    "convert list of quantum objects to numpy array [2, num_time_steps]"
    return np.concatenate([state.full() for state in states], axis=-1)

class StochasticTwoLevelDataset(Dataset):
    def __init__(self, num_batches=30, batched_samples=6, validation_samples=10, start=0, stop=2, last=10, time_steps=300, mc_samples=250, dataset_type='closed'): 
        self.total_time_steps = np.linspace(start, last, time_steps)
        self.initial_states, _, self.spherical = sample_initial_states(batched_samples, rand=True)
        self.validation_points = sample_initial_states(validation_samples, rand=False)
        self.num_per_batch = batched_samples ** 2
        self.num_batches = num_batches
        self.num_trajs = self.num_per_batch * self.num_batches
        self.dataset_type = dataset_type

        if dataset_type == 'closed':
            self.rand_parameters = np.zeros((num_batches, 2))
        elif dataset_type == 'open':
            self.rand_parameters = np.zeros((num_batches, 4))
        expect_data = []
        for i in range(num_batches):
            samp_z = np.random.uniform(1, 2.5, 1)
            samp_x = np.random.uniform(1, 2.5, 1)
            self.rand_parameters[i, 0] = samp_z
            self.rand_parameters[i, 1] = samp_x
            H = samp_z[0] * sigmaz() + samp_x[0] * sigmax()

            if dataset_type == 'closed':
                solve = lambda state : sesolve(H, state, self.total_time_steps, e_ops=[sigmax(), sigmay(), sigmaz()], progress_bar=None)
            elif dataset_type == 'open':
                decay_samp = np.random.uniform(0.1, 0.3, 2)
                self.rand_parameters[i, 2:] = decay_samp
                c_ops = [np.sqrt(decay_samp[0]) * sigmax(), np.sqrt(decay_samp[1]) * sigmaz()]
                solve = lambda state : mesolve(H, state, self.total_time_steps, e_ops=[sigmax(), sigmay(), sigmaz()], c_ops=c_ops)
                
            all_states = [solve(state).expect for state in self.initial_states]
            states = [np.asarray(states, dtype='double') for states in all_states] 
            states = np.asarray([np.column_stack([state[0], state[1], state[2]]) for state in states])
            expect_data.append(states)
            
        self.expect_data = np.asarray(expect_data)
        self.total_expect_data = self.expect_data.reshape(self.num_trajs, time_steps, 3)
        self.train_time_steps = self.total_time_steps[np.where(self.total_time_steps <= stop)]
        self.train_expect_data = self.total_expect_data[:,:self.train_time_steps.shape[0],:]

    def plot_trajs(self):
        for i in range(self.num_batches):
            for j in range(self.num_per_batch):
                ts = self.time_steps
                fig, (ax1, ax2, ax3) = plt.subplots(3, 1)

                ax1.plot(ts, self.expect_data[i, j, :, 0])
                ax1.set_ylim(-1, 1)
                ax1.set_ylabel('$\sigma_x$')

                ax2.plot(ts, self.expect_data[i, j, :, 1])
                ax2.set_ylim(-1, 1)
                ax2.set_ylabel('$\sigma_y$')

                ax3.plot(ts, self.expect_data[i, j, :, 2])
                ax3.set_ylim(-1, 1)
                ax3.set_ylabel('$\sigma_z$')
                if self.dataset_type == 'closed':
                    ax3.set_xlabel('H = {}z + {}x'.format(self.rand_parameters[i, 0], self.rand_parameters[i, 1]))
                else:
                    ax3.set_xlabel('H = {}z + {}x decay: {} {}'.format(*self.rand_parameters[i]))

                plt.savefig('plots/stochastic_closed_noise/traj_{}_{}.png'.format(i, j))
                plt.close(fig)

    def render_initial_states(self, directory):
        bloch = Bloch()
        colors = normalize(self.spherical)
        bloch.point_color = colors
        bloch.add_points([self.spherical[:, 0], self.spherical[:, 1], self.spherical[:, 2]], 'm')
        bloch.save(directory)

# two qubit functions

In [5]:
class LatentODEfunc(nn.Module):

    def __init__(self, latent_dim=4, nhidden=20):
        super(LatentODEfunc, self).__init__()
        self.elu = nn.ELU(inplace=True)
        self.fc1 = nn.Linear(latent_dim, nhidden)
        self.fc2 = nn.Linear(nhidden, nhidden)
        self.fc3 = nn.Linear(nhidden, nhidden)
        self.fc4 = nn.Linear(nhidden, latent_dim)
        self.nfe = 0

    def forward(self, t, x):
        self.nfe += 1
        out = self.fc1(x)
        out = self.elu(out)
        out = self.fc2(out)
        out = self.elu(out)
        out = self.fc3(out)
        out = self.elu(out)
        out = self.fc4(out)
        return out


class RecognitionRNN(nn.Module):

    def __init__(self, latent_dim=4, obs_dim=3, nhidden=25, nbatch=1):
        super(RecognitionRNN, self).__init__()
        self.nhidden = nhidden
        self.nbatch = nbatch
        self.i2h = nn.Linear(obs_dim + nhidden, nhidden)
        self.h2o = nn.Linear(nhidden, latent_dim * 2)

    def forward(self, x, h):
        combined = torch.cat((x, h), dim=1)
        h = torch.tanh(self.i2h(combined))
        out = self.h2o(h)
        return out, h

    def initHidden(self, batch=0):
        if batch == 0:
            return torch.zeros(self.nbatch, self.nhidden)
        else:
            return torch.zeros(batch, self.nhidden)


class Decoder(nn.Module):

    def __init__(self, latent_dim=4, obs_dim=2, nhidden=20, extra=False):
        super(Decoder, self).__init__()
        self.extra = extra
        self.tanh = nn.Tanh()
        self.fc1 = nn.Linear(latent_dim, nhidden)
        self.fc2 = nn.Linear(nhidden, nhidden)
        self.fc3 = nn.Linear(nhidden, obs_dim)

    def forward(self, z):
        out = self.fc1(z)
        out = self.tanh(out)
        if self.extra:
            out = self.fc2(out)
            out = self.tanh(out)
            out = self.fc3(out)
        else:
            out = self.fc3(out)
        return out

def log_normal_pdf(x, mean, logvar):
    const = torch.from_numpy(np.array([2. * np.pi])).float().to(x.device)
    const = torch.log(const)
    return -.5 * (const + logvar + (x - mean) ** 2. / torch.exp(logvar))


def normal_kl(mu1, lv1, mu2, lv2):
    v1 = torch.exp(lv1)
    v2 = torch.exp(lv2)
    lstd1 = lv1 / 2.
    lstd2 = lv2 / 2.

    kl = lstd2 - lstd1 + ((v1 + (mu1 - mu2) ** 2.) / (2. * v2)) - .5
    return kl





class latent_ode:
    def __init__(self, obs_dim=2, latent_dim=4, nhidden=20, rnn_nhidden=25, lr=1e-2, batch=1000, beta=1, extra_decode=False):
        self.obs_dim = obs_dim
        self.latent_dim = latent_dim
        self.nhidden = nhidden
        self.rnn_nhidden = rnn_nhidden
        self.beta = beta
        self.epsilon = None

        self.func = LatentODEfunc(latent_dim, nhidden)
        self.rec = RecognitionRNN(latent_dim, obs_dim, rnn_nhidden, batch)
        self.dec = Decoder(latent_dim, obs_dim, nhidden, extra=extra_decode)
        self.params = (list(self.func.parameters()) + list(self.dec.parameters()) + list(self.rec.parameters()))
        self.lr = lr
        self.optimizer = optim.Adam(self.params, lr=self.lr)

    def train(self, trajs, ts, num_epochs):
        # dataset parameters
        num_ts = ts.size(0)
        beta = self.beta
        for itr in range(1, num_epochs + 1):
            self.optimizer.zero_grad()
            h = self.rec.initHidden()
            for t in reversed(range(num_ts)):
                obs = trajs[:, t, :]
                out, h = self.rec.forward(obs, h)
            qz0_mean, qz0_logvar = out[:, :self.latent_dim], out[:, self.latent_dim:]
            epsilon = torch.randn(qz0_mean.size())
            z0 = epsilon * torch.exp(.5 * qz0_logvar) + qz0_mean

            # forward in time and solve ode for reconstructions
            pred_z = odeint(self.func, z0, ts).permute(1, 0, 2)
            pred_x = self.dec(pred_z)

            # compute loss
            noise_std_ = torch.zeros(pred_x.size()) + 0.3
            noise_logvar = 2. * torch.log(noise_std_)
            logpx = log_normal_pdf(
                trajs, pred_x, noise_logvar).sum(-1).sum(-1)
            pz0_mean = pz0_logvar = torch.zeros(z0.size())
            analytic_kl = beta * normal_kl(qz0_mean, qz0_logvar,
                                    pz0_mean, pz0_logvar).sum(-1)
            loss = torch.mean(-logpx + analytic_kl, dim=0)
            loss.backward()
            av_mse, *_ = self.MSE(trajs, ts)
            self.optimizer.step()

            if itr == num_epochs:
                self.epsilon = epsilon
            
            print('Epoch: {}, elbo: {:.4f}, mse: {:.4f}'.format(itr, loss, av_mse))

    def encode(self, trajs, ts, reconstruct=True):
        if (reconstruct):
            with torch.no_grad():
                num_ts = ts.size(0)
                # sample from trajectorys' approx. posterior
                h = self.rec.initHidden(batch=trajs.shape[0])
                for t in reversed(range(num_ts)):
                    obs = trajs[:, t, :]
                    out, h = self.rec.forward(obs, h)
                qz0_mean, qz0_logvar = out[:, :self.latent_dim], out[:, self.latent_dim:]
                z0 = qz0_mean #self.epsilon * torch.exp(.5 * qz0_logvar) + qz0_mean
        else:
            with torch.no_grad():
                num_ts = ts.size(0)
                # sample from trajectorys' approx. posterior
                h = self.rec.initHidden(batch=trajs.shape[0])
                for t in reversed(range(num_ts)):
                    obs = trajs[:, t, :]
                    out, h = self.rec.forward(obs, h)
                qz0_mean, qz0_logvar = out[:, :self.latent_dim], out[:, self.latent_dim:]
                epsilon = torch.randn(qz0_mean.size())
                z0 = epsilon * torch.exp(.5 * qz0_logvar) + qz0_mean

        return z0

    def decode(self, z0, ts):
        with torch.no_grad():
            if len(z0.shape) == 1:
                pred_z = odeint(self.func, z0, ts)
            else:
                pred_z = odeint(self.func, z0, ts).permute(1, 0, 2)
            pred_x = self.dec(pred_z)
        return pred_x

    def latent_dynamics(self, trajs, enc_ts, dec_ts, recontruct=True):
        z0 = self.encode(trajs, enc_ts, recontruct)
        print('z0',z0.shape)
        with torch.no_grad():
            if len(z0.shape) == 1:
                pred_z = odeint(self.func, z0, dec_ts)
            else:
                pred_z = odeint(self.func, z0, dec_ts).permute(1, 0, 2)
        return pred_z


    def MSE(self, trajs, train_ts):
        z0 = self.encode(trajs, train_ts)
        pred_x = self.decode(z0, train_ts)

        mse_errors = np.mean((trajs.numpy() - pred_x.numpy()) ** 2, axis=1)
        mse_errors = np.mean(mse_errors, axis=1)
        avg_mse = np.mean(mse_errors)

        return avg_mse, mse_errors
    


In [6]:
def save_dataset(data, dataset_name):
    torch.save(data, './{}.pt'.format(dataset_name))

def load_dataset(dataset_name):
    return torch.load('./{}.pt'.format(dataset_name))


In [7]:
def save_model(model, model_name):
    torch.save(model.func.state_dict(), './{}_func.pt'.format(model_name))
    torch.save(model.dec.state_dict(), './{}_dec.pt'.format(model_name))
    torch.save(model.rec.state_dict(), './{}_rec.pt'.format(model_name))
    torch.save(model.epsilon, './{}_epsilon.pt'.format(model_name))


In [8]:
seed = 12345
types = 'closed'
epochs = 300
obs_dim = 3
latent_dim = 10
rnn_nhidden = 48
nhidden = 48
lr = 4e-3
    

In [9]:
if __name__ == "__main__":
    torch.manual_seed(seed)
    np.random.seed(seed)
    
    # initializing the dataset
    data = StochasticTwoLevelDataset(dataset_type= types)
    save_dataset(data, '{}-{}'.format(types, seed))
    print('dataset {}-{} saved'.format(types, seed))

    #initializing the model
    trajs = torch.from_numpy(data.train_expect_data).float()
    ts = torch.from_numpy(data.train_time_steps).float()
    model = latent_ode(obs_dim=obs_dim, latent_dim= latent_dim, nhidden=nhidden, 
        rnn_nhidden=rnn_nhidden, lr=lr, batch=data.train_expect_data.shape[0])

    model.train(trajs, ts, epochs)
    save_model(model, '{}_{}_{}_{}-{}'.format(types, latent_dim, rnn_nhidden, nhidden, lr))

dataset closed-12345 saved
Epoch: 1, elbo: 339.3580, mse: 0.3210
Epoch: 2, elbo: 312.3820, mse: 0.3110
Epoch: 3, elbo: 283.8954, mse: 0.3070
Epoch: 4, elbo: 274.0133, mse: 0.3036
Epoch: 5, elbo: 259.8314, mse: 0.2987
Epoch: 6, elbo: 247.7667, mse: 0.2884
Epoch: 7, elbo: 232.3782, mse: 0.2718
Epoch: 8, elbo: 231.7753, mse: 0.2710
Epoch: 9, elbo: 231.0222, mse: 0.2733
Epoch: 10, elbo: 227.7909, mse: 0.2697
Epoch: 11, elbo: 223.2791, mse: 0.2644
Epoch: 12, elbo: 221.8174, mse: 0.2619
Epoch: 13, elbo: 221.7539, mse: 0.2616
Epoch: 14, elbo: 221.8112, mse: 0.2608
Epoch: 15, elbo: 220.0978, mse: 0.2605
Epoch: 16, elbo: 220.6349, mse: 0.2611
Epoch: 17, elbo: 219.3941, mse: 0.2616
Epoch: 18, elbo: 218.4250, mse: 0.2611
Epoch: 19, elbo: 217.3216, mse: 0.2593
Epoch: 20, elbo: 216.4761, mse: 0.2577
Epoch: 21, elbo: 215.0945, mse: 0.2571
Epoch: 22, elbo: 214.9403, mse: 0.2569
Epoch: 23, elbo: 215.0400, mse: 0.2570
Epoch: 24, elbo: 214.6177, mse: 0.2567
Epoch: 25, elbo: 213.5191, mse: 0.2558
Epoch: 

In [10]:
train_end = 2
expo_start = 1.9732
expo_good_end = 4
np.random.seed(0)
torch.manual_seed(0)

<torch._C.Generator at 0x7f2df3ee9470>

In [11]:
def load_model(model, model_name):
    model.func.load_state_dict(torch.load('./saved_models/{}_func.pt'.format(model_name)))
    model.dec.load_state_dict(torch.load('./saved_models/{}_dec.pt'.format(model_name)))
    model.rec.load_state_dict(torch.load('./saved_models/{}_rec.pt'.format(model_name)))
    try:
        model.epsilon = torch.load('./saved_models/{}_epsilon.pt'.format(model_name))
    except:
        pass
    model.func.eval()
    model.dec.eval()
    model.rec.eval()

In [12]:
def load(type):
    if type == 'open':
        data = torch.load('./open_6_53_53_0.007.pt')
        model = latent_ode(batch=1080, obs_dim=3, latent_dim=6, nhidden=53, rnn_nhidden=53, lr=0.007, beta=1, extra_decode=True)
        load_model(model, 'open_6_53_53_0.007')
    elif type == 'closed':
        data = torch.load('saved_datasets/closed-1.pt') 
        model = latent_ode(batch=1080, obs_dim=3, latent_dim=6, nhidden=48, rnn_nhidden=48, lr=0.004, extra_decode=True)
        load_model(model, 'closed_6_48_48_0.004')
    elif type == 'two':
        data = torch.load('./two_8_170_170_0.002.pt') 
        model = latent_ode(batch=1080, obs_dim=4, latent_dim=8, nhidden=170, rnn_nhidden=170, lr=0.002, extra_decode=True)
        load_model(model, 'two_8_170_170_0.002')

    return data, model