# 导入模块

In [None]:
import gym
import random
from tqdm import tqdm
import numpy as np
import matplotlib.pyplot as plt

# Q-learning

## Q-learning 算法

In [None]:
class agent_q_learning():
  def __init__(self, env, q_param):
    self.env = env
    if q_param is None:
      self.alpha = 0.5 
      self.gamma = 0.9
    else:
      self.alpha = q_param['alpha']
      self.gamma =  q_param['gamma']
    self.epsilon = 0.1

    self.q={}
    self.__init_q_table()
    pass

  def output_action(self, state, is_greedy = True):
    if random.uniform(0,1) < self.epsilon and is_greedy:
      return self.env.action_space.sample()
    else:
      return max(list(range(self.env.action_space.n)) ,
                 key=lambda x: self.q[(state,x)])
    pass

  def __init_q_table(self):
    for s in range(self.env.observation_space.n):
      for a in range(self.env.action_space.n):
        self.q[(s,a)]=0.0

  def update_q_table(self, state, action, next_state, next_maxaction, reward):
    self.q[(state,action)] += self.alpha * \
                            (reward + \
                             self.gamma * self.q[(next_state,next_maxaction)] - \
                             self.q[(state,action)] \
                             )


## Q-learning 执行和评估类

In [None]:
class evaluate_alg_q():
  def __init__(self, env, cnt=50, episodes=2000, q_param=None):
    #cnt 用作多次求平均值
    self.env = env
    self.cnt = cnt
    self.episodes = episodes
    self.q_param = q_param

    self.reward_array = np.zeros(episodes)
    
    pass

  def start_execute(self):
    for _ in tqdm(range(self.cnt)):
      self.agent = agent_q_learning(self.env, self.q_param)
      for i in range(self.episodes):
        r = 0
        state = self.env.reset()
        while True:
          action = self.agent.output_action(state)

          next_state, reward, done , _ = self.env.step(action)

          next_maxaction = self.agent.output_action(state, is_greedy=False)

          self.agent.update_q_table(state, action, next_state, next_maxaction, reward)

          state = next_state
          r += reward

          if done:
              break
        self.reward_array[i] += r
      self.reward_array[i] /= self.cnt

  def evaluate_plot_reward(self):
    plt.plot(self.reward_array,'-')
  
  def evaluate_plot_reward_dev(self, dev_num=10,style='-'):
    size_row = int(len(self.reward_array)/dev_num)
    reward_array_dev = self.reward_array.reshape((size_row, dev_num)).mean(axis = 1)
    envaluate_plt = plt.plot(reward_array_dev, style)
    return envaluate_plt

## Q-learning 创建实例和运行

In [None]:
env=gym.make('Taxi-v3')
algorithm_q = evaluate_alg_q(env, cnt=5, episodes=2000)
algorithm_q.start_execute()
# algorithm_q.evaluate_plot_reward()
env.close()

In [None]:
algorithm_q.evaluate_plot_reward_dev(dev_num=20)

# Sarsa

## Sarsa 算法

In [None]:
class agent_sarsa():
  def __init__(self, env, sarsa_param):
    self.env = env
    if sarsa_param is None:
      self.alpha = 0.5 
      self.gamma = 0.9
    else:
      self.alpha = sarsa_param['alpha']
      self.gamma =  sarsa_param['gamma']

    self.epsilon = 0.1
    self.q = {}
    self.__q_table_init()
    pass

  def output_action(self, state):
    random_value = np.random.random()
    if random_value < self.epsilon:
      return env.action_space.sample()
    else:
      action_list = list(range(self.env.action_space.n))
      return max(action_list, key = lambda x: self.q[(state, x)])
    pass

  def update_q_table(self,state, action, next_state, next_action, reward):
    self.q[(state,action)] += self.alpha*(reward+self.gamma*self.q[(next_state,next_action)]-self.q[(state,action)])

  def __q_table_init(self):
    for s in range(self.env.observation_space.n):
      for a in range(self.env.action_space.n):
        self.q[(s,a)] = 0.0


## Sarsa 执行和评估类

