In [2]:
import gymnasium as gym
import stable_baselines3
from stable_baselines3 import DQN
from math import ceil
import numpy as np
import sklearn
from sklearn import metrics
from sklearn.metrics import confusion_matrix,classification_report,accuracy_score,ConfusionMatrixDisplay
import matplotlib.pyplot as plt
import pickle 
from copy import deepcopy
from datetime import datetime

In [3]:
class StoreAndTerminateWrapper(gym.Wrapper):
  ''''
  :param env: (gym.Env) Gym environment that will be wrapped
  :param max_steps: (int) Max number of steps per episode
  '''
  def __init__(self, env):
    # Call the parent constructor, so we can access self.env later
    super(StoreAndTerminateWrapper, self).__init__(env)
    self.max_steps = 200
    # Counter of steps per episode
    self.current_step = 0
    self.mem = []
    self.TotalReward = 0.0 
    self.env = env
    self.first_state = 0
    self.first_obs = 0
    self.prev_obs = 0 
    self.states_list = []
    self.info = {}
  
  def reset(self):
    """
    Reset the environment 
    """
    # Reset the counter
    self.current_step = 0
    obs =self.env.reset()
    self.TotalReward = 0.0
    self.first_obs = obs
    return obs

  def step(self, action):
    """
    In this function we store the initial state as well as the memory of the agent
    :param action: ([float] or int) Action taken by the agent
    :return: (np.ndarray, float, bool, dict) observation, reward, is the episode over?, additional informations
    """
    if self.current_step == 0: #store initial state
      self.prev_obs = self.first_obs
      self.first_state = deepcopy(self.env)
      self.states_list.append(self.first_state)
    # print("t",self.env.state[0],"reward",self.TotalReward)
    # if self.env.state[0]==-1.2:
    #   print("-1.2")
    #   obs = self.reset()
    #   reward = -200
    #   done = True
    #   return obs, reward, done, False
    self.current_step += 1
    obs, reward, terminated, truncated, info = self.env.step(action)
    done = terminated or truncated
    self.TotalReward += reward
    self.mem.append(tuple((self.prev_obs,action)))
    self.prev_obs = obs
    if self.current_step >= self.max_steps:
      done = True
      # Update the info dict to signal that the limit was exceeded
    if obs[0]<=-1.2:
      done = True
      reward = -201 - self.TotalReward
      self.TotalReward =-200
      # print("fff",reward)
    if done:
      self.mem.append(tuple(('done',self.TotalReward)))
    self.info['mem'] = self.mem
    self.info['state'] = self.states_list
    # self.mem.append(tuple(obs,action))
    return obs, reward, done, info

  def set_state(self, state):
    """
    :param state: initial state of the episode
    :return: environment is updated and observations is returned
    """
    self.env = deepcopy(state)
    obs = np.array(list(self.env.unwrapped.state))
    self.current_step = 0
    self.TotalReward = 0.0
    self.first_obs = obs
    return obs

def abstract_state_general(model,state1,d):
  if type(state1) == str:
    if state1 == 'done':
      return 'end'
  q_values = model.step_model.step([state1])
  return tuple([ceil(q_value/d) for q_value in q_values[1][0]])

def Abstract_classes(ep,abstraction_d,model):
  d=abstraction_d
  abs_states1=[]
  for episode in ep:
    for state,action in episode:
      abs_st = abstract_state_general(model,state,d)
      if abs_st == 'end':
        continue
      abs_states1.append(abs_st)
  unique1=list(set(abs_states1))
  uni1 = np.array(unique1)
  a=len(abs_states1)
  b=len(set(abs_states1))
  print("abstract states:",b)
  print("Concrete states",a)
  print("ratio",b/a)
  return unique1,uni1

