In [1]:
import tensorflow as tf
from tf_agents.agents import ddpg
from tf_agents.agents.ddpg import ddpg_agent
from tf_agents.drivers import dynamic_step_driver
from tf_agents.environments import tf_py_environment
from tf_agents.eval import metric_utils
from tf_agents.metrics import tf_metrics
from tf_agents.replay_buffers import tf_uniform_replay_buffer
from tf_agents.utils import common
import wandb

import sys
sys.path.insert(0, '..')
import utils.dataloader as dataloader
import environments.battery as battery_env
import environments.household as household_env







In [2]:
"""
Train and evaluate a DDPG agent
"""

# Param for iteration
num_iterations = 5000
customer = 1
# 0 = battery, 1 = household
# Params for collect
initial_collect_steps = 1000
collect_steps_per_iteration = 2000
replay_buffer_capacity = 1000000
ou_stddev = 0.2
ou_damping = 0.15

# Params for target update
target_update_tau = 0.05
target_update_period = 5

# Params for train
train_steps_per_iteration = 1
batch_size = 48 * 7
actor_learning_rate = 1e-4
critic_learning_rate = 1e-3
dqda_clipping = None
td_errors_loss_fn = tf.compat.v1.losses.huber_loss
gamma = 0.99
reward_scale_factor = 1.0
gradient_clipping = None

# Params for eval and checkpoints
num_eval_episodes = 1
num_test_episodes = 1
eval_interval = 50

In [3]:
# Load data
data_train = dataloader.get_customer_data(dataloader.loadData('../../data/load1011.csv'),
                                          dataloader.loadPrice('../../data/price_wo_outlier.csv'), dataloader.loadMix("../../data/fuel2021.csv"), customer)
data_eval = dataloader.get_customer_data(dataloader.loadData('../../data/load1112.csv'),
                                         dataloader.loadPrice('../../data/price_wo_outlier.csv'), dataloader.loadMix("../../data/fuel2122.csv"), customer)
data_test = dataloader.get_customer_data(dataloader.loadData('../../data/load1213.csv'),
                                         dataloader.loadPrice('../../data/price_wo_outlier.csv'), dataloader.loadMix("../../data/fuel2223.csv"), customer)


In [13]:
data_train[0].iloc[2,0]

1.339

In [14]:

import numpy as np
import tensorflow as tf
from tf_agents.environments import py_environment
from tf_agents.specs import array_spec
from tf_agents.trajectories import time_step as ts
import wandb


class Battery(py_environment.PyEnvironment):

    def __init__(self, load_data, pv_data, price_data, test=False):
 
        #Time steps
        self._current_timestep = -1
        self._max_timesteps = 48 * 365 #Timeslots: 2*24*365
        self._episode_ended = False 
        
        #logging
        self._test = test #Boolean flag indicating whether the environment is in test mode.
        self._test_writer = None #TensorFlow summary writer used for test logs.

        #Battery parameters
        self._capacity = 13.5
        self._power_battery = 2.3
        self._init_charge = 0.0
        self._soe = 0.0
        self._power_grid = 25.0
        self._total_electricity_bill = 0.0

        #Observation and Action space
        self._action_spec = array_spec.BoundedArraySpec(shape=(1,), dtype=np.float32, minimum=-self._power_battery, maximum=self._power_battery, name='action')
        self._observation_spec = array_spec.BoundedArraySpec(shape=(4,), dtype=np.float32, name='observation')

        #Data
        self._load_data = load_data
        self._pv_data = pv_data
        self._electricity_prices = price_data

    def action_spec(self):
        return self._action_spec

    def observation_spec(self):
        return self._observation_spec

    def _reset(self):
        self._current_timestep = -1 #is set to -1, indicating that the next timeslot will be the first one.
        self._soe = self._init_charge #is set to the initial charge value (_init_charge).
        self._episode_ended = False #signaling the start of a new episode.
        self._total_electricity_bill = 0.0 #as it accumulates the electricity cost during each episode.
        observation = np.array([self._soe, self._load_data.iloc[0,0], self._pv_data.iloc[0,0], self._electricity_prices.iloc[0,0]], dtype=np.float32)

        return ts.restart(observation)


    def _step(self, action):

        #Timing
        self._current_timestep += 1 #Update the timeslot
        if self._episode_ended: #Check for Episode termination to reset
            return self.reset()
        
        #Load new data
        p_load = self._load_data.iloc[self._current_timestep, 0] #power of the household (load)
        p_pv = self._pv_data.iloc[self._current_timestep, 0] #power of the pv plant
        p_battery = action[0] #power battery, positive -> charging, negative -> discharging
        p_netload = p_load - p_pv - p_battery #First use pv for load, then battery
        electricity_price = self._electricity_prices.iloc[self._current_timestep, 0]
        #netload > 0: Es muss noch Strom eingekauft werden

        #netload < 0: Es kann Strom verkauft werden


        electricity_price = self._electricity_prices.iloc[self._current_timestep, 0]
        load = self._load_data.iloc[self._current_timestep, 0]
        pv = self._pv_data.iloc[self._current_timestep, 0]
        net_load = load - pv
        

        #Balance energy
        old_soe = self._soe
        energy_from_grid = 0.0
        energy_feed_in = 0.0        

        new_soe = np.clip(old_soe + action[0], a_min=0.0, a_max=self._capacity, dtype=np.float32)
        amount_charged_discharged = (new_soe - old_soe)
        energy_leftover_missing = np.abs(action[0] - amount_charged_discharged)
        
        energy_management = net_load + amount_charged_discharged
        if energy_management < 0: # Sell energy
            energy_feed_in = np.abs(energy_management)
        elif energy_management > 0: # Buy energy
            energy_from_grid = energy_management
        self._soe = new_soe

        #Calculate Costs and Profits
        cost = energy_from_grid * electricity_price
        profit = energy_feed_in * electricity_price * 0.7
        self._total_electricity_bill += profit - cost

        current_reward = profit - cost - energy_leftover_missing 

        #Create observation: SoE, price + 6 price forecasts
        observation = np.array([new_soe, load, pv, electricity_price], dtype=np.float32)
        
        # Log test
        if self._test:
            wandb.log({'action': action[0], 'soe': new_soe, 'energy delta': energy_leftover_missing, 'Total bill': self._total_electricity_bill})

        # Check for episode end
        if self._current_timestep >= self._max_timesteps - 7:
            self._episode_ended = True
            if self._test:
                wandb.log({'profit': self._total_electricity_bill})           
            
        if self._episode_ended:
            return ts.termination(observation=observation,reward=current_reward)
        else:
            return ts.transition(observation=observation,reward=current_reward)

