In [5]:
import gymnasium as gym
import pickle
from VALIDATION.KeyRef_2.KeyRef_2_env import KeyRef2_Env
from stable_baselines3.common.vec_env     import DummyVecEnv
from stable_baselines3.common.callbacks   import BaseCallback, EvalCallback
from stable_baselines3.common.monitor     import Monitor
from stable_baselines3.common.env_checker import check_env
from stable_baselines3 import DQN
import os
import datetime
import pandas as pd
from stable_baselines3.common.callbacks import BaseCallback

K = 30
planning_horizon = 480*60
ReworkProbability = 0.03

with open('VALIDATION/SMALL/pickle_JA_valid_scenarios_480.pkl', 'rb') as f:
    valid_scenarios = pickle.load(f)

env = KeyRef2_Env(K, planning_horizon, ReworkProbability, valid_scenarios)

# check_env(env)
# obs = env.reset(seed=42)
# print("Observation:", obs)

# episodes = 2
# for episode in range(episodes):
# 	done = False
# 	obs = env.reset()
# 	while done == False:#not done:
# 		random_action = env.action_space.sample()
# 		obs, reward, done, truncated, info = env.step(random_action)
# 		print('reward', reward)

In [6]:
action_list = ["CDR1", "CDR2", "CDR3", "CDR4", "CDR5", "CDR6"]
                          
# Create directories for models and logs
current_time = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
models_dir = f"models/keyref2-{current_time}"
logdir = f"logs/keyref2-{current_time}"
log_training_txt_dir = "keyref2_log_training_txt"
log_training_excel_dir = "keyref2_log_training_excel"

if not os.path.exists(models_dir):
    os.makedirs(models_dir)
if not os.path.exists(logdir):
    os.makedirs(logdir)
if not os.path.exists(log_training_txt_dir):
    os.makedirs(log_training_txt_dir)
if not os.path.exists(log_training_excel_dir):
    os.makedirs(log_training_excel_dir)

# Generate unique file names based on current time
log_file          = os.path.join(log_training_txt_dir,   f"training_keyref2_{current_time}.txt")
excel_file        = os.path.join(log_training_excel_dir, f"training_keyref2_{current_time}.xlsx")
action_count_file = os.path.join(log_training_txt_dir,   f"action_count_keyref2_{current_time}.txt")
action_excel_file = os.path.join(log_training_excel_dir, f"action_count_keyref2_{current_time}.xlsx")

# Define the custom callback -------------------------------------------------------------
class CustomCallback(BaseCallback):
    def __init__(self, log_dir, excel_file, txt_file, action_count_file, action_excel_file, verbose=0):
        super(CustomCallback, self).__init__(verbose)
        self.log_dir = log_dir
        self.excel_file = excel_file
        self.txt_file = txt_file
        self.action_count_file = action_count_file
        self.action_excel_file = action_excel_file
        self.logs = []
        self.episode_rewards = []
        self.action_counts = {}
        self.episode_start = True

    def _on_training_start(self) -> None:
        # Initialize action counts
        self.action_counts = {action: 0 for action in action_list}

    def _on_step(self) -> bool:
        if self.episode_start:
            self.episode_rewards.append(0)
            self.episode_start = False

        # Record reward for the current step
        reward = self.locals['rewards'][0]
        self.episode_rewards[-1] += reward

        # Increment action count
        action = self.locals.get('actions', None)
        if action is not None:
            action_name = action_list[action[0]]
            self.action_counts[action_name] += 1
        
        return True

    def _on_rollout_end(self) -> None:
        # Called at the end of each episode
        sum_reward   = self.episode_rewards[-1] if self.episode_rewards else 0
        
        self.logger.record('train/episode_reward',   sum_reward)
        
        self.logs.append({
            'sum_reward': sum_reward,
        })

        self.episode_start = True

    
    def _on_training_end(self) -> None:
        # Save logs to Excel
        df = pd.DataFrame(self.logs)
        df.to_excel(self.excel_file, index=False)

        action_df = pd.DataFrame(list(self.action_counts.items()), columns=['Action', 'Count'])
        action_df.to_excel(self.action_excel_file, index=False)

        # Save logs to text file
        with open(self.txt_file, 'w') as f:
            f.write(df.to_string(index=False))
        with open(self.action_count_file, 'w') as f:
            f.write(action_df.to_string(index=False))

# Create the callback
callback = CustomCallback(log_dir=logdir, 
                          excel_file=excel_file,
                          txt_file=log_file,
                          action_count_file=action_count_file,
                          action_excel_file=action_excel_file,
                          verbose=1)

import torch
import torch.nn as nn
import torch.nn.functional as F
from gymnasium import spaces
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor


class CustomFeaturesExtractor(BaseFeaturesExtractor):
    """
    Custom feature extractor for DQN.
    
    :param observation_space: (spaces.Box)
    """

    def __init__(self, observation_space: spaces.Box):
        super(CustomFeaturesExtractor, self).__init__(observation_space, features_dim=6)  # Output features_dim matches last layer's output
        n_input_nodes = observation_space.shape[0]
        self.fc1 = nn.Linear(n_input_nodes, 30)
        self.fc2 = nn.Linear(30, 30)
        self.fc3 = nn.Linear(30, 30)
        self.fc4 = nn.Linear(30, 30)
        self.fc5 = nn.Linear(30, 30)
        self.fc6 = nn.Linear(30, 6)  # Output layer with 6 nodes

    def forward(self, observations: torch.Tensor) -> torch.Tensor:
        x = F.tanh(self.fc1(observations))
        x = F.tanh(self.fc2(x))
        x = F.tanh(self.fc3(x))
        x = F.tanh(self.fc4(x))
        x = F.tanh(self.fc5(x))
        x = self.fc6(x)  # Output layer
        return x

# Define policy_kwargs for DQN model
policy_kwargs = dict(
    features_extractor_class=CustomFeaturesExtractor,
)


model_path = os.path.join(models_dir, "DQN_.zip")
# Initialize DQN using the custom model
model = DQN(
    'MlpPolicy',  # Use a Multi-layer Perceptron (MLP) policy
    env,  # Your RL environment
    policy_kwargs=policy_kwargs,
    buffer_size=1000,  # Replay buffer size N
    batch_size=32,  # Batch size
    gamma=0.9,  # Discount factor
    tau=0.01,  # Soft target update strategy
    exploration_initial_eps=0.5,  # Initial epsilon
    exploration_final_eps=0.1,  # Final epsilon
    verbose=1
)
model.learn(total_timesteps=8000, 
            tb_log_name="KeyRef2",
            log_interval=10,
            reset_num_timesteps=False,
            callback=callback)
model.save(model_path)


Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


In [7]:
import numpy as np


model = DQN.load(model_path, env=env)
def softmax_action_selection(model, obs, mu):
    obs_tensor = torch.tensor(obs, dtype=torch.float32).unsqueeze(0).to(model.device)
    q_values = model.q_net(obs_tensor).detach().cpu().numpy().flatten()
    exp_q_values = np.exp(mu * q_values)
    probabilities = exp_q_values / np.sum(exp_q_values)
    action = np.random.choice(len(q_values), p=probabilities)
    return action

results = []
method = 'keyref2'
InstanceList = [f'valid{i+1}' for i in range(12)]
ScenarioList = ['A', 'B', 'C']

mu = 1.6 
for instance_id in InstanceList:
    print("-----------", instance_id)
    for scenario_id in ScenarioList:
        print("-----", scenario_id)
        # Reset the environment with the new dataset
        env.reset(test=True, 
                  datatest=instance_id, 
                  scenariotest=scenario_id)
        
        obs, info = env.reset()
        done = False
        
        while not done:
            action = softmax_action_selection(model, obs, mu)
            obs, reward, done, truncated, info = env.step(action)
        
        tardiness = env.calc_tardiness()
        print(tardiness)
        results.append({
                        'Method'    : method,
                        'InstanceID': instance_id,
                        'ScenarioID': scenario_id,
                        'Tardiness' : tardiness
                        })

df = pd.DataFrame(results)
file_name = f"VALIDATION/keyref2.xlsx"
df.to_excel(file_name, index=False)

Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
----------- valid1
----- A
1443890.0
----- B
1795000.0
----- C
3298930.0
----------- valid2
----- A
3689580.0
----- B
1373530.0
----- C
2319898.0
----------- valid3
----- A
1108820.0
----- B
1470690.0
----- C
3187460.0
----------- valid4
----- A
3310394.666666667
----- B
2887510.0
----- C
2418050.0
----------- valid5
----- A
1582290.0
----- B
8039950.0
----- C
831920.0
----------- valid6
----- A
5232024.166666667
----- B
5251580.0
----- C
3377210.0
----------- valid7
----- A
2341950.0
----- B
2080293.1428571427
----- C
6111294.666666667
----------- valid8
----- A
7956090.0
----- B
9151610.0
----- C
1691150.0
----------- valid9
----- A
1456290.0
----- B
8481880.0
----- C
3028400.0
----------- valid10
----- A
2819677.3333333335
----- B
3162220.0
----- C
9034940.0
----------- valid11
----- A
2070960.0
----- B
8999480.0
----- C
2932230.0
----------- valid12
----- A
980910.0
----- B
3729660.0
----- C
9754930.0