def ML_first_representation_func_based(Abs_d,functional_func,reward_func,model,input_episodes,unique1):
  """
  TO-DO : fix epsilon and threshold
  """
  d = Abs_d
  data1_x_b=[]
  data1_y_b= [] 
  data1_y_f_b = []
  for i, episode in enumerate(input_episodes):
    record = np.zeros(len(unique1))
    temp_flag = False
    for state, action in episode:
      ab = abstract_state_general(model,state,d)
      if ab == 'end':
        assert not temp_flag, f'Episode data problem, two terminations in one episode. Episode number{i}'
        temp_flag = True
        # print(action)
        # print(functional_func(episode))
        if functional_func(episode):
          data1_y_f_b.append(1)
        else:
          data1_y_f_b.append(0)
        if reward_func(episode):
          data1_y_b.append(1)
        else:
          data1_y_b.append(0)
        # print("end\n\n\n")
        # print(len(data1_y_b),"len(input_episodes)",len(input_episodes))
        continue
        # print(state[0])
      ind = unique1.index(ab)
      record[ind] = 1
      # print(state, action)
      assert len(data1_y_b)<len(input_episodes), "assert"
      # if you want the frequency go with the next line 
      # record[ind] += 1
    data1_x_b.append(record)

  return data1_x_b, data1_y_b, data1_y_f_b

def report(model2,x_train, y_train,x_test, y_test):
  print("********************** reporting the result of the model **************************")
  print('The score for train data is {0}'.format(model2.score(x_train,y_train)))
  print('The score for test data is {0}'.format(model2.score(x_test,y_test)))


  predictions_train = model2.predict(x_train)
  predictions_test = model2.predict(x_test)

  print("\n\n--------------------------------------recall---------------------------------")

  print('the test recall for the class yes is {0}'.format(metrics.recall_score(y_test,predictions_test, pos_label=1)))
  print('the test recall for the class no is {0}'.format(metrics.recall_score(y_test,predictions_test, pos_label=0)))

  print('the training recall for the class yes is {0}'.format(metrics.recall_score(y_train,predictions_train, pos_label=1)))
  print('the training recall for the class no is {0}'.format(metrics.recall_score(y_train,predictions_train, pos_label=0)))


  print("\n\n--------------------------------------precision------------------------------")


  print('the test precision for the class yes is {0}'.format(metrics.precision_score(y_test,predictions_test, pos_label=1)))
  print('the test precision for the class no is {0}'.format(metrics.precision_score(y_test,predictions_test, pos_label=0)))

  print('the training precision for the class yes is {0}'.format(metrics.precision_score(y_train,predictions_train, pos_label=1)))
  print('the training precision for the class no is {0}'.format(metrics.precision_score(y_train,predictions_train, pos_label=0)))

  print("\n\n")
  print(classification_report(y_test, predictions_test, target_names=['NO ','yes']))

  tn, fp, fn, tp = confusion_matrix(y_test, predictions_test).ravel()
  specificity = tn / (tn+fp)
  print("\n\nspecifity :",specificity)
  print("\n\n--------------------------------------confusion----------------------------")
  CM = metrics.confusion_matrix(y_test, predictions_test)
  print("The confusion Matrix:")
  print(CM)
  print('the accuracy score in {0}\n\n'.format(accuracy_score(y_test, predictions_test)))
  print("********************** plotting the confusion matrix & ROC curve **************************")
  ConfusionMatrixDisplay(model2, x_test, y_test)
  metrics.plot_roc_curve(model2, x_test, y_test) 
  plt.show()

# write function for load


def random_test_2(model, env, Num):
    # Gymnasium v0.29+的reset()返回(obs, info)元组
    obs, info = env.reset()  # 解包获得观测值
    counter = 1
    episode_reward = 0.0
    end = Num  # 初始化end防止未完成任何episode的情况
    
    for i in range(Num):
        # 只传递obs部分给predict
        action, _ = model.predict(obs, deterministic=True)
        # Gymnasium v0.29+的step()返回5个值（包括truncated）
        obs, reward, done, info = env.step(action)  
        episode_reward += reward
        if done:  # 检查episode结束
            counter += 1
            end = i
            episode_reward = 0.0
            obs, info = env.reset()  # 重置时再次解包
    
    # 处理记忆数据
    last_index = -1  # 默认值
    
    # 逆向查找最后一个'done'标记
    for u in range(1, len(env.info['mem'])+1):
        current_item = env.info['mem'][-u]
        # 处理可能出现的数组情况
        if isinstance(current_item[0], (np.ndarray, list)):
            # 如果是数组，检查是否包含'done'
            if 'done' in current_item[0]:
                last_index = -u
                break
        elif current_item[0] == 'done':
            last_index = -u
            break
    
    # 计算切片范围
    remaining_steps = Num - end if end != Num else 0
    randomtest = env.info['mem'][last_index:-remaining_steps] if remaining_steps > 0 else env.info['mem'][last_index:]
    
    # 处理状态数据
    ran_state = []
    if counter > 1:
        try:
            ran_state = env.info['state'][-counter+1:-1]
        except (IndexError, KeyError):
            ran_state = []
    
    return randomtest, ran_state