In [None]:
class evaluate_alg_sarsa():
  def __init__(self, env, cnt=50, episodes=2000, sarsa_param=None):
    #cnt 用作多次求平均值
    self.env = env
    self.cnt = cnt
    self.episodes = episodes
    self.sarsa_param = sarsa_param

    self.reward_array = np.zeros(episodes)
    pass

  def start_execute(self):
    for _ in tqdm(range(self.cnt)):
      self.agent = agent_sarsa(self.env, self.sarsa_param)
      for i in range(self.episodes):
        r = 0
        state = self.env.reset()
        action = self.agent.output_action(state)
        while True:
          next_state, reward, done , _ = self.env.step(action)
          next_action = self.agent.output_action(state)

          self.agent.update_q_table(state, action, next_state, next_action, reward)

          state = next_state
          action = next_action
          r += reward
          if done:
            break
        self.reward_array[i] += r
      self.reward_array[i] /= self.cnt

  def evaluate_plot_reward(self):
    plt.plot(self.reward_array)
  
  def evaluate_plot_reward_dev(self, dev_num=10, style='-.'):
    size_row = int(len(self.reward_array)/dev_num)
    reward_array_dev = self.reward_array.reshape((size_row, dev_num)).mean(axis = 1)
    envaluate_plt = plt.plot(reward_array_dev,style)
    return envaluate_plt

## Sarsa创建实例和运行

In [None]:
env=gym.make('Taxi-v3')
algorithm_sarsa = evaluate_alg_sarsa(env, cnt=5, episodes=2000)
algorithm_sarsa.start_execute()
# algorithm_sarsa.evaluate_plot_reward()
env.close()

In [None]:
algorithm_sarsa.evaluate_plot_reward_dev(20)

# 期望Sarsa

## 期望Sarsa算法

In [None]:
class agent_sarsa_exp():
  def __init__(self, env, sarsa_param):
    self.env = env
    if sarsa_param is None:
      self.alpha = 0.5 
      self.gamma = 0.9
    else:
      self.alpha = sarsa_param['alpha']
      self.gamma =  sarsa_param['gamma']

    self.epsilon = 0.1
    self.q = {}
    self.__q_table_init()
    pass

  def output_action(self, state, isgreedy=True):
    random_value = np.random.random()
    if random_value < self.epsilon and isgreedy==True:
      return env.action_space.sample()
    else:
      action_list = list(range(self.env.action_space.n))
      return max(action_list, key = lambda x: self.q[(state, x)])
    pass

  def update_q_table(self,state, action, next_state, next_maxaction, reward):
    target_value = []
    for each_action in range(self.env.action_space.n):
      target_value.append(1/self.env.action_space.n * self.epsilon \
                          * self.q[(next_state, each_action)]) 
      if each_action == next_maxaction:
        target_value.append((1 - self.epsilon) * self.q[(next_state, each_action)])

    self.q[(state,action)] += self.alpha*(reward+self.gamma*(np.array(target_value)).sum()-self.q[(state,action)])

  def __q_table_init(self):
    for s in range(self.env.observation_space.n):
      for a in range(self.env.action_space.n):
        self.q[(s,a)] = 0.0


## 期望Sarsa执行和评估类

In [None]:
class evaluate_alg_sarsa_exp():
  def __init__(self, env, cnt=50, episodes=2000, sarsa_exp_param=None):
    #cnt 用作多次求平均值
    self.env = env
    self.cnt = cnt
    self.episodes = episodes
    self.sarsa_exp_param = sarsa_exp_param

    self.reward_array = np.zeros(episodes)
    pass

  def start_execute(self):
    for _ in tqdm(range(self.cnt)):
      self.agent = agent_sarsa_exp(self.env, self.sarsa_exp_param)
      for i in range(self.episodes):
        r = 0
        state = self.env.reset()
        action = self.agent.output_action(state)
        while True:
          next_state, reward, done , _ = self.env.step(action)
          next_action = self.agent.output_action(state)
          next_maxaction = self.agent.output_action(state, isgreedy=False)

          self.agent.update_q_table(state, action, next_state, next_maxaction, reward)

          state = next_state
          action = next_action
          r += reward
          if done:
            break
        self.reward_array[i] += r
      self.reward_array[i] /= self.cnt

  def evaluate_plot_reward(self):
    plt.plot(self.reward_array,'--')
  
  def evaluate_plot_reward_dev(self, dev_num=10, style='--'):
    size_row = int(len(self.reward_array)/dev_num)
    reward_array_dev = self.reward_array.reshape((size_row, dev_num)).mean(axis = 1)
    envaluate_plt = plt.plot(reward_array_dev, style)
    return envaluate_plt

