# Package installation + importing


In [1]:
!pip install ray

!pip install "ray[rllib]"

!pip install "ray[tune]"

!pip install tensorflow
!pip install tensorboard

!pip install -U hyperopt

!pip install --upgrade aiohttp

import psutil
import ray
ray._private.utils.get_system_memory = lambda: psutil.virtual_memory().total


Requirement already up-to-date: hyperopt in d:\pycharm 2020.3.3\anaconda\lib\site-packages (0.2.7)


In [12]:
import pandas as pd
import numpy as np

import gym
from gym import spaces
from gym.utils import seeding

import matplotlib.pyplot as plt

import ray
from ray import tune

from ray.tune.registry import register_env
from ray.tune.logger import pretty_print

from sklearn import preprocessing

import math
import os

# Preprocessing data

In [13]:
house_id = 32
data_dir = 'D:/Jupyter/data/train/'

data_by = 'days' # 'days', 'periods'
data_with_forecast = False

Columns:

data_by_periods (no forecast)
- idx | timestamp | actual_load | actual_pv | price_buy_00 | price_sell_00

data_by_periods (with forecast) 
- idx | timestamp | actual_load | actual_pv | load_00 | ... | pv_95 | price_buy_00 | ... | price_sell_95
    - actual_load and actual_pv - are for current step (with shift '-1') - with consideration of last step

data_by_days

In [None]:
import psutil
ray._private.utils.get_system_memory = lambda: psutil.virtual_memory().total
import ray.rllib.agents.pg as pg
import ray.rllib.agents.ppo as ppo
import numpy as np

mode = 'train' # 'submit'
additions = 0  # additions [0 - no forecast, 1 - with forecast]

def get_data(mode, additions):    
    train_data_not_normalized = pd.read_csv(f'{TRAIN_DATA_DIR}/{house_id}.csv', parse_dates=['timestamp'], sep=";")
    train_data = train_data_not_normalized.copy()
    train_data = train_data[~train_data.isnull()]
    if additions == 0:
        train_data = pd.concat([train_data.iloc[:, :5].copy(), train_data['price_buy_00'], train_data['price_sell_00']], axis=1)
    else:
        train_data = pd.concat([train_data.iloc[:, :10].copy(), train_data.iloc[:, 101:106].copy(), train_data['price_buy_00'], train_data['price_sell_00']], axis=1)
    train_data_by_periods = [train_data[train_data['period_id'] == i] for i in np.sort(train_data['period_id'].unique())]
    for i in range(len(train_data_by_periods)):
            train_data_by_periods[i] = train_data_by_periods[i].drop(columns = 'site_id',axis=1)
            train_data_by_periods[i] = train_data_by_periods[i].drop(columns = 'period_id',axis=1)
            train_data_by_periods[i] = train_data_by_periods[i].sort_values(by = 'timestamp')
            train_data_by_periods[i] = train_data_by_periods[i].reset_index()
            train_data_by_periods[i] = train_data_by_periods[i].drop(columns = 'index', axis=1)
    if additions:
        for period in range(len(train_data_by_periods)):
            train_data_by_periods[period] = train_data_by_periods[period].drop(train_data_by_periods[period].index[(len(train_data_by_periods[period])-95) : len(train_data_by_periods[period])])
    train_data_by_days = []
    for i in range(len(train_data_by_periods)):
        j = 0
        while (j+1)*96 + 1 <= len(train_data_by_periods[i]):
            train_data_by_days.append(train_data_by_periods[i][j*96:(j+1)*96 + 1])
            j += 1
    for i in range(len(train_data_by_days)):
        train_data_by_days[i] = train_data_by_days[i].reset_index()
        train_data_by_days[i] = train_data_by_days[i].drop(columns = 'index')
    return train_data_by_days



# Environment class

