Rahmanda Afebrio Yuris Soesatyo - Chapter 18:Reinforcement Learning

1. Reinforcement Learning Basics, Policy Search, and OpenAI Gym

Reinforcement Learning (RL) memandang pengambilan keputusan sebagai proses interaksi berulang antara agen dan lingkungan. Di setiap langkah waktu, agen mengamati state, memilih sebuah action, lalu menerima reward sekaligus berpindah ke state berikutnya. Target akhirnya bukan sekadar reward sesaat, tapi memaksimalkan akumulasi reward jangka panjang yang sudah didiskon, sehingga agen belajar membuat keputusan yang bagus secara berkelanjutan.

Kerangka ini kepake di banyak banget skenario dunia nyata, mulai dari kontrol robot, game Atari kayak Ms. Pac-Man, game strategi kompleks seperti Go, sampai sistem otomatisasi suhu dan trading algoritmik. Yang beda-beda di tiap kasus biasanya definisi state, ruang aksi, aturan transisi environment, dan bentuk reward-nya. Tapi pola dasarnya selalu sama: agen coba-coba, dapat feedback, lalu memperbaiki perilakunya.

Perilaku agen sendiri diatur oleh policy, yaitu aturan yang memetakan state ke action. Policy bisa deterministik (state tertentu → aksi tertentu) atau stokastik (menghasilkan distribusi probabilitas aksi). Di pendekatan modern, policy hampir selalu dimodelkan sebagai neural network. Proses mencari policy terbaik ini disebut policy search, yang spektrumnya luas: dari cara simpel seperti brute-force, metode evolusioner (genetic algorithm, NEAT), sampai pendekatan berbasis gradien yang langsung mengoptimalkan parameter policy agar reward makin besar.

Buat praktik dan eksperimen, OpenAI Gym jadi standar de facto karena nyediain banyak environment siap pakai dengan antarmuka yang konsisten. Contoh klasiknya CartPole-v1, simulasi kereta dengan tongkat yang harus dijaga tetap seimbang. Environment ini punya metode reset() dan step(action), plus informasi observation_space dan action_space yang menjelaskan bentuk state dan aksi, jadi enak banget buat mulai belajar dan ngetes algoritma RL.

In [None]:
import gym
import numpy as np

env = gym.make("CartPole-v1")
obs = env.reset()

def basic_policy(obs):
    angle = obs[2]
    return 0 if angle < 0 else 1  # 0: left, 1: right

totals = []
for episode in range(500):
    episode_rewards = 0
    obs = env.reset()
    for step in range(200):
        action = basic_policy(obs)
        obs, reward, done, info = env.step(action)
        episode_rewards += reward
        if done:
            break
    totals.append(episode_rewards)

print(np.mean(totals), np.std(totals), np.min(totals), np.max(totals))
env.close()

2. Policy Gradients and CartPole with TensorFlow/Keras

Metode Policy Gradient (PG) mengoptimasi policy secara langsung dengan memaksimalkan ekspektasi return, tanpa membangun model eksplisit dari environment atau fungsi nilai. Ide dasarnya adalah memperkuat aksi yang menghasilkan return tinggi dan melemahkan aksi yang menghasilkan return rendah. Proses training umumnya melibatkan pengumpulan beberapa episode, perhitungan return terdiskonto untuk setiap aksi, dan penggunaan nilai tersebut sebagai sinyal pembelajaran.

Algoritma klasik REINFORCE mengimplementasikan pendekatan ini dengan mendefinisikan loss sebagai negatif dari log-probabilitas aksi yang diambil, dikalikan dengan return yang diperoleh. Gradien dari loss ini menjadi estimasi gradien reward terhadap parameter policy, sehingga update parameter mendorong peningkatan probabilitas aksi yang mengarah pada return tinggi.

Pada environment CartPole, policy dapat dimodelkan sebagai neural network sederhana yang menerima vektor observasi berdimensi empat dan menghasilkan probabilitas untuk salah satu aksi (misalnya mendorong ke kiri), biasanya melalui fungsi aktivasi sigmoid. Aksi kemudian di-sample secara stokastik dari distribusi ini, yang secara alami mendorong eksplorasi.

