In [1]:
import numpy as np
from tf_agents.environments import py_environment
from tf_agents.specs import array_spec
from tf_agents.trajectory import time_step


class CustomEnvironment(py_environment.PyEnvironment):
    def __init__(self):
        self._action_spec = array_spec.BoundedArraySpec(shape=(), dtype=np.int32, minimum=0, maximum=1)
        self._observation_spec = array_spec.BoundedArraySpec(shape=(1,), dtype=np.float32, minimum=0, maximum=1)
        self._state = np.zeros((1,), dtype=np.float32)
        self._episode_ended = False

    def action_spec(self):
        return self._action_spec

    def observation_spec(self):
        return self._observation_spec

    def _reset(self):
        self._state = np.zeros((1,), dtype=np.float32)
        self._episode_ended = False
        return time_step.restart(self._state)

    def _step(self, action):
        if self._episode_ended:
            return self.reset()

        if action == 0:
            self._state += 0.1
        else:
            self._state -= 0.1

        if self._state >= 1.0:
            self._episode_ended = True
            return time_step.termination(self._state, reward=1.0)
        elif self._state <= 0.0:
            self._episode_ended = True
            return time_step.termination(self._state, reward=-1.0)
        else:
            return time_step.transition(self._state, reward=0.0, discount=1.0)


KeyboardInterrupt: 

In [2]:
import numpy as np

In [5]:
_external_state_variable_shape = (3, 96)


In [6]:
np.prod(_external_state_variable_shape)

288

In [1]:
import tf_agents


In [5]:
import numpy as np
from tf_agents.environments import py_environment
from tf_agents.specs import array_spec
from tf_agents.trajectories import time_step


class BatteryChargingEnvironment(py_environment.PyEnvironment):

    def __init__(self):
        # Define action and observation specs
        self._external_state_variable_shape = (3, 96)
        self._external_state_variable_dtype = np.float32
        self._external_state_variable_max = 10.0  # Maximum value from external state variables

        #shape=() for scalar action, action of type float, e.g. can be 0.4 not just 0 or 1
        self._action_spec = array_spec.BoundedArraySpec(shape=(), dtype=np.float32,
                                                        minimum=-self._external_state_variable_max,
                                                        maximum=self._external_state_variable_max)
        self._internal_state_variable_spec = array_spec.BoundedArraySpec(shape=(), dtype=np.float32, minimum=-1.0,
                                                                        maximum=1.0)

        self._observation_spec = array_spec.ArraySpec(
            shape=(np.prod(self._external_state_variable_shape) + 1,),
            dtype=np.float32)

        # Define initial state variables
        self._external_state_variables = np.zeros(self._external_state_variable_shape,
                                                  dtype=self._external_state_variable_dtype)
        self._internal_state_variable = 0.0
        self._episode_ended = False

    def action_spec(self):
        return self._action_spec

    def observation_spec(self):
        return self._observation_spec

    def _reset(self):
        self._external_state_variables = np.zeros(self._external_state_variable_shape,
                                                  dtype=self._external_state_variable_dtype)
        self._internal_state_variable = 0.0
        self._episode_ended = False
        return time_step.restart(self._get_observation())

    def _step(self, action):
        if self._episode_ended:
            return self.reset()

        # Cap the action using the maximum value from external state variables
        action = np.clip(action, -self._external_state_variable_max, self._external_state_variable_max)

        # Update the internal state variable based on the capped action
        self._internal_state_variable += action

        # Generate a reward based on the current state
        reward = self._calculate_reward()

        # Update the external state variables (assuming they are externally updated)
        self._external_state_variables = self._externally_update_state_variables()

        # Check if episode is terminated
        if self._episode_ended:
            return time_step.termination(self._get_observation(), reward)
        else:
            return time_step.transition(self._get_observation(), reward=reward, discount=1.0)

    def _calculate_reward(self):
        # Implement your own reward calculation logic based on the current state
        # For example, you can calculate the reward based on the stock prices, portfolio value, etc.
        # Return a scalar reward value
        return 0.0

    def _externally_update_state_variables(self):
        # Implement the logic to externally update the state variables
        # Return the updated external state variables as a numpy array
        return np.zeros(self._external_state_variable_shape,
                        dtype=self._external_state_variable_dtype)  # Placeholder for external state update

    def _get_observation(self):
        # Concatenate the internal and external state variables to create the observation
        external_state_flat = self._external_state_variables.flatten()
        internal_state = np.array([self._internal_state_variable], dtype=np.float32)
        return np.concatenate([external_state_flat, internal_state], axis=0)


In [6]:
import numpy as np
import tensorflow as tf

