In [None]:
!pip install tensorflow
!pip install tensorflow-probability
!pip install gymnasium
!pip install keras
!pip install keras-rl2
!pip install matplotlib

Collecting gymnasium
  Downloading gymnasium-1.0.0-py3-none-any.whl.metadata (9.5 kB)
Collecting farama-notifications>=0.0.1 (from gymnasium)
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl.metadata (558 bytes)
Downloading gymnasium-1.0.0-py3-none-any.whl (958 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m958.1/958.1 kB[0m [31m11.8 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, gymnasium
Successfully installed farama-notifications-0.0.4 gymnasium-1.0.0
Collecting keras-rl2
  Downloading keras_rl2-1.0.5-py3-none-any.whl.metadata (304 bytes)
Downloading keras_rl2-1.0.5-py3-none-any.whl (52 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m52.1/52.1 kB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: keras-rl2
Successfully installed keras-rl2-1.0.5


In [None]:
import gymnasium as gym
import tensorflow as tf
import tensorflow_probability as tfp
import numpy as np

In [None]:
class tempenv(gym.Env):
    def __init__(self, st_vec, rew):
        super(tempenv, self).__init__()
        self.action_space = gym.spaces.Box(
            low=np.array([-1.0, -1.0, -1.0]),
            high=np.array([1.0, 1.0, 1.0]),
            dtype=np.float32  # Changed to float32 for RL
        )
        self.observation_space = gym.spaces.Box(
            low=-5.0,
            high=5.0,
            shape=(30,),
            dtype=np.float32  # Changed to float32
        )
        self.reward = rew
        self.state = st_vec

    def step(self, action):  # Fixed indentation
        t = action
        x = self.state[0]
        y = self.state[1]
        z = self.state[2]
        self.state[0] = x + t[0]
        self.state[1] = y + t[1]
        self.state[2] = z + t[2]
        self.reward = -np.sum(np.abs(self.state))

        done = self.reward > 5
        return self.state, self.reward, done, False, {}  # Added truncated flag

    def reset(self, seed=None, options=None):  # Fixed indentation
        if seed is not None:
            super().reset(seed=seed)
        self.state = self.state.copy()  # Make copy to avoid modifying original
        self.reward = self.reward
        return self.state, {}

In [None]:
#MODELING the actor critic

In [None]:
def value_function(state):
    n_hidden1 = 400
    n_hidden2 = 400

    hidden1 = tf.keras.layers.Dense(n_hidden1, activation='elu')(state)
    hidden2 = tf.keras.layers.Dense(n_hidden2, activation='elu')(hidden1)
    value = tf.keras.layers.Dense(1)(hidden2)

    return value

The fundamental idea behind the policy network is to create a function that can map the state of the environment to a probability distribution over actions. this is done to get the right action from the state and nn can give very nice app, we use nn on the output of the hidden (which is done with the state) to get the mapping of the probability (nomal distribution in our case)

In [None]:
def policy_network(state, env):
    n_hidden1 = 40
    n_hidden2 = 40
    n_outputs = 3


    # Reshape the state to (batch_size, 30) if the input has extra dimensions (like (1, 1, 30))
    # state = tf.keras.layers.Reshape((-1,))(state)  # Flatten the state to shape (batch_size, 30)

    # Using keras layers instead of tf.layers
    hidden1 = tf.keras.layers.Dense(
        n_hidden1,
        activation='elu',
        kernel_initializer='glorot_uniform'
    )(state)

    hidden2 = tf.keras.layers.Dense(
        n_hidden2,
        activation='elu',
        kernel_initializer='glorot_uniform'
    )(hidden1)

    mu = tf.keras.layers.Dense(
        n_outputs,
        activation=None,
        kernel_initializer='glorot_uniform'
    )(hidden2)

    sigma = tf.keras.layers.Dense(
        n_outputs,
        activation=None,
        kernel_initializer='glorot_uniform'
    )(hidden2)

    sigma = tf.keras.layers.Lambda(lambda x: tf.nn.softplus(x) + 1e-5)(sigma)

    # Wrap normal distribution sampling in Lambda layer
    action_tf_var = tf.keras.layers.Lambda(
        lambda x: tf.squeeze(
            tfp.distributions.Normal(loc=x[0], scale=x[1]).sample(1),
            axis=0
        )
    )([mu, sigma])

    # Wrap clip_by_value in Lambda layer
    action_tf_var = tf.keras.layers.Lambda(
        lambda x: tf.clip_by_value(
            x,
            env.action_space.low,
            env.action_space.high
        ),
        output_shape=(n_outputs,)
    )(action_tf_var)

    norm_dist = tf.keras.layers.Lambda(
      lambda x: tfp.distributions.Normal(loc=x[0], scale=x[1]),
      output_shape=(n_outputs,)
    )([mu, sigma])
    return action_tf_var, norm_dist

In [None]:
import numpy as np
import tensorflow as tf
import sklearn
from sklearn.preprocessing import StandardScaler
state_space_samples = np.array([env.observation_space.sample() for _ in range(10000)])
scaler = StandardScaler()
scaler.fit(state_space_samples)

# Function to normalize states
def scale_state(state):
    scaler = StandardScaler()
    scaled = scaler.transform(state.reshape(1, -1))  # Reshape for single sample
    return scaled.flatten()

In [None]:
from tensorflow.python.framework.ops import disable_eager_execution

disable_eager_execution()


lr_actor = 0.00002
lr_critic = 0.001

# Input array placeholder (state)
state_input = tf.keras.Input(shape=(30,))

# Assuming you have these functions defined elsewhere
# Placeholder definitions for action and value targets
action_input = tf.keras.Input(shape=(3,))  # Adjust shape based on your action space
delta_input = tf.keras.Input(shape=(1,))
target_input = tf.keras.Input(shape=(1,))#vel from state

# Define your actor model (assuming you have policy_network function)
print("State tensor shape:", state_tensor.shape)
print("Action input shape:", action_input.shape)
print("Delta input shape:", delta_input.shape)
action_output, norm_dist = policy_network(state_input,env)
actor_model = tf.keras.Model(inputs=[state_input, action_input, delta_input],
outputs=action_output)
action_output, norm_dist = policy_network(state_input,env)

# Define your critic model (assuming you have value_function)
value_output = value_function(state_input)

# Actor (Policy) Loss
def actor_loss(norm_dist, action_input, delta_input):
    prob = norm_dist.prob(action_input)
    loss = -tf.math.log(prob + 1e-5) * delta_input
    return loss

# Critic (Value) Loss
def critic_loss(value_output, target_input):
    return tf.reduce_mean(tf.square(tf.squeeze(value_output) - target_input))

# Create models
actor_model = tf.keras.Model(inputs=[state_input, action_input, delta_input],
                           outputs=action_output)
critic_model = tf.keras.Model(inputs=[state_input],
                            outputs=value_output)

# Optimizers
actor_optimizer = tf.keras.optimizers.Adam(learning_rate=lr_actor)
critic_optimizer = tf.keras.optimizers.Adam(learning_rate=lr_critic)

State tensor shape: (30,)
Action input shape: (None, 3)
Delta input shape: (None, 1)


NotImplementedError: Exception encountered when calling Lambda.call().

[1mWe could not automatically infer the shape of the Lambda's output. Please specify the `output_shape` argument for this Lambda layer.[0m

Arguments received by Lambda.call():
  • args=(['<KerasTensor shape=(None, 3), dtype=float32, sparse=False, name=keras_tensor_433>', '<KerasTensor shape=(None, 3), dtype=float32, sparse=False, name=keras_tensor_437>'],)
  • kwargs={'mask': ['None', 'None']}

In [None]:
def create_state_scaler(env, n_samples=10000):
    """Create and fit a scaler for state normalization"""
    # Sample states from environment
    state_space_samples = []
    for _ in range(n_samples):
        state, _ = env.reset()  # Get initial state
        state_space_samples.append(state)

        # Sample some random actions and get resulting states
        action = env.action_space.sample()
        next_state, _, done, _, _ = env.step(action)
        state_space_samples.append(next_state)

    # Convert to array and fit scaler
    state_space_samples = np.array(state_space_samples)
    scaler = StandardScaler()
    scaler.fit(state_space_samples)

    return scaler

def scale_state(state, scaler):
    """Scale a single state"""
    # Ensure state is right shape
    if len(state.shape) == 1:
        state = state.reshape(1, -1)
    return scaler.transform(state)


In [None]:
# Initialize environment
initial_state = np.arange(30)  -10
initial_reward = initial_state[5]
env = tempenv(st_vec=initial_state, rew=initial_reward)

# Create state scaler
scaler = create_state_scaler(env)

# Training loop
gamma = 0.99
num_episodes = 250
episode_history = []

for episode in range(num_episodes):
   state, _ = env.reset()
   reward_total = 0
   steps = 0
   done = False

   while not done:
       state_tensor = tf.convert_to_tensor(scale_state(state, scaler), dtype=tf.float64)
       state_tensor = tf.expand_dims(state_tensor, 0)
       state_tensor = tf.squeeze(state_tensor)


       action = actor_model([state_tensor, action_input, delta_input])
       action = tf.squeeze(action)

       # Take step in environment
       next_state, reward, done, truncated, _ = env.step(action.numpy())
       steps += 1
       reward_total += reward

       # Get value estimates
       next_state_tensor = tf.convert_to_tensor(scale_state(next_state, scaler), dtype=tf.float32)
       next_state_tensor = tf.expand_dims(next_state_tensor, 0)

       V_next = critic_model(next_state_tensor)
       V_curr = critic_model(state_tensor)

       # Calculate targets and advantages
       target = reward + gamma * tf.squeeze(V_next) * (1 - done)
       td_error = target - tf.squeeze(V_curr)

       # Update actor
       actor_loss_val = actor_model.train_on_batch(
           [state_tensor, tf.expand_dims(action, 0), tf.expand_dims(td_error, 0)],
           tf.expand_dims(action, 0)
       )

       # Update critic
       critic_loss_val = critic_model.train_on_batch(
           state_tensor,
           tf.expand_dims(target, 0)
       )

       state = next_state

   episode_history.append(reward_total)
   print(f"Episode: {episode}, Steps: {steps}, Reward: {reward_total:.2f}")

   # Check if solved
   if len(episode_history) >= 101 and np.mean(episode_history[-100:]) > 90:
       print("\n*** Solved! ***")
       print(f"Mean reward over last 100 episodes: {np.mean(episode_history[-100:]):.2f}")
       break

  gym.logger.warn(
  gym.logger.warn(


ValueError: Tried to convert 'input' to a tensor and failed. Error: A KerasTensor cannot be used as input to a TensorFlow function. A KerasTensor is a symbolic placeholder for a shape and dtype, used when constructing Keras Functional models or Keras Functions. You can only use it as input to a Keras layer or a Keras operation (from the namespaces `keras.layers` and `keras.operations`). You are likely doing something like:

```
x = Input(...)
...
tf_fn(x)  # Invalid.
```

What you should do instead is wrap `tf_fn` in a layer:

```
class MyLayer(Layer):
    def call(self, x):
        return tf_fn(x)

x = MyLayer()(x)
```


  and should_run_async(code)