Return dihitung sebagai jumlah reward terdiskonto dengan faktor diskonto ( \gamma ) (umumnya sekitar 0.95), lalu dinormalisasi di seluruh aksi dan episode untuk mengurangi varians estimasi gradien. Normalisasi ini menghasilkan advantage yang lebih stabil, sehingga proses pembelajaran menjadi lebih konsisten dan konvergen.

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
import gym

env = gym.make("CartPole-v1")
n_inputs = env.observation_space.shape[0]

model = keras.models.Sequential([
    keras.layers.Dense(5, activation="elu", input_shape=[n_inputs]),
    keras.layers.Dense(1, activation="sigmoid"),
])

optimizer = keras.optimizers.Adam(learning_rate=0.01)
loss_fn = keras.losses.binary_crossentropy

def play_one_step(env, obs, model, loss_fn):
    with tf.GradientTape() as tape:
        left_proba = model(obs[np.newaxis])
        action = tf.cast(tf.random.uniform([1, 1]) > left_proba, tf.int32)
        y_target = 1.0 - tf.cast(action, tf.float32)
        loss = tf.reduce_mean(loss_fn(y_target, left_proba))
    grads = tape.gradient(loss, model.trainable_variables)
    obs, reward, done, info = env.step(int(action[0, 0]))
    return obs, reward, done, grads

def play_multiple_episodes(env, n_episodes, n_max_steps, model, loss_fn):
    all_rewards, all_grads = [], []
    for episode in range(n_episodes):
        current_rewards, current_grads = [], []
        obs = env.reset()
        for step in range(n_max_steps):
            obs, reward, done, grads = play_one_step(env, obs, model, loss_fn)
            current_rewards.append(reward)
            current_grads.append(grads)
            if done:
                break
        all_rewards.append(current_rewards)
        all_grads.append(current_grads)
    return all_rewards, all_grads

def discount_rewards(rewards, discount_factor):
    discounted = np.array(rewards, dtype=np.float32)
    for step in range(len(rewards) - 2, -1, -1):
        discounted[step] += discount_factor * discounted[step + 1]
    return discounted

def discount_and_normalize_rewards(all_rewards, discount_factor):
    all_discounted = [discount_rewards(r, discount_factor) for r in all_rewards]
    flat = np.concatenate(all_discounted)
    mean, std = flat.mean(), flat.std()
    return [(d - mean) / (std + 1e-8) for d in all_discounted]

n_iterations = 150
n_episodes_per_update = 10
n_max_steps = 200
discount_factor = 0.95

for iteration in range(n_iterations):
    all_rewards, all_grads = play_multiple_episodes(
        env, n_episodes_per_update, n_max_steps, model, loss_fn
    )
    all_final_rewards = discount_and_normalize_rewards(
        all_rewards, discount_factor
    )
    all_mean_grads = []
    for var_index in range(len(model.trainable_variables)):
        mean_grads = tf.reduce_mean([
            final_reward * all_grads[episode_idx][step][var_index]
            for episode_idx, final_rewards in enumerate(all_final_rewards)
            for step, final_reward in enumerate(final_rewards)
        ], axis=0)
        all_mean_grads.append(mean_grads)
    optimizer.apply_gradients(zip(all_mean_grads, model.trainable_variables))

env.close()

3. Markov Decision Processes, Temporal-Difference Learning, Q-Learning, and Deep Q-Networks

Sebagian besar masalah dalam Reinforcement Learning bisa diformalkan sebagai Markov Decision Process (MDP). Di sini, lingkungan didefinisikan lewat sekumpulan state, sekumpulan aksi, probabilitas transisi antar-state, fungsi reward, dan faktor diskonto. Asumsi Markov intinya bilang kalau state berikutnya dan reward yang diterima cuma bergantung pada state dan aksi saat ini, bukan seluruh riwayat sebelumnya. Asumsi ini yang bikin banyak algoritma RL jadi masuk akal dan bisa dianalisis secara matematis.

Dalam kerangka MDP, konsep kunci yang sering muncul adalah Bellman Optimality Equation, yang mendeskripsikan nilai optimal sebuah state maupun pasangan state–aksi. Kalau fungsi transisi dan reward diketahui secara lengkap, nilai optimal ini bisa dihitung secara iteratif menggunakan algoritma seperti Value Iteration atau Q-Value Iteration. Algoritma-algoritma ini bekerja dengan cara menerapkan persamaan Bellman berulang kali sampai nilainya konvergen ke solusi optimal.

