In [1]:
import gymnasium as gym
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, Model

# Memory buffer for on-policy rollouts
class Memory:
    def __init__(self):
        self.obs = []
        self.actions = []
        self.logps = []
        self.rewards = []
        self.values = []
        self.dones = []

    def store(self, obs, action, logp, value, reward, done):
        self.obs.append(obs.copy())
        self.actions.append(action.copy())
        self.logps.append(logp)
        self.values.append(value)
        self.rewards.append(reward)
        self.dones.append(done)

    def clear(self):
        self.__init__()

In [2]:
class Actor(Model):
    def __init__(self, obs_dim, act_dim):
        super().__init__()
        
        self.d1 = layers.Dense(256, activation='relu')
        self.n1 = layers.LayerNormalization()
        self.d2 = layers.Dense(256, activation='relu')
        self.n2 = layers.LayerNormalization()
        self.d3 = layers.Dense(128, activation='tanh')
        self.mu = layers.Dense(act_dim)
        
        self.log_std = self.add_weight(
            name='log_std',
            shape=(act_dim,),
            initializer=tf.constant_initializer(-0.5),
            trainable=True
        )
        
    def call(self, input):
        x = self.d1(input)
        x = self.n1(x)
        x = self.d2(x)
        x = self.n2(x)
        x = self.d3(x)
        mu = self.mu(x)
        
        batch_size = tf.shape(input)[0]
        log_std = tf.broadcast_to(self.log_std[None, :], (batch_size, self.log_std.shape[0]))
        return mu, log_std

In [3]:
import tensorflow_probability as tfp

class PPOAgent:
    def __init__(self, obs_dim, act_dim, 
                 clip_ratio=0.2, gamma=0.99, lam=0.95,
                 pi_lr=1e-4, vf_lr=1e-4, train_pi_iters=80, train_v_iters=80, action_scale=0.9, smooth_coef=0.9):
        self.obs_dim = obs_dim
        self.act_dim = act_dim
        self.clip_ratio = clip_ratio
        self.gamma = gamma
        self.lam = lam
        self.train_pi_iters = train_pi_iters
        self.train_v_iters = train_v_iters
        self.action_scale = action_scale
        self.smooth_coef = smooth_coef
        self.prev_action = np.zeros(act_dim, dtype=np.float32)

        # Build actor and critic
        self.actor = Actor(self.obs_dim, self.act_dim)
        self.critic = self._build_critic()
        self.actor_optimizer = tf.keras.optimizers.Adam(learning_rate=pi_lr)
        self.critic_optimizer = tf.keras.optimizers.Adam(learning_rate=vf_lr)
        self.memory = Memory()

    def _build_critic(self):
        inp = layers.Input(shape=(self.obs_dim,))
        x = layers.Dense(256, activation='relu')(inp)
        x = layers.LayerNormalization()(x)
        x = layers.Dense(256, activation='relu')(x)
        x = layers.LayerNormalization()(x)
        x = layers.Dense(128, activation='tanh')(x)
        v = layers.Dense(1)(x)
        return Model(inputs=inp, outputs=v)

    def act(self, obs):
        tfd = tfp.distributions
        obs = obs.reshape(1, -1).astype(np.float32)
        mu, log_std = self.actor(obs)
        std = tf.exp(log_std)
        pi_dist = tfd.Normal(loc=mu, scale=std)
        self.pi_dist = pi_dist
        pi = pi_dist.sample()
        logp = tf.reduce_sum(pi_dist.log_prob(pi), axis=-1)
        value = self.critic(obs)
        
        raw_action = pi.numpy()[0]

        # 4) 액션 스케일링 (크기를 줄인다)
        scaled = raw_action * self.action_scale

        # 5) 지수 이동 평균 스무딩
        smoothed = (self.smooth_coef * self.prev_action
                    + (1 - self.smooth_coef) * scaled)
        self.prev_action = smoothed

        # 6) 환경 허용 범위로 클리핑
        action = np.clip(smoothed, -1, 1)

        return action, logp.numpy()[0], value.numpy()[0,0]

    def store(self, obs, action, logp, value, reward, done):
        self.memory.store(obs, action, logp, value, reward, done)

    def compute_gae(self, last_value):
        rewards = np.array(self.memory.rewards + [0], dtype=np.float32)
        values = np.array(self.memory.values + [last_value], dtype=np.float32)
        dones = np.array(self.memory.dones + [0], dtype=np.float32)
        T = len(self.memory.rewards)
        adv = np.zeros(T, dtype=np.float32)
        last_gae = 0
        for t in reversed(range(T)):
            delta = rewards[t] + self.gamma * values[t+1] * (1-dones[t]) - values[t]
            adv[t] = last_gae = delta + self.gamma * self.lam * (1-dones[t]) * last_gae
        returns = adv + values[:-1]
        return adv, returns

    def update(self):
        obs = np.array(self.memory.obs, dtype=np.float32)
        actions = np.array(self.memory.actions, dtype=np.float32)
        old_logps = np.array(self.memory.logps, dtype=np.float32)
        values = np.array(self.memory.values, dtype=np.float32)
        last_value = values[-1]
        adv, returns = self.compute_gae(last_value)
        adv = (adv - adv.mean()) / (adv.std() + 1e-8)

        # Policy update
        for _ in range(self.train_pi_iters):
            with tf.GradientTape() as tape:
                mu, log_std = self.actor(obs)
                std = tf.exp(log_std)
                new_logp = -0.5 * tf.reduce_sum(((actions - mu)/std)**2 + 2*log_std + np.log(2*np.pi), axis=1)
                ratio = tf.exp(new_logp - old_logps)
                clipped_ratio = tf.clip_by_value(ratio, 1 - self.clip_ratio, 1 + self.clip_ratio)
                pi_loss = -tf.reduce_mean(tf.minimum(ratio * adv, clipped_ratio * adv))
                entropy = tf.reduce_mean(self.pi_dist.entropy())
                pi_loss -= 0.01 * entropy

            grads = tape.gradient(pi_loss, self.actor.trainable_variables)
            self.actor_optimizer.apply_gradients(zip(grads, self.actor.trainable_variables))

        # Value update
        for _ in range(self.train_v_iters):
            with tf.GradientTape() as tape:
                v = self.critic(obs)[:,0]
                v_loss = tf.reduce_mean((returns - v)**2)
            grads = tape.gradient(v_loss, self.critic.trainable_variables)
            self.critic_optimizer.apply_gradients(zip(grads, self.critic.trainable_variables))

        self.memory.clear()






