<a href="https://colab.research.google.com/github/FREDSAYS-dev/Thesis/blob/main/THESIS.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [18]:
import numpy as np
import pandas as pd

class SimpleEnv:
    """
    Minimal 1-D environment for NPC experiments.

    State: integer in [0, n_positions - 1]. Start at 0; goal at n_positions - 1.
    Actions: 0=left, 1=right.
    Rewards: base reward (step_cost) each step; goal_bonus at goal.
    Penalties: applied in after_step() hook based on flags.
    """

    def __init__(self, n_positions=5, step_cost=-0.1, goal_bonus=1.0, seed=42):
        self.n_positions = n_positions
        self.goal = n_positions - 1
        self.step_cost = step_cost
        self.goal_bonus = goal_bonus
        self.num_actions = 2
        self.seed(seed)

    def seed(self, seed):
        """Set the random seed for reproducibility."""
        import numpy as np
        self.rng = np.random.default_rng(seed)

    def reset(self):
        """Reset environment to the start state and return it."""
        self.state = 0
        self._history = [self.state]
        return self.state

    def step(self, action):
        """Apply the action and return (next_state, reward, done, info)."""
        old_state = self.state
        if action == 0:
            self.state = max(0, self.state - 1)
        elif action == 1:
            self.state = min(self.goal, self.state + 1)
        else:
            raise ValueError("Invalid action")

        reward = self.goal_bonus if self.state == self.goal else self.step_cost
        done = (self.state == self.goal)
        info = {'old_state': old_state, 'action': action}
        self._history.append(self.state)
        return self.state, reward, done, info

    def observe(self, transition):
        """
        Consume a real transition AFTER step has been called externally.

        Args:
            transition: dict with keys 'state', 'action', 'next_state', 'reward', 'done', 'info'
        """
        # Here you can append transitions to a dataset or compute metrics.
        pass

    def after_step(self, reward, flags):
        """
        Apply penalties based on flags after calling step().
        Args:
            reward: the base reward from step().
            flags: dict with boolean keys 'collision', 'stall', 'loop'
        Returns:
            Adjusted reward after penalties.
        """
        penalty = 0.0
        if flags.get('collision', False):
            penalty -= 1.0
        if flags.get('stall', False):
            penalty -= 0.2
        if flags.get('loop', False):
            penalty -= 0.5
        return reward + penalty


# Rule-based policy: always move right
def rule_based_policy(state):
    return 1


In [20]:
def aspect_log(policy_func):
    def wrapper(env, state):
        action = policy_func(env, state)
        next_state, reward, done = env.step(action)
        print(f"[LOG] state={state}, action={action}, next_state={next_state}, reward={reward}, done={done}")
        return action
    return wrapper

def aspect_mood(policy_func, mood_start=0.5, epsilon_base=0.3):

    import numpy as np
    mood = mood_start

    def wrapper(env, state, **kw):
        nonlocal mood
        epsilon = max(0.1, epsilon_base * (1.0 - mood))
        # let inner policy use epsilon if it wants
        action = policy_func(env, state) if 'epsilon' not in kw else policy_func(env, state, kw['epsilon'])
        aux = {"epsilon": epsilon, "mood": mood}
        # mood will be updated next call when caller injects last_reward
        if 'last_reward' in kw:
            mood = float(np.clip(mood + 0.1 * kw['last_reward'], 0.0, 1.0))
            aux["mood"] = mood
        return action, aux
    return wrapper



# Example: wrap policies
logged_policy = aspect_log(rule_based_policy)
mood_policy = aspect_mood(rule_based_policy)





In [16]:
env = SimpleEnv()
num_states, num_actions = 5, 2
Q = np.zeros((num_states, num_actions))
alpha = 0.1  # learning rate
gamma = 0.95  # discount factor
epsilon = 0.3  # exploration rate

transitions = []  # to collect state–action–reward–next_state for a dataset

for episode in range(200):
    state = env.reset()
    done = False
    while not done:
        # ε-greedy action selection
        if np.random.rand() < epsilon:
            action = np.random.randint(num_actions)
        else:
            action = np.argmax(Q[state])
        next_state, reward, done, info = env.step(action)

        # Q-learning update
        best_next_action = np.argmax(Q[next_state])
        td_target = reward + gamma * Q[next_state, best_next_action]
        Q[state, action] += alpha * (td_target - Q[state, action])
        # record transition for dataset
        transitions.append({
            'state': state,
            'action': action,
            'reward': reward,
            'next_state': next_state
        })
        state = next_state