Masalahnya, di dunia nyata agen hampir nggak pernah tahu fungsi transisi dan reward secara eksplisit. Karena itu, pembelajaran biasanya dilakukan secara model-free, murni dari pengalaman interaksi dengan environment. Salah satu pendekatan penting di sini adalah Temporal-Difference (TD) Learning, yang menggabungkan ide Monte Carlo dan dynamic programming. Intinya, nilai diperbarui berdasarkan selisih antara prediksi saat ini dan target bootstrap yang berasal dari estimasi nilai di langkah berikutnya.

Algoritma TD yang paling terkenal adalah Q-Learning. Di metode ini, agen memperbarui estimasi nilai state–aksi (Q-value) menggunakan aturan pembaruan berbasis TD error dan learning rate. Q-Learning bersifat off-policy, artinya proses belajarnya tetap mengarah ke policy optimal meskipun agen saat itu sedang menjalankan policy eksplorasi yang berbeda, misalnya ε-greedy.

Ketika ruang state menjadi sangat besar atau bahkan kontinu, menyimpan Q-value dalam bentuk tabel sudah nggak realistis. Solusinya adalah Approximate Q-Learning, di mana fungsi Q didekati oleh fungsi parametrik. Kalau fungsi aproksimasi ini berupa neural network dalam, pendekatannya dikenal sebagai Deep Q-Network (DQN). DQN dilatih dengan meminimalkan error antara prediksi Q dan target TD, biasanya menggunakan Mean Squared Error. Pendekatan ini memungkinkan agen menangani environment berdimensi tinggi, seperti input visual mentah pada game Atari, dan jadi salah satu tonggak penting dalam perkembangan Reinforcement Learning modern.

In [None]:
import numpy as np

# Tabular example on tiny MDP (3 states, handcrafted transitions)
transition_probabilities = [
    [[0.7, 0.3, 0.0], [1.0, 0.0, 0.0], [0.8, 0.2, 0.0]],
    [[0.0, 1.0, 0.0], None,           [0.0, 0.0, 1.0]],
    [None,           [0.8, 0.1, 0.1], None          ],
]
rewards = [
    [[+10, 0, 0], [0, 0, 0], [0, 0, 0]],
    [[0, 0, 0],   [0, 0, 0], [0, 0, -50]],
    [[0, 0, 0],   [+40, 0, 0], [0, 0, 0]],
]
possible_actions = [[0, 1, 2], [0, 2], [1]]

Q_values = np.full((3, 3), -np.inf)
for s, actions in enumerate(possible_actions):
    Q_values[s, actions] = 0.0

def step(state, action):
    probas = transition_probabilities[state][action]
    next_state = np.random.choice([0, 1, 2], p=probas)
    reward = rewards[state][action][next_state]
    return next_state, reward

def exploration_policy(state):
    return np.random.choice(possible_actions[state])

alpha0, decay, gamma = 0.05, 0.005, 0.90
state = 0
for iteration in range(10000):
    action = exploration_policy(state)
    next_state, reward = step(state, action)
    next_value = np.max(Q_values[next_state])
    alpha = alpha0 / (1 + iteration * decay)
    Q_values[state, action] *= (1 - alpha)
    Q_values[state, action] += alpha * (reward + gamma * next_value)
    state = next_state

optimal_actions = np.argmax(Q_values, axis=1)
print("Optimal actions per state:", optimal_actions)



In [None]:

# DQN for CartPole
import gym
import numpy as np
import tensorflow as tf
from tensorflow import keras
from collections import deque

env = gym.make("CartPole-v0")
input_shape = [env.observation_space.shape[0]]
n_outputs = env.action_space.n

model = keras.models.Sequential([
    keras.layers.Dense(32, activation="elu", input_shape=input_shape),
    keras.layers.Dense(32, activation="elu"),
    keras.layers.Dense(n_outputs),
])

def epsilon_greedy_policy(state, epsilon=0):
    if np.random.rand() < epsilon:
        return np.random.randint(n_outputs)
    Q_values = model.predict(state[np.newaxis], verbose=0)
    return int(np.argmax(Q_values[0]))

