# Import and Define the Control Problem

In [60]:
from collections import defaultdict
from typing import Optional

import numpy as np
import torch
import tqdm
from tensordict.nn import TensorDictModule
from tensordict.tensordict import TensorDict, TensorDictBase
from torch import nn

from torchrl.data import BoundedTensorSpec, CompositeSpec, UnboundedContinuousTensorSpec
from torchrl.envs import (
    CatTensors,
    EnvBase,
    Transform,
    TransformedEnv,
    UnsqueezeTransform,
)
from torchrl.envs.transforms.transforms import _apply_to_composite
from torchrl.envs.utils import check_env_specs, step_mdp

device = "cpu" if not torch.cuda.is_available() else "cuda:0"
print(device)

device = "cpu"

complex_type = torch.complex64
PSI_0 = np.array([1, 0])
PSI_0_tensor = torch.tensor(PSI_0, dtype=complex_type).to(device)
PSI_F = np.array([0, 1])
PSI_F_tensor = torch.tensor(PSI_F, dtype=complex_type).to(device)

H0 = np.array([[1,  0],
               [0, -1]])
H0_tensor = torch.tensor(H0, dtype=complex_type).to(device)
H1 = np.array([[0,  1],
               [1,  0]])
H1_tensor = torch.tensor(H1, dtype=complex_type).to(device)

cuda:0


# Define the Various Things We Need for TorchRL

In [61]:
# Step function advances along the environment
def _step(tensordict):

    # Current quantum state and control value
    psi_real, psi_im, control = tensordict["psi_real"], tensordict["psi_imag"], tensordict["action"].squeeze(-1)
    psi = psi_real + 1.0j*psi_im

    # H0 and H1
    H0 = tensordict["params","H0"]
    H1 = tensordict["params","H1"]

    # Timestep
    dt = tensordict["params", "dt"]

    # Cost Function -- Fidelity with Final State
    costs = torch.pow(torch.abs(torch.dot(psi, PSI_F_tensor)), 2)

    # Propagate along the state
    U = torch.linalg.matrix_exp(-1.0j*dt*(H0 + control*H1))
    new_psi = U@psi

    reward = -costs.view(*tensordict.shape, 1)
    done = torch.zeros_like(reward, dtype=torch.bool)
    out = TensorDict(
        {
            "psi_real": torch.real(new_psi),
            "psi_imag": torch.imag(new_psi),
            "params": tensordict["params"],
            "reward": reward,
            "done": done,
        },
        tensordict.shape,
    )
    return out

In [62]:
# Reset function resets a run to the original starting state
def _reset(self, tensordict):

    if tensordict is None or tensordict.is_empty():
        # if no tensordict is passed, we generate a single set of hyperparameters
        # Otherwise, we assume that the input tensordict contains all the relevant
        # parameters to get started.
        tensordict = self.gen_params(batch_size=self.batch_size)

    out = TensorDict(
        {
            "psi_real": torch.real(PSI_0_tensor),
            "psi_imag": torch.imag(PSI_0_tensor),
            "params": tensordict["params"],
        },
        batch_size=tensordict.shape,
    )
    return out

In [63]:
# Specifies the bounds of the environment
def _make_spec(self, td_params):

    # Under the hood, this will populate self.output_spec["observation"]
    self.observation_spec = CompositeSpec(
        psi_real=BoundedTensorSpec(
            low=-1,
            high=1,
            shape=(PSI_0.shape[0],),
            dtype=torch.float32)
        ,
        psi_imag=BoundedTensorSpec(
            low=-1,
            high=1,
            shape=(PSI_0.shape[0],),
            dtype=torch.float32)
        ,
        # we need to add the "params" to the observation specs, as we want
        # to pass it at each step during a rollout
        params=make_composite_from_td(td_params["params"]),
        shape=(),
    )
    
    # since the environment is stateless, we expect the previous output as input.
    # For this, EnvBase expects some state_spec to be available
    self.state_spec = self.observation_spec.clone()
    # action-spec will be automatically wrapped in input_spec when
    # `self.action_spec = spec` will be called supported
    self.action_spec = BoundedTensorSpec(
        low=-td_params["params", "max_amp"],
        high=td_params["params", "max_amp"],
        shape=(1,),
        dtype=torch.float32,
    )
    self.reward_spec = UnboundedContinuousTensorSpec(shape=(*td_params.shape, 1))


