## Model Predictive Control: MCP
模型预测控制

In [12]:
import gym

# 定义环境
class MyWrapper(gym.Wrapper):
  def __init__(self):
    env = gym.make('Pendulum-v1', render_mode='rgb_array')
    super().__init__(env)
    self.env = env
    self.step_n = 0
  
  def reset(self):
    state, _ = self.env.reset()
    self.step_n = 0
    return state
    
  def step(self, action):
    state, reward, terminated, truncated, info = self.env.step(action)
    done = terminated or truncated
    self.step_n += 1
    if self.step_n >= 200:
      done = True
    return state, reward, done, info 
  
env = MyWrapper()
env.reset()

array([-0.82245004,  0.56883734,  0.60749143], dtype=float32)

In [13]:
import numpy as np
import torch

class Pool:
	def __init__(self, limit):
		# 样本池
		self.datas = []
		self.limit = limit
  
	# 向样本池中添加数据
	def add(self, state, action, reward, next_state, over):
		if isinstance(state, np.ndarray) or isinstance(state, torch.Tensor):
			state = state.reshape(3).tolist()

		action = float(action)
		reward = float(reward)
		
		if isinstance(next_state, np.ndarray) or isinstance(
			next_state, torch.Tensor
		):
			next_state = next_state.reshape(3).tolist()

		over = bool(over)
  
		self.datas.append((state, action, reward, next_state, over))

		# 数据上限，超出时从最古老的开始删除
		while len(self.datas) > self.limit:
			self.datas.pop(0)
  
  # 获取一批数据样本
	def get_sample(self):
		# 从样本池中采样
		samples = self.datas
  
		# [b, 3]
		state = torch.FloatTensor([i[0] for i in samples]).reshape(-1, 3)
		# [b, 1]
		action = torch.FloatTensor([i[1] for i in samples]).reshape(-1, 1)
		# [b, 1]
		reward = torch.FloatTensor([i[2] for i in samples]).reshape(-1, 1)
		# [b, 4]
		next_state = torch.FloatTensor([i[3] for i in samples]).reshape(-1, 3)
		# [b, 1]
		over = torch.LongTensor([i[4] for i in samples]).reshape(-1, 1)
  
		# [b, 4]
		input = torch.cat([state, action], dim=1)
		# [b, 4]
		label = torch.cat([reward, next_state - state], dim=1)
		
		return input, label

	def __len__(self):
		return len(self.datas)

pool = Pool(100000)

# 初始化一局游戏的数据
def _():
	# 初始化游戏
	state = env.reset()

	# 玩到游戏结束为止
	over = False
	while not over:
		# 随机一个动作
		action = env.action_space.sample()[0]
  
		# 执行动作得到反馈
		next_state, reward, over, _ = env.step([action])
  
		# 记录数据样本
		pool.add(state, action, reward, next_state, over)
  
		# 更新游戏状态，开始下一个动作
		state = next_state
  

_()

a, b = pool.get_sample()

len(pool), pool.datas[0], a.shape, b.shape

(200,
 ([0.9117597937583923, 0.41072380542755127, 0.8333132863044739],
  0.6211926937103271,
  -0.24896565982900887,
  [0.8846868276596069, 0.4661858081817627, 1.2345350980758667],
  False),
 torch.Size([200, 4]),
 torch.Size([200, 4]))

In [14]:
import random

