In [1]:
import cheetah
from gym import spaces
import numpy as np
import torch
from torch import nn
from torch import optim

from environments import ARESlatticeStage3v1_9 as lattice
from environments import utils
from environments.absolute import ARESEAAbsolute

initializing ocelot...


In [2]:
screen_resolution = (2448, 2040)
pixel_size = (3.3198e-6, 2.4469e-6)

cell = utils.subcell_of(lattice.cell, "AREASOLA1", "AREABSCR1")

segment = cheetah.Segment.from_ocelot(cell)
segment.AREABSCR1.resolution = screen_resolution
segment.AREABSCR1.pixel_size = pixel_size
segment.AREABSCR1.is_active = True

segment.AREABSCR1.binning = 4



In [3]:
actuator_space = spaces.Box(
    low=np.array([-30, -30, -30, -3e-3, -6e-3], dtype=np.float32),
    high=np.array([30, 30, 30, 3e-3, 6e-3], dtype=np.float32)
)
goal_space = spaces.Box(
    low=np.array([-2e-3, -2e-3, 0, 0], dtype=np.float32),
    high=np.array([2e-3, 2e-3, 5e-4, 5e-4], dtype=np.float32)
)

In [4]:
batch_size = 4

In [5]:
def track(actuators, incoming):
    segment.AREAMQZM1.k1, segment.AREAMQZM2.k1, segment.AREAMQZM3.k1 = actuators[:3]
    segment.AREAMCVM1.angle, segment.AREAMCHM1.angle = actuators[3:]

    outgoing = segment(incoming)
    
    return np.array([
        segment.AREABSCR1.read_beam.mu_x,
        segment.AREABSCR1.read_beam.mu_y,
        segment.AREABSCR1.read_beam.sigma_x,
        segment.AREABSCR1.read_beam.sigma_y
    ])

In [6]:
incoming = [cheetah.Beam.make_random(
    n=int(1e5),
    mu_x=np.random.uniform(-3e-3, 3e-3),
    mu_y=np.random.uniform(-3e-4, 3e-4),
    mu_xp=np.random.uniform(-1e-4, 1e-4),
    mu_yp=np.random.uniform(-1e-4, 1e-4),
    sigma_x=np.random.uniform(0, 2e-3),
    sigma_y=np.random.uniform(0, 2e-3),
    sigma_xp=np.random.uniform(0, 1e-4),
    sigma_yp=np.random.uniform(0, 1e-4),
    sigma_s=np.random.uniform(0, 2e-3),
    sigma_p=np.random.uniform(0, 5e-3),
    energy=np.random.uniform(80e6, 160e6)
) for _ in range(batch_size)]
initial_actuators = [actuator_space.sample() for _ in range(batch_size)]
desired = [goal_space.sample() for _ in range(batch_size)]
achieved = [track(actuators=x, incoming=y) for x, y in zip(initial_actuators, incoming)]

actuators_normalized = [a / actuator_space.high for a in initial_actuators]
desired_normalized = [d / goal_space.high for d in desired]
achieved_normalized = [a / goal_space.high for a in achieved]

observations = [np.concatenate([act, des, ach]) for act, des, ach in zip(actuators_normalized, desired_normalized, achieved_normalized)]
observations = torch.tensor(observations, dtype=torch.float32)

In [14]:
obs_dim = 13
act_dim = 5

policy = nn.Sequential(
            nn.Linear(13, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, act_dim)
        )
optimizer = optim.Adam(policy.parameters())

In [17]:
actuators_normalized = policy(observations)
actuators = actuators_normalized.detach().numpy() * actuator_space.high
actuators

array([[-3.0834339e+00,  1.8075350e+01, -8.2389021e+00, -1.2562300e-04,
         2.8751574e-03],
       [-5.4766650e+00,  1.5908647e+01, -4.8141012e+00,  4.4243887e-05,
         2.9782741e-03],
       [-6.6022120e+00,  1.6908152e+01, -7.0617170e+00,  1.7371456e-04,
         3.4988343e-03],
       [-1.2455313e+01,  1.8642729e+01, -9.3460073e+00,  2.5855444e-04,
         3.7138234e-03]], dtype=float32)

In [19]:
new_achieved = [track(a, i) for a, i in zip(actuators, incoming)]

In [20]:
new_achieved

[array([0.00392575, 0.00015873, 0.00060491, 0.00231073]),
 array([2.95057707e-03, 3.29152354e-05, 1.74099550e-04, 1.93410995e-03]),
 array([0.00403608, 0.00036771, 0.0013661 , 0.00077377]),
 array([0.00128955, 0.00018809, 0.00146892, 0.0007146 ])]

In [24]:
def objective_fn(achieved, desired):
    offset = achieved - desired
    weights = np.array([1, 1, 2, 2])

    return np.log((weights * np.abs(offset)).sum())

In [25]:
objectives = [objective_fn(a, d) for a, d in zip(new_achieved, desired)]
objectives

[-4.465337708493084,
 -4.896657019723347,
 -4.663975287466898,
 -5.089642641279441]

In [27]:
loss = np.mean(objectives)
loss

-4.778903164240692

In [29]:
policy.zero_grad()
loss.backward()
optimzer.step()

AttributeError: 'numpy.float64' object has no attribute 'backward'

In [4]:
def rollout(env, policy, batch_size):
    observations = torch.zeros(batch_size, env.observation_space.shape[0])
    actions = torch.zeros(batch_size, env.action_space.shape[0])
    objectives = torch.zeros(batch_size)

    for i in range(batch_size):
        observation = env.reset()
        observation = torch.tensor(observation, dtype=torch.float32)
        observations[i] = observation

        observation = torch.unsqueeze(observation, 0)
        action = policy(observation)
        action = torch.squeeze(action)
        actions[i] = action
        
        action = action.detach().numpy()
        _, objective, _, _ = env.step(action)
        objectives[i] = objective
    
    return observations, actions, objectives

In [5]:
n_steps = 1000
batch_size = 64

for i in range(n_steps):
    observations, actions, objectives = rollout(env, policy, batch_size)

    loss = objectives.mean()
    
    policy.zero_grad()
    loss.backward()
    optimizer.step()

    print(f"Finished step {i}")

RuntimeError: element 0 of tensors does not require grad and does not have a grad_fn

In [None]:
class GaussianActor(nn.Module):

    def __init__(self, obs_dim, act_dim):
        super().__init__()

        self.log_std = nn.Parameter(0.5 * torch.ones(act_dim))
        self.mu_net = nn.Sequential(
            nn.Linear(obs_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, act_dim)
        )
    
    def forward(self, observation, action=None):
        mu = self.mu_net(observation)
        std = torch.exp(self.log_std)
        pi = distributions.Normal(mu, std)

        if action is None:
            return pi
        else:
            log_probs = pi.log_prob(action).sum(axis=-1)
            return pi, log_probs


class OneShotPolicyGradient:

    def __init__(self, env, batch_size=64):
        self.env = env
        self.batch_size = batch_size

        self.actor = GaussianActor(env.observation_space.shape, env.action_space.shape)
        self.optimizer = optim.Adam(self.actor.parameters)
    
    def learn(self, n_steps=1000):
        step = 0
        while step < n_steps:
            
            
            

            step += self.batch_size


if __name__ == "__main__":
    env = ARESEAAbsolute()
    env = FlattenObservation(env)