In [5]:
def train_ppo(env_name="Ant-v5", epochs=1, steps_per_epoch=1000):
    env = gym.make(env_name, render_mode='human')
    obs_dim = env.observation_space.shape[0]
    act_dim = env.action_space.shape[0]
    agent = PPOAgent(obs_dim, act_dim)

    for epoch in range(epochs):
        obs, info = env.reset()
        ep_ret, ep_len = 0, 0
        for t in range(steps_per_epoch):
            action, logp, value = agent.act(obs)
            next_obs, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated

            agent.store(obs, action, logp, value, reward, done)
            obs = next_obs
            ep_ret += reward
            ep_len += 1

            if epoch+1 == epochs and (t==steps_per_epoch-1) or done:
                _, _, last_val = agent.act(obs)
                agent.memory.values[-1] = last_val
            
                last_val = agent.memory.values[-1]
                adv, returns = agent.compute_gae(last_val)
                      
                print(f"Epoch {epoch}: Return {ep_ret:.1f}, Length {ep_len}")
                obs, info = env.reset()
                ep_ret, ep_len = 0, 0
                
                agent.update()
            elif done or (t==steps_per_epoch-1):
                _, _, last_val = agent.act(obs)
                agent.memory.values[-1] = last_val
                agent.update()
                                
                print(f"Epoch {epoch}: Return {ep_ret:.1f}, Length {ep_len}")
                obs, info = env.reset()
                ep_ret, ep_len = 0, 0
            else:
                continue
    
    actor_model = agent.actor
    actor_model.save("actor_model_PPO.keras")
    env.close()

# Run training
if __name__ == "__main__":
    train_ppo()

Epoch 0: Return 158.9, Length 1000


In [6]:
import tensorflow as tf