In [16]:
class BatteryEnvTrain(gym.Env):

    def __init__(self, df, timestep_idx = 0, power = 0, capacity = 0, efficiency = 0):

        metadata = pd.read_csv(f'metadata.csv', index_col=0, sep=";").loc[house_id]
        self.capacity = np.float32(metadata.Battery_1_Capacity * 1000)
        self.power = np.float32(metadata.Battery_1_Power * 1000)
        self.efficiency = np.float32(metadata.Battery_1_Charge_Efficiency)
        
        self.timestep_idx = timestep_idx    # id of a line
        self.days_step = 0               # day for df
        self.df_days = df                # df of a period

        # Certain df (with properties of metadata)
        self.df = self.df_days[self.days_step]
        self.current_charge = INITIAL_CHARGE

        # Action_Space.shape = number of batteries (=1)
        self.action_space = spaces.Box(low = -1, high = 1, shape = (1,)) 

        # Observation_Space.shape = S_t x S_c x S_x (2 + 1 + (7 - 1) = 9)
        # из df убираем 1 (timestamp)
        observation_space_shape = (len(self.df.columns) - 1) + 2 + 1
        #self.observation_space = spaces.Box(low=0, high=np.inf, shape = (observation_space_shape,))
        #self.observation_space = spaces.Box(low = 0 - np.around((((self.power / self.capacity) * (15. / 60.)) * self.capacity) * (1. / self.efficiency)), high=np.inf, shape = (observation_space_shape,))
        self.observation_space = spaces.Box(low = -np.inf, high=np.inf, shape = (observation_space_shape,))

        # load data from a pandas dataframe
        self.data = self.df.loc[self.timestep_idx,:]
        self.actual_pv = self.df.loc[self.timestep_idx + 1,:].actual_pv
        self.actual_consumption = self.df.loc[self.timestep_idx + 1,:].actual_consumption

        # initialize STATE = S_x x S_t x S_c
        # S_x = price_buy_00 + price_sell_00 + actual_load + actual_pv + load_00 + pv_00
        # S_t = week_day + day_quarter
        # S_c = current_charge
        # data[1] убирает timestamp из массива 
        tmp = self.data[1:].tolist()
        self.state = [float(tmp[i]) for i in range(len(tmp))] + \
              [self.data.timestamp.weekday(), self.data.timestamp.quarter] + \
              [float(INITIAL_CHARGE)]
        
        # initialize REWARD and Rewards_memory (for graphics)
        self.reward = 0
        
        self.scores_memory = []
        self.agent_outside_capacity_memory = []

        # self.done (end of the episode = end of file)
        self.terminal = False 

        # counter of going outside the capacity
        self.checker_agent_outside_capacity = 0
        
        self.money_spent_cumulative = 0
        self.money_spent_without_battery_cumulative = 0

        self.money_spent_cumulative_episode = 0
        self.money_spent_without_battery_cumulative_episode = 0

        self.remember_step = 0

    def step(self, action):
        action = (action * ((self.power / self.capacity) * (15. / 60.))) * self.capacity
        self.current_charge = np.clip(self.current_charge, 0.0, self.capacity)
        current_charge_old = self.current_charge

        if action >= 0:
            proposed_energy = current_charge_old + action * self.efficiency
        else:
            proposed_energy = current_charge_old + action * (1. / self.efficiency)
        
        actual_delta_charge_energy = proposed_energy - current_charge_old
        net_energy = (actual_delta_charge_energy + self.actual_consumption) - self.actual_pv
        self.current_charge = current_charge_old + actual_delta_charge_energy

        price_buy = self.data.price_buy_00
        price_sell = self.data.price_sell_00
        price = price_buy if net_energy >= 0 else price_sell

        grid_energy_without_battery = self.actual_consumption - self.actual_pv
        price_without_battery = price_buy if grid_energy_without_battery >= 0 else price_sell

        money_spent_without_battery = grid_energy_without_battery * (price_without_battery / 1000.)
        money_spent = net_energy * (price / 1000.) 

        self.money_spent_cumulative += money_spent 
        self.money_spent_without_battery_cumulative += money_spent_without_battery

        self.money_spent_cumulative_episode += money_spent
        self.money_spent_without_battery_cumulative_episode += money_spent_without_battery

        if self.current_charge <= self.capacity and self.current_charge >= 0:
            self.reward = (self.timestep_idx + 1)/(self.remember_step + (self.timestep_idx + 1)) 
        else:
            self.checker_agent_outside_capacity += 1        # 0 reward, checker > 0 -> reward for 'step' became lower
            self.reward = 0
            self.remember_step = self.timestep_idx

        self.timestep_idx += 1
        self.data = self.df.loc[self.timestep_idx,:]
        self.actual_pv = self.df.loc[self.timestep_idx + 1,:].actual_pv
        self.actual_consumption = self.df.loc[self.timestep_idx + 1,:].actual_consumption

        tmp = self.data[1:].tolist()
        self.state = [float(tmp[i]) for i in range(len(tmp))] + \
              [self.data.timestamp.weekday(), self.data.timestamp.quarter] + \
              [float(self.current_charge)]

        self.terminal = (self.timestep_idx >= (len(self.df.index.unique()) - 2))
        
        if self.terminal:
            #print('day number = ', self.days_step, " / ", (len(self.df_days) - 1))
            #print("days_step: ", self.days_step, " / ", (len(train_data_by_days) - 1))
            if psutil.virtual_memory().percent >= 80.0:
                gc.collect()

            self.agent_outside_capacity_memory.append(self.checker_agent_outside_capacity)
            
            if (self.days_step != (len(self.df_days) - 1)):
                score = (self.money_spent_cumulative_episode - self.money_spent_without_battery_cumulative_episode)/np.abs(self.money_spent_without_battery_cumulative_episode)
                self.reward += int(score < 0) * (-score) * 100 / ( 1 + self.checker_agent_outside_capacity)
            else:
                score = (self.money_spent_cumulative - self.money_spent_without_battery_cumulative)/np.abs(self.money_spent_without_battery_cumulative)
                self.reward += int(score < 0) * (-score) * 10000 / ( 1 + self.checker_agent_outside_capacity)
               
            if self.days_step == (len(self.df_days) - 1):
                
                print('site id score average = ', (self.money_spent_cumulative - self.money_spent_without_battery_cumulative)/np.abs(self.money_spent_without_battery_cumulative))
                print('agent outside capacity average = ', np.sum(self.agent_outside_capacity_memory), "out of ", len(self.df_days[0]) * len(self.df_days))
                self.scores_memory.clear()
                self.agent_outside_capacity_memory.clear()
                self.days_step = 0

                self.money_spent_cumulative = 0
                self.money_spent_without_battery_cumulative = 0
                self.money_spent_cumulative_episode = 0
                self.money_spent_without_battery_cumulative_episode = 0

                self.checker_agent_outside_capacity = 0
                self.remember_step = 0

                self.current_charge = INITIAL_CHARGE
            else:
                self.days_step += 1
                self.money_spent_cumulative_episode = 0
                self.money_spent_without_battery_cumulative_episode = 0
                self.checker_agent_outside_capacity = 0
                self.remember_step = 0

            return self.state, float(self.reward), self.terminal, {}

        return self.state, float(self.reward), self.terminal, {}

    def reset(self):

        self.df = self.df_days[self.days_step]

        # reseting df
        self.timestep_idx = 0
        self.data = self.df.loc[self.timestep_idx,:]
        self.actual_pv = self.df.loc[self.timestep_idx + 1,:].actual_pv
        self.actual_consumption = self.df.loc[self.timestep_idx + 1,:].actual_consumption

        # initialize state
        tmp = self.data[1:].tolist()
        self.state = [float(tmp[i]) for i in range(len(tmp))] + \
              [self.data.timestamp.weekday(), self.data.timestamp.quarter] + \
              [float(self.current_charge)]

        # reset done, memory, current_charge, reward
        self.terminal = False 
        self.reward = 0

        #return self.state
        return self.state