def make_composite_from_td(td):
    # custom funtion to convert a tensordict in a similar spec structure
    # of unbounded values.
    composite = CompositeSpec(
        {
            key: make_composite_from_td(tensor)
            if isinstance(tensor, TensorDictBase)
            else UnboundedContinuousTensorSpec(
                dtype=tensor.dtype, device=tensor.device, shape=tensor.shape
            )
            for key, tensor in td.items()
        },
        shape=td.shape,
    )
    return composite

In [64]:
def _set_seed(self, seed: Optional[int]):
    rng = torch.manual_seed(seed)
    self.rng = rng

In [65]:
def gen_params(dt=0.01, batch_size=None) -> TensorDictBase:
    """Returns a tensordict containing the physical parameters such as timestep and control stuff."""
    if batch_size is None:
        batch_size = []
    td = TensorDict(
        {
            "params": TensorDict(
                {
                    "max_amp": 1.0,
                    "dt": dt,
                    "H0": H0_tensor,
                    "H1": H1_tensor  
                },
                [],
            )
        },
        [],
    )
    if batch_size:
        td = td.expand(batch_size).contiguous()
    return td

In [66]:
class QuantumEnv(EnvBase):
    metadata = {
        "render_modes": ["human", "rgb_array"],
        "render_fps": 30,
    }
    batch_locked = False

    def __init__(self, td_params=None, seed=None, device="cpu"):
        if td_params is None:
            td_params = self.gen_params()

        super().__init__(device=device, batch_size=[])
        self._make_spec(td_params)
        if seed is None:
            seed = torch.empty((), dtype=torch.int64).random_().item()
        self.set_seed(seed)

    # Helpers: _make_step and gen_params
    gen_params = staticmethod(gen_params)
    _make_spec = _make_spec

    # Mandatory methods: _step, _reset and _set_seed
    _reset = _reset
    _step = staticmethod(_step)
    _set_seed = _set_seed

In [67]:
env = QuantumEnv()
check_env_specs(env)

check_env_specs succeeded!


In [68]:
print("observation_spec:", env.observation_spec)
print("state_spec:", env.state_spec)
print("reward_spec:", env.reward_spec)

observation_spec: CompositeSpec(
    psi_real: BoundedTensorSpec(
        shape=torch.Size([2]),
        space=ContinuousBox(
            low=Tensor(shape=torch.Size([2]), device=cpu, dtype=torch.float32, contiguous=True),
            high=Tensor(shape=torch.Size([2]), device=cpu, dtype=torch.float32, contiguous=True)),
        device=cpu,
        dtype=torch.float32,
        domain=continuous),
    psi_imag: BoundedTensorSpec(
        shape=torch.Size([2]),
        space=ContinuousBox(
            low=Tensor(shape=torch.Size([2]), device=cpu, dtype=torch.float32, contiguous=True),
            high=Tensor(shape=torch.Size([2]), device=cpu, dtype=torch.float32, contiguous=True)),
        device=cpu,
        dtype=torch.float32,
        domain=continuous),
    params: CompositeSpec(
        max_amp: UnboundedContinuousTensorSpec(
            shape=torch.Size([]),
            space=None,
            device=cpu,
            dtype=torch.float32,
            domain=continuous),
        dt: U

In [69]:
td = env.reset()
print("reset tensordict", td)

reset tensordict TensorDict(
    fields={
        done: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
        params: TensorDict(
            fields={
                H0: Tensor(shape=torch.Size([2, 2]), device=cpu, dtype=torch.complex64, is_shared=False),
                H1: Tensor(shape=torch.Size([2, 2]), device=cpu, dtype=torch.complex64, is_shared=False),
                dt: Tensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, is_shared=False),
                max_amp: Tensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, is_shared=False)},
            batch_size=torch.Size([]),
            device=None,
            is_shared=False),
        psi_imag: Tensor(shape=torch.Size([2]), device=cpu, dtype=torch.float32, is_shared=False),
        psi_real: Tensor(shape=torch.Size([2]), device=cpu, dtype=torch.float32, is_shared=False),
        terminated: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False)},
    batc