def fix_testing(testing_episodes,testing_states,Env2):
  buffer =[] 
  episodes_set = []
  j=0
  for i in range(len(testing_episodes)):
    if testing_episodes[i][0] == 'done':
      if i == 0:
        continue
      buffer.append(testing_episodes[i])
      episodes_set.append(buffer)
      buffer=[]
    else:
      buffer.append(testing_episodes[i])
      # np.array(mtc_wrapped.set_state(qq[0]),dtype="float32")
  if not (episodes_set[0][0][0]==np.array(Env2.set_state(testing_states[0]),dtype="float32")).all():
    del testing_states[0]
  if not (episodes_set[0][0][0]==np.array(Env2.set_state(testing_states[0]),dtype="float32")).all():
    assert False, 'problem in starting states'
  if len(episodes_set)!=len(testing_states):
    del testing_states[-1]
  if len(episodes_set)!=len(testing_states):
    assert False, 'problem in data prepration'
  return episodes_set , testing_states

def is_functional_fault(episode):
  epsilon = 0.1
  env = mtc_wrapped
  reward = episode[-1][1]
  last_state = episode[-2][0][0]
  if last_state<(env.low[0]+epsilon) and reward == -200:
    return True
  else:
    return False


def is_reward_fault(episode):
  RF_threshold = -180
  reward = episode[-1][1]
  # print(len(episode))
  if reward<RF_threshold and len(episode)>200:
    return True
  else:
    return False

def is_functional_fault_last_state(last_step,done_step):
  epsilon = 0.1
  env = mtc_wrapped
  assert done_step[0]=='done', "Wrong input!"
  reward = done_step[1]
  last_state = last_step[0][0]
  if last_state<(env.low[0]+epsilon) and reward == -200:
    return True
  else:
    return False


def is_reward_fault_last_state(last_step,done_step):
  RF_threshold = -180
  assert done_step[0]=='done', "Wrong input!"
  reward = done_step[1]
  last_state = last_step[0][0]
  # print(len(episode))
  if reward<RF_threshold and not is_functional_fault_last_state(last_step,done_step):
    return True
  else:
    return False

def load_p(name):
  with open(f'/content/drive/MyDrive/MC/{name}.pickle', 'rb') as file2:
    to_what = pickle.load(file2)
  return to_what
def local_load_p(name):
  with open(f'c:/Users/Student/Desktop/Data/{name}', 'rb') as file2:
    to_what = pickle.load(file2)
  return to_what

In [4]:
def translator(episode,model, d, unique5):
  """
  thid function takes the concrete episodes and returns the encoded episodes 
  based on the presence and absence of the individuals  
  :param 'episode': input episode
  :param 'model': RL model
  :param 'd': abstraction level = 1
  :param 'unique5': abstract classes 
  :return: encoded episodse based on the presence and absence

  """
  d=d
  record = np.zeros(len(unique5))
  for state, action in episode:
    ab = abstract_state_general(model,state,d)
    if ab == 'end':
      continue
    if ab in unique5:
      ind = unique5.index(ab)
      record[ind] = 1
  return [record]

def episode_player(episodes,d, abs_classes, model, monitor) -> list:
  ''' This function replays the episodes and returns the risk of each step in each episode
  :param 'episodes': input episodes
  :param 'd': abstraction level 
  :param 'abs_classes': abstract classes
  :param 'model': RL model
  :param 'monitor': ML model
  :return: risk of each step in each episode
  
  '''
  episodes_risk=[]
  for episode in episodes:
    risk_array=[]
    for step in range(len(episode)-1):
      monitoring_data = translator(episode[:step],model,d,abs_classes)
      Risk = monitor.predict_proba(monitoring_data)
      risk_array.append(Risk[0][1])
    episodes_risk.append(risk_array)
  return episodes_risk