# Ray RLlib training

In [17]:
# ------- ENV CREATION
house_id = 32
INITIAL_CHARGE = 0.0
MODE = 'submit'
data_dir = ''
METADATA_DIR = data_dir
TRAIN_DATA_DIR = data_dir

def env_creator(_):
    data_by_days = get_data(MODE, 0)
    return BatteryEnvTrain(data_by_days.copy(), 0)

env_name = 'BatteryEnvTrain-v0'

# env-name should be according to this: [someting]-v[something]     example: battery-v0
register_env(env_name, env_creator)

# allowing 2 gpus
os.environ["CUDA_VISIBLE_DEVICES"] = "0,1"

## Training (no hyperparameter optimization)

In [None]:
# Algorithms test on MountainCarContinuous-v0 (continuous action space)
# [working with init() - BC, {DDPG, TD3}, MARWIL, PG, SAC]
# [working with initi(cpus) - {A2C, A3C} (3), ARS (6), ES (11), APEX_DDPG (?), IMPALA(2 gpu), {PPO, APPO} (3)]
# [Discrete action-space - {DQN, Rainbow}, APEX_DQN, R2D2, SlateQ, {LinUCB, LinTS}, AlphaZero]
# [Different - Dreamer(images only)]

# ERROR:
# CQL on MountainCarContinuous-v0 - `ReplayBuffer(size)` has been deprecated. Use `ReplayBuffer(capacity)` instead. This will raise an error in the future
#                                    `execution_plan` functions should accept `trainer`, `workers`, and `config` as args!
# MAML (3) on MountainCarContinuous-v0 - execution_plan() takes 2 positional arguments but 3 were given
#                                         'Continuous_MountainCarEnv' object has no attribute 'sample_tasks'
# MBMPO (?) on MountainCarContinuous-v0 - env doest not have a `reward()` method, needed for MB-MPO!

# Careful:
# if env is scuffed -> ray.shutdown() -> re-register env -> execute again

#----------------------------
# DDPG gives accurate results
# PPO gives weird results (from -100 to 100 sometimes but in converge to same as DDPG)
#----------------------------


ray.init(num_cpus=12, num_gpus=1, ignore_reinit_error=True)
analysis = tune.run(
              "PPO",
              stop={"timesteps_total": 10500000},
              name='Battery_experiment',
              #verbose=True,                                               # helps with clearing outputs, but not enough info (not very useful)
              #metric="task_score",
              mode="max",
              config={      # https://docs.ray.io/en/latest/rllib/rllib-training.html#common-parameters
                  "env": env_name,
                  #"num_workers": 2,                                      # sum problems with those 2 for some reason
                  #"num_gpus": 2,                                         #
                  #"sgd_minibatch_size": tune.choice([128, 512, 2048]),   # doesnt understands what is it
                  #"train_batch_size": tune.choice([int(len(train_data_by_periods[0])/2),len(train_data_by_periods[0]), len(train_data_by_periods[0])*2, len(train_data_by_periods[0])*10]),  # for each one 'll start training again
                  'vf_clip_param': math.inf,
                  "framework": "tf2",
                  "eager_tracing": True,
                  "lr": tune.grid_search([1e-3, 1e-4, 1e-5]),
                  #"sgd_minibatch_size": len(train_data_by_periods[0]),
                  #"train_batch_size": len(train_data_by_periods[0])*len(train_data_by_periods)*3,
                  #"evaluation_interval": 1,
                  #"evaluation_duration": 2,
                  #"evaluation_duration_unit": "episodes",

                  #"explore": True,
                  #"exploration_config": {
                  #  "type": "StochasticSampling",
                  #  "random_timesteps": 0,
                  #},

              },
              #restore='/content/root/ray_results/Battery_experiment/DDPG_BatteryEnvTrain-v0_560eb_00001_1_2021-11-23_16-56-51/checkpoint_000300/checkpoint-300',
              num_samples=1,
              checkpoint_at_end=True,                                     # doesnt matter (even if stopped manually - creates checkpoint at the end)
              #checkpoint_freq = 30000,
          )