## 期望Sarsa创建实例和运行

In [None]:
env=gym.make('Taxi-v3')
algorithm_sarsa_exp = evaluate_alg_sarsa_exp(env, cnt=5, episodes=2000)
algorithm_sarsa_exp.start_execute()
# algorithm_sarsa.evaluate_plot_reward()
env.close()
algorithm_sarsa_exp.evaluate_plot_reward_dev(20)

# 双Q学习

## 双Q学习算法

In [None]:
class agent_double_q_learning():
  def __init__(self, env, q_param):
    self.env = env
    if q_param is None:
      self.alpha = 0.5 
      self.gamma = 0.9
    else:
      self.alpha = q_param['alpha']
      self.gamma =  q_param['gamma']
    self.epsilon = 0.1

    self.q={}
    self.__init_q_table()
    pass

  def output_action(self, state, q1, q2=None, is_greedy = True):
    if random.uniform(0,1) < self.epsilon and is_greedy:
      return self.env.action_space.sample()
    elif q2 is not None:
      return max(list(range(self.env.action_space.n)) ,
                 key=lambda x: q1[(state,x)]+q2[(state,x)] )
    else:
      return max(list(range(self.env.action_space.n)) ,
                 key=lambda x: q1[(state,x)] )
    pass

  def __init_q_table(self):
    for s in range(self.env.observation_space.n):
      for a in range(self.env.action_space.n):
        self.q[(s,a)] = 0.0

  def update_q_table(self, state, action, next_state, next_maxaction, reward, q):
    self.q[(state,action)] += self.alpha * \
                            (reward + \
                             self.gamma * q[(next_state,next_maxaction)] - \
                             self.q[(state,action)] \
                             )


## 双Q学习执行和评估类

In [None]:
class evaluate_alg_double_q():
  def __init__(self, env, cnt=50, episodes=2000, q_param=None):
    #cnt 用作多次求平均值
    self.env = env
    self.cnt = cnt
    self.episodes = episodes
    self.q_param = q_param
    self.reward_array = np.zeros(episodes)
    pass

  def start_execute(self):
    for _ in tqdm(range(self.cnt)):
      self.agent_q1 = agent_double_q_learning(self.env, self.q_param)
      self.agent_q2 = agent_double_q_learning(self.env, self.q_param)
      for i in range(self.episodes):
        r = 0
        state = self.env.reset()
        while True:
          action = self.agent_q1.output_action(state, self.agent_q1.q,\
                                            self.agent_q2.q)
          next_state, reward, done , _ = self.env.step(action)
          if np.random.random()< 0.5:
            next_maxaction = self.agent_q1.output_action(state,\
                                                         self.agent_q1.q,\
                                                         is_greedy=False)
            self.agent_q1.update_q_table(state, action, next_state,\
                                         next_maxaction, reward,\
                                         self.agent_q2.q)
          else:
            next_maxaction = self.agent_q2.output_action(state,\
                                              self.agent_q2.q,\
                                              is_greedy=False)
            self.agent_q2.update_q_table(state, action, next_state,\
                                         next_maxaction, reward,\
                                         self.agent_q1.q)
          state = next_state
          r += reward
          if done:
              break
        self.reward_array[i] += r
      self.reward_array[i] /= self.cnt

  def evaluate_plot_reward(self):
    plt.plot(self.reward_array,':')
  
  def evaluate_plot_reward_dev(self, dev_num=10,style=':'):
    size_row = int(len(self.reward_array)/dev_num)
    reward_array_dev = self.reward_array.reshape((size_row, dev_num)).mean(axis = 1)
    envaluate_plt = plt.plot(reward_array_dev, style)
    return envaluate_plt

## 创建实例和运行

In [None]:
env=gym.make('Taxi-v3')
algorithm_double_q = evaluate_alg_double_q(env, cnt=5, episodes=2000)
algorithm_double_q.start_execute()
# algorithm_double_q.evaluate_plot_reward()
env.close()

