##  Requirements 

In [6]:
import torch
import numpy as np
import gymnasium as gym
import argparse
import math
from random import seed
import random
from datetime import datetime
import pickle
import sklearn
import numpy
from sklearn.model_selection import train_test_split
from sklearn.naive_bayes import GaussianNB, CategoricalNB
from sklearn.neighbors import KNeighborsClassifier
from sklearn import preprocessing
from sklearn.metrics import confusion_matrix
from sklearn.metrics import ConfusionMatrixDisplay
import matplotlib.pyplot as plt
from sklearn import metrics
from sklearn.metrics import accuracy_score
from itertools import product
from sklearn.utils import resample
from sklearn.model_selection import KFold, RepeatedKFold
from sklearn.metrics import f1_score
from sklearn import impute
import statistics
from scipy import stats
from copy import deepcopy
from sklearn.ensemble import ExtraTreesClassifier
from sklearn.metrics import classification_report
from sklearn.ensemble import RandomForestClassifier
from math import ceil
import copy
import sys
import os
from sklearn.metrics import jaccard_score
import time
import multiprocessing
from pymoo.algorithms.moo.nsga2 import calc_crowding_distance
import subprocess
import logging
from csv import reader
import argparse


In [None]:
if any(arg.startswith('--f=') for arg in sys.argv):
    sys.argv = [arg for arg in sys.argv if not arg.startswith('--f=')]

parser = argparse.ArgumentParser()
parser.add_argument("--epsilon", type=int, default=30)
parser.add_argument("--abstract-level", type=float, default=1)
args = parser.parse_args()

print(f'epsilon: {args.epsilon}')
DISPLAY_SCREEN = False
random.seed(42)
np.random.seed(42)
DD = args.abstract_level

epsilon: 30


## RL

In [None]:
class StoreAndTerminateWrapper(gym.Wrapper):
    '''
    :param env: (gym.Env) Gym environment that will be wrapped
    :param max_steps: (int) Max number of steps per episode
    '''
    def __init__(self, env):
        super(StoreAndTerminateWrapper,self).__init__(env)
        self.max_steps = 200
        self.current_step = 0
        self.env=env
        self.mem = []
        self.TotalReward = 0.0
        self.first_state = None
        self.first_obs = 0
        self.prev_obs = None
        self.states_list = []
        self.info = {}

    def reset(self, *args, **kwargs):
        self.current_step = 0
        obs, info = self.env.reset(*args, **kwargs)
        self.TotalReward = 0.0
        self.first_obs = obs
        return obs,info

    def step(self, action):
        if self.current_step == 0:
            self.prev_obs = self.first_obs
            self.first_state = deepcopy(self.env)
            self.states_list.append(self.first_state)
        self.current_step += 1
        obs, reward, terminated, truncated, info = self.env.step(action)
        self.TotalReward += reward
        self.mem.append(tuple((self.prev_obs,action)))
        self.prev_obs = obs
        if self.current_step >= self.max_steps:
          truncated = True
        if obs[0] <= -1.2:
          truncated = True
          reward = -201 - self.TotalReward
          self.TotalReward = -200
        if terminated or truncated:
          self.mem.append(tuple(('done',self.TotalReward)))
        self.info['mem'] = self.mem
        self.info['state'] = self.states_list
        return obs, reward, terminated, truncated, info

    def set_state(self, state):
        self.env = deepcopy(state)
        obs = np.array(list(self.env.unwrapped.state))
        self.current_step = 0
        self.TotalReward = 0.0
        self.first_obs = obs
        return obs

## Torch

