In [1]:
import gymnasium as gym
import random
import numpy as np
from dataclasses import dataclass
from skfuzzy.membership import trapmf
from typing import List, Tuple
import copy
from helper import explain_rule_strengths, FQLModelVerbose

np.random.seed(42)
random.seed(42)

In [2]:
env = gym.make("MountainCar-v0", render_mode="human", goal_velocity=0.1)

observation, info = env.reset(seed=42)
for _ in range(1000):
    action = env.action_space.sample()
    observation, reward, terminated, truncated, info = env.step(action)
    
    if terminated or truncated:
        observation, info = env.reset()
env.close()

  from pkg_resources import resource_stream, resource_exists


## Mathematical Explanation

### 1. Trapezoidal Membership Function

A trapezoidal membership function is defined by four parameters:

$$
\text{Trapezium}(a, b, c, d)
$$

Where:  
- $a$ = left (start of slope-up)  
- $b$ = left\_top (start of plateau)  
- $c$ = right\_top (end of plateau)  
- $d$ = right (end of slope-down)  

The membership function $\mu(x)$ is given by:

$$
\mu(x) =
\begin{cases}
0, & x \le a \\
\frac{x - a}{b - a}, & a < x < b \\
1, & b \le x \le c \\
\frac{d - x}{d - c}, & c < x < d \\
0, & x \ge d
\end{cases}
$$

### 2. Membership Values for an Input Variable

For a crisp input value $x$ and a set of $n$ fuzzy sets $F_1, F_2, \dots, F_n$,  
the membership vector is:

$$
M(x) = \left[ \mu_{F_1}(x), \mu_{F_2}(x), \dots, \mu_{F_n}(x) \right]
$$

### 3. Rule Membership Calculation

If the system has $k$ input variables, each with $n_i$ fuzzy sets,  
the total number of rules is:

$$
N_{\text{rules}} = \prod_{i=1}^k n_i
$$

For a given crisp state:

$$
S = [x_1, x_2, \dots, x_k]
$$

Each rule corresponds to a combination of fuzzy set indices:

$$
R_j = (f_1, f_2, \dots, f_k), \quad f_i \in \{1, \dots, n_i\}
$$

The unnormalized membership of a rule is the product of the membership degrees:

$$
\mu_{R_j} = \prod_{i=1}^k \mu_{F_{i, f_i}}(x_i)
$$

### 4. Normalization of Rule Memberships

Finally, rule memberships are normalized to sum to 1:

$$
\mu_{R_j}^{\text{norm}} = \frac{\mu_{R_j}}{\sum_{m=1}^{N_{\text{rules}}} \mu_{R_m}}
$$

This ensures the set of rule memberships forms a probability-like distribution over all rules.


In [3]:
@dataclass
class Trapezium:
    """
    Represents a trapezoidal membership function for fuzzy logic.
    
    Attributes:
        left (float): Start of the trapezoid's base (lower bound).
        left_top (float): Start of the top plateau.
        right_top (float): End of the top plateau.
        right (float): End of the trapezoid's base (upper bound).
    """
    left: float
    left_top: float
    right_top: float
    right: float

    def membership_value(self, input_value: float) -> float:
        """
        Calculate the membership value of an input for this trapezoidal fuzzy set.

        Args:
            input_value (float): The crisp input value.

        Returns:
            float: Membership value in the range [0, 1].
        """
        x = np.array([input_value])
        params = [self.left, self.left_top, self.right_top, self.right]
        return float(trapmf(x, params)[0])


class InputStateVariable:
    """
    Represents a fuzzy input variable containing multiple fuzzy sets.
    """

    def __init__(self, *fuzzy_sets: Trapezium):
        """
        Initialize an input variable with fuzzy sets.

        Args:
            *fuzzy_sets (Trapezium): One or more trapezoidal fuzzy sets.
        """
        self.fuzzy_set_list: Tuple[Trapezium, ...] = fuzzy_sets

    def get_fuzzy_sets(self) -> Tuple[Trapezium, ...]:
        """
        Get all fuzzy sets for this input variable.

        Returns:
            tuple[Trapezium, ...]: The fuzzy sets.
        """
        return self.fuzzy_set_list

    def get_memberships(self, value: float) -> List[float]:
        """
        Get membership values for a crisp value across all fuzzy sets.

        Args:
            value (float): The crisp input value.

        Returns:
            list[float]: Membership values for each fuzzy set.
        """
        return [fs.membership_value(value) for fs in self.fuzzy_set_list]