In [None]:
algorithm_double_q.evaluate_plot_reward_dev(dev_num=20)

# 算法联合对比测试

In [None]:
algorithm_sarsa_exp.evaluate_plot_reward()
algorithm_sarsa.evaluate_plot_reward()
algorithm_q.evaluate_plot_reward()
algorithm_double_q.evaluate_plot_reward()
plt.legend(['exp','sarsa','q', 'double_q'])
plt.xlabel('episode')
plt.ylabel('average reward')

In [None]:
algorithm_q.evaluate_plot_reward_dev(dev_num=20)
algorithm_sarsa.evaluate_plot_reward_dev(20)
algorithm_double_q.evaluate_plot_reward_dev(dev_num=20)
algorithm_sarsa_exp.evaluate_plot_reward_dev(20)
plt.legend(['q learning', 'sarsa','double q learning','expect sarsa'])
plt.xlabel('episode')
plt.ylabel('average reward')

# 工具和测试代码

## 效果对比

In [None]:
algorithm_q.evaluate_plot_reward_dev(20)
algorithm_sarsa.evaluate_plot_reward_dev(20)
plt.legend(['q-learning', 'sarsa'])

In [None]:
for i in range(5):
  print(algorithm_q.reward_array[i*400:(i+1)*400].mean())

In [None]:
for i in range(5):
  print(algorithm_sarsa.reward_array[i*400:(i+1)*400].mean())

In [None]:
for i in range(5):
  print(algorithm_double_q.reward_array[i*400:(i+1)*400].mean())

In [None]:
for i in range(5):
  print(algorithm_sarsa_exp.reward_array[i*400:(i+1)*400].mean())

In [None]:
algorithm_sarsa.reward_array[:-500].mean()

## Sarsa粒子群算法，超参数选取

网址：https://zh.wikipedia.org/wiki/%E7%B2%92%E5%AD%90%E7%BE%A4%E4%BC%98%E5%8C%96

