<a href="https://colab.research.google.com/github/bintangnabiil/Hands-On-Machine-Learning-with-Scikit-Learn-Keras-and-TensorFlow/blob/main/Rangkuman_Chapter_18.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#Chapter 18: Reinforcement Learning
Reinforcement Learning (RL) adalah paradigma machine learning yang berbeda dari supervised dan unsupervised learning. Dalam RL, sebuah agent belajar untuk membuat keputusan dengan berinteraksi langsung dengan environment melalui trial dan error. Agent menerima reward atau punishment berdasarkan aksi yang diambil, dan tujuannya adalah memaksimalkan total reward yang diterima dalam jangka panjang.

Konsep RL terinspirasi dari cara manusia dan hewan belajar melalui interaksi dengan lingkungan mereka. Tidak seperti supervised learning yang memerlukan labeled data, RL belajar dari konsekuensi aksi yang diambil. Ini membuat RL sangat cocok untuk masalah-masalah yang memerlukan pengambilan keputusan sekuensial dalam lingkungan yang dinamis dan tidak pasti.
##Komponen Fundamental Reinforcement Learning
###1. Agent dan Environment
Agent adalah entitas yang belajar dan membuat keputusan. Agent berinteraksi dengan environment (lingkungan) yang merupakan segala sesuatu di luar agent yang dapat mempengaruhi atau dipengaruhi oleh aksi agent. Interaction loop antara agent dan environment membentuk dasar dari semua algoritma RL:

- Agent mengamati state saat ini dari environment
- Agent memilih action berdasarkan policy yang dimiliki
- Environment merespons dengan memberikan reward dan transisi ke state baru
- Agent menggunakan informasi ini untuk memperbaiki policy

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import gym
from collections import defaultdict, deque
import random

# Contoh implementasi environment sederhana: Grid World
class GridWorld:
    def __init__(self, width=5, height=5):
        self.width = width
        self.height = height
        self.agent_pos = [0, 0]  # Starting position
        self.goal_pos = [width-1, height-1]  # Goal position
        self.obstacles = [(2, 2), (1, 3), (3, 1)]  # Obstacle positions

    def reset(self):
        """Reset environment ke initial state"""
        self.agent_pos = [0, 0]
        return self.get_state()

    def get_state(self):
        """Return current state sebagai tuple"""
        return tuple(self.agent_pos)

    def is_valid_move(self, pos):
        """Check apakah posisi valid"""
        x, y = pos
        if x < 0 or x >= self.width or y < 0 or y >= self.height:
            return False
        if tuple(pos) in self.obstacles:
            return False
        return True

    def step(self, action):
        """Execute action dan return (next_state, reward, done, info)"""
        # Actions: 0=up, 1=down, 2=left, 3=right
        moves = [(-1, 0), (1, 0), (0, -1), (0, 1)]

        old_pos = self.agent_pos.copy()
        new_pos = [self.agent_pos[0] + moves[action][0],
                   self.agent_pos[1] + moves[action][1]]

        # Check if move is valid
        if self.is_valid_move(new_pos):
            self.agent_pos = new_pos

        # Calculate reward
        reward = -0.1  # Small negative reward for each step (encourage efficiency)
        done = False

        if self.agent_pos == self.goal_pos:
            reward = 10.0  # Large positive reward for reaching goal
            done = True
        elif tuple(self.agent_pos) in self.obstacles:
            reward = -1.0  # Negative reward for hitting obstacle

        return self.get_state(), reward, done, {}

    def render(self):
        """Visualize current state"""
        grid = np.zeros((self.height, self.width))

        # Mark obstacles
        for obs in self.obstacles:
            grid[obs[1], obs[0]] = -1

        # Mark goal
        grid[self.goal_pos[1], self.goal_pos[0]] = 2

        # Mark agent
        grid[self.agent_pos[1], self.agent_pos[0]] = 1

        symbols = {-1: '█', 0: '·', 1: 'A', 2: 'G'}
        for row in grid:
            print(''.join(symbols[int(cell)] for cell in row))
        print()

# Test environment
env = GridWorld()
print("Initial state:")
env.render()

print("After moving right:")
state, reward, done, _ = env.step(3)
print(f"State: {state}, Reward: {reward}, Done: {done}")
env.render()

###2. State, Action, dan Reward
State (s) merepresentasikan situasi saat ini dari environment yang dapat diamati oleh agent. State harus mengandung informasi yang cukup untuk membuat keputusan optimal. Action (a) adalah pilihan yang tersedia bagi agent pada setiap state. Reward (r) adalah signal feedback numerik yang diberikan environment kepada agent setelah mengambil action tertentu.

Reward function adalah fungsi yang memetakan kombinasi state-action ke nilai reward: R(s,a) → ℝ. Design reward function sangat krusial dalam RL karena menentukan perilaku yang akan dipelajari agent. Reward engineering melibatkan:
- Sparse rewards: Reward hanya diberikan pada goal states
- Dense rewards: Reward diberikan pada setiap step untuk memberikan guidance
- Shaped rewards: Reward didesain untuk memberikan informasi tambahan tentang progress

In [None]:
# Contoh berbagai jenis reward functions
class RewardShaping:
    @staticmethod
    def sparse_reward(state, goal_state):
        """Sparse reward: hanya reward di goal"""
        return 100.0 if state == goal_state else 0.0

    @staticmethod
    def distance_based_reward(state, goal_state):
        """Dense reward berdasarkan jarak ke goal"""
        distance = abs(state[0] - goal_state[0]) + abs(state[1] - goal_state[1])
        return -distance  # Negative distance sebagai reward

    @staticmethod
    def potential_based_shaping(state, goal_state, gamma=0.9):
        """Potential-based reward shaping"""
        # Potential function: negative distance to goal
        potential = -(abs(state[0] - goal_state[0]) + abs(state[1] - goal_state[1]))
        return potential

    @staticmethod
    def curiosity_reward(state, visited_states):
        """Curiosity-driven reward untuk exploration"""
        visit_count = visited_states.get(state, 0)
        return 1.0 / (1.0 + visit_count)  # Reward berkurang dengan visit count