#print("best hyperparameters: ", analysis.best_config)

In [None]:
ray.shutdown()

## Training (hyperparameter optimization)


In [None]:
#from ray.rllib.agents.ddpg import DDPGTrainer
from ray.rllib.rollout import rollout
from ray.tune.schedulers import AsyncHyperBandScheduler
from ray.tune.suggest.hyperopt import HyperOptSearch

from datetime import datetime

from hyperopt import hp
from hyperopt.pyll.base import scope

def on_train_result(info):
    info["result"]["objective"] = info["result"]["episode_reward_mean"]

def get_best_trials(trials, metric):
    return sorted(trials, key=lambda trial: trial.last_result[metric], reverse=True)

def get_agent(trial):
    agent = DDPGAgent(config=trial.config)
    agent.restore(trial._checkpoint.value)
    return agent

def remove_checkpoints(trials):
    for trial in trials:
        for path in glob(os.path.join(trial._checkpoint.value + "*")):
            os.remove(path)
        os.rmdir(os.path.dirname(trial._checkpoint.value))
        trial.clear_checkpoint()

Using 'Population Based scheduler'

In [None]:
import random
from ray.tune.schedulers import PopulationBasedTraining

def explore(config):
    # ensure we collect enough timesteps to do sgd
    if config["train_batch_size"] < config["sgd_minibatch_size"] * 2:
        config["train_batch_size"] = config["sgd_minibatch_size"] * 2
    # ensure we run at least one sgd iter
    if config["num_sgd_iter"] < 1:
        config["num_sgd_iter"] = 1
    return config

pbt = PopulationBasedTraining(
    time_attr="time_total_s",
    perturbation_interval=120,
    resample_probability=0.25,
    # Specifies the mutations of these hyperparams
    hyperparam_mutations={
        "lambda": lambda: random.uniform(0.9, 1.0),
        "clip_param": lambda: random.uniform(0.01, 0.5),
        "lr": [1e-3, 5e-4, 1e-4, 5e-5, 1e-5],
        "num_sgd_iter": lambda: random.randint(1, 30),
        "sgd_minibatch_size": lambda: random.randint(128, 16384),
        "train_batch_size": lambda: random.randint(2000, 160000),
    },
    custom_explore_fn=explore)

# custom parameters hyperoptimization - https://docs.ray.io/en/latest/tune/examples/ax_example.html
# give an env config (from there take out needed values)

ray.init(num_cpus=12, num_gpus=0)
analysis = tune.run(
              "PPO",
              stop={"timesteps_total": 10500000},
              name='Battery_experiment',
              scheduler=pbt,
              mode="max",
              config={                                                     # https://docs.ray.io/en/latest/rllib/rllib-training.html#common-parameters
                  "env": env_name,
                  #"sgd_minibatch_size": tune.choice([128, 512, 2048]),   # doesnt understands what is it
                  #"train_batch_size": tune.choice([int(len(train_data_by_periods[0])/2),len(train_data_by_periods[0]), len(train_data_by_periods[0])*2, len(train_data_by_periods[0])*10]),  # for each one 'll start training again
                  'vf_clip_param': math.inf,
                  "framework": "tf2",
                  "eager_tracing": True,
                  "lr": 5e-6,
              },
              num_samples=15,
              checkpoint_at_end=True,                                     # doesnt matter (even if stopped manually - creates checkpoint at the end)
          )

Using 'HyperBand' scheduler

In [None]:
one_part_of_df = len(train_data_by_periods[0]) - 2

ray.init(num_cpus=12, num_gpus=1)

hyperopt = HyperOptSearch({
    #"rollout_fragment_length": hp.choice("rollout_fragment_length", [256, 512]),
    #"entropy_coeff": hp.choice("entropy_coeff", [1e-2, 1e-3]),
    #"lambda": hp.choice("lambda", [1, 0.5]), # 1, 0.5
    "lr": hp.choice("entropy_coeff", [1e-5]),   # 1e-3, 1e-4
    "vf_loss_coeff": hp.choice("vf_loss_coeff", [0.5]),  # 0.5, 0.25
    "timesteps_per_iteration": one_part_of_df
    },
    metric="episode_reward_mean",
    mode="max",
)

hyperband = AsyncHyperBandScheduler(            # better sort it when it started training (to get rid of failed explorations because otherwise it'll take 2 long to learn)
time_attr="training_iteration",
metric="episode_reward_mean",
max_t=500,           # max time units per trial = TIMESTEPS OF TRAINING (3838 * max_t)
mode="max",
grace_period=50,      # only stop trials at least this old in time [1 is because 1st output will be after exploration despite ts=3838]
)