In [8]:
class TorchModel():
    def __init__(self, torch_net: torch.nn.Module):
        self.torch_net = torch_net

    def abstract_state(self, state1, d):
        if type(state1) == str:
            if state1 == 'done':
                return 'end'
        state1 = torch.tensor(np.array(state1), dtype=torch.float32, requires_grad=False).cuda()
        q_value = self.torch_net(state1).cpu().detach().numpy()
        if q_value.ndim == 1:
            return tuple(np.ceil(q_value / d))
        else:
            return [tuple(i) for i in np.ceil(q_value / d)]

    def predict(self, obs, deterministic=True):
        obs = torch.tensor(obs, dtype=torch.float32, requires_grad=False).cuda()
        q_value = self.torch_net(obs).cpu().detach().numpy()
        if deterministic:
            return np.argmax(q_value)
        else:
            return np.random.choice([0, 1], p=q_value / q_value.sum())

    def action_probability(self, state):
        state = torch.tensor(np.array(state), dtype=torch.float32, requires_grad=False).cuda()
        q_value = self.torch_net(state).cpu().detach().numpy()
        if q_value.ndim == 1:
            return q_value / q_value.sum()
        else:
            div_factor = q_value.sum(axis=1)
            return (q_value.T / div_factor).T

## RL function

In [10]:
def proportional_sampling_whitout_replacement(index, size):
    s = 0
    s = sum(np.array(index))
    p = [ind / s for ind in index]
    samples = np.random.choice(index, size=size, replace=False, p=p)
    return samples

def population_sample(episodes , ind,  pop_size , threshold, functional_fault_size, reward_fault_size):
  """
  This function is meant to sample episodes from training after that you need to add test episodes using random_test 
  Set the parameters as you want but be careful the input episodes for this function is the memory of the agent and each step has seperate index 
  this function returns the final steps of the selected function then you need to extract that episodes from the input memore that is called 'episodes'
  use the episodes extract function ... 

  samples n episodes from training n1 functinal faults and n2 reward faults 
  reward faults are episodes with reward bellow the thresthreshold 
  from random test samples M episodes m1 random episode and
  m2 episodes with sudden reward change we dont have a sudden reward change in this example  
  """
  epsilon = 0.1
  index = []
  functional_fault = []
  reward_fault = []
  start_states =[]
  ind  = np.where(np.array(episodes)==('done',))
  index= ind[0]
  print(len(ind[0]),'episodes from training')
  population=[]
  for i in index:
    _,r = episodes[i]
    if abs(episodes[i-1][0][0])<(mtc_wrapped.low[0]+epsilon):
      functional_fault.append(i)
      print('function fault') 
    if r<threshold:
      reward_fault.append(i)
      print('reward fault')
  if len(functional_fault)<functional_fault_size:
    print('functional faults size is' ,len(functional_fault),' and its less than desired number' )
    population += functional_fault
    print('sampling more random episodes instead ...!')
  if len(functional_fault)==functional_fault_size:
    population += functional_fault
  if len(functional_fault)>functional_fault_size:
    # proportianl_sample_whitout_replacement()
    sam1=proportional_sampling_whitout_replacement(functional_fault,functional_fault_size)
    print(population)
    print("ff",len(functional_fault))
    population += sam1
  if len(reward_fault)<reward_fault_size:
    print('reward faults size is' ,len(reward_fault),' and its less than desired number' )
    population += reward_fault
    print('sampling more random episodes instead ...!')
  if len(reward_fault)==reward_fault_size:
    population += reward_fault
  if len(reward_fault)>reward_fault_size:
    #proportional sampling
    sam2 = proportional_sampling_whitout_replacement(reward_fault,reward_fault_size)
    population += list(sam2)
  r_size= pop_size-len(population)
  # random_test(model,env,r_size)
  print("RF",len(reward_fault))
  # population += reward_fault
  return population , r_size

def episode_extract(sampled_index, episodes):
  epis = []
  for i in sampled_index:
    # print(episodes[i])
    j = i-1
    while not episodes[j][0] == 'done':
      # print(episodes[j])
      if j==0:
        break
      j-=1
    slice1 = episodes[(j+1):(i+1)]
    epis.append(slice1)
    assert len(slice1)>0, 'Attempt to return Empty episode'
  return epis

def fitness_reward(episode):
    """
    here the reward could be calculated as the lengh of the episode; Since the
    reward of the cartpole is defined based on the number of steps without falling
    last part of the episode contains the signal of ('done',reward)
    """
    return len(episode) - 1