# Demonstrasi different reward functions
goal = (4, 4)
current_state = (2, 2)
visited = {(2, 2): 3, (1, 1): 1}

print("Reward Functions Comparison:")
print(f"Sparse: {RewardShaping.sparse_reward(current_state, goal)}")
print(f"Distance-based: {RewardShaping.distance_based_reward(current_state, goal)}")
print(f"Potential-based: {RewardShaping.potential_based_shaping(current_state, goal)}")
print(f"Curiosity: {RewardShaping.curiosity_reward(current_state, visited)}")

###3. Policy dan Value Functions
Policy (π) adalah strategi yang digunakan agent untuk memilih action pada setiap state. Policy dapat berupa:

- Deterministic policy: π(s) = a (selalu memilih action yang sama untuk state tertentu)
- Stochastic policy: π(a|s) = P(At = a | St = s) (probabilitas memilih action a pada state s)
<br><br>

Value function mengukur seberapa baik suatu state atau state-action pair:

- State value function: V^π(s) = E[Gt | St = s, π] (expected return dari state s mengikuti policy π)
- Action value function (Q-function): Q^π(s,a) = E[Gt | St = s, At = a, π] (expected return dari mengambil action a pada state s)

In [None]:
# Implementasi Policy dan Value Functions
class PolicyValueFunctions:
    def __init__(self, num_states, num_actions):
        self.num_states = num_states
        self.num_actions = num_actions

        # Initialize value functions
        self.V = defaultdict(float)  # State values
        self.Q = defaultdict(lambda: defaultdict(float))  # Action values

        # Initialize policy (uniform random initially)
        self.policy = defaultdict(lambda: np.ones(num_actions) / num_actions)

    def get_action_probabilities(self, state):
        """Get action probabilities untuk stochastic policy"""
        return self.policy[state]

    def get_greedy_action(self, state):
        """Get greedy action berdasarkan Q-values"""
        if state not in self.Q:
            return np.random.randint(self.num_actions)

        q_values = np.array([self.Q[state][a] for a in range(self.num_actions)])
        return np.argmax(q_values)

    def get_epsilon_greedy_action(self, state, epsilon=0.1):
        """Epsilon-greedy action selection"""
        if np.random.random() < epsilon:
            return np.random.randint(self.num_actions)  # Explore
        else:
            return self.get_greedy_action(state)  # Exploit

    def update_policy_from_q(self, state, epsilon=0.1):
        """Update policy berdasarkan Q-values (epsilon-greedy)"""
        probs = np.full(self.num_actions, epsilon / self.num_actions)
        best_action = self.get_greedy_action(state)
        probs[best_action] += 1.0 - epsilon
        self.policy[state] = probs

    def calculate_state_value(self, state):
        """Calculate V(s) dari Q(s,a) dan policy"""
        if state not in self.Q:
            return 0.0

        value = 0.0
        action_probs = self.get_action_probabilities(state)

        for action in range(self.num_actions):
            value += action_probs[action] * self.Q[state][action]

        return value

# Contoh penggunaan
pv_functions = PolicyValueFunctions(num_states=25, num_actions=4)

# Simulate beberapa Q-values
test_state = (2, 2)
pv_functions.Q[test_state][0] = 1.5  # Up
pv_functions.Q[test_state][1] = -0.5  # Down
pv_functions.Q[test_state][2] = 0.8   # Left
pv_functions.Q[test_state][3] = 2.1   # Right

print("Q-values for state (2,2):")
for action in range(4):
    print(f"Action {action}: {pv_functions.Q[test_state][action]}")

print(f"\nGreedy action: {pv_functions.get_greedy_action(test_state)}")
print(f"Epsilon-greedy action: {pv_functions.get_epsilon_greedy_action(test_state, 0.2)}")

pv_functions.update_policy_from_q(test_state, epsilon=0.1)
print(f"Updated policy probabilities: {pv_functions.policy[test_state]}")
print(f"State value: {pv_functions.calculate_state_value(test_state)}")

##Markov Decision Process (MDP)
RL problems secara formal dimodelkan sebagai Markov Decision Process (MDP). MDP didefinisikan oleh tuple (S, A, P, R, γ):