now = datetime.now().strftime("%Y-%m-%d_%H-%M")

analysis = tune.run(
              "A3C",
              name=now,
              num_samples=10,
              #checkpoint_freq = one_part_of_df,        # helps to draw graphics of 'reward_mean_episode'
              #search_alg=hyperopt,
              #scheduler=hyperband,
              stop={"timesteps_total": 10000000},
              config={
                  "env": env_name,
                  "callbacks": {"on_train_result": tune.function(on_train_result)},
              },
              #checkpoint_at_end=True,
          )

In [None]:
ray.shutdown()

## Iterative training

In [None]:
house_id = 32
INITIAL_CHARGE = 0.0
MODE = 'train'
METADATA_DIR = ''
TRAIN_DATA_DIR = f'/{MODE}'

def env_creator(_):
    data_by_days = get_data(MODE, 0)
    return BatteryEnvTrain(data_by_days.copy(), 0)

env_name = 'BatteryEnvTrain-v0' 
register_env(env_name, env_creator)
os.environ["CUDA_VISIBLE_DEVICES"] = "0,1"

In [None]:
from ray.tune.schedulers import AsyncHyperBandScheduler

data_by_days = get_data(MODE, 0)
length_timesteps = len(data_by_days)*len(data_by_days[0])
number_of_episodes = len(data_by_days)

hyperband = AsyncHyperBandScheduler(            # better sort it when it started training (to get rid of failed explorations because otherwise it'll take 2 long to learn)
time_attr="episodes_total",
metric="episode_reward_mean",         
mode="max",
max_t= length_timesteps, 
grace_period = length_timesteps,
)

register_env(env_name, env_creator)
ray.init(num_cpus=9, num_gpus=0)
analysis = tune.run(
              "PPO",
              stop={"training_iteration": 10},
              name='Battery_experiment',
              #mode="max",
              #verbose=0,
              #scheduler=hyperband,
              config={      
                  "env": env_name,
                  "framework": "tf2",
                  "eager_tracing": True,
                  "lr": 5e-5,
                  'vf_clip_param': math.inf,
              },
              num_samples=3,
              checkpoint_at_end=True,                                     # doesnt matter (even if stopped manually - creates checkpoint at the end)
              checkpoint_freq = 1000,                                  # 1 iter = 200 steps
              #reuse_actors=True,
          )

trial_logdir = analysis.get_best_logdir(metric="episode_reward_mean", mode="max")  
best_checkpoint = analysis.get_best_checkpoint(trial_logdir, metric="episode_reward_mean", mode="max")

ray.shutdown()

number_of_iterations = 200

for i in range(number_of_iterations):
    
    #hyperband = AsyncHyperBandScheduler(            # better sort it when it started training (to get rid of failed explorations because otherwise it'll take 2 long to learn)
    #time_attr="timesteps_total",
    #metric="episode_reward_mean",         
    #mode="max",
    #max_t= (length_timesteps*3)*(i+2), 
    #grace_period = length_timesteps + (length_timesteps*3)*(i+1)
    #)
    
    for j in range(5): print('----------')
    print(f'starts loop {i + 2} / {number_of_iterations} ')
    for j in range(5): print('----------')

    register_env(env_name, env_creator)
    ray.init(num_cpus=9, num_gpus=0)
    analysis = tune.run(
                  "PPO",
                  stop={"training_iteration": 10 * (i + 2)},
                  name='Battery_experiment',
                  #mode="max",
                  #verbose=0,
                  #scheduler=hyperband,
                  config={      
                      "env": env_name,
                      "framework": "tf2",
                      "eager_tracing": True,
                      "lr": 5e-5,
                      'vf_clip_param': math.inf,
                  },
                  restore=best_checkpoint,
                  num_samples=3,
                  checkpoint_at_end=True,                                     # doesnt matter (even if stopped manually - creates checkpoint at the end)
                  checkpoint_freq = 1000,                                  # 1 iter = 200 steps
                  #reuse_actors=True,
              )

    trial_logdir = analysis.get_best_logdir(metric="episode_reward_mean", mode="max")  
    best_checkpoint = analysis.get_best_checkpoint(trial_logdir, metric="episode_reward_mean", mode="max")
    
    ray.shutdown()

from pathlib import Path
f = str(best_checkpoint)
p = str(Path(f).parents[1])
!zip -r /home/notebooks/Battery_experiment_best.zip {p}

In [None]:
number_of_iterations = 200
stopped_iteration = 550