replay_buffer = deque(maxlen=2000)

def sample_experiences(batch_size):
    indices = np.random.randint(len(replay_buffer), size=batch_size)
    batch = [replay_buffer[i] for i in indices]
    states, actions, rewards, next_states, dones = [
        np.array([exp[field] for exp in batch]) for field in range(5)
    ]
    return states, actions, rewards, next_states, dones

def play_one_step(env, state, epsilon):
    action = epsilon_greedy_policy(state, epsilon)
    next_state, reward, done, info = env.step(action)
    replay_buffer.append((state, action, reward, next_state, done))
    return next_state, reward, done, info

batch_size = 32
discount_factor = 0.95
optimizer = keras.optimizers.Adam(learning_rate=1e-3)
loss_fn = keras.losses.mean_squared_error

def training_step(batch_size):
    states, actions, rewards, next_states, dones = sample_experiences(batch_size)
    next_Q_values = model.predict(next_states, verbose=0)
    max_next_Q_values = np.max(next_Q_values, axis=1)
    target_Q_values = rewards + (1 - dones) * discount_factor * max_next_Q_values
    mask = tf.one_hot(actions, n_outputs)
    with tf.GradientTape() as tape:
        all_Q = model(states)
        Q_values = tf.reduce_sum(all_Q * mask, axis=1, keepdims=True)
        loss = tf.reduce_mean(loss_fn(target_Q_values, Q_values))
    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(grads, model.trainable_variables))

for episode in range(600):
    obs = env.reset()
    for step in range(200):
        epsilon = max(1 - episode / 500, 0.01)
        obs, reward, done, info = play_one_step(env, obs, epsilon)
        if done:
            break
    if episode > 50:
        training_step(batch_size)

env.close()


4. Deep Q-Learning Variants and Improvements

Walaupun Deep Q-Network (DQN) berhasil memperluas Q-Learning ke ruang state berdimensi tinggi, versi dasarnya terkenal cukup “rewel” saat dilatih. Training sering tidak stabil dan rentan terhadap catastrophic forgetting. Akar masalahnya ada pada fakta bahwa jaringan saraf dipakai sekaligus sebagai learner dan sebagai pembuat target. Parameter model terus berubah, policy ikut berubah, dan distribusi data dari replay buffer pun ikut bergeser, sehingga target pembelajaran jadi tidak stasioner dan mudah memicu osilasi.

Salah satu perbaikan paling awal dan paling penting adalah Fixed Q-Value Targets. Ide sederhananya adalah memisahkan jaringan menjadi dua: online network dan target network. Online network dipakai untuk update parameter, sedangkan target network hanya bertugas menghasilkan target Q-value. Bobot target network tidak ikut berubah setiap step, melainkan disalin secara periodik dari online network. Dengan target yang relatif “diam” ini, proses training jadi jauh lebih stabil dan tidak terlalu liar.

Masalah lain pada DQN klasik adalah over-estimation bias, yang muncul karena operator maksimum cenderung melebih-lebihkan nilai Q. Untuk mengatasi ini, diperkenalkan Double DQN. Triknya adalah memisahkan proses memilih aksi dan mengevaluasi nilainya. Online network digunakan untuk menentukan aksi terbaik pada state berikutnya, sementara target network dipakai untuk menghitung nilai Q dari aksi tersebut. Pemisahan ini kelihatannya sepele, tapi efeknya besar dalam menurunkan bias estimasi dan meningkatkan performa secara konsisten.

Selain arsitektur dan target, cara mengambil data dari replay buffer juga berpengaruh besar. Prioritized Experience Replay (PER) mengubah strategi sampling dengan memberi prioritas lebih tinggi pada transisi yang punya TD error besar. Intinya, pengalaman yang “mengejutkan” dianggap lebih informatif dan lebih layak dipelajari ulang. Supaya pembelajaran tetap tidak bias, setiap sampel diberi importance-sampling weight yang mengoreksi kontribusinya terhadap loss. Hasilnya, agen belajar lebih cepat dan lebih efisien dari pengalaman penting.