In [None]:
class pso_evaluate():
  def __init__(self, pso_cnt, episode):
    #第0个存储全局信息
    self.pso_cnt = pso_cnt+1
    self.episode = episode
    self.pso_param_head = ['alpha', 'gamma']
    self.pso_param_limit = [(0.2, 1), (0.6, 1.2)]

    self.pso_agent = {}
    self.__pso_init_param()

  def pso_execute(self):
    for _ in range(self.episode):
      self.__pso_fitness_call1()
      self.__pso_fitness_call2()
      self.__pso_update_param()

  def pso_print_ans(self):
    print('ans is :',self.pso_agent[(0,'optim_param')])
    print('check it!')
    print('all msg:',self.pso_agent)

  def __pso_update_param(self):
    for i in range(self.pso_cnt-1):
      pso_v = np.array(list( self.pso_agent[(i+1, 'pso_v')].values() ))
      pso_optim_param = np.array(list( self.pso_agent[i+1, 'optim_param'].values() ))
      pso_now_param = np.array(list( self.pso_agent[i+1, 'now_param'].values() ))
      pso_best_param = np.array(list( self.pso_agent[0, 'optim_param'].values() ))

      pso_v = 0.5*pso_v + \
              2*np.random.random()*(pso_best_param - pso_now_param) + \
              2*np.random.random()*(pso_optim_param - pso_now_param)
      pso_now_param += pso_v

      self.pso_agent[(i+1, 'pso_v')] = { k:v for k,v in zip(self.pso_param_head, pso_v.tolist()) }
      self.pso_agent[(i+1,'now_param')] = { k:v for k,v in zip(self.pso_param_head, pso_now_param.tolist()) }
      self.__pso_modify_param(i+1)

  def __pso_modify_param(self, pso_agent_index):
    for i, title in enumerate(self.pso_param_head):
      if self.pso_agent[(pso_agent_index,'now_param')][title] < self.pso_param_limit[i][0]:
        self.pso_agent[(pso_agent_index,'now_param')][title] = self.pso_param_limit[i][0]
      if self.pso_agent[(pso_agent_index,'now_param')][title] > self.pso_param_limit[i][1]:
        self.pso_agent[(pso_agent_index,'now_param')][title] = self.pso_param_limit[i][1]

  def __pso_fitness_call1(self):
    fit_value_array = []
    for i in range(self.pso_cnt-1):
      fit_value = self.__fitness_evaluate(i+1)
      fit_value_array.append(fit_value)
      self.pso_agent[(i+1,'fit_value')]['fit_now'] = fit_value
      if fit_value > self.pso_agent[(i+1,'fit_value')]['fit_best']:
        self.pso_agent[(i+1,'fit_value')]['fit_best'] = fit_value
        self.pso_agent[(i+1, "optim_param")] = self.pso_agent[(i+1, 'now_param')]
    print(fit_value_array)
        
  def __fitness_evaluate(self,pso_agent_index):
    env=gym.make('Taxi-v3')
    sarsa_param = self.pso_agent[(pso_agent_index, 'now_param')]
    algorithm_sarsa = evaluate_alg_sarsa(env, cnt=1, episodes=2000, sarsa_param=sarsa_param)
    algorithm_sarsa.start_execute()
    env.close()
    fit_value = algorithm_sarsa.reward_array[:-500].mean()
    return fit_value
  
  def __pso_fitness_call2(self):
    for i in range(self.pso_cnt-1):
      if self.pso_agent[(i+1,'fit_value')]['fit_best'] > self.pso_agent[(0,'fit_value')]['fit_best']:
        self.pso_agent[(0,'fit_value')]['fit_best'] = self.pso_agent[(i+1,'fit_value')]['fit_best']
        self.pso_agent[(0, "optim_param")] = self.pso_agent[(i+1, 'now_param')]

  def __pso_init_param(self):
    for i in range(self.pso_cnt):
      self.pso_agent[(i,'optim_param')] = { k:v for k,v in zip(self.pso_param_head, np.zeros(len(self.pso_param_head)).tolist()) }
      self.pso_agent[(i,'now_param')] = { k:np.random.uniform(v[0],v[1]) \
                                         for k,v in zip(self.pso_param_head, self.pso_param_limit) }
      self.pso_agent[(i,'pso_v')] = { k:np.random.random()*(v[1]-v[0])*0.5
                                         for k,v in zip(self.pso_param_head, self.pso_param_limit) }
      self.pso_agent[(i,'fit_value')] = {'fit_best':float('-inf'), 'fit_now':float('-inf')}


## Sarsa粒子群算法执行

In [None]:
pst_find  = pso_evaluate(pso_cnt=5, episode=20)
pst_find.pso_execute()
pst_find.pso_print_ans()

In [None]:
pst_find.pso_print_ans()

In [None]:
pst_find.pso_print_ans()

## Sarsa超参数效果测试

In [None]:
env=gym.make('Taxi-v3')
sarsa_param = {'alpha': 0.5317706294307902, 'gamma': 0.99545498514414}
algorithm_sarsa1 = evaluate_alg_sarsa(env, cnt=5, episodes=2000, sarsa_param=sarsa_param)
algorithm_sarsa1.start_execute()
env.close()

In [None]:
algorithm_sarsa.evaluate_plot_reward_dev(20)
algorithm_sarsa1.evaluate_plot_reward_dev(20,style='x-.')
plt.xlabel('episode')
plt.ylabel('average reward')
plt.legend(['sarsa', 'pso sarsa'], loc='lower right')

## Q-learning粒子群算法参数选取

In [None]:
class pso_evaluate_q(pso_evaluate):
  def __fitness_evaluate(self,pso_agent_index):
    env=gym.make('Taxi-v3')
    q_param = self.pso_agent[(pso_agent_index, 'now_param')]
    algorithm_q = evaluate_alg_q(env, cnt=1, episodes=2000, q_param=q_param)
    algorithm_q.start_execute()
    env.close()
    fit_value = algorithm_q.reward_array[:-500].mean()
    return fit_value


In [None]:
pst_find = pso_evaluate_q(pso_cnt=5, episode=20)
pst_find.pso_execute()

In [None]:
pst_find.pso_print_ans()

In [None]:
pst_find.pso_print_ans()\

## Q-learning超参数效果测试