for i in range(number_of_iterations):
    
    hyperband = AsyncHyperBandScheduler(            # better sort it when it started training (to get rid of failed explorations because otherwise it'll take 2 long to learn)
    time_attr="training_iteration",
    metric="episode_reward_mean",         
    mode="max",
    max_t= stopped_iteration + 5*(i+1), 
    grace_period = stopped_iteration + 5*(i)
    )
    
    for j in range(5): print('----------')
    print(f'{i + 2} / {number_of_iterations}  loops')
    print(f'{(i + 2)*5} / {number_of_iterations * 5} iterations')
    for j in range(5): print('----------')

    register_env(env_name, env_creator)
    
    ray.init(num_cpus=9, num_gpus=0)

    analysis = tune.run(
                  "PPO",
                  stop={"training_iteration": stopped_iteration + 10 * (i + 1)},
                  name='Battery_experiment',
                  #mode="max",
                  #verbose=0,
                  #scheduler=hyperband,
                  config={      
                      "env": env_name,
                      "framework": "tf2",
                      "eager_tracing": True,
                      "lr": 5e-5,
                      'vf_clip_param': math.inf,
                  },
                  restore=best_checkpoint,
                  num_samples=3,
                  checkpoint_at_end=True,                                     # doesnt matter (even if stopped manually - creates checkpoint at the end)
                  checkpoint_freq = 1000,                                  # 1 iter = 200 steps
                  #reuse_actors=True,
              )

    trial_logdir = analysis.get_best_logdir(metric="episode_reward_mean", mode="max")  
    best_checkpoint = analysis.get_best_checkpoint(trial_logdir, metric="episode_reward_mean", mode="max")
    
    ray.shutdown()

from pathlib import Path
f = str(best_checkpoint)
p = str(Path(f).parents[1])
!zip -r /home/notebooks/Battery_experiment_best.zip {p}

## Some Training helping

Config

In [None]:
# PERFECT CONFIG SO FAR

hyperopt = HyperOptSearch({
    #"twin_q": hp.choice("twin_q", [False, True]),
    #"critic_lr": hp.choice("critic_lr", [1e-3, 1e-4]),
    #"actor_lr": hp.choice("actor_lr", [1e-3, 1e-4]),
    "train_batch_size": 256),        # by paralleling 1x time for 256, 2x for 512, 4x for 1024 (paralleling helps to train every sample at the same time) [take 256 to make it faster]
    "learning_starts": one_part_of_df * 2,        # per train session (meaning 1 time, not per episode) - x2 seems better
    "timesteps_per_iteration": one_part_of_df,
    #"actor_hidden_activation": hp.choice("actor_hidden_activation", ["relu", "tanh"]),      # idk whats better (leave 'relu' cuz of ray developers)
    #"critic_hidden_activation": hp.choice("critic_hidden_activation", ["relu", "tanh"]),
    },
    metric="episode_reward_mean",
    mode="max",
)


hyperband = AsyncHyperBandScheduler(            # better sort it when it started training (to get rid of failed explorations because otherwise it'll take 2 long to learn)
time_attr="training_iteration",
metric="episode_reward_mean",
max_t=1500000/one_part_of_df,           # max time units per trial = TIMESTEPS OF TRAINING (3838 * 5)
mode="max",
grace_period=2,      # take something like 5-10% of timesteps {to let it train for a while and then delete}
)

In [None]:
# Results
# 1) HyperOptSearch can't understand 'num_atoms', 'noisy', 'hiddens', "objective", 'max_concurrent
#    Need to specify "objective" as "reward_episode_mean" and mode="max"
#-----------------------------------------------------------------


#dir_path = os.path.dirname(os.path.realpath(__file__))

ray.init(num_cpus=6, num_gpus=1)

# HyperOptSearch(space, metric, max_concurrent)
hyperopt = HyperOptSearch(
    {
        "gamma": (1 - hp.loguniform("_gamma", np.log(1e-4), np.log(1e-1))) / 1,
        "lr": hp.loguniform("lr", np.log(1e-6), np.log(1e-3)),
    },
    metric="episode_reward_mean",
    mode="max",
)
'''
hyperopt = HyperOptSearch(
    {
        "gamma": (1 - hp.loguniform("_gamma", np.log(1e-4), np.log(1e-1))) / 1,
        "lr": hp.loguniform("lr", np.log(1e-6), np.log(1e-3)),

        "num_atoms": hp.choice("num_atoms", [1, 51]),
        "noisy": hp.choice("noisy", [False, True]),
        "hiddens": hp.choice(
            "hiddens",
            [
                [scope.int(64 * (2 ** hp.quniform("_layer_1_1", 0, 3, 1)))],
                [
                    scope.int(64 * (2 ** hp.quniform("_layer_2_1", 0, 3, 1))),
                    scope.int(64 * (2 ** hp.quniform("_layer_2_2", 0, 3, 1))),
                ],
            ],
        ),
    },
    metric="objective",
    mode="max",
    #max_concurrent=32,  # old (use tune.suggest.ConcurrencyLimiter() ???)
)
'''

# Scheduler 'sorting' = deleting 'bad' trials (number of 'time' that trial lives = max_t  {because time_attr = 'training_iteration'})
#
hyperband = AsyncHyperBandScheduler(
    time_attr="number_of_trials", metric="episode_reward_mean", mode="max", max_t=20
)

now = datetime.now().strftime("%Y-%m-%d_%H-%M")