Varian lain yang cukup elegan adalah Dueling DQN, yang mengubah struktur jaringan itu sendiri. Alih-alih langsung memprediksi Q-value untuk setiap aksi, jaringan dipisah menjadi dua alur: satu untuk mempelajari state value dan satu lagi untuk advantage tiap aksi. Keduanya kemudian digabung untuk menghasilkan Q-value akhir. Pendekatan ini sangat berguna pada state di mana pilihan aksi tidak terlalu berpengaruh, karena jaringan bisa fokus belajar seberapa “bagus” state tersebut tanpa harus bergantung penuh pada perbedaan aksi.

Gabungan dari teknik-teknik ini—fixed targets, Double DQN, PER, dan dueling architecture—membuat DQN jauh lebih stabil, akurat, dan praktis digunakan. Bahkan, banyak implementasi modern langsung mengombinasikan semuanya, karena secara kolektif mereka mengatasi hampir semua kelemahan utama DQN versi awal.

In [None]:
# Double DQN training_step (modifikasi target)
target = keras.models.clone_model(model)
target.set_weights(model.get_weights())

def training_step_double(batch_size):
    states, actions, rewards, next_states, dones = sample_experiences(batch_size)
    next_Q_online = model.predict(next_states, verbose=0)
    best_next_actions = np.argmax(next_Q_online, axis=1)
    next_Q_target = target.predict(next_states, verbose=0)
    next_mask = tf.one_hot(best_next_actions, n_outputs).numpy()
    next_best_Q = np.sum(next_Q_target * next_mask, axis=1)
    target_Q_values = rewards + (1 - dones) * discount_factor * next_best_Q
    mask = tf.one_hot(actions, n_outputs)
    with tf.GradientTape() as tape:
        all_Q = model(states)
        Q_values = tf.reduce_sum(all_Q * mask, axis=1, keepdims=True)
        loss = tf.reduce_mean(loss_fn(target_Q_values, Q_values))
    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(grads, model.trainable_variables))

# Periodically sync target network
if episode % 50 == 0:
    target.set_weights(model.get_weights())

# Dueling DQN architecture for CartPole-like inputs
from tensorflow import keras
import tensorflow.keras.backend as K

n_outputs = 2
input_states = keras.layers.Input(shape=[4])
hidden1 = keras.layers.Dense(32, activation="elu")(input_states)
hidden2 = keras.layers.Dense(32, activation="elu")(hidden1)
state_values = keras.layers.Dense(1)(hidden2)
raw_advantages = keras.layers.Dense(n_outputs)(hidden2)
advantages = raw_advantages - K.max(raw_advantages, axis=1, keepdims=True)
Q_values = state_values + advantages
dueling_model = keras.Model(inputs=[input_states], outputs=[Q_values])

5. TF-Agents, Atari Breakout, and Modern RL Algorithms

Di bagian penutup bab, diperkenalkan TF-Agents, yaitu library Reinforcement Learning berbasis TensorFlow yang dirancang supaya eksperimen RL jadi lebih rapi dan terstruktur. TF-Agents menyediakan banyak komponen siap pakai, mulai dari environment wrapper, replay buffer yang efisien, sampai implementasi berbagai algoritma RL modern seperti DQN, Double DQN, REINFORCE, PPO, dan Soft Actor-Critic. Selain itu, ada juga driver utilities yang membantu mengotomatisasi proses interaksi agen dengan environment untuk mengumpulkan experience.

Sebagai studi kasus utama, bab ini membahas pelatihan agen DQN pada game Atari Breakout-v4 dari OpenAI Gym. Proses training mengikuti praktik standar yang diperkenalkan oleh DeepMind, termasuk preprocessing frame dengan mengubah gambar menjadi grayscale, menurunkan resolusi, dan menggabungkan beberapa frame sekaligus agar agen bisa menangkap dinamika gerak. Training juga memanfaatkan replay buffer berukuran besar, target network terpisah, serta pembaruan parameter secara berkala untuk menjaga stabilitas pembelajaran.

Dalam TF-Agents, interaksi dengan environment direpresentasikan menggunakan objek TimeStep, yang berisi informasi seperti tipe langkah (awal, tengah, atau akhir episode), reward, discount, dan observasi. Library ini juga mendefinisikan specification formal seperti observation_spec, action_spec, dan time_step_spec, sehingga bentuk data yang masuk dan keluar bisa divalidasi dengan jelas dan meminimalkan error saat menghubungkan environment dengan algoritma.