In [70]:
td = env.rand_step(td)
print("random step tensordict", td)

random step tensordict TensorDict(
    fields={
        action: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.float32, is_shared=False),
        done: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
        next: TensorDict(
            fields={
                done: Tensor(shape=torch.Size([1]), device=cpu, dtype=torch.bool, is_shared=False),
                params: TensorDict(
                    fields={
                        H0: Tensor(shape=torch.Size([2, 2]), device=cpu, dtype=torch.complex64, is_shared=False),
                        H1: Tensor(shape=torch.Size([2, 2]), device=cpu, dtype=torch.complex64, is_shared=False),
                        dt: Tensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, is_shared=False),
                        max_amp: Tensor(shape=torch.Size([]), device=cpu, dtype=torch.float32, is_shared=False)},
                    batch_size=torch.Size([]),
                    device=None,
                    i

In [11]:
class quantum_env:

    def __init__(self, psi0: qt.Qobj, H0: qt.Qobj, H1: qt.Qobj, psif: qt.Qobj, dt: float = 0.1, maxsteps: int = 100):

        self.psi0 = psi0
        self.H0 = H0
        self.H1 = H1
        self.psif = psif

        self.psi0_arr = self.qObj_to_array(psi0)
        self.H0_arr = self.qObj_to_array(H0)
        self.H1_arr = self.qObj_to_array(H1)
        self.psif_arr = self.qObj_to_array(psif)

        self.action_space = quantum_action_space()

        self.reset()

    def reset(self):

        # Reset control input and state input
        self.H1_coeff = 0
        self.state = np.concatenate(self.H1_coeff*np.ones(1).flatten(), self.psi0_arr)
        self.psi = self.psi0
        info = {}

        return self.psi0_arr, info
    
    def qObj_to_array(self, psi):
        
        as_arr = psi.full()
        return np.concatenate((np.real(as_arr).flatten(), np.imag(as_arr).flatten()))
    
    def step(self, action):

        self.H1_coeff += action
        
        res = qt.sesolve(self.H0 + self.H1_coeff*self.H1, self.psi, tlist=[0, self.dt])

        self.psi = res.states[-1]
        self.state = np.concatenate(self.H1_coeff, self.qObj_to_array(self.psi))

        return

class quantum_action_space:

    def __init__(self, vmin = -0.1, vmax = 0.1, gradation=11):
        self.vmin = vmin
        self.vmax = vmax

        self.actions = np.linspace(vmin, vmax, gradation)
        self.n = self.actions.size

    def sample(self):
        return np.random.sample(self.actions)

SyntaxError: invalid syntax (689547695.py, line 22)

In [4]:
env.action_space.n

2

In [8]:
env.action_space.sample()

0

In [9]:
env.step?

[0;31mSignature:[0m [0menv[0m[0;34m.[0m[0mstep[0m[0;34m([0m[0maction[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Steps through the environment and if the number of steps elapsed exceeds ``max_episode_steps`` then truncate.

Args:
    action: The environment step action

Returns:
    The environment step ``(observation, reward, terminated, truncated, info)`` with `truncated=True`
    if the number of steps elapsed >= max episode steps
[0;31mFile:[0m      ~/.conda/envs/torch/lib/python3.9/site-packages/gymnasium/wrappers/time_limit.py
[0;31mType:[0m      method

In [None]:
env.action_space.n
env.reset()
env.action_space.sample()
env.step(action.item())
observation, reward, terminated, truncated, _ = env.step(action.item())