# Define the Environment
class Environment:
    def __init__(self, price_forecast, demand_forecast, solar_forecast, capacity, max_charging_power):
        self.price_forecast = price_forecast
        self.demand_forecast = demand_forecast
        self.solar_forecast = solar_forecast
        self.capacity = capacity
        self.max_charging_power = max_charging_power
        self.time_step = 0
        self.battery_charge = 0

    def get_state(self):
        # Construct the current state representation
        state = [self.price_forecast[self.time_step],
                 self.demand_forecast[self.time_step],
                 self.solar_forecast[self.time_step],
                 self.battery_charge]
        return state

    def step(self, action):
        # Execute the action and update the environment
        price = self.price_forecast[self.time_step]
        demand = self.demand_forecast[self.time_step]
        solar = self.solar_forecast[self.time_step]

        # Calculate the reward
        reward = 0
        if action > 0:
            reward += -price * action  # Penalty for charging
        elif action < 0:
            reward += price * abs(action)  # Reward for discharging

        # Update the battery charge level
        self.battery_charge += action

        # Clip the battery charge within the capacity limits
        self.battery_charge = np.clip(self.battery_charge, 0, self.capacity)

        # Update the time step
        self.time_step += 1

        # Check if the episode is done
        done = self.time_step >= len(self.price_forecast)

        # Construct the next state representation
        next_state = [self.price_forecast[self.time_step],
                      self.demand_forecast[self.time_step],
                      self.solar_forecast[self.time_step],
                      self.battery_charge]

        return next_state, reward, done

# Define the Q-Network
class QNetwork:
    def __init__(self, state_dim, action_dim, learning_rate):
        self.model = tf.keras.Sequential([
            tf.keras.layers.Dense(32, activation='relu', input_shape=(state_dim,)),
            tf.keras.layers.Dense(32, activation='relu'),
            tf.keras.layers.Dense(action_dim)
        ])
        self.optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

    def train_step(self, state, action, target):
        with tf.GradientTape() as tape:
            q_values = self.model(state)
            predicted_q_values = tf.reduce_sum(tf.multiply(q_values, action), axis=1)
            loss = tf.reduce_mean(tf.square(predicted_q_values - target))

        gradients = tape.gradient(loss, self.model.trainable_variables)
        self.optimizer.apply_gradients(zip(gradients, self.model.trainable_variables))

    def predict(self, state):
        return self.model(state)

# Define the Agent
class Agent:
    def __init__(self, state_dim, action_dim, learning_rate, discount_factor, epsilon):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.discount_factor = discount_factor
        self.epsilon = epsilon

        self.q_network = QNetwork(state_dim, action_dim, learning_rate)

    def choose_action(self, state):
        if np.random.rand() < self.epsilon:
            return np.random.randint(-environment.max_charging_power, environment.max_charging_power + 1)
        else:
            q_values = self.q_network.predict(np.array([state]))
            return np.argmax(q_values)

    def train(self, environment, num_episodes, max_steps):
        for episode in range(num_episodes):
            state = environment.get_state()
            episode_reward = 0

            for step in range(max_steps):
                action = self.choose_action(state)
                next_state, reward, done = environment.step(action)

                # Update the Q-value
                q_values_next = self.q_network.predict(np.array([next_state]))
                max_q_value_next = np.max(q_values_next)
                target = reward + self.discount_factor * max_q_value_next

                action_one_hot = np.zeros(self.action_dim)
                action_one_hot[action + environment.max_charging_power] = 1

                self.q_network.train_step(np.array([state]), np.array([action_one_hot]), np.array([target]))

                episode_reward += reward
                state = next_state

                if done:
                    break

            print("Episode:", episode + 1, "Reward:", episode_reward)

# Example usage
price_forecast = [1.0, 1.2, 0.8, 0.9, 1.1]  # Example price forecast data
demand_forecast = [0.8, 1.0, 0.9, 1.2, 1.1]  # Example demand forecast data
solar_forecast = [1.5, 1.2, 1.0, 0.7, 0.8]  # Example solar forecast data

capacity = 100  # Battery capacity
max_charging_power = 10  # Maximum charging power

environment = Environment(price_forecast, demand_forecast, solar_forecast, capacity, max_charging_power)

state_dim = 4
action_dim = 2 * max_charging_power + 1  # [-max_charging_power, ..., 0, ..., max_charging_power]
learning_rate = 0.001
discount_factor = 0.99
epsilon = 0.1

agent = Agent(state_dim, action_dim, learning_rate, discount_factor, epsilon)

num_episodes = 100
max_steps = len(price_forecast)

agent.train(environment, num_episodes, max_steps)




IndexError: index 28 is out of bounds for axis 0 with size 21