def fitness_confidence(episode, model, mode):
    """
    confidence level is define as differences between the highest and
    second highest action probabilities of selecting actions OR
    the ratio between the highest and lowest/second highest action probability
    :param `mode`: r for ration and m for differences
    :param `model`: is the RL agent
    :param `episode`: is the episode values or sequence from the rl
    """
    cl = 0.0
    if type(episode[-1][0]) is not str or episode[-1][0] != 'done':
        assert False, "last state is not string 'done'"

    state_list = [k[0] for k in episode[:-1]]
    prob = model.action_probability(state_list)
    prob.sort(axis=1)
    if mode == 'm':
        return (prob[:, -1] - prob[:, -2]).sum() / episode[-1][1]
    elif mode == 'r':
        return (prob[:, -1] / prob[:, -2]).sum() / episode[-1][1]
    print("WARNING nothing returned", episode)

def fitness_reward_probability(ml, binary_episode):
    """
    This function returns the third fitness funciton that is ment to guide the search toward
    the episodes with a higher probability of a reward fault and as we have a minimizing
    optimization funciton in MOSA we neeed to change this functionwe can either go with the
    negation of the probability of the reward fault = 1-probability of the reward fault
    that is equal to the probability of the bein a non-faulty episode
    :param `ml`: RF_FF_1rep for functional fault
    :param `binary episode`: episodes decodeed as having abstract states
    """
    # return -(ml.predict_proba(episode)[0][1])
    return ml.predict_proba(binary_episode)[0][0]

def fitness_functional_probability(ml, binary_episode):
    return ml.predict_proba(binary_episode)[0][0]

def state_abstraction(model, state1, state2, d):
    """
    This function compares to state, if they were in the same abstract class
    function returs 'True' otherwise 'False'
    """
    q_value1 = model.step_model.step([state1])
    q_value2 = model.step_model.step([state2])
    for i in range(len(q_value1[1][0])):
        print(q_value1[1][0][i])
        print(q_value2[1][0][i])
        if ceil(q_value1[1][0][i] / d) == ceil(q_value2[1][0][i] / d):
            continue
        else:
            return False
    return True

def report(model2, x_train, y_train, x_test, y_test):
    plt.ion()
    print("********************** reporting the result of the model **************************")
    print('The score for train data is {0}'.format(model2.score(x_train, y_train)))
    print('The score for test data is {0}'.format(model2.score(x_test, y_test)))

    predictions_train = model2.predict(x_train)
    predictions_test = model2.predict(x_test)

    print("\n\n--------------------------------------recall---------------------------------")

    print(
        'the test recall for the class yes is {0}'.format(metrics.recall_score(y_test, predictions_test, pos_label=1)))
    print('the test recall for the class no is {0}'.format(metrics.recall_score(y_test, predictions_test, pos_label=0)))

    print('the training recall for the class yes is {0}'.format(
        metrics.recall_score(y_train, predictions_train, pos_label=1)))
    print('the training recall for the class no is {0}'.format(
        metrics.recall_score(y_train, predictions_train, pos_label=0)))

    print("\n\n--------------------------------------precision------------------------------")

    print('the test precision for the class yes is {0}'.format(
        metrics.precision_score(y_test, predictions_test, pos_label=1)))
    print('the test precision for the class no is {0}'.format(
        metrics.precision_score(y_test, predictions_test, pos_label=0)))

    print('the training precision for the class yes is {0}'.format(
        metrics.precision_score(y_train, predictions_train, pos_label=1)))
    print('the training precision for the class no is {0}'.format(
        metrics.precision_score(y_train, predictions_train, pos_label=0)))

    print("\n\n")
    print(classification_report(y_test, predictions_test, target_names=['NO ', 'yes']))

    tn, fp, fn, tp = confusion_matrix(y_test, predictions_test).ravel()
    specificity = tn / (tn + fp)
    print("\n\nspecifity :", specificity)
    print("\n\n--------------------------------------confusion----------------------------")
    CM = metrics.confusion_matrix(y_test, predictions_test)
    print("The confusion Matrix:")
    print(CM)
    print('the accuracy score in {0}\n\n'.format(accuracy_score(y_test, predictions_test)))
    print("********************** plotting the confusion matrix & ROC curve **************************")
    ConfusionMatrixDisplay(CM, display_labels=model2.classes_).plot()
    fpr, tpr, thresholds = metrics.roc_curve(y_test, predictions_test)
    roc_auc = metrics.auc(fpr, tpr)
    display = metrics.RocCurveDisplay(fpr=fpr, tpr=tpr, roc_auc=roc_auc, estimator_name='example estimator')
    display.plot()
    plt.pause(3)
    plt.ioff()