analysis = tune.run(
              "DDPG",
              name=now,
              num_samples=2,
              search_alg=hyperopt,
              scheduler=hyperband,
              stop={"timesteps_total": 300000},
              config={
                  "env": env_name,
                  #"num_gpus": np.clip(num_gpus / num_cpus, 0, 1),
                  "callbacks": {"on_train_result": tune.function(on_train_result)},
              },
              checkpoint_at_end=True,                                     # doesnt matter (even if stopped manually - creates checkpoint at the end)
              #local_dir=os.path.join(dir_path, "ray_results"),
          )


best_trials = get_best_trials(trials, "episode_reward_mean")
best_trial = best_trials[0]

agent = get_agent(best_trial)

print(
    "best score: {}, config: {}, checkpoint: {}".format(
        best_trial.last_result["episode_reward_mean"],
        best_trial.config,
        best_trial._checkpoint.value,
    )
)

remove_checkpoints(best_trials[1:])

rollout(agent, env_id, 1000, no_render=True)



# Evaluation

Getting data

In [6]:
import psutil
ray._private.utils.get_system_memory = lambda: psutil.virtual_memory().total
import ray.rllib.agents.pg as pg
import ray.rllib.agents.ppo as ppo
import numpy as np

def get_data(mode, additions):      # additions [0 - no forecast, 1 - with forecast]
    train_data_not_normalized = pd.read_csv(f'{TRAIN_DATA_DIR}/{house_id}.csv', parse_dates=['timestamp'], sep=";")
    train_data = train_data_not_normalized.copy()
    train_data = train_data[~train_data.isnull()]
    if additions == 0:
        train_data = pd.concat([train_data.iloc[:, :5].copy(), train_data['price_buy_00'], train_data['price_sell_00']], axis=1)
    else:
        train_data = pd.concat([train_data.iloc[:, :10].copy(), train_data.iloc[:, 101:106].copy(), train_data['price_buy_00'], train_data['price_sell_00']], axis=1)
    train_data_by_periods = [train_data[train_data['period_id'] == i] for i in np.sort(train_data['period_id'].unique())]
    for i in range(len(train_data_by_periods)):
            train_data_by_periods[i] = train_data_by_periods[i].drop(columns = 'site_id')
            train_data_by_periods[i] = train_data_by_periods[i].drop(columns = 'period_id')
            train_data_by_periods[i] = train_data_by_periods[i].sort_values(by = 'timestamp')
            train_data_by_periods[i] = train_data_by_periods[i].reset_index()
            train_data_by_periods[i] = train_data_by_periods[i].drop(columns = 'index')
    if additions:
        for period in range(len(train_data_by_periods)):
            train_data_by_periods[period] = train_data_by_periods[period].drop(train_data_by_periods[period].index[(len(train_data_by_periods[period])-95) : len(train_data_by_periods[period])])
    train_data_by_days = []
    for i in range(len(train_data_by_periods)):
        j = 0
        while (j+1)*96 + 1 <= len(train_data_by_periods[i]):
            train_data_by_days.append(train_data_by_periods[i][j*96:(j+1)*96 + 1])
            j += 1
    for i in range(len(train_data_by_days)):
        train_data_by_days[i] = train_data_by_days[i].reset_index()
        train_data_by_days[i] = train_data_by_days[i].drop(columns = 'index')
    return train_data_by_days

evaluating

In [71]:
#checkpoint_path = '/home/ray_results/Battery_experiment/PPO_BatteryEnvTrain-v0_44ee4_00000_0_2022-05-01_13-54-50/checkpoint_000500/checkpoint-500'
checkpoint_path = best_checkpoint
RL_algorithm = 'ppo'

env_name = 'BatteryEnvTrain-v0' 

scores = []
outside_capacity = []

# calculating for different seeds
rang = 5    

print('------------')
print('house_id = ', house_id)
print('mode = ', MODE)
print('algorithm = ', RL_algorithm) 

for i in range(rang):
    print(i+1, f"/ {rang}")
    env = env_creator(house_id)

    if RL_algorithm == 'ppo':
        config = ppo.DEFAULT_CONFIG.copy()
        custom_config = {"env": env_name, 'vf_clip_param': math.inf, "framework": "tf2", "eager_tracing": True, "lr": 5e-5,}
        for key in custom_config.keys():
            config[key] = custom_config[key]
        agent = ppo.PPOTrainer(config=config, env=env_name) 
    elif RL_algorithm == 'pg':
        config = pg.DEFAULT_CONFIG.copy()
        custom_config = {"env": env_name, "framework": "tf2", "eager_tracing": True, "lr": 5e-5,}
        for key in custom_config.keys():
            config[key] = custom_config[key]
        agent = pg.PGTrainer(config=config, env=env_name) 

    agent.restore(checkpoint_path)                                  # restore agent's weights (that were made during training)
    
    # ------ LOOP FOR EVALUATING
    while (env.terminal != True) and (env.days_step != (len(env.df_days) - 1)):
        action = agent.compute_single_action(env.state)
        env.step(action)
        if (env.terminal == True) and (env.days_step != (len(env.df_days) - 1)):
            env.reset()
        elif (env.terminal == True) and (env.days_step == (len(env.df_days) - 1)):
            #print("score = ", (env.money_spent_cumulative - env.money_spent_without_battery_cumulative)/np.abs(env.money_spent_without_battery_cumulative))
            scores.append((env.money_spent_cumulative - env.money_spent_without_battery_cumulative)/np.abs(env.money_spent_without_battery_cumulative))
            outside_capacity.append(np.sum(env.agent_outside_capacity_memory))
            