class Build:
    """
    Represents a fuzzy inference system builder.
    """

    def __init__(self, *input_vars: InputStateVariable):
        """
        Initialize the fuzzy system with input variables.

        Args:
            *input_vars (InputStateVariable): One or more input variables.
        """
        self.input_vars: Tuple[InputStateVariable, ...] = input_vars

    def get_input(self) -> Tuple[InputStateVariable, ...]:
        """
        Get all input variables.

        Returns:
            tuple[InputStateVariable, ...]: The input variables.
        """
        return self.input_vars

    def get_number_of_fuzzy_sets(self, input_variable: InputStateVariable) -> int:
        """
        Get the number of fuzzy sets for a given input variable.

        Args:
            input_variable (InputStateVariable): The input variable.

        Returns:
            int: Number of fuzzy sets.
        """
        return len(input_variable.get_fuzzy_sets())

    def get_number_of_rules(self) -> int:
        """
        Compute the total number of fuzzy rules.

        Returns:
            int: The number of possible rules.
        """
        num_rules = 1
        for var in self.input_vars:
            num_rules *= self.get_number_of_fuzzy_sets(var)
        return num_rules

    def get_rule_memberships(self, state: List[float]) -> List[float]:
        """
        Calculate normalized membership values for all possible rules.

        Args:
            state (list[float]): Crisp values for each input variable.

        Returns:
            list[float]: Normalized rule membership degrees.
        """
        memberships = [
            var.get_memberships(state[i]) for i, var in enumerate(self.input_vars)
        ]

        rule_memberships = []
        for idx in np.ndindex(*[len(m) for m in memberships]):
            mu = 1.0
            for var_idx, set_idx in enumerate(idx):
                mu *= memberships[var_idx][set_idx]
            rule_memberships.append(mu)

        total = sum(rule_memberships)
        if total > 0:
            rule_memberships = [m / total for m in rule_memberships]

        return rule_memberships

In [4]:
temperature = InputStateVariable(
    Trapezium(0, 0, 10, 20),   # Low
    Trapezium(15, 25, 35, 35)  # High
)

speed = InputStateVariable(
    Trapezium(0, 0, 20, 40),   # Slow
    Trapezium(30, 50, 70, 70)  # Fast
)

system = Build(temperature, speed)
state = [18, 35]
labels = [["Low", "High"], ["Slow", "Fast"]]

explain_rule_strengths(system, state, labels=labels, decimals=4)

                   FUZZY INFERENCE — Step-by-step evaluation                    

1) Crisp input values:
   Input  1: value = 18
   Input  2: value = 35

2) Memberships per input variable (for the given crisp values):

   Input 1 (value = 18):
      1. Low          | params = (0, 0, 10, 20) -> μ = 0.2000
      2. High         | params = (15, 25, 35, 35) -> μ = 0.3000

   Input 2 (value = 35):
      1. Slow         | params = (0, 0, 20, 40) -> μ = 0.2500
      2. Fast         | params = (30, 50, 70, 70) -> μ = 0.2500

3) Building rule combinations and computing unnormalized strengths:

   Rule | Antecedents                    | Memberships          |     Unnorm
   ----------------------------------------------------------------------
      1 | Low, Slow                      | 0.2000, 0.2500       |     0.0500
      2 | Low, Fast                      | 0.2000, 0.2500       |     0.0500
      3 | High, Slow                     | 0.3000, 0.2500       |     0.0750
      4 | High, Fast      

# Fuzzy Q-Learning — Mathematical Explanation