def fix_testing(testing_episodes, testing_states, Env2):
    buffer = []
    episodes_set = []
    j = 0
    for i in range(len(testing_episodes)):
        if type(testing_episodes[i][0]) is str and testing_episodes[i][0] == 'done':
            if i == 0:
                continue
            buffer.append(testing_episodes[i])
            episodes_set.append(buffer)
            buffer = []
        else:
            buffer.append(testing_episodes[i])
    if not (episodes_set[0][0][0] == Env2.set_state(testing_states[0])).all():
        del testing_states[0]
    if not (episodes_set[0][0][0] == Env2.set_state(testing_states[0])).all():
        assert False, 'problem in starting states'
    if len(episodes_set) != len(testing_states):
        del testing_states[-1]
    if len(episodes_set) != len(testing_states):
        assert False, 'problem in data prepration'
    return episodes_set, testing_states


## ML

In [None]:
def Abstract_classes(ep, abstraction_d, model):
    d = abstraction_d
    abs_states1 = []
    for episode in ep:
        for state, action in episode:
            if type(state) is str:
                continue
            abs_states1.append(state)
    abs_states1 = model.abstract_state(abs_states1, d)
    unique1 = list(set(abs_states1))
    uni1 = np.array(unique1)
    a = len(abs_states1)
    b = len(set(abs_states1))
    print("abstract states:", b)
    print("Concrete states", a)
    print("ratio", b / a)
    return unique1, uni1
#need modify
def ML_first_representation(Abs_d, epsilon_functional_fault_boarder, uni1, model, ep, unique1):
    d = Abs_d
    epsilon = epsilon_functional_fault_boarder
    data1_x_b = []
    data1_y_b = []
    data1_y_f_b = []
    reward_fault_threshold = -180
    for episode in ep:
        record = np.zeros(len(uni1))

        if episode[-1][1] >= reward_fault_threshold:
            data1_y_b.append(0)
        else:
            data1_y_b.append(1)

        state_list = [k[0] for k in episode[:-1]]
        if is_fail_state(state_list):
            data1_y_f_b.append(1)
        else:
            data1_y_f_b.append(0)

        ab = model.abstract_state(state_list, d)
        for i in ab:
            try:
                record[hash_table[i]] = 1
            except:
                continue
        data1_x_b.append(record)

    return data1_x_b, data1_y_b, data1_y_f_b


## Genetic

In [None]:
def translator(episode, model, d, unique5):
    """
    thid function takes the concrete episodes and returns the encoded episodes
    based on the presence and absence of the individuals
    :param 'episode': input episode
    :param 'model': RL model
    :param 'd': abstraction level = 1
    :param 'unique5': abstract classes
    :return: encoded episodse based on the presence and absence

    """
    d = d
    record = np.zeros(len(unique5))
    state_list = []
    for state, action in episode:
        if type(state) is str and state == 'done':
            continue
        state_list.append(state)

    abstract_states = model.abstract_state(state_list, d)
    for ab in abstract_states:
        try:
            record[hash_table[ab]] = 1
        except:
            continue
    return [record]

def transform(state):
  position = state[0]
  noise = np.random.uniform(low=0.95, high=1.05)
  new_position= position * noise 
  new_state =deepcopy(state)
  new_state[0] = new_position 
  return new_state