Pengumpulan data dilakukan melalui driver seperti DynamicStepDriver atau DynamicEpisodeDriver. Driver ini menjalankan policy di environment, mengirimkan aksi, menyimpan trajectory ke replay buffer, sekaligus memperbarui metrik training secara otomatis. Dengan pola ini, logika interaksi dengan environment dipisahkan dari logika pembelajaran, membuat kode lebih modular, mudah dibaca, dan gampang dikembangkan.

Selain DQN, bab ini juga menyinggung beberapa algoritma Reinforcement Learning modern lainnya. Pendekatan Actor–Critic memisahkan peran policy dan estimasi nilai untuk menurunkan varians gradien. Soft Actor-Critic menambahkan konsep entropi ke dalam objektif sehingga policy menjadi lebih stabil dan eksploratif. Proximal Policy Optimization membatasi perubahan policy lewat mekanisme clipped loss agar training tidak terlalu agresif. Terakhir, dibahas juga curiosity-driven exploration, yang menambahkan reward intrinsik berbasis ketidakpastian atau error prediksi untuk mendorong agen mengeksplorasi environment dengan reward yang jarang muncul. Semua pendekatan ini mencerminkan arah utama RL modern, yang fokus pada stabilitas training, efisiensi data, dan kemampuan diskalakan ke masalah yang semakin kompleks.

In [None]:
import tensorflow as tf
from tf_agents.environments import suite_gym
from tf_agents.environments.wrappers import ActionRepeat
from tf_agents.replay_buffers import tf_uniform_replay_buffer
from tf_agents.agents.dqn import dqn_agent
from tf_agents.networks import q_network
from tf_agents.drivers.dynamic_step_driver import DynamicStepDriver
from tf_agents.policies.random_tf_policy import RandomTFPolicy
from tf_agents.utils.common import function

# Load Breakout environment via TF-Agents (wrapper around Gym)
tf_env = suite_gym.load("Breakout-v4")

# Q-network (CNN) on preprocessed 84x84x4 observations (assume preprocessing wrapper applied)
preproc_env = tf_env  # placeholder if you add preprocessing wrappers
q_net = q_network.QNetwork(
    preproc_env.observation_spec(),
    preproc_env.action_spec(),
    fc_layer_params=(512,)
)

optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4)
train_step = tf.Variable(0)

agent = dqn_agent.DqnAgent(
    preproc_env.time_step_spec(),
    preproc_env.action_spec(),
    q_network=q_net,
    optimizer=optimizer,
    td_errors_loss_fn=tf.keras.losses.Huber(reduction="none"),
    train_step_counter=train_step,
)
agent.initialize()

replay_buffer = tf_uniform_replay_buffer.TFUniformReplayBuffer(
    data_spec=agent.collect_data_spec,
    batch_size=preproc_env.batch_size,
    max_length=1000000
)
replay_buffer_observer = replay_buffer.add_batch

update_period = 4
collect_driver = DynamicStepDriver(
    preproc_env,
    agent.collect_policy,
    observers=[replay_buffer_observer],
    num_steps=update_period
)

# Warm up replay buffer with random policy
initial_collect_policy = RandomTFPolicy(preproc_env.time_step_spec(),
                                        preproc_env.action_spec())
init_driver = DynamicStepDriver(
    preproc_env,
    initial_collect_policy,
    observers=[replay_buffer.add_batch],
    num_steps=20000
)
init_driver.run()

dataset = replay_buffer.as_dataset(
    sample_batch_size=64,
    num_steps=2,
    num_parallel_calls=3
).prefetch(3)
iterator = iter(dataset)

collect_driver.run = function(collect_driver.run)
agent.train = function(agent.train)

def train_agent(n_iterations):
    time_step = None
    policy_state = agent.collect_policy.get_initial_state(preproc_env.batch_size)
    for iteration in range(n_iterations):
        time_step, policy_state = collect_driver.run(time_step, policy_state)
        trajectories, buffer_info = next(iterator)
        train_loss = agent.train(trajectories)

# Example (would take long in practice):
# train_agent(1000000)