# 1. 저장된 모델 불러오기
model = tf.keras.models.load_model("actor_model.keras")

# 2. 기본 TFLite 변환 (float32 모델)
converter = tf.lite.TFLiteConverter.from_keras_model(model)
tflite_model = converter.convert()

# 3. 저장
with open("actor_model.tflite", "wb") as f:
    f.write(tflite_model)

# 4. 동적 범위 양자화 적용
converter = tf.lite.TFLiteConverter.from_keras_model(model)  # 다시 converter 초기화
converter.optimizations = [tf.lite.Optimize.DEFAULT]
tflite_quant_model = converter.convert()

# 5. 양자화된 모델 저장
with open("actor_model_quant_dynamic.tflite", "wb") as f:
    f.write(tflite_quant_model)

INFO:tensorflow:Assets written to: C:\Users\dlwns\AppData\Local\Temp\tmp4eft_lak\assets


INFO:tensorflow:Assets written to: C:\Users\dlwns\AppData\Local\Temp\tmp4eft_lak\assets


Saved artifact at 'C:\Users\dlwns\AppData\Local\Temp\tmp4eft_lak'. The following endpoints are available:

* Endpoint 'serve'
  args_0 (POSITIONAL_ONLY): TensorSpec(shape=(1, 105), dtype=tf.float32, name='input_layer_4')
Output Type:
  TensorSpec(shape=(1, 8), dtype=tf.float32, name=None)
Captures:
  2771178222032: TensorSpec(shape=(), dtype=tf.resource, name=None)
  2771178219344: TensorSpec(shape=(), dtype=tf.resource, name=None)
  2771246268240: TensorSpec(shape=(), dtype=tf.resource, name=None)
  2771246260560: TensorSpec(shape=(), dtype=tf.resource, name=None)
  2771246267472: TensorSpec(shape=(), dtype=tf.resource, name=None)
  2771246258832: TensorSpec(shape=(), dtype=tf.resource, name=None)
  2771246254608: TensorSpec(shape=(), dtype=tf.resource, name=None)
  2771246257104: TensorSpec(shape=(), dtype=tf.resource, name=None)
INFO:tensorflow:Assets written to: C:\Users\dlwns\AppData\Local\Temp\tmpa9ucjl2v\assets


INFO:tensorflow:Assets written to: C:\Users\dlwns\AppData\Local\Temp\tmpa9ucjl2v\assets


Saved artifact at 'C:\Users\dlwns\AppData\Local\Temp\tmpa9ucjl2v'. The following endpoints are available:

* Endpoint 'serve'
  args_0 (POSITIONAL_ONLY): TensorSpec(shape=(1, 105), dtype=tf.float32, name='input_layer_4')
Output Type:
  TensorSpec(shape=(1, 8), dtype=tf.float32, name=None)
Captures:
  2771178222032: TensorSpec(shape=(), dtype=tf.resource, name=None)
  2771178219344: TensorSpec(shape=(), dtype=tf.resource, name=None)
  2771246268240: TensorSpec(shape=(), dtype=tf.resource, name=None)
  2771246260560: TensorSpec(shape=(), dtype=tf.resource, name=None)
  2771246267472: TensorSpec(shape=(), dtype=tf.resource, name=None)
  2771246258832: TensorSpec(shape=(), dtype=tf.resource, name=None)
  2771246254608: TensorSpec(shape=(), dtype=tf.resource, name=None)
  2771246257104: TensorSpec(shape=(), dtype=tf.resource, name=None)


In [8]:
import tensorflow as tf
import numpy as np
import gymnasium as gym

interpreter = tf.lite.Interpreter(model_path="actor_model_quant_dynamic.tflite")
interpreter.allocate_tensors()

input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()

env = gym.make('Ant-v5', render_mode='human')
obs, _ = env.reset()

for t in range(1000):
    input_data = np.array(obs, dtype=np.float32).reshape(1, -1)
    interpreter.set_tensor(input_details[0]['index'], input_data)
    interpreter.invoke()
    action = interpreter.get_tensor(output_details[0]['index'])[0]
    
    obs, reward, terminated, truncated, _ = env.step(action)
    if terminated or truncated:
        break
    
a = input()
env.close()