def mutation_improved(population, model, env, objective_uncovered):
    """
    This is the final mutation function
    It takes the population as input and returns the mutated individual
    :param 'population': Population that we want to mutate
    :param 'model': RL model
    :param 'env': RL environment
    :param 'objective_uncovered: uncovered ubjectives for tournament selection
    :return: mutated candidate (we re-rexecute the episode from the mutation part)
    To-do:
    move deepcopy to the cadidate class methods .set info
    """
    parent = tournament_selection(population, 10, objective_uncovered)  # tournament selection
    parent1 = deepcopy(parent.get_candidate_values())
    if len(parent1) < 3:
        assert False, "parent in mutation is shorter than 3"
    Mutpoint = random.randint(3, (len(parent1) - 3))
    new_state = transform(parent1[Mutpoint][0])
    action = model.predict(new_state)
    if action != int(parent1[Mutpoint][1]):
        print('Mutation lured the agent ... ')
    new_parent = parent1[:Mutpoint]
    new_parent.append([new_state, 'Mut'])
    new_cand = Candidate(new_parent)
    new_cand.set_start_state(parent.get_start_state())

    re_executed_epis = re_execute(model, env, new_cand)

    re_executed_cand = Candidate(re_executed_epis)
    re_executed_cand.set_start_state(new_cand.get_start_state())
    re_executed_cand.set_info(deepcopy(parent.get_info()))
    re_executed_cand.set_info(["mutation is done! ", "mutpoint was:", Mutpoint])

    return re_executed_cand

def mutation_improved_p(parent, model, env, m_rate):
    """
    This is the final mutation function with input of a parent considering internal m_rate
    Here we give the parent to themutation funcion based on the given mutation
    rate of m_rate, we may mutate the episodes.
    :param 'parent' : individual that we want to mutate
    :param 'model': RL model
    :param 'env': RL environment
    :param 'm_rate': mutation : recommended value is 1/len(parent)
    :return : mutated individual
    To-do:
    move deepcopy to the cadidate .set info
    """
    # parent = tournament_selection(population, 10, objective_uncovered)  # tournament selection
    global MUTATION_NUMBER
    chance = random.uniform(0, 1)
    if chance > m_rate:
        return parent
    else:
        parent1 = deepcopy(parent.get_candidate_values())
        if len(parent1) < 3:
            assert False, "parent in mutation is shorter than 3"
        Mutpoint = random.randint(1, (len(parent1) - 3))
        new_state = transform(parent1[Mutpoint][0])
        action = model.predict(new_state)
        if action != int(parent1[Mutpoint][1]):
            print('Mutation lured the agent ... ')
        new_parent = parent1[:Mutpoint]
        new_parent.append([new_state, 'Mut'])
        new_cand = Candidate(new_parent)
        new_cand.set_start_state(parent.get_start_state())
        re_executed_epis = re_execute(model, env, new_cand)
        re_executed_cand = Candidate(re_executed_epis)
        re_executed_cand.set_start_state(new_cand.get_start_state())
        re_executed_cand.set_info(deepcopy(parent.get_info()))
        re_executed_cand.set_info(["mutation is done! ", "mutpoint was:", Mutpoint])
        MUTATION_NUMBER += 1
        return re_executed_cand