def single_episode_player(episode,d, abs_classes, model, monitor) -> list:
  ''' This function replays one episodes and returns the risk of each step in episode
  :param 'episode': input episode
  :param 'd': abstraction level
  :param 'abs_classes': abstract classes
  :param 'model': RL model
  :param 'monitor': ML model
  :return: risk of each step in episode
  '''
  risk_array=[]
  for step in range(len(episode)-1):
    monitoring_data = translator(episode[:step],model,d,abs_classes)
    Risk = monitor.predict_proba(monitoring_data)
    risk_array.append(Risk[0][1])
  return risk_array

def line_plot(data):
    plt.figure(figsize=(20, 6))
    for i in range(len(data)): 
        plt.plot( [i for i in range(len(data[i]))], data[i], label = f"Episode {i}")
    # plt.plot(y, x, label = "line 2")
        
    plt.legend()
    plt.show()

def plot_positions(episodes):
    plt.figure(figsize=(20, 6))
    
    for i in range(len(episodes)):
        position =[]
        position_arr =[]
        for j in range(len(episodes[i])-1):
            position.append(episodes[i][j][0][0])
        position_arr.append(position)
        plt.plot([i for i in range(len(position))], position, label = f"Episode {i}")
        
    plt.legend()
    plt.show()

def plot_velocity(episodes):
    plt.figure(figsize=(20, 6))
    
    for i in range(len(episodes)):
        velocity =[]
        velocity_arr =[]
        for j in range(len(episodes[i])-1):
            velocity.append(episodes[i][j][0][1])
        velocity_arr.append(velocity)
        plt.plot([i for i in range(len(velocity))], velocity, label = f"Episode {i}")
        
    plt.legend()
    plt.show()

def position_extractor(episode):
    position =[]
    for i in range(len(episode)-1):
        position.append(episode[i][0][0])
    return position
    
def velocity_extractor(episode):
    velocity =[]
    for i in range(len(episode)-1):
        velocity.append(episode[i][0][1])
    return velocity


def Plot_all(data, params,save=False,show=True,data_chunk=0,path='Plots/v2'):
    '''plot risk and position snd velocity in one figure with 3 subplots
    '''
    d,unique1,model,RF_FF_1rep = params
    fig, axs = plt.subplots(3,figsize=(20, 18))
    for i in range(len(data)):
        axs[0].plot([i for i in range(len(data[i])-1)], single_episode_player(data[i],d,unique1,model,RF_FF_1rep), label = f"Episode {i}")
        axs[1].plot([i for i in range(len(data[i])-1)], position_extractor(data[i]), label = f"Episode {i}")
        axs[2].plot([i for i in range(len(data[i])-1)], velocity_extractor(data[i]), label = f"Episode {i}")
    axs[0].legend()
    axs[0].set_title('Risk')
    axs[1].set_title('Position')
    axs[2].set_title('Velocity')
    axs[1].legend()
    axs[2].legend()
    axs[0].set_ylim(-0.1,1.1)
    current_time = datetime.now()
    ID = current_time.strftime("%Y%m%d%H%M%S")
    if save:
        fig.savefig(f'{path}/RPV_C{data_chunk}_{ID}.png')
    plt.close()

In [5]:
Dataset_path = "D:\\code\\RLtest\\1.zip"
mtc = gym.make('MountainCar-v0')
mtc_wrapped = StoreAndTerminateWrapper(mtc)
model = DQN('MlpPolicy',env=mtc_wrapped, verbose=1)
model = model.load(Dataset_path)

Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


FileNotFoundError: [Errno 2] No such file or directory: 'D:\\code\\RLtest\\1.zip.zip'

In [6]:
RT,RTS = random_test_2(model,mtc_wrapped,300_000)
FRT,FRTS = fix_testing(RT,RTS,mtc_wrapped)
#save FRT and FRTS as pickle
training_size = 2200
assert len(FRT) >= training_size
Dataset_path2 = "D:\code\RLtest\\"
with open(f'{Dataset_path2}/Random_episodes/FRT_training.pickle', 'wb') as file2:
    pickle.dump(FRT[:training_size], file2)
with open(f'{Dataset_path2}/Random_episodes/FRTS_training.pickle', 'wb') as file2:
    pickle.dump(FRTS[:training_size], file2)

RF=0
FF=0
Buff_reward = 0
Buff_len = 0
for test_episode in FRT:
    Buff_reward += test_episode[-1][1]
    Buff_len += (len(test_episode)-1)
    if is_functional_fault(test_episode):
        FF+=1
    if is_reward_fault(test_episode):
        RF+=1

KeyboardInterrupt: 