In [15]:
# Initiate env
env = 0
if env == 0:
    tf_env_train = tf_py_environment.TFPyEnvironment(Battery(load_data=data_train[0], pv_data=data_train[1], price_data=data_train[2], test=True))
    tf_env_eval = tf_py_environment.TFPyEnvironment(Battery(load_data=data_train[0], pv_data=data_train[1], price_data=data_train[2], test=True))
else:
    tf_env_train = tf_py_environment.TFPyEnvironment(household_env.Household(init_charge=0.0, data=data_train, test=True))
    tf_env_eval = tf_py_environment.TFPyEnvironment(household_env.Household(init_charge=0.0, data=data_eval, test=True))

num_states = tf_env_train.observation_spec().shape[0]
print("Size of State Space ->  {}".format(num_states)) #SoE, price, price forecast 1-6
num_actions = tf_env_train.action_spec().shape[0]
print("Size of Action Space ->  {}".format(num_actions))

upper_bound = tf_env_train.action_spec().maximum.item()
lower_bound = tf_env_train.action_spec().minimum.item()

print("Max Value of Action ->  {}".format(upper_bound))
print("Min Value of Action ->  {}".format(lower_bound))

Size of State Space ->  4
Size of Action Space ->  1
Max Value of Action ->  2.299999952316284
Min Value of Action ->  -2.299999952316284


In [16]:
# Prepare runner
global_step = tf.compat.v1.train.get_or_create_global_step()

actor_net = ddpg.actor_network.ActorNetwork(
    input_tensor_spec=tf_env_train.observation_spec(),
    output_tensor_spec=tf_env_train.action_spec(), fc_layer_params=(400, 300),
    activation_fn=tf.keras.activations.relu)

critic_net = ddpg.critic_network.CriticNetwork(
    input_tensor_spec=(tf_env_train.observation_spec(), tf_env_train.action_spec()),
    joint_fc_layer_params=(400, 300),
    activation_fn=tf.keras.activations.relu)

tf_agent = ddpg_agent.DdpgAgent(
    tf_env_train.time_step_spec(),
    tf_env_train.action_spec(),
    actor_network=actor_net,
    critic_network=critic_net,
    actor_optimizer=tf.compat.v1.train.AdamOptimizer(
        learning_rate=actor_learning_rate
    ),
    critic_optimizer=tf.compat.v1.train.AdamOptimizer(
        learning_rate=critic_learning_rate
    ),
    ou_stddev=ou_stddev,
    ou_damping=ou_damping,
    target_update_tau=target_update_tau,
    target_update_period=target_update_period,
    dqda_clipping=dqda_clipping,
    td_errors_loss_fn=td_errors_loss_fn,
    gamma=gamma,
    reward_scale_factor=reward_scale_factor,
    gradient_clipping=gradient_clipping,
    debug_summaries=False,
    summarize_grads_and_vars=False,
    train_step_counter=global_step,
)