## Notation
- Let there be $k$ input variables (e.g., Temperature, Speed).
- Input variable $i$ has $n_i$ fuzzy sets.
- Each fuzzy set is a trapezoid $\mathrm{Trapezium}(a,b,c,d)$.
- A *rule* $R_j$ is a particular combination of one fuzzy set from each input.
- The total number of rules:
  $$
  N_{\text{rules}} = \prod_{i=1}^k n_i.
  $$

---

## 1. Trapezoidal membership function
A trapezoid is parameterised by $a,b,c,d$. The membership of crisp $x$ in that trapezoid is:

$$
\mu(x) =
\begin{cases}
0, & x \le a \\
\dfrac{x - a}{b - a}, & a < x < b \\
1, & b \le x \le c \\
\dfrac{d - x}{d - c}, & c < x < d \\
0, & x \ge d
\end{cases}
$$

---

## 2. Per-variable membership vector
For a given input variable $i$ and crisp value $x_i$, compute memberships across its fuzzy sets:

$$
M_i(x_i) = \big[ \mu_{i,1}(x_i), \mu_{i,2}(x_i), \dots, \mu_{i,n_i}(x_i)\big].
$$

---

## 3. Rule (antecedent) unnormalized strength
For rule $R_j$ corresponding to fuzzy-set indices $(f_1,\dots,f_k)$ (one index per input):

$$
\tilde{\mu}_{R_j} = \prod_{i=1}^k \mu_{i,f_i}(x_i).
$$

This is the product t-norm (AND) across inputs.

---

## 4. Normalize rule strengths
Normalize rule strengths to sum to 1 (so they behave like a distribution over rules):

$$
\mu_{R_j} = \frac{\tilde{\mu}_{R_j}}{\sum_{m=1}^{N_{\text{rules}}} \tilde{\mu}_{R_m}}.
$$

Let the normalized rule-strength vector be $ \mathbf{R} = [\mu_{R_1}, \dots, \mu_{R_N}]. $

---

## 5. Per-rule action choices
For each rule $j$ we pick an action index $M_j$ (from $\{0,\dots,A-1\}$) using an ε-greedy policy on that rule's Q-row:

- with probability $\epsilon$: choose a random action.
- otherwise: choose $\arg\max_{a} Q[j,a]$.

---

## 6. Aggregate action selection (final action)
Compute action weights by combining rule strengths with per-rule Q-values:

$$
W(a) = \sum_{j=1}^{N} \mu_{R_j} \cdot Q[j,a]
$$

Pick final action:

$$
a^\star = \arg\max_a W(a)
$$

(Optionally add a tiny random jitter to $W$ if the standard deviation is too small, to break ties.)

---

## 7. Q-value for the *previous* state (scalar)
When the agent previously observed state $s_{t-1}$ it recorded:
- the rule strengths $\mathbf{R}^{t-1}$, and
- the per-rule chosen actions $M^{t-1}_j$.

The scalar (aggregated) Q-value used as baseline is:

$$
Q_{\text{prev}} = \sum_{j=1}^{N} \mu^{t-1}_{R_j} \cdot Q\big[j, M^{t-1}_j\big].
$$

---

## 8. State value for the *current* state
For current state $s_t$ with rule strengths $\mathbf{R}^t$, compute

$$
V_t = \sum_{j=1}^{N} \mu_{R_j}^t \cdot \max_a Q[j,a].
$$

---

## 9. Temporal-difference (TD) error and Q update
Given immediate reward $r_t$ (received for the transition from $s_{t-1}$ to $s_t$), discount $\gamma$ and learning rate $\alpha$:

TD error:

$$
\delta_t = r_t + \gamma \, V_t \;-\; Q_{\text{prev}}.
$$

Update the Q-table for each rule $j$ that was active in the previous state ($\mu^{t-1}_{R_j} > 0$):

$$
Q[j, M^{t-1}_j] \leftarrow Q[j, M^{t-1}_j] \;+\; \alpha \cdot \delta_t \cdot \mu^{t-1}_{R_j}.
$$

This multiplies the TD error by the rule's activation in the previous state (responsibility weighting).