# 定义主模型
class Model(torch.nn.Module):
  
  # swish 激活函数
  class Swish(torch.nn.Module):
    def __init__(self):
      super().__init__()

    def forward(self, x):
      return x * torch.sigmoid(x)
    
  # 定义工具层
  class FCLayer(torch.nn.Module):
    def __init__(self, in_size, out_size):
      super().__init__()
      self.in_size = in_size
      
      # 初始化参数
      std = in_size ** 0.5
      std *= 2
      std = 1 / std
      
      weight = torch.empty(5, in_size, out_size)
      torch.nn.init.normal_(weight, mean=0.0, std=std)
      
      # [5, in, out]
      self.weight = torch.nn.Parameter(weight)
      
      # [5, 1, out]
      self.bias = torch.nn.Parameter(torch.zeros(5, 1, out_size))
      
    def forward(self, x):
      # x -> [5, b, in]
      
      # [5, b, in] * [5, in, out] -> [5, b, out]
      x = torch.bmm(x, self.weight)
      
      # [5, b, out] + [5, 1, out] -> [5, b, out]
      x = x + self.bias
      
      return x
    
    
  def __init__(self):
    super().__init__()
    
    self.sequential = torch.nn.Sequential(
      self.FCLayer(4, 200),
      self.Swish(),
      self.FCLayer(200, 200),
      self.Swish(),
      self.FCLayer(200, 200),
      self.Swish(),
      self.FCLayer(200, 200),
      self.Swish(),
      self.FCLayer(200, 8),
      torch.nn.Identity(),
    )
    
    self.softplus = torch.nn.Softplus()
    self.optimizer = torch.optim.Adam(self.parameters(), lr=1e-3)
    
  def forward(self, x):
    # x -> [5, b, 4]
    
    # [5, b, 4] -> [5, b, 8]
    x = self.sequential(x)
    
    # [5, b, 8] -> [5, b, 4]
    mean = x[..., :4]
    
    # [5, b, 8] -> [5, b, 4]
    logvar = x[..., 4:]
    
    # [1, 1, 4] - [5, b, 4] -> [5, b, 4]
    logvar = 0.5 - logvar
    
    # [1, 1, 4] - [5, b, 4] -> [5, b, 4]
    logvar = 0.5 - self.softplus(logvar)
    
    # [5, b, 4] - [1, 1, 4] -> [5, b, 4]
    logvar = logvar + 10
    
    # [5, b, 4] + [1, 1, 4] -> [5, b, 4]
    logvar = self.softplus(logvar) - 10
    
    # [5, b, 4], [5, b, 4]
    return mean, logvar
  
  def train(self, input, label):
    # input -> [b, 4]
    # label -> [b, 4]
    
    # 反复训练N次
    for _ in range(len(input) // 64 * 20):
      #从全量数据中抽样64个,反复抽5遍,形成5份数据
      #[5, 64]
      select = [torch.randperm(len(input))[:64] for _ in range(5)]
      select = torch.stack(select)
      # [5, b, 4], [5, b, 4]
      input_select = input[select]
      label_select = label[select]
      del select
      
      # 模型计算
      # [5, b, 4] -> [5, b, 4], [5, b, 4]
      mean, logvar = model(input_select)
      
      # 计算loss
      # [b, 4] - [b, 4] * [b, 4] -> [b, 4]
      mse_loss = (mean - label_select) ** 2 * (-logvar).exp()
      
      # [b, 4] -> [b] -> scala
      mse_loss = mse_loss.mean(dim=1).mean()
      
      # [b, 4] -> [b] -> scala
      var_loss = logvar.mean(dim=1).mean()
      
      loss = mse_loss + var_loss
      
      self.optimizer.zero_grad()
      loss.backward()
      self.optimizer.step()
      
model = Model()
a, b = model(torch.randn(5, 64, 4))
a.shape, b.shape

(torch.Size([5, 64, 4]), torch.Size([5, 64, 4]))

In [15]:
class MPC:
  def _fake_step(self, state, action):
    # state -> [b, 3]
    # action -> [b, 1]
    
    # [b, 4]
    input = torch.cat([state, action], dim=1)
    
    # 重复5遍
    # [b, 4] -> [1, b, 4] -> [5, b, 4]
    input = input.unsqueeze(dim=0).repeat([5, 1, 1])
    
    # 模型计算
    # [5, b, 4] -> [5, b, 4],[5, b, 4]
    with torch.no_grad():
      mean, std = model(input)
    std = std.exp().sqrt()
    del input
    
    # means的后三列加上环境数据
    mean[:, :, 1:] += state
    
    # 重采样
    # [5, b, 4]
    sample = torch.distributions.Normal(0, 1).sample(mean.shape)
    sample = mean + sample * std
    
    # 0-4采样b个元素
    # [4, 4, 2, 4, 3, 4, 1, 3, 3, 0, 2, ......]
    select = [random.choice(range(5)) for _ in range(mean.shape[1])]
    
    # 重采样结果，0d: 0-4随机选择，2d: 0-b随机选择
    # [5, b, 4] -> [b, 4]
    sample = sample[select, range(mean.shape[1])]
    
    # 切分
    reward, next_state = sample[:, :1], sample[:, 1:]
    
    return reward, next_state
    
  def _cem_optimize(self, state, mean):
    state = torch.FloatTensor(state).reshape(1, 3)
    var = torch.ones(25)
    # state -> [1, 3]
    # mean -> [25]
    
    # 当前环境信息，复制50次
    # [1, 3] -> [50, 3]
    state = state.repeat(50, 1)
    
    # 循环5次，找出最优解
    for _ in range(5):
      # 采样50个标准正态分布数据作为action
      actions = torch.distributions.Normal(0, 1).sample([50, 25])
      
      # 乘以标准差，加上均值
      # [50, 25] * [25] -> [50, 25]
      actions *= var**0.5
      # [50, 25] * [25] -> [50, 25]
      actions += mean
      
      # 计算每条动作序列的累计奖励
      # [50, 1]
      reward_sum = torch.zeros(50, 1)
      
      # 遍历25个动作
      for i in range(25):
        # [50, 25] -> [50, 1]
        action = actions[:, i].unsqueeze(dim=1)
        
        # 没有真正玩游戏，预测reward和next_state
        # [50, 3], [50, 1] -> [50, 1], [50, 3]
        reward, state = self._fake_step(state, action)
        
        # [50, 1] + [50, 1] -> [50, 1]
        reward_sum += reward
        
      # 按照reward_sum从小到大排列
      # [50]
      select = torch.sort(reward_sum.squeeze(dim=1)).indices
      # [50, 25]
      actions = actions[select]
      del select
      
      # 取反馈最好的10个动作链
      # [10, 25]
      actions = actions[-10:]
      
      # 下一次随机时，希望贴近这些动作分布
      # [25]
      new_mean = actions.mean(dim=0)
      new_var = actions.var(dim=0)
      
      # 增量更新
      # [25] + [25] -> [25]
      mean = 0.1 * mean + 0.9 * new_mean
      # [25] + [25] -> [25]
      var = 0.1 * var + 0.9 * new_var
      
    return mean

  def mpc(self):
    # 初始动作的分布均值为0
    mean = torch.zeros(25)
    
    reward_sum = 0
    state = env.reset()
    over = False
    while not over:
      #在当前状态下,找25个最优动作的均值
      #[1, 3],[25],[25] -> [25]
      actions = self._cem_optimize(state, mean)
      
      # 执行第一个动作
      action = actions[0].item()
      
      # 执行动作
      next_state, reward, over, _ = env.step([action])
      
      # 增加数据
      pool.add(state, action, reward, next_state, over)
      
      state = next_state
      reward_sum += reward
    
      # 下一个动作的均值，在当前动作均值的基础上寻找
      # [25]
      mean = torch.empty(actions.shape)
      mean[:-1] = actions[1:]
      mean[-1] = 0
      
    return reward_sum
  
  
mpc = MPC()

a, b, = mpc._fake_step(torch.randn(200, 3), torch.randn(200, 1))

print(a.shape, b.shape)
print(mpc._cem_optimize(torch.randn(1, 3), torch.zeros(25)).shape)
    

torch.Size([200, 1]) torch.Size([200, 3])
torch.Size([25])


In [16]:
for i in range(10):
  input, label = pool.get_sample()
  model.train(input, label)
  reward_sum = mpc.mpc()
  print(i, len(pool), reward_sum)

0 400 -779.8466348604484
1 600 -1653.9163814208307
2 800 -525.2314782461391
3 1000 -501.9996663859694
4 1200 -518.6831688805908
5 1400 -121.71027123953911
6 1600 -129.56305028379327
7 1800 -538.8285927252066
8 2000 -123.9477862490982
9 2200 -122.84676558886278