- S: Set of states
- A: Set of actions
- P: Transition probability function P(s'|s,a)
- R: Reward function R(s,a,s')
- γ: Discount factor (0 ≤ γ ≤ 1)

Markov Property menyatakan bahwa future state hanya bergantung pada current state dan action, tidak pada history sebelumnya: P(St+1|St, At, St-1, At-1, ...) = P(St+1|St, At).
<br><br>

Discount factor γ menentukan seberapa penting future rewards dibandingkan immediate rewards. γ = 0 membuat agent hanya peduli immediate reward, sedangkan γ → 1 membuat agent mempertimbangkan long-term consequences.

In [None]:
# Implementasi MDP untuk Grid World
class GridWorldMDP:
    def __init__(self, env, gamma=0.9):
        self.env = env
        self.gamma = gamma
        self.states = self._get_all_states()
        self.actions = list(range(4))  # 0=up, 1=down, 2=left, 3=right

        # Build transition model
        self.P = self._build_transition_model()
        self.R = self._build_reward_model()

    def _get_all_states(self):
        """Get semua possible states"""
        states = []
        for x in range(self.env.width):
            for y in range(self.env.height):
                if (x, y) not in self.env.obstacles:
                    states.append((x, y))
        return states

    def _build_transition_model(self):
        """Build transition probability model P(s'|s,a)"""
        P = {}

        for state in self.states:
            P[state] = {}
            for action in self.actions:
                P[state][action] = {}

                # Simulate action dari state ini
                self.env.agent_pos = list(state)
                next_state, _, _, _ = self.env.step(action)

                # Deterministic transitions (probability = 1.0)
                P[state][action][next_state] = 1.0

        return P

    def _build_reward_model(self):
        """Build reward model R(s,a,s')"""
        R = {}

        for state in self.states:
            R[state] = {}
            for action in self.actions:
                R[state][action] = {}

                # Get expected reward
                self.env.agent_pos = list(state)
                next_state, reward, _, _ = self.env.step(action)
                R[state][action][next_state] = reward

        return R

    def get_transition_prob(self, state, action, next_state):
        """Get P(s'|s,a)"""
        return self.P.get(state, {}).get(action, {}).get(next_state, 0.0)

    def get_reward(self, state, action, next_state):
        """Get R(s,a,s')"""
        return self.R.get(state, {}).get(action, {}).get(next_state, 0.0)

    def get_expected_reward(self, state, action):
        """Get expected reward E[R|s,a]"""
        expected_reward = 0.0

        for next_state in self.states:
            prob = self.get_transition_prob(state, action, next_state)
            reward = self.get_reward(state, action, next_state)
            expected_reward += prob * reward

        return expected_reward

# Contoh penggunaan MDP
env = GridWorld(width=4, height=4)
mdp = GridWorldMDP(env)

print("MDP Information:")
print(f"Number of states: {len(mdp.states)}")
print(f"Number of actions: {len(mdp.actions)}")
print(f"Discount factor: {mdp.gamma}")

# Test transition probabilities
test_state = (1, 1)
test_action = 3  # Right
print(f"\nTransitions from state {test_state} with action {test_action}:")
for next_state in mdp.states:
    prob = mdp.get_transition_prob(test_state, test_action, next_state)
    if prob > 0:
        reward = mdp.get_reward(test_state, test_action, next_state)
        print(f"  → {next_state}: P={prob:.2f}, R={reward:.2f}")

##Value Iteration Algorithm
Value Iteration adalah dynamic programming algorithm yang menghitung optimal value function dengan iteratively updating value estimates. Algorithm ini berdasarkan pada Bellman Optimality Equation:

$$
V*(s) = max_a Σ_{s'} P(s'|s,a)[R(s,a,s') + γV*(s')]
$$

Value iteration melakukan updates sampai convergence, kemudian mengekstrak optimal policy dari optimal value function.

In [None]:
# Implementasi Value Iteration
class ValueIteration:
    def __init__(self, mdp, theta=1e-6):
        self.mdp = mdp
        self.theta = theta  # Convergence threshold
        self.V = {state: 0.0 for state in mdp.states}  # Initialize values to 0
        self.policy = {}

    def bellman_update(self, state):
        """Perform Bellman update untuk satu state"""
        if state == self.mdp.env.goal_pos:
            return 0.0  # Terminal state

        max_value = float('-inf')

        for action in self.mdp.actions:
            action_value = 0.0

            # Calculate E[R + γV(s')] untuk action ini
            for next_state in self.mdp.states:
                prob = self.mdp.get_transition_prob(state, action, next_state)
                reward = self.mdp.get_reward(state, action, next_state)

                action_value += prob * (reward + self.mdp.gamma * self.V[next_state])

            max_value = max(max_value, action_value)

        return max_value

    def value_iteration(self, max_iterations=1000):
        """Run value iteration algorithm"""
        iterations = 0
        value_history = []

        for i in range(max_iterations):
            delta = 0.0
            new_V = {}

            # Update semua states
            for state in self.mdp.states:
                old_value = self.V[state]
                new_V[state] = self.bellman_update(state)
                delta = max(delta, abs(new_V[state] - old_value))

            self.V = new_V
            value_history.append(dict(self.V))
            iterations += 1

            # Check convergence
            if delta < self.theta:
                print(f"Converged after {iterations} iterations")
                break

        return value_history

    def extract_policy(self):
        """Extract optimal policy dari value function"""
        policy = {}

        for state in self.mdp.states:
            if state == tuple(self.mdp.env.goal_pos):
                policy[state] = 0  # Arbitrary action for terminal state
                continue

            best_action = None
            best_value = float('-inf')

            for action in self.mdp.actions:
                action_value = 0.0

                for next_state in self.mdp.states:
                    prob = self.mdp.get_transition_prob(state, action, next_state)
                    reward = self.mdp.get_reward(state, action, next_state)

                    action_value += prob * (reward + self.mdp.gamma * self.V[next_state])

                if action_value > best_value:
                    best_value = action_value
                    best_action = action

            policy[state] = best_action

        self.policy = policy
        return policy

    def visualize_values(self):
        """Visualize value function"""
        grid = np.zeros((self.mdp.env.height, self.mdp.env.width))

        for state in self.mdp.states:
            x, y = state
            grid[y, x] = self.V[state]

        plt.figure(figsize=(8, 6))
        plt.imshow(grid, cmap='viridis', origin='upper')
        plt.colorbar(label='State Value')
        plt.title('State Values from Value Iteration')

        # Add value text pada setiap cell
        for state in self.mdp.states:
            x, y = state
            plt.text(x, y, f'{self.V[state]:.2f}',
                    ha='center', va='center', color='white', fontsize=8)

        plt.show()

    def visualize_policy(self):
        """Visualize optimal policy"""
        action_symbols = ['↑', '↓', '←', '→']

        grid = np.full((self.mdp.env.height, self.mdp.env.width), ' ', dtype=str)

        for state in self.mdp.states:
            x, y = state
            if state == tuple(self.mdp.env.goal_pos):
                grid[y, x] = 'G'
            elif self.policy[state] is not None:
                grid[y, x] = action_symbols[self.policy[state]]

        # Mark obstacles
        for obs in self.mdp.env.obstacles:
            x, y = obs
            grid[y, x] = '█'

        print("Optimal Policy:")
        for row in grid:
            print(' '.join(row))

# Run Value Iteration
env = GridWorld(width=5, height=5)
mdp = GridWorldMDP(env, gamma=0.9)
vi = ValueIteration(mdp)

print("Running Value Iteration...")
value_history = vi.value_iteration()

print("\nFinal State Values:")
for state in sorted(vi.V.keys()):
    print(f"V{state} = {vi.V[state]:.3f}")

# Extract dan visualize optimal policy
optimal_policy = vi.extract_policy()
print(f"\nOptimal Policy extracted")
vi.visualize_policy()
vi.visualize_values()

##Policy Iteration Algorithm
Policy Iteration adalah alternative approach yang terdiri dari dua langkah berulang:

- Policy Evaluation: Menghitung value function untuk current policy
- Policy Improvement: Update policy berdasarkan value function

Policy iteration sering converge dalam fewer iterations dibandingkan value iteration, meskipun setiap iteration lebih computationally expensive.

In [None]:
# Implementasi Policy Iteration
class PolicyIteration:
    def __init__(self, mdp, theta=1e-6):
        self.mdp = mdp
        self.theta = theta

        # Initialize random policy
        self.policy = {}
        for state in mdp.states:
            self.policy[state] = np.random.choice(mdp.actions)

        # Initialize value function
        self.V = {state: 0.0 for state in mdp.states}

    def policy_evaluation(self, max_iterations=1000):
        """Evaluate current policy"""
        for i in range(max_iterations):
            delta = 0.0
            new_V = {}

            for state in self.mdp.states:
                if state == tuple(self.mdp.env.goal_pos):
                    new_V[state] = 0.0  # Terminal state
                    continue

                # Calculate value untuk current policy
                action = self.policy[state]
                value = 0.0

                for next_state in self.mdp.states:
                    prob = self.mdp.get_transition_prob(state, action, next_state)
                    reward = self.mdp.get_reward(state, action, next_state)
                    value += prob * (reward + self.mdp.gamma * self.V[next_state])

                new_V[state] = value
                delta = max(delta, abs(new_V[state] - self.V[state]))

            self.V = new_V

            if delta < self.theta:
                break

        return i + 1  # Return number of iterations

    def policy_improvement(self):
        """Improve policy berdasarkan current value function"""
        policy_stable = True

        for state in self.mdp.states:
            if state == tuple(self.mdp.env.goal_pos):
                continue

            old_action = self.policy[state]

            # Find best action
            best_action = None
            best_value = float('-inf')

            for action in self.mdp.actions:
                action_value = 0.0

                for next_state in self.mdp.states:
                    prob = self.mdp.get_transition_prob(state, action, next_state)
                    reward = self.mdp.get_reward(state, action, next_state)
                    action_value += prob * (reward + self.mdp.gamma * self.V[next_state])

                if action_value > best_value:
                    best_value = action_value
                    best_action = action

            self.policy[state] = best_action

            if old_action != best_action:
                policy_stable = False

        return policy_stable

    def policy_iteration(self, max_iterations=100):
        """Run policy iteration algorithm"""
        iteration_history = []

        for i in range(max_iterations):
            print(f"Policy Iteration {i+1}")

            # Policy Evaluation
            eval_iterations = self.policy_evaluation()
            print(f"  Policy evaluation converged in {eval_iterations} iterations")

            # Policy Improvement
            policy_stable = self.policy_improvement()

            # Store current state
            iteration_history.append({
                'iteration': i + 1,
                'policy': dict(self.policy),
                'values': dict(self.V),
                'stable': policy_stable
            })

            if policy_stable:
                print(f"Policy converged after {i+1} iterations")
                break

        return iteration_history

    def compare_with_value_iteration(self, vi_result):
        """Compare results dengan Value Iteration"""
        print("\nComparison with Value Iteration:")
        print("State Value Differences:")

        max_diff = 0.0
        for state in self.mdp.states:
            diff = abs(self.V[state] - vi_result.V[state])
            max_diff = max(max_diff, diff)
            if diff > 1e-3:  # Only show significant differences
                print(f"  {state}: PI={self.V[state]:.4f}, VI={vi_result.V[state]:.4f}, diff={diff:.4f}")

        print(f"Maximum difference: {max_diff:.6f}")

        # Compare policies
        policy_differences = 0
        for state in self.mdp.states:
            if self.policy[state] != vi_result.policy[state]:
                policy_differences += 1
                print(f"Policy difference at {state}: PI={self.policy[state]}, VI={vi_result.policy[state]}")

        print(f"Policy differences: {policy_differences} out of {len(self.mdp.states)} states")

# Run Policy Iteration
print("Running Policy Iteration...")
pi = PolicyIteration(mdp, theta=1e-6)
pi_history = pi.policy_iteration()

print("\nPolicy Iteration Results:")
action_symbols = ['↑', '↓', '←', '→']

print("Final Policy:")
grid = np.full((mdp.env.height, mdp.env.width), ' ', dtype=str)

for state in mdp.states:
    x, y = state
    if state == tuple(mdp.env.goal_pos):
        grid[y, x] = 'G'
    else:
        grid[y, x] = action_symbols[pi.policy[state]]

for obs in mdp.env.obstacles:
    x, y = obs
    grid[y, x] = '█'

for row in grid:
    print(' '.join(row))

# Compare dengan Value Iteration results
pi.compare_with_value_iteration(vi)

##Q-Learning Algorithm
Q-Learning adalah model-free reinforcement learning algorithm yang dapat belajar optimal policy tanpa mengetahui transition model environment. Q-Learning menggunakan Temporal Difference (TD) learning untuk update Q-values berdasarkan experience.

Q-Learning update rule:
Q(s,a) ← Q(s,a) + α[r + γ max_a' Q(s',a') - Q(s,a)]

Di mana α adalah learning rate, γ adalah discount factor, dan term dalam bracket adalah TD error.

In [None]:
# Implementasi Q-Learning
class QLearning:
    def __init__(self, num_states, num_actions, learning_rate=0.1, discount_factor=0.9,
                 epsilon=0.1, epsilon_decay=0.995, min_epsilon=0.01):
        self.num_actions = num_actions
        self.lr = learning_rate
        self.gamma = discount_factor
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.min_epsilon = min_epsilon

        # Initialize Q-table
        self.Q = defaultdict(lambda: np.zeros(num_actions))

        # Tracking
        self.episode_rewards = []
        self.episode_lengths = []
        self.td_errors = []

    def get_action(self, state, training=True):
        """Get action menggunakan epsilon-greedy policy"""
        if training and np.random.random() < self.epsilon:
            return np.random.randint(self.num_actions)  # Explore
        else:
            return np.argmax(self.Q[state])  # Exploit

    def update(self, state, action, reward, next_state, done):
        """Update Q-value menggunakan Q-learning rule"""
        current_q = self.Q[state][action]

        if done:
            target_q = reward  # No future rewards
        else:
            target_q = reward + self.gamma * np.max(self.Q[next_state])

        # Calculate TD error
        td_error = target_q - current_q
        self.td_errors.append(abs(td_error))

        # Update Q-value
        self.Q[state][action] += self.lr * td_error

        return td_error

    def decay_epsilon(self):
        """Decay exploration rate"""
        self.epsilon = max(self.min_epsilon, self.epsilon * self.epsilon_decay)

    def train(self, env, num_episodes=1000, max_steps_per_episode=200):
        """Train Q-learning agent"""
        print(f"Training Q-Learning for {num_episodes} episodes...")

        for episode in range(num_episodes):
            state = env.reset()
            total_reward = 0
            steps = 0

            for step in range(max_steps_per_episode):
                # Get action
                action = self.get_action(state, training=True)

                # Take action
                next_state, reward, done, _ = env.step(action)

                # Update Q-value
                td_error = self.update(state, action, reward, next_state, done)

                # Update state
                state = next_state
                total_reward += reward
                steps += 1

                if done:
                    break

            # Store episode statistics
            self.episode_rewards.append(total_reward)
            self.episode_lengths.append(steps)

            # Decay epsilon
            self.decay_epsilon()

            # Print progress
            if (episode + 1) % 100 == 0:
                avg_reward = np.mean(self.episode_rewards[-100:])
                avg_length = np.mean(self.episode_lengths[-100:])
                print(f"Episode {episode+1}: Avg Reward = {avg_reward:.2f}, "
                      f"Avg Length = {avg_length:.1f}, Epsilon = {self.epsilon:.3f}")

        print("Training completed!")

    def test(self, env, num_episodes=10, render=False):
        """Test trained agent"""
        test_rewards = []
        test_lengths = []

        for episode in range(num_episodes):
            state = env.reset()
            total_reward = 0
            steps = 0

            if render:
                print(f"\nTest Episode {episode + 1}:")
                env.render()

            while True:
                action = self.get_action(state, training=False)  # No exploration
                next_state, reward, done, _ = env.step(action)

                state = next_state
                total_reward += reward
                steps += 1

                if render:
                    print(f"Action: {['Up', 'Down', 'Left', 'Right'][action]}")
                    env.render()

                if done or steps > 200:
                    break

            test_rewards.append(total_reward)
            test_lengths.append(steps)

            if render:
                print(f"Episode {episode + 1}: Reward = {total_reward}, Steps = {steps}")

        avg_reward = np.mean(test_rewards)
        avg_length = np.mean(test_lengths)

        print(f"\nTest Results over {num_episodes} episodes:")
        print(f"Average Reward: {avg_reward:.2f} ± {np.std(test_rewards):.2f}")
        print(f"Average Length: {avg_length:.1f} ± {np.std(test_lengths):.1f}")

        return test_rewards, test_lengths

    def plot_training_progress(self):
        """Plot training metrics"""
        fig, axes = plt.subplots(2, 2, figsize=(15, 10))

        # Episode rewards
        axes[0, 0].plot(self.episode_rewards, alpha=0.3, label='Episode Reward')

        # Moving average
        window = min(100, len(self.episode_rewards) // 10)
        if window > 1:
            moving_avg = np.convolve(self.episode_rewards, np.ones(window)/window, mode='valid')
            axes[0, 0].plot(range(window-1, len(self.episode_rewards)), moving_avg,
                           label=f'Moving Average ({window})', linewidth=2)

        axes[0, 0].set_xlabel('Episode')
        axes[0, 0].set_ylabel('Reward')
        axes[0, 0].set_title('Episode Rewards')
        axes[0, 0].legend()
        axes[0, 0].grid(True, alpha=0.3)

        # Episode lengths
        axes[0, 1].plot(self.episode_lengths, alpha=0.3, label='Episode Length')

        if window > 1:
            moving_avg_length = np.convolve(self.episode_lengths, np.ones(window)/window, mode='valid')
            axes[0, 1].plot(range(window-1, len(self.episode_lengths)), moving_avg_length,
                           label=f'Moving Average ({window})', linewidth=2)

        axes[0, 1].set_xlabel('Episode')
        axes[0, 1].set_ylabel('Steps')
        axes[0, 1].set_title('Episode Lengths')
        axes[0, 1].legend()
        axes[0, 1].grid(True, alpha=0.3)

        # TD Errors
        if self.td_errors:
            axes[1, 0].plot(self.td_errors, alpha=0.3)

            if len(self.td_errors) > window:
                moving_avg_td = np.convolve(self.td_errors, np.ones(window)/window, mode='valid')
                axes[1, 0].plot(range(window-1, len(self.td_errors)), moving_avg_td,
                               linewidth=2, color='red')

            axes[1, 0].set_xlabel('Update Step')
            axes[1, 0].set_ylabel('|TD Error|')
            axes[1, 0].set_title('Temporal Difference Errors')
            axes[1, 0].grid(True, alpha=0.3)

        # Q-value heatmap untuk sample state
        sample_states = list(self.Q.keys())[:min(20, len(self.Q))]
        if sample_states:
            q_matrix = np.array([self.Q[state] for state in sample_states])
            im = axes[1, 1].imshow(q_matrix, cmap='viridis', aspect='auto')
            axes[1, 1].set_xlabel('Action')
            axes[1, 1].set_ylabel('State (sample)')
            axes[1, 1].set_title('Q-Values Heatmap (Sample States)')
            plt.colorbar(im, ax=axes[1, 1])

        plt.tight_layout()
        plt.show()

    def get_policy(self):
        """Extract greedy policy dari Q-values"""
        policy = {}
        for state in self.Q:
            policy[state] = np.argmax(self.Q[state])
        return policy

    def visualize_q_values(self, env):
        """Visualize Q-values untuk grid world"""
        action_symbols = ['↑', '↓', '←', '→']

        print("Q-Values and Policy:")
        print("Format: [Up, Down, Left, Right] -> Best Action")

        for y in range(env.height):
            for x in range(env.width):
                state = (x, y)

                if state in env.obstacles:
                    print("   OBSTACLE  ", end="  ")
                elif state == tuple(env.goal_pos):
                    print("    GOAL     ", end="  ")
                elif state in self.Q:
                    q_vals = self.Q[state]
                    best_action = np.argmax(q_vals)
                    print(f"[{q_vals[0]:4.1f},{q_vals[1]:4.1f},{q_vals[2]:4.1f},{q_vals[3]:4.1f}]{action_symbols[best_action]}", end=" ")
                else:
                    print("    UNVIS    ", end="  ")
            print()

# Implementasi dan training Q-Learning
print("="*60)
print("Q-LEARNING DEMONSTRATION")
print("="*60)

# Create environment dan agent
env = GridWorld(width=5, height=5)
q_agent = QLearning(num_states=25, num_actions=4,
                   learning_rate=0.1, discount_factor=0.9,
                   epsilon=0.9, epsilon_decay=0.995)

# Train agent
q_agent.train(env, num_episodes=500, max_steps_per_episode=100)

# Test trained agent
print("\nTesting trained agent:")
test_rewards, test_lengths = q_agent.test(env, num_episodes=5, render=True)

# Visualize results
q_agent.plot_training_progress()
q_agent.visualize_q_values(env)

# Compare Q-Learning policy dengan optimal policy
print("\nComparison with Value Iteration:")
q_policy = q_agent.get_policy()
differences = 0

for state in vi.policy:
    if state in q_policy:
        if q_policy[state] != vi.policy[state]:
            differences += 1
            print(f"Policy difference at {state}: Q-Learning={q_policy[state]}, VI={vi.policy[state]}")

print(f"Total policy differences: {differences}")

##SARSA Algorithm
SARSA (State-Action-Reward-State-Action) adalah on-policy TD learning algorithm yang update Q-values berdasarkan action yang benar-benar akan diambil oleh current policy, bukan action terbaik seperti pada Q-Learning.

SARSA update rule:
Q(s,a) ← Q(s,a) + α[r + γQ(s',a') - Q(s,a)]

Perbedaan utama dengan Q-Learning adalah SARSA menggunakan Q(s',a') dimana a' adalah action yang akan diambil, sedangkan Q-Learning menggunakan max_a' Q(s',a').

In [None]:
# Implementasi SARSA Algorithm
class SARSA:
    def __init__(self, num_states, num_actions, learning_rate=0.1, discount_factor=0.9,
                 epsilon=0.1, epsilon_decay=0.995, min_epsilon=0.01):
        self.num_actions = num_actions
        self.lr = learning_rate
        self.gamma = discount_factor
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.min_epsilon = min_epsilon

        # Initialize Q-table
        self.Q = defaultdict(lambda: np.zeros(num_actions))

        # Tracking
        self.episode_rewards = []
        self.episode_lengths = []
        self.td_errors = []

    def get_action(self, state, training=True):
        """Epsilon-greedy action selection"""
        if training and np.random.random() < self.epsilon:
            return np.random.randint(self.num_actions)
        else:
            return np.argmax(self.Q[state])

    def update(self, state, action, reward, next_state, next_action, done):
        """Update Q-value menggunakan SARSA rule"""
        current_q = self.Q[state][action]

        if done:
            target_q = reward
        else:
            target_q = reward + self.gamma * self.Q[next_state][next_action]

        td_error = target_q - current_q
        self.td_errors.append(abs(td_error))

        self.Q[state][action] += self.lr * td_error

        return td_error

    def decay_epsilon(self):
        """Decay exploration rate"""
        self.epsilon = max(self.min_epsilon, self.epsilon * self.epsilon_decay)

    def train(self, env, num_episodes=1000, max_steps_per_episode=200):
        """Train SARSA agent"""
        print(f"Training SARSA for {num_episodes} episodes...")

        for episode in range(num_episodes):
            state = env.reset()
            action = self.get_action(state, training=True)  # Initial action
            total_reward = 0
            steps = 0

            for step in range(max_steps_per_episode):
                # Take action
                next_state, reward, done, _ = env.step(action)

                if done:
                    # Update with terminal state
                    td_error = self.update(state, action, reward, next_state, None, done)
                    total_reward += reward
                    steps += 1
                    break

                # Get next action (important: this is what makes it SARSA)
                next_action = self.get_action(next_state, training=True)

                # Update Q-value
                td_error = self.update(state, action, reward, next_state, next_action, done)

                # Move to next state-action pair
                state = next_state
                action = next_action
                total_reward += reward
                steps += 1

            self.episode_rewards.append(total_reward)
            self.episode_lengths.append(steps)
            self.decay_epsilon()

            if (episode + 1) % 100 == 0:
                avg_reward = np.mean(self.episode_rewards[-100:])
                avg_length = np.mean(self.episode_lengths[-100:])
                print(f"Episode {episode+1}: Avg Reward = {avg_reward:.2f}, "
                      f"Avg Length = {avg_length:.1f}, Epsilon = {self.epsilon:.3f}")

        print("SARSA Training completed!")

    def compare_with_qlearning(self, q_learning_agent, env, num_test_episodes=100):
        """Compare SARSA performance dengan Q-Learning"""
        print("\nComparing SARSA vs Q-Learning:")

        # Test SARSA
        sarsa_rewards = []
        for episode in range(num_test_episodes):
            state = env.reset()
            total_reward = 0
            steps = 0

            while steps < 200:
                action = self.get_action(state, training=False)
                next_state, reward, done, _ = env.step(action)
                state = next_state
                total_reward += reward
                steps += 1

                if done:
                    break

            sarsa_rewards.append(total_reward)

        # Test Q-Learning
        qlearning_rewards = []
        for episode in range(num_test_episodes):
            state = env.reset()
            total_reward = 0
            steps = 0

            while steps < 200:
                action = q_learning_agent.get_action(state, training=False)
                next_state, reward, done, _ = env.step(action)
                state = next_state
                total_reward += reward
                steps += 1

                if done:
                    break

            qlearning_rewards.append(total_reward)

        # Statistical comparison
        sarsa_mean = np.mean(sarsa_rewards)
        sarsa_std = np.std(sarsa_rewards)
        qlearning_mean = np.mean(qlearning_rewards)
        qlearning_std = np.std(qlearning_rewards)

        print(f"SARSA: {sarsa_mean:.2f} ± {sarsa_std:.2f}")
        print(f"Q-Learning: {qlearning_mean:.2f} ± {qlearning_std:.2f}")

        # Plot comparison
        plt.figure(figsize=(12, 5))

        plt.subplot(1, 2, 1)
        plt.hist(sarsa_rewards, alpha=0.7, label='SARSA', bins=20)
        plt.hist(qlearning_rewards, alpha=0.7, label='Q-Learning', bins=20)
        plt.xlabel('Episode Reward')
        plt.ylabel('Frequency')
        plt.title('Reward Distribution Comparison')
        plt.legend()

        plt.subplot(1, 2, 2)
        episodes = range(len(self.episode_rewards))
        plt.plot(episodes, self.episode_rewards, alpha=0.3, label='SARSA')
        plt.plot(range(len(q_learning_agent.episode_rewards)),
                q_learning_agent.episode_rewards, alpha=0.3, label='Q-Learning')

        # Moving averages
        window = 50
        if len(self.episode_rewards) > window:
            sarsa_ma = np.convolve(self.episode_rewards, np.ones(window)/window, mode='valid')
            plt.plot(range(window-1, len(self.episode_rewards)), sarsa_ma,
                    label='SARSA (MA)', linewidth=2)

        if len(q_learning_agent.episode_rewards) > window:
            q_ma = np.convolve(q_learning_agent.episode_rewards, np.ones(window)/window, mode='valid')
            plt.plot(range(window-1, len(q_learning_agent.episode_rewards)), q_ma,
                    label='Q-Learning (MA)', linewidth=2)

        plt.xlabel('Episode')
        plt.ylabel('Reward')
        plt.title('Training Progress Comparison')
        plt.legend()

        plt.tight_layout()
        plt.show()

# Train SARSA agent
sarsa_agent = SARSA(num_states=25, num_actions=4,
                   learning_rate=0.1, discount_factor=0.9,
                   epsilon=0.9, epsilon_decay=0.995)

sarsa_agent.train(env, num_episodes=500, max_steps_per_episode=100)

# Compare SARSA dengan Q-Learning
sarsa_agent.compare_with_qlearning(q_agent, env, num_test_episodes=100)

##Deep Q-Network (DQN)
Deep Q-Network (DQN) menggunakan neural network untuk aproximasi Q-function, memungkinkan RL untuk menangani high-dimensional state spaces. DQN memperkenalkan beberapa teknik penting:

- Experience Replay: Menyimpan experiences dalam replay buffer dan sampling secara random untuk training
- Target Network: Menggunakan separate network untuk target Q-values yang di-update secara periodik
- Double DQN: Mengurangi overestimation bias dengan memisahkan action selection dan evaluation

In [None]:
import tensorflow as tf
from collections import deque

# Implementasi Deep Q-Network
class DQN:
    def __init__(self, state_size, action_size, learning_rate=0.001,
                 gamma=0.99, epsilon=1.0, epsilon_decay=0.995, min_epsilon=0.01,
                 memory_size=10000, batch_size=32, target_update_freq=100):

        self.state_size = state_size
        self.action_size = action_size
        self.lr = learning_rate
        self.gamma = gamma
        self.epsilon = epsilon
        self.epsilon_decay = epsilon_decay
        self.min_epsilon = min_epsilon
        self.batch_size = batch_size
        self.target_update_freq = target_update_freq

        # Experience replay memory
        self.memory = deque(maxlen=memory_size)

        # Neural networks
        self.q_network = self._build_network()
        self.target_network = self._build_network()

        # Initialize target network
        self.update_target_network()

        # Training statistics
        self.losses = []
        self.episode_rewards = []
        self.training_step = 0

    def _build_network(self):
        """Build neural network untuk Q-function approximation"""
        model = tf.keras.Sequential([
            tf.keras.layers.Dense(128, activation='relu', input_shape=(self.state_size,)),
            tf.keras.layers.Dense(128, activation='relu'),
            tf.keras.layers.Dense(64, activation='relu'),
            tf.keras.layers.Dense(self.action_size, activation='linear')
        ])

        model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=self.lr),
                     loss='mse')

        return model

    def remember(self, state, action, reward, next_state, done):
        """Store experience dalam replay memory"""
        self.memory.append((state, action, reward, next_state, done))

    def get_action(self, state, training=True):
        """Epsilon-greedy action selection"""
        if training and np.random.random() < self.epsilon:
            return np.random.randint(self.action_size)

        q_values = self.q_network.predict(state.reshape(1, -1), verbose=0)
        return np.argmax(q_values[0])

    def replay(self):
        """Train network dengan batch dari experience replay"""
        if len(self.memory) < self.batch_size:
            return

        # Sample random batch
        batch = random.sample(list(self.memory), self.batch_size)
        states = np.array([e[0] for e in batch])
        actions = np.array([e[1] for e in batch])
        rewards = np.array([e[2] for e in batch])
        next_states = np.array([e[3] for e in batch])
        dones = np.array([e[4] for e in batch])

        # Current Q-values
        current_q_values = self.q_network.predict(states, verbose=0)

        # Target Q-values dari target network
        target_q_values = self.target_network.predict(next_states, verbose=0)

        # Update Q-values untuk actions yang diambil
        for i in range(self.batch_size):
            if dones[i]:
                current_q_values[i][actions[i]] = rewards[i]
            else:
                current_q_values[i][actions[i]] = rewards[i] + self.gamma * np.max(target_q_values[i])

        # Train network
        history = self.q_network.fit(states, current_q_values,
                                   epochs=1, verbose=0, batch_size=self.batch_size)

        self.losses.append(history.history['loss'][0])
        self.training_step += 1

        # Update target network
        if self.training_step % self.target_update_freq == 0:
            self.update_target_network()

        # Decay epsilon
        if self.epsilon > self.min_epsilon:
            self.epsilon *= self.epsilon_decay

    def update_target_network(self):
        """Copy weights dari main network ke target network"""
        self.target_network.set_weights(self.q_network.get_weights())

    def state_to_vector(self, state, env):
        """Convert grid position ke feature vector"""
        # One-hot encoding untuk position
        vector = np.zeros(env.width * env.height)
        x, y = state
        index = y * env.width + x
        vector[index] = 1.0

        # Add distance features
        goal_x, goal_y = env.goal_pos
        distance_features = [
            abs(x - goal_x) / env.width,   # Normalized x distance
            abs(y - goal_y) / env.height,  # Normalized y distance
            (abs(x - goal_x) + abs(y - goal_y)) / (env.width + env.height)  # Manhattan distance
        ]

        return np.concatenate([vector, distance_features])

    def train(self, env, num_episodes=1000, max_steps_per_episode=200):
        """Train DQN agent"""
        print(f"Training DQN for {num_episodes} episodes...")

        state_size = env.width * env.height + 3  # Position + distance features

        for episode in range(num_episodes):
            state = env.reset()
            state_vector = self.state_to_vector(state, env)
            total_reward = 0
            steps = 0

            for step in range(max_steps_per_episode):
                # Get action
                action = self.get_action(state_vector, training=True)

                # Take action
                next_state, reward, done, _ = env.step(action)
                next_state_vector = self.state_to_vector(next_state, env)

                # Store experience
                self.remember(state_vector, action, reward, next_state_vector, done)

                # Train network
                self.replay()

                state_vector = next_state_vector
                total_reward += reward
                steps += 1

                if done:
                    break

            self.episode_rewards.append(total_reward)

            # Print progress
            if (episode + 1) % 100 == 0:
                avg_reward = np.mean(self.episode_rewards[-100:])
                avg_loss = np.mean(self.losses[-100:]) if self.losses else 0
                print(f"Episode {episode+1}: Avg Reward = {avg_reward:.2f}, "
                      f"Avg Loss = {avg_loss:.4f}, Epsilon = {self.epsilon:.3f}")

        print("DQN Training completed!")

    def test(self, env, num_episodes=10, render=False):
        """Test trained DQN agent"""
        test_rewards = []

        for episode in range(num_episodes):
            state = env.reset()
            state_vector = self.state_to_vector(state, env)
            total_reward = 0
            steps = 0

            if render:
                print(f"\nDQN Test Episode {episode + 1}:")
                env.render()

            while steps < 200:
                action = self.get_action(state_vector, training=False)
                next_state, reward, done, _ = env.step(action)
                next_state_vector = self.state_to_vector(next_state, env)

                state_vector = next_state_vector
                total_reward += reward
                steps += 1

                if render:
                    print(f"Action: {['Up', 'Down', 'Left', 'Right'][action]}")
                    env.render()

                if done:
                    break

            test_rewards.append(total_reward)

            if render:
                print(f"Episode {episode + 1}: Reward = {total_reward}, Steps = {steps}")

        avg_reward = np.mean(test_rewards)
        print(f"\nDQN Test Results: Average Reward = {avg_reward:.2f} ± {np.std(test_rewards):.2f}")

        return test_rewards

    def plot_training_progress(self):
        """Plot DQN training metrics"""
        fig, axes = plt.subplots(2, 2, figsize=(15, 10))

        # Episode rewards
        axes[0, 0].plot(self.episode_rewards, alpha=0.3, label='Episode Reward')

        window = min(100, len(self.episode_rewards) // 10)
        if window > 1:
            moving_avg = np.convolve(self.episode_rewards, np.ones(window)/window, mode='valid')
            axes[0, 0].plot(range(window-1, len(self.episode_rewards)), moving_avg,
                           label=f'Moving Average ({window})', linewidth=2)

        axes[0, 0].set_xlabel('Episode')
        axes[0, 0].set_ylabel('Reward')
        axes[0, 0].set_title('DQN Episode Rewards')
        axes[0, 0].legend()
        axes[0, 0].grid(True, alpha=0.3)

        # Training losses
        if self.losses:
            axes[0, 1].plot(self.losses, alpha=0.3, label='Loss')

            if len(self.losses) > 50:
                loss_ma = np.convolve(self.losses, np.ones(50)/50, mode='valid')
                axes[0, 1].plot(range(49, len(self.losses)), loss_ma,
                               label='Moving Average (50)', linewidth=2)

            axes[0, 1].set_xlabel('Training Step')
            axes[0, 1].set_ylabel('Loss')
            axes[0, 1].set_title('DQN Training Loss')
            axes[0, 1].legend()
            axes[0, 1].grid(True, alpha=0.3)

        # Epsilon decay
        epsilon_history = []
        eps = 1.0
        for i in range(len(self.episode_rewards)):
            epsilon_history.append(eps)
            if eps > self.min_epsilon:
                eps *= self.epsilon_decay

        axes[1, 0].plot(epsilon_history)
        axes[1, 0].set_xlabel('Episode')
        axes[1, 0].set_ylabel('Epsilon')
        axes[1, 0].set_title('Exploration Rate Decay')
        axes[1, 0].grid(True, alpha=0.3)

        # Q-value visualization untuk center state
        center_state = (2, 2)  # Middle of 5x5 grid
        if hasattr(env, 'width') and hasattr(env, 'height'):
            center_vector = self.state_to_vector(center_state, env)
            q_values = self.q_network.predict(center_vector.reshape(1, -1), verbose=0)[0]

            actions = ['Up', 'Down', 'Left', 'Right']
            axes[1, 1].bar(actions, q_values)
            axes[1, 1].set_ylabel('Q-Value')
            axes[1, 1].set_title(f'Q-Values for State {center_state}')
            axes[1, 1].grid(True, alpha=0.3)

        plt.tight_layout()
        plt.show()

# Train DQN agent
print("="*60)
print("DEEP Q-NETWORK DEMONSTRATION")
print("="*60)

env = GridWorld(width=5, height=5)
state_size = env.width * env.height + 3  # Position encoding + distance features

dqn_agent = DQN(state_size=state_size, action_size=4,
                learning_rate=0.001, gamma=0.99,
                epsilon=1.0, epsilon_decay=0.995,
                memory_size=10000, batch_size=32)

# Train DQN
dqn_agent.train(env, num_episodes=800, max_steps_per_episode=100)

# Test DQN
print("\nTesting DQN agent:")
dqn_rewards = dqn_agent.test(env, num_episodes=5, render=True)

# Plot results
dqn_agent.plot_training_progress()