tf_agent.initialize()

eval_policy = tf_agent.policy
collect_policy = tf_agent.collect_policy

In [17]:
replay_buffer = tf_uniform_replay_buffer.TFUniformReplayBuffer(
    tf_agent.collect_data_spec,
    batch_size=tf_env_train.batch_size,
    max_length=replay_buffer_capacity,
)

initial_collect_driver = dynamic_step_driver.DynamicStepDriver(
    tf_env_train,
    collect_policy,
    observers=[replay_buffer.add_batch],
    num_steps=initial_collect_steps,
)

collect_driver = dynamic_step_driver.DynamicStepDriver(
    tf_env_train,
    collect_policy,
    observers=[replay_buffer.add_batch],
    num_steps=collect_steps_per_iteration,
)

In [18]:
wandb.login()
wandb.init(
    project="DDPG_battery_testing",
    job_type="train_eval_test",
    name="3_ex_09",
    config={
        "train_steps": num_iterations,
        "batch_size": batch_size,
        "actor_learning_rate": actor_learning_rate,
        "critic_learning_rate": critic_learning_rate}
)

artifact = wandb.Artifact(name='save', type="checkpoint")

eval_metrics = [
    tf_metrics.AverageReturnMetric(name="AverageReturnEvaluation", buffer_size=num_eval_episodes)
]

test_metrics = [
    tf_metrics.AverageReturnMetric(name="AverageReturnTest", buffer_size=num_eval_episodes)
]

"""train_checkpointer = common.Checkpointer(
    ckpt_dir='checkpoints/ddpg/',
    max_to_keep=1,
    agent=tf_agent,
    policy=tf_agent.policy,
    replay_buffer=replay_buffer,
    global_step=global_step
)

train_checkpointer.initialize_or_restore()"""

global_step = tf.compat.v1.train.get_global_step()

# For better performance
initial_collect_driver.run = common.function(initial_collect_driver.run)
collect_driver.run = common.function(collect_driver.run)
tf_agent.train = common.function(tf_agent.train)

# Collect initial replay data
initial_collect_driver.run()

time_step = tf_env_train.reset()
policy_state = collect_policy.get_initial_state(tf_env_train.batch_size)

# Dataset generates trajectories with shape [Bx2x...]
# pipeline which will feed data to the agent
dataset = replay_buffer.as_dataset(
    num_parallel_calls=3, sample_batch_size=batch_size, num_steps=2
).prefetch(3)
iterator = iter(dataset)



Instructions for updating:
Use `tf.data.Dataset.counter(...)` instead.
Instructions for updating:
Use `as_dataset(..., single_deterministic_pass=False) instead.


In [19]:
# Train and evaluate
print("Start training ...")
while global_step.numpy() < num_iterations:
    time_step, policy_state = collect_driver.run(
        time_step=time_step,
        policy_state=policy_state,
    )
    experience, _ = next(iterator)
    train_loss = tf_agent.train(experience)
    metrics = {}
    if global_step.numpy() % eval_interval == 0:
        #train_checkpointer.save(global_step)
        metrics = metric_utils.eager_compute(
            eval_metrics,
            tf_env_eval,
            eval_policy,
            num_episodes=num_eval_episodes,
            train_step=global_step,
            summary_writer=None,
            summary_prefix='',
            use_function=True)
    
    metrics["loss"] = train_loss.loss
    wandb.log(metrics)

# Initiate test env
if env == 0:
    tf_env_test = tf_py_environment.TFPyEnvironment(battery_env.Battery(init_charge=0.0, data=data_test, test=True))
else:
    tf_env_test = tf_py_environment.TFPyEnvironment(household_env.Household(init_charge=0.0, data=data_test, test=True))

print("Start testing ...")
metrics = metric_utils.eager_compute(
    test_metrics,
    tf_env_test,
    eval_policy,
    num_episodes=num_test_episodes,
    train_step=None,
    summary_writer=None,
    summary_prefix='',
    use_function=True)
wandb.log(metrics)
artifact.add_dir(local_path='checkpoints/ddpg/')
wandb.log_artifact(artifact)
wandb.finish()

Start training ...


KeyboardInterrupt: 