In [None]:
class FQLModel:
    """
    Fuzzy Q-Learning Model.

    This class implements a fuzzy reinforcement learning agent using a Q-learning
    approach with a fuzzy inference system (FIS) for state representation.

    Attributes:
        gamma (float): Discount factor for future rewards.
        alpha (float): Learning rate.
        epsilon (float): Exploration rate (for ε-greedy policy).
        action_set_length (int): Number of possible actions.
        fis (Build): Fuzzy inference system for computing rule memberships.
        q_table (np.ndarray): Q-values table, shape = (num_rules, num_actions).
        R (List[float]): Truth values (rule activations) for the current state.
        R_ (List[float]): Truth values for the previous state.
        M (List[int]): Selected action index per rule.
        V (List[float]): State value history.
        Q (List[float]): Q-value history.
        Error (float): Temporal Difference (TD) error.
    """

    def __init__(self, gamma: float, alpha: float, epsilon: float, action_set_length: int, fis: "Build"):
        self.gamma = gamma
        self.alpha = alpha
        self.epsilon = epsilon
        self.action_set_length = action_set_length
        self.fis = fis

        # Initialize Q-table: rows = rules, columns = actions
        self.q_table = np.zeros((self.fis.get_number_of_rules(), action_set_length))

        # Internal state variables
        self.R: List[float] = []
        self.R_: List[float] = []
        self.M: List[int] = []
        self.V: List[float] = []
        self.Q: List[float] = []
        self.Error: float = 0.0

    def truth_value(self, state_value: List[float]) -> "FQLModel":
        """
        Compute truth values (rule activations) for a given state.

        Args:
            state_value (List[float]): Crisp state values for each input variable.

        Returns:
            FQLModel: Self (for method chaining).
        """
        self.R = self.fis.get_rule_memberships(state_value)
        return self

    def action_selection(self) -> int:
        """
        Select an action using an ε-greedy strategy across fuzzy rules.

        Returns:
            int: Index of the selected action.
        """
        self.M.clear()

        # Select action for each rule
        for rule_idx in range(len(self.R)):
            if random.random() < self.epsilon:
                # Exploration
                action_index = random.randint(0, self.action_set_length - 1)
            else:
                # Exploitation
                action_index = int(np.argmax(self.q_table[rule_idx]))
            self.M.append(action_index)

        # Aggregate actions weighted by rule activations
        action_weights = np.zeros(self.action_set_length)
        for rule_idx, truth_value in enumerate(self.R):
            if truth_value > 0:
                for action_idx in range(self.action_set_length):
                    action_weights[action_idx] += truth_value * self.q_table[rule_idx, action_idx]

        # Add small noise if actions are too similar
        if np.std(action_weights) < 0.1:
            action_weights += np.random.normal(0, 0.1, self.action_set_length)

        return int(np.argmax(action_weights))

    def calculate_q_value(self):
        """
        Compute the Q-value for the previous state based on selected actions.
        """
        q_curr = sum(
            truth_value * self.q_table[index, self.M[index]]
            for index, truth_value in enumerate(self.R_)
        )
        self.Q.append(q_curr)

    def calculate_state_value(self):
        """
        Compute the state value for the current state (max-Q over all actions for each rule).
        """
        v_curr = sum(
            self.R[index] * np.max(rule_q_values)
            for index, rule_q_values in enumerate(self.q_table)
        )
        self.V.append(v_curr)

    def update_q_value(self, reward: float) -> "FQLModel":
        """
        Update the Q-table using the Temporal Difference (TD) learning rule.

        Args:
            reward (float): Immediate reward received.

        Returns:
            FQLModel: Self (for method chaining).
        """
        if not self.V or not self.Q:
            return self

        # TD Error
        self.Error = reward + self.gamma * self.V[-1] - self.Q[-1]

        # Update Q-values for rules activated in previous state
        for index, truth_value in enumerate(self.R_):
            if truth_value > 0:
                self.q_table[index, self.M[index]] += self.alpha * (self.Error * truth_value)

        return self

    def save_state_history(self):
        """
        Save current truth values (R) to R_ for the next update step.
        """
        self.R_ = copy.copy(self.R)

    def get_initial_action(self, state: List[float]) -> int:
        """
        Get the first action for an episode, clearing history.

        Args:
            state (List[float]): Initial crisp state values.

        Returns:
            int: Selected action index.
        """
        self.V.clear()
        self.Q.clear()
        self.truth_value(state)
        action = self.action_selection()
        self.calculate_q_value()
        self.save_state_history()
        return action

    def get_action(self, state: List[float]) -> int:
        """
        Select an action for the given state (no Q-update).

        Args:
            state (List[float]): Crisp state values.

        Returns:
            int: Selected action index.
        """
        self.truth_value(state)
        return self.action_selection()

    def run(self, state: List[float], reward: float) -> int:
        """
        Perform one step of the fuzzy Q-learning algorithm.

        Args:
            state (List[float]): Current crisp state values.
            reward (float): Immediate reward received.

        Returns:
            int: Selected action index for the next step.
        """
        self.truth_value(state)
        self.calculate_state_value()
        self.update_q_value(reward)
        action = self.action_selection()
        self.calculate_q_value()
        self.save_state_history()
        return action