print("scores average = ", np.average(scores))
print("outside capacity average = ", np.average(outside_capacity))
print('------------')
#Initialize env
#While 'episode'
#    action = agent(state)
#    info = env.step(state)
#
#The rest info in inside 'env' class [score is there too]

------------
house_id =  32
mode =  submit
algorithm =  ppo
1 / 5


[2m[36m(RolloutWorker pid=3973486)[0m   logger.warn("Casting input x to numpy array.")
[2m[36m(RolloutWorker pid=3973484)[0m   logger.warn("Casting input x to numpy array.")
2022-05-02 16:30:32,697	INFO trainable.py:534 -- Restored on 10.3.0.4 from checkpoint: /home/dinveel/ray_results/Battery_experiment/PPO_BatteryEnvTrain-v0_71bdb_00001_1_2022-05-02_16-18-14/checkpoint_000550/checkpoint-550
2022-05-02 16:30:32,698	INFO trainable.py:543 -- Current state after restoring: {'_iteration': 550, '_timesteps_total': 2200000, '_time_total': 6483.102120399475, '_episodes_total': 23100}


2 / 5


[2m[36m(RolloutWorker pid=3974023)[0m   logger.warn("Casting input x to numpy array.")
[2m[36m(RolloutWorker pid=3974021)[0m   logger.warn("Casting input x to numpy array.")
2022-05-02 16:30:46,318	INFO trainable.py:534 -- Restored on 10.3.0.4 from checkpoint: /home/dinveel/ray_results/Battery_experiment/PPO_BatteryEnvTrain-v0_71bdb_00001_1_2022-05-02_16-18-14/checkpoint_000550/checkpoint-550
2022-05-02 16:30:46,318	INFO trainable.py:543 -- Current state after restoring: {'_iteration': 550, '_timesteps_total': 2200000, '_time_total': 6483.102120399475, '_episodes_total': 23100}


3 / 5


[2m[36m(RolloutWorker pid=3974591)[0m   logger.warn("Casting input x to numpy array.")
[2m[36m(RolloutWorker pid=3974589)[0m   logger.warn("Casting input x to numpy array.")
2022-05-02 16:30:59,687	INFO trainable.py:534 -- Restored on 10.3.0.4 from checkpoint: /home/dinveel/ray_results/Battery_experiment/PPO_BatteryEnvTrain-v0_71bdb_00001_1_2022-05-02_16-18-14/checkpoint_000550/checkpoint-550
2022-05-02 16:30:59,687	INFO trainable.py:543 -- Current state after restoring: {'_iteration': 550, '_timesteps_total': 2200000, '_time_total': 6483.102120399475, '_episodes_total': 23100}


4 / 5


[2m[36m(RolloutWorker pid=3975179)[0m   logger.warn("Casting input x to numpy array.")
[2m[36m(RolloutWorker pid=3975178)[0m   logger.warn("Casting input x to numpy array.")
2022-05-02 16:31:12,994	INFO trainable.py:534 -- Restored on 10.3.0.4 from checkpoint: /home/dinveel/ray_results/Battery_experiment/PPO_BatteryEnvTrain-v0_71bdb_00001_1_2022-05-02_16-18-14/checkpoint_000550/checkpoint-550
2022-05-02 16:31:12,995	INFO trainable.py:543 -- Current state after restoring: {'_iteration': 550, '_timesteps_total': 2200000, '_time_total': 6483.102120399475, '_episodes_total': 23100}


5 / 5


[2m[36m(RolloutWorker pid=3975751)[0m   logger.warn("Casting input x to numpy array.")
[2m[36m(RolloutWorker pid=3975754)[0m   logger.warn("Casting input x to numpy array.")
2022-05-02 16:31:26,341	INFO trainable.py:534 -- Restored on 10.3.0.4 from checkpoint: /home/dinveel/ray_results/Battery_experiment/PPO_BatteryEnvTrain-v0_71bdb_00001_1_2022-05-02_16-18-14/checkpoint_000550/checkpoint-550
2022-05-02 16:31:26,342	INFO trainable.py:543 -- Current state after restoring: {'_iteration': 550, '_timesteps_total': 2200000, '_time_total': 6483.102120399475, '_episodes_total': 23100}


scores average =  -0.05023388
outside capacity average =  0.0
------------


In [84]:
ray.shutdown()

# Cleaning

In [78]:
!ls ~/ray_results #/Battery_experiment/PG_BatteryEnvTrain-v0_70a76_00000_0_2022-04-30_11-55-33/checkpoint_007000

Battery_experiment


In [85]:
!rm -rf ~/ray_results
!mkdir ~/ray_results

In [None]:
!zip -r ~/notebooks/Battery_experiment.zip ~/ray_results/

In [None]:
%reload_ext tensorboard
%tensorboard --logdir ~/ray_results