In [None]:
env=gym.make('Taxi-v3')
q_param = {'alpha': 0.5374978944586886, 'gamma': 1.0119420190638535}
algorithm_q1 = evaluate_alg_q(env, cnt=5, episodes=2000, q_param=q_param)
algorithm_q1.start_execute()
env.close()

In [None]:
algorithm_q.evaluate_plot_reward_dev(20)
algorithm_q1.evaluate_plot_reward_dev(20,style='x-')
plt.legend(['q learning', 'pso q learning'])
plt.xlabel('episode')
plt.ylabel('average reward')

## 期望Sarsa粒子群算法参数选取

In [None]:
class pso_evaluate_sarsa_exp():
  def __init__(self, pso_cnt, episode):
    #第0个存储全局信息
    self.pso_cnt = pso_cnt+1
    self.episode = episode
    self.pso_param_head = ['alpha', 'gamma']
    self.pso_param_limit = [(0.2, 1), (0.6, 1.2)]

    self.pso_agent = {}
    self.__pso_init_param()

  def pso_execute(self):
    for _ in range(self.episode):
      self.__pso_fitness_call1()
      self.__pso_fitness_call2()
      self.__pso_update_param()

  def pso_print_ans(self):
    print('ans is :',self.pso_agent[(0,'optim_param')])
    print('check it!')
    print('all msg:',self.pso_agent)

  def __pso_update_param(self):
    for i in range(self.pso_cnt-1):
      pso_v = np.array(list( self.pso_agent[(i+1, 'pso_v')].values() ))
      pso_optim_param = np.array(list( self.pso_agent[i+1, 'optim_param'].values() ))
      pso_now_param = np.array(list( self.pso_agent[i+1, 'now_param'].values() ))
      pso_best_param = np.array(list( self.pso_agent[0, 'optim_param'].values() ))

      pso_v = 0.5*pso_v + \
              2*np.random.random()*(pso_best_param - pso_now_param) + \
              2*np.random.random()*(pso_optim_param - pso_now_param)
      pso_now_param += pso_v

      self.pso_agent[(i+1, 'pso_v')] = { k:v for k,v in zip(self.pso_param_head, pso_v.tolist()) }
      self.pso_agent[(i+1,'now_param')] = { k:v for k,v in zip(self.pso_param_head, pso_now_param.tolist()) }
      self.__pso_modify_param(i+1)

  def __pso_modify_param(self, pso_agent_index):
    for i, title in enumerate(self.pso_param_head):
      if self.pso_agent[(pso_agent_index,'now_param')][title] < self.pso_param_limit[i][0]:
        self.pso_agent[(pso_agent_index,'now_param')][title] = self.pso_param_limit[i][0]
      if self.pso_agent[(pso_agent_index,'now_param')][title] > self.pso_param_limit[i][1]:
        self.pso_agent[(pso_agent_index,'now_param')][title] = self.pso_param_limit[i][1]

  def __pso_fitness_call1(self):
    fit_value_array = []
    for i in range(self.pso_cnt-1):
      fit_value = self.__fitness_evaluate(i+1)
      fit_value_array.append(fit_value)
      self.pso_agent[(i+1,'fit_value')]['fit_now'] = fit_value
      if fit_value > self.pso_agent[(i+1,'fit_value')]['fit_best']:
        self.pso_agent[(i+1,'fit_value')]['fit_best'] = fit_value
        self.pso_agent[(i+1, "optim_param")] = self.pso_agent[(i+1, 'now_param')]
    print(fit_value_array)
        
  def __fitness_evaluate(self,pso_agent_index):
    env=gym.make('Taxi-v3')
    sarsa_exp_param = self.pso_agent[(pso_agent_index, 'now_param')]
    algorithm_sarsa = evaluate_alg_sarsa_exp(env, cnt=1, episodes=2000, sarsa_exp_param=sarsa_exp_param)
    algorithm_sarsa.start_execute()
    env.close()
    fit_value = algorithm_sarsa.reward_array[:-500].mean()
    return fit_value
  
  def __pso_fitness_call2(self):
    for i in range(self.pso_cnt-1):
      if self.pso_agent[(i+1,'fit_value')]['fit_best'] > self.pso_agent[(0,'fit_value')]['fit_best']:
        self.pso_agent[(0,'fit_value')]['fit_best'] = self.pso_agent[(i+1,'fit_value')]['fit_best']
        self.pso_agent[(0, "optim_param")] = self.pso_agent[(i+1, 'now_param')]

  def __pso_init_param(self):
    for i in range(self.pso_cnt):
      self.pso_agent[(i,'optim_param')] = { k:v for k,v in zip(self.pso_param_head, np.zeros(len(self.pso_param_head)).tolist()) }
      self.pso_agent[(i,'now_param')] = { k:np.random.uniform(v[0],v[1]) \
                                         for k,v in zip(self.pso_param_head, self.pso_param_limit) }
      self.pso_agent[(i,'pso_v')] = { k:np.random.random()*(v[1]-v[0])*0.5
                                         for k,v in zip(self.pso_param_head, self.pso_param_limit) }
      self.pso_agent[(i,'fit_value')] = {'fit_best':float('-inf'), 'fit_now':float('-inf')}