In [None]:
# Temperature fuzzy sets: Low, High
temperature = InputStateVariable(
    Trapezium(0, 0, 10, 20),   # Low
    Trapezium(15, 25, 35, 35)  # High
)

# Speed fuzzy sets: Slow, Fast
speed = InputStateVariable(
    Trapezium(0, 0, 20, 40),   # Slow
    Trapezium(30, 50, 70, 70)  # Fast
)

fis = Build(temperature, speed)

# Create FQL model with 3 possible actions
fql = FQLModelVerbose(gamma=0.9, alpha=0.1, epsilon=0.2, action_set_length=3, fis=fis)

# Example environment reward
def env_reward(state):
    t, s = state
    return -((t - 25) ** 2) - ((s - 40) ** 2)

# Initial state and first action (no previous state to update)
state = [18, 35]
print("=== INITIAL ACTION SELECTION ===")
first_action = fql.step_verbose(state, reward=0.0)  # reward=0 for initialization
print(f"Initial chosen action: {first_action}\n\n")

# Run for a few steps
for step in range(10):
    # reward computed for the *previous* state 
    reward = env_reward(state)
    # new random state (the agent will observe this and use it to update Q using the previous reward)
    state = [random.uniform(0, 35), random.uniform(20, 70)]
    action = fql.step_verbose(state, reward)
    print(f"Step {step+1:>2}: new state = {state}, chosen action = {action}, reward w.r.t prev state = {reward:.2f}\n")

=== INITIAL ACTION SELECTION ===
STATE (t): [18, 35]

Per-variable memberships:
  Input 1: S1:0.2000, S2:0.3000
  Input 2: S1:0.2500, S2:0.2500

Normalized rule strengths (R_t):
  Rule  1: R_t = 0.2000
  Rule  2: R_t = 0.2000
  Rule  3: R_t = 0.3000
  Rule  4: R_t = 0.3000

State value V_t = sum_j R_t[j] * max_a Q[j,a] = 0.0000
Aggregated Q_prev (for previous state): 0.0000
Received reward r_t = 0.0000

TD error δ = r_t + γ * V_t - Q_prev = 0.0000

No previous rule activations recorded — skipping Q update.

Per-rule action choices for current state (ε-greedy per rule):
  Rule  1: chosen action = 0 (exploit)
  Rule  2: chosen action = 1 (explore)
  Rule  3: chosen action = 0 (exploit)
  Rule  4: chosen action = 0 (explore)

Action weights had nearly zero variance, added tiny jitter to break ties.

Aggregate action weights (W[a] = sum_j R_t[j] * Q[j,a]):
  Action 0: W = 0.0050
  Action 1: W = -0.0014
  Action 2: W = 0.0065

Selected final action: 2

Initial chosen action: 2


STATE (t): 