print("Learned Q-table:")
print(Q)


Q = np.zeros((env.n_positions, env.num_actions))




def q_policy(env, state):
    return np.argmax(Q[state])


logged_q_policy = aspect_log(q_policy)
mood_q_policy = aspect_mood(q_policy)

#  Evaluation loop
state = env.reset()
aux_for_next_call = {}

for _ in range(5):
    action, aux = mood_q_policy(env, state, **aux_for_next_call)


    next_state, reward, done, info = env.step(action)


    flags = {"collision": False, "stall": (next_state == state), "loop": False}
    reward = env.after_step(reward, flags)
    env.observe({"state": state, "action": action, "next_state": next_state,
                 "reward": reward, "done": done, "info": info})


    aux_for_next_call = {"last_reward": reward}

    state = next_state
    if done:
        break


Learned Q-table:
[[0.43406605 0.57212485]
 [0.40127556 0.70749998]
 [0.5590303  0.85      ]
 [0.69124921 1.        ]
 [0.         0.        ]]


In [17]:
class TwoAgentEnv:
    """
    Minimal 2-agent 1D world.
    - positions: [pos_agent0, pos_agent1], start [0, 2]
    - actions per agent: 0=left, 1=right
    - goal at position 4
    - collision penalty if both land on same non-goal position
    """
    def __init__(self):
        self.num_positions = 5
        self.goal = 4
        self.num_actions = 2
        self.reset()

    def reset(self):
        self.positions = [0, 2]
        return self.positions.copy()

    def step(self, actions):
        rewards = [0.0, 0.0]
        done = False

        # update positions
        for i, action in enumerate(actions):
            if action == 0:
                self.positions[i] = max(0, self.positions[i] - 1)
            else:
                self.positions[i] = min(self.num_positions - 1, self.positions[i] + 1)

        # collision penalty (if not at goal)
        if self.positions[0] == self.positions[1] and self.positions[0] != self.goal:
            rewards = [-1.0, -1.0]

        # goal reward
        for i in range(2):
            if self.positions[i] == self.goal:
                rewards[i] += 1.0
                done = True

        return self.positions.copy(), rewards, done

In [21]:
def random_policy(_env, _pos):
    """A simple policy for agent 1 (opponent or ally): returns 0 or 1 at random."""
    return np.random.randint(2)

def q_policy_agent0(env, positions, Q):
    """
    Learner’s policy for agent 0 based on a Q‑table.
    Only uses the first position (agent 0) to look up the Q‑value.
    """
    pos0 = positions[0]
    return int(np.argmax(Q[pos0]))

def combined_policy(env, positions, Q, opponent_policy=random_policy):
    """
    Combines the learner’s action for agent 0 and the opponent/helper’s action for agent 1.
    Returns a list [action0, action1].
    """
    action0 = q_policy_agent0(env, positions, Q)
    action1 = opponent_policy(env, positions[1])
    return [action0, action1]


In [6]:
def random_policy(_env, _state_for_that_agent):
    return np.random.randint(2)

def q_policy_agent0(env, state_pair, Q):
    # control only agent0 via its own position; state_pair = [pos0, pos1]
    pos0 = state_pair[0]
    return int(np.argmax(Q[pos0]))

def combined_policy(env, state_pair, Q, opponent_policy=random_policy):
    action0 = q_policy_agent0(env, state_pair, Q)
    action1 = opponent_policy(env, state_pair[1])
    return [action0, action1]

In [22]:

class NActionPolicy:
    """
    Small decision scope: returns an action and metadata about candidates.
    """

    def __init__(self, num_actions=2, greedy=True):
        self.num_actions = num_actions
        self.greedy = greedy

    def __call__(self, env, state, epsilon=0.1):
        actions = list(range(self.num_actions))
        probs = np.ones(self.num_actions) / self.num_actions
        q_values = np.zeros(self.num_actions)  # replace with real Q-values if available

        if np.random.rand() < epsilon:
            action = np.random.choice(actions, p=probs)
        else:
            action = int(np.argmax(q_values))

        aux = {
            'candidate_actions': actions,
            'probs': probs.tolist(),
            'chosen': action
        }
        return action, aux