#need modify    
def Crossover_improved_v2(population, model, d, objective_uncovered):
    """
    This is the crossover function that we are using
    It takes the population as input and returns the mutated individual
    :param 'population': Population. we select a parent based on the tournament
     selection and then select the mutation point and then search for the matching point.
    :param 'model': RL model
    :param 'env': RL environment
    :param 'objective_uncovered: uncovered ubjectives for tournament selection
    :return: mutated candidate (we re-rexecute the episode from the mutation part)
    To-do:
    finding matching episode could be improved bu storing a mapping between concrete states and
    """
    found_match = False
    while not (found_match):
        parent = tournament_selection(population, 10, objective_uncovered)  # tournament selection
        parent1 = deepcopy(parent.get_candidate_values())
        parent1_start_point = deepcopy(parent.get_start_state())
        if len(parent1) < 6:
            continue
            # assert False, 'input of crossover is shorter than expected '
        crosspoint = random.randint(2, (len(parent1) - 3))
        abs_class = model.abstract_state(parent1[crosspoint][0], d)
        for i in range(50):
            indx = random.randint(0, len(population) - 1)
            random_candidate = deepcopy(population[indx])
            random_cand_data = random_candidate.get_candidate_values()
            if len(random_cand_data) < 8:
                continue
            random_cand_start_point = random_candidate.get_start_state()
            random_cand_state_list = [k[0] for k in random_cand_data[2:len(random_cand_data)-3]]
            random_ab = model.abstract_state(random_cand_state_list, d)
            judge = (np.array(random_ab) == np.array(abs_class)).all(axis=1)
            if judge.any():
                matches_list = np.where(judge)[0]
                found_match = True
            if found_match:
                break
                # print('Crossover. attemp',i)
    index_match_in_matchlist = random.randint(0, len(matches_list) - 1)
    matchpoint = matches_list[index_match_in_matchlist]
    match_candidate = deepcopy(random_candidate)
    match = deepcopy(random_cand_data)
    match_start = deepcopy(random_cand_start_point)
    offspring1 = deepcopy(parent1[:crosspoint])
    offspring1 += deepcopy(match[matchpoint:])
    offspring1[-1] = ['done', (len(offspring1) - 1)]
    candid1 = Candidate(offspring1)
    candid1.set_start_state(parent1_start_point)
    candid1.set_info(deepcopy(parent.get_info()))
    candid1.set_info(["crossover is Done!", "the crossover point is:", crosspoint])
    offspring2 = deepcopy(match[:matchpoint])
    offspring2 += deepcopy(parent1[crosspoint:])
    offspring2[-1] = ['done', (len(offspring2) - 1)]
    candid2 = Candidate(offspring2)
    candid2.set_start_state(match_start)
    candid2.set_info(deepcopy(match_candidate.get_info()))
    candid2.set_info(["crossover is Done!", "the crossover point is:", matchpoint])

    if len(offspring1) < 3:
        print(offspring1)
        assert False, 'created offspring 1 in crossover is shorter than expected '

    if len(offspring2) < 3:
        print(offspring2)
        assert False, 'created offspring 2 in crossover is shorter than expected '

    return candid1, candid2
#need to modify
def Crossover_improved_v2_random(population, model, d, objective_uncovered):
    found_match = False
    while not found_match:
        i = random.randint(0, len(population))
        parent1 = deepcopy(population[i].get_candidate_values())
        parent1_start_point = deepcopy(population[i].get_start_state())
        matches_list = []
        crosspoint = random.randint(1, (len(parent1) - 3))
        abs_class = list(model.abstract_state(parent1[crosspoint][0], d))
        attemp = 0
        for i in range(700):
            attemp += 1
            indx = random.randint(0, len(population) - 1)
            random_candidate = deepcopy(population[indx])
            random_cand_data = random_candidate.get_candidate_values()
            random_cand_start_point = random_candidate.get_start_state()
            for st_index in range(1, len(random_cand_data) - 3):
                random_ab = list(model.abstract_state(random_cand_data[st_index][0], d))
                if random_ab == abs_class:
                    matches_list.append(st_index)
                    found_match = True
            if found_match:
                break
    print("match found in --- attemps", attemp)
    index_match_in_matchlist = random.randint(0, len(matches_list) - 1)
    matchpoint = matches_list[index_match_in_matchlist]
    match_candidate = random_candidate
    match = random_cand_data
    match_start = deepcopy(random_cand_start_point)
    offspring1 = deepcopy(parent1[:crosspoint])
    offspring1 += deepcopy(match[matchpoint:])
    offspring1[-1] = ['done', (len(offspring1) - 1)]
    candid1 = Candidate(offspring1)
    candid1.set_start_state(parent1_start_point)

    offspring2 = deepcopy(match[:matchpoint])
    offspring2 += deepcopy(parent1[crosspoint:])
    offspring2[-1] = ['done', (len(offspring2) - 1)]
    candid2 = Candidate(offspring2)
    candid2.set_start_state(match_start)
    return candid1, candid2