## 期望Sarsa粒子群算法执行

In [None]:
pst_find  = pso_evaluate_sarsa_exp(pso_cnt=5, episode=20)
pst_find.pso_execute()
pst_find.pso_print_ans()

## 期望Sarsa粒子群算法测试

In [None]:
env=gym.make('Taxi-v3')
sarsa_exp_param = {'alpha': 0.8793776792728718, 'gamma': 0.9964040263575571}
algorithm_sarsa_exp1 = evaluate_alg_sarsa_exp(env, cnt=5, episodes=2000, sarsa_exp_param=sarsa_exp_param)
algorithm_sarsa_exp1.start_execute()
env.close()

In [None]:
algorithm_sarsa_exp.evaluate_plot_reward_dev(20)
algorithm_sarsa_exp1.evaluate_plot_reward_dev(20, style='x--')
plt.legend(['expect sarsa', 'pso expect sarsa'])

## Double Q-learning粒子群算法参数选取

In [None]:
class pso_evaluate_double_q(pso_evaluate):
  def __fitness_evaluate(self,pso_agent_index):
    env=gym.make('Taxi-v3')
    q_param = self.pso_agent[(pso_agent_index, 'now_param')]
    algorithm_q = evaluate_alg_double_q(env, cnt=1, episodes=2000, q_param=q_param)
    algorithm_q.start_execute()
    env.close()
    fit_value = algorithm_q.reward_array[:-500].mean()
    return fit_value

In [None]:
pst_find = pso_evaluate_double_q(pso_cnt=5, episode=20)
pst_find.pso_execute()

In [None]:
pst_find.pso_print_ans()

In [None]:
env=gym.make('Taxi-v3')
q_param = {'alpha': 0.5265945680168334, 'gamma': 0.9764647562876259}
algorithm_double_q1 = evaluate_alg_double_q(env, cnt=5, episodes=2000, q_param=q_param)
algorithm_double_q1.start_execute()
env.close()

## Double Q-learning粒子群算法效果测试

In [None]:
algorithm_double_q.evaluate_plot_reward_dev(20)
algorithm_double_q1.evaluate_plot_reward_dev(20, style='x:')
plt.legend(['double q learning', 'pso double q learning'])
plt.xlabel('episode')
plt.ylabel('average reward')

# 综合效果

In [None]:

algorithm_q1.evaluate_plot_reward_dev(20)
algorithm_sarsa1.evaluate_plot_reward_dev(20)
algorithm_double_q1.evaluate_plot_reward_dev(20)
algorithm_sarsa_exp1.evaluate_plot_reward_dev(20)


plt.xlabel('episode')
plt.ylabel('average reward')

plt.legend(['pso q learning','pso sarsa','pso double q learning', 'pso expect sarsa'])

In [None]:
for i in range(5):
  print(algorithm_q1.reward_array[i*400:(i+1)*400].mean())
print('')
for i in range(5):
  print(algorithm_sarsa1.reward_array[i*400:(i+1)*400].mean())
print('')
for i in range(5):
  print(algorithm_double_q1.reward_array[i*400:(i+1)*400].mean())
print('')
for i in range(5):
  print(algorithm_sarsa_exp1.reward_array[i*400:(i+1)*400].mean())
print('')