def aspect_log(policy_func):
    """
    Log wrapper: prints state, chosen action and any extras returned by the policy.
    """
    def wrapper(env, state, **kw):
        action, aux = policy_func(env, state, **kw)
        print(f"[LOG] state={state}, action={action}, aux={aux}")
        return action, aux
    return wrapper

def aspect_mood(policy_func, mood_start=0.5, epsilon_base=0.3):
    """
    Mood wrapper: adjusts exploration rate based on mood.
    Mood increases with positive reward and decreases with negative reward.
    """
    mood = mood_start
    def wrapper(env, state, **kw):
        nonlocal mood
        epsilon = max(0.1, epsilon_base * (1.0 - mood))
        aux = {'epsilon': epsilon, 'mood': mood}
        action, inner_aux = policy_func(env, state, epsilon=epsilon)
        aux.update(inner_aux)
        # When calling run_episode, update aux['last_reward'] before next call
        if 'last_reward' in aux:
            mood = float(np.clip(mood + 0.1 * aux['last_reward'], 0.0, 1.0))
        return action, aux
    return wrapper


In [23]:
def aspect_log_multi(policy_func):
    def wrapper(env, state_pair):
        actions = policy_func(env, state_pair)
        next_state, rewards, done = env.step(actions)
        print(f"[LOG2] state={state_pair}, actions={actions}, next_state={next_state}, rewards={rewards}, done={done}")
        return actions
    return wrapper

In [24]:
env2 = TwoAgentEnv()
Q_agent0 = np.zeros((env2.num_positions, env2.num_actions))  # placeholder; train if needed

# wrap combined policy for logging
logged_combined = aspect_log_multi(lambda e, s: combined_policy(e, s, Q_agent0))

state = env2.reset()
for _ in range(10):
    _ = logged_combined(env2, state)   # aspect logs + steps
    state = env2.positions

AttributeError: 'TwoAgentEnv' object has no attribute 'num_positions'

In [10]:
transitions_df = pd.DataFrame(transitions)
print(transitions_df.head())
# transitions_df.to_csv('npc_transitions.csv', index=False)  # save for reuse


   state  action  reward  next_state
0      0       0    -0.1           0
1      0       1    -0.1           1
2      1       0    -0.1           0
3      0       0    -0.1           0
4      0       1    -0.1           1


In [11]:
mood = 0.5  # neutral mood
state = env.reset()
for step in range(15):
    # exploration rate depends on mood (higher mood → less exploration)
    epsilon_emotion = max(0.1, 1.0 - mood)
    if np.random.rand() < epsilon_emotion:
        action = np.random.randint(num_actions)
    else:
        action = np.argmax(Q[state])
    next_state, reward, done, info = env.step(action)

    # update mood
    mood = min(1.0, max(0.0, mood + 0.1 * reward))
    print(f"Step {step+1}: state={state}, action={'right' if action==1 else 'left'}, reward={reward}, mood={mood:.2f}")
    state = next_state
    if done:
        state = env.reset()


Step 1: state=0, action=right, reward=-0.1, mood=0.49
Step 2: state=1, action=right, reward=-0.1, mood=0.48
Step 3: state=2, action=right, reward=-0.1, mood=0.47
Step 4: state=3, action=left, reward=-0.1, mood=0.46
Step 5: state=2, action=right, reward=-0.1, mood=0.45
Step 6: state=3, action=right, reward=1.0, mood=0.55
Step 7: state=0, action=left, reward=-0.1, mood=0.54
Step 8: state=0, action=left, reward=-0.1, mood=0.53
Step 9: state=0, action=left, reward=-0.1, mood=0.52
Step 10: state=0, action=left, reward=-0.1, mood=0.51
Step 11: state=0, action=right, reward=-0.1, mood=0.50
Step 12: state=1, action=left, reward=-0.1, mood=0.49
Step 13: state=0, action=left, reward=-0.1, mood=0.48
Step 14: state=0, action=right, reward=-0.1, mood=0.47
Step 15: state=1, action=left, reward=-0.1, mood=0.46
