In [1]:
import os
import numpy as np
import random
import gym
import time
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from stable_baselines3 import DQN
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.logger import configure


In [2]:

data = np.load('C:\\Users\\Efe\\Desktop\\custom_dataset.npz') #Load the custom image dataset created with RawDatato28x28 script 
X_train = data['X_train']  # Shape should be (num_samples, 28, 28, 1) 
X_test = data['X_test']
y_train = data['y_train']  # One-hot encoded labels
y_test = data['y_test']

print("Custom image dataset loaded successfully.")
print(f"Training set shape: {X_train.shape}")
print(f"Test set shape: {X_test.shape}")


Custom image dataset loaded successfully.
Training set shape: (109708, 28, 28, 1)
Test set shape: (27427, 28, 28, 1)


In [3]:
# Convert datasets to PyTorch tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_train_tensor = torch.tensor(np.argmax(y_train, axis=1), dtype=torch.long)  # Convert from one-hot to class indices
y_test_tensor = torch.tensor(np.argmax(y_test, axis=1), dtype=torch.long)


In [4]:
class CustomEnv(gym.Env):
    def __init__(self, images_per_episode=1, dataset=(X_train_tensor, y_train_tensor), random=True):
        super().__init__()

        self.action_space = gym.spaces.Discrete(8)  # Assuming 8 unique labels
        self.observation_space = gym.spaces.Box(low=0, high=1, shape=(28, 28, 1), dtype=np.float32)

        self.images_per_episode = images_per_episode
        self.step_count = 0

        self.x, self.y = dataset
        self.random = random
        self.dataset_idx = 0

    def step(self, action):
        done = False
        reward = int(action == self.expected_action)

        obs = self._next_obs()

        self.step_count += 1
        if self.step_count >= self.images_per_episode:
            done = True

        return obs, reward, done, {}

    def reset(self):
        self.step_count = 0
        obs = self._next_obs()
        return obs

    def _next_obs(self):
        if self.random:
            next_obs_idx = np.random.randint(0, len(self.x))
            self.expected_action = int(self.y[next_obs_idx])  # Get the label from the tensor
            obs = self.x[next_obs_idx].numpy()
        else:
            obs = self.x[self.dataset_idx].numpy()
            self.expected_action = int(self.y[self.dataset_idx])  # Get the label from the tensor

            self.dataset_idx += 1
            if self.dataset_idx >= len(self.x):
                self.dataset_idx = 0

        return obs


In [5]:
class SimpleNN(nn.Module):
    def __init__(self, input_shape=(28, 28, 1), num_actions=8):
        super(SimpleNN, self).__init__()
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(28 * 28 * 1, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, num_actions)

    def forward(self, x):
        x = self.flatten(x)
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x


In [6]:
from tqdm import tqdm
from stable_baselines3 import DQN
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.logger import configure

# Wrapping the environment for DQN training using Stable-Baselines3
env = DummyVecEnv([lambda: CustomEnv(images_per_episode=1)])

def custom_dqn():
    # Configure the logger for stable-baselines3
    new_logger = configure('./logs/custom_dqn', ["tensorboard"])  # Keep only tensorboard logging to avoid excessive console output

    # Create DQN model with Stable Baselines3 using MlpPolicy
    model = DQN('MlpPolicy', env, verbose=0, tensorboard_log="./custom_dqn_tensorboard/", learning_rate=1e-4)

    # Set the logger for the model
    model.set_logger(new_logger)

    start_time = time.time()

    # Initialize tqdm progress bar
    total_timesteps = int(219416)  # Adjust the total timesteps as needed, this one goes through each training image twice as there were initially 109708 training images
    with tqdm(total=total_timesteps, desc="Training DQN", unit="step") as pbar:
        # Override the model.learn() method to manually update the progress bar
        for _ in range(total_timesteps):
            model.learn(total_timesteps=1, reset_num_timesteps=False)
            pbar.update(1)  # Update progress bar by 1 step

    print("DQN Training Time:", time.time() - start_time)

    # Save the model’s policy and parameters
    model.policy.save('custom_dqn_policy')
    model.save_replay_buffer('custom_dqn_replay_buffer')

    # Close the environment
    env.close()

    return model

# Train the DQN model with the custom dataset
dqn_model = custom_dqn()


Training DQN: 100%|██████████| 219416/219416 [31:37<00:00, 115.65step/s]


DQN Training Time: 1897.2116219997406


Took me around 30 minutes to train and save the DQN. I have no idea how to reduce the training time.

I initially tested this with MNIST handwritten numbers dataset. It has 60000 training samples in the same format as this one. It needed 4 times less epochs as this to reach an accuracy above 90% and plateau (which took like 6 mins.) 

In [7]:
def custom_dqn_eval(dqn_model):
    eval_env = DummyVecEnv([lambda: CustomEnv(images_per_episode=1, dataset=(X_test_tensor, y_test_tensor), random=False)])

    attempts, correct = 0, 0

    for _ in range(100):  # Evaluate on 100 episodes
        obs = eval_env.reset()
        done = False
        while not done:
            action, _states = dqn_model.predict(obs)
            obs, reward, done, _ = eval_env.step(action)
            attempts += 1
            if reward > 0:
                correct += 1

    accuracy = (float(correct) / attempts) * 100
    print(f'Validation done...\nAccuracy: {accuracy}%')

# Evaluate the DQN model on the test set
custom_dqn_eval(dqn_model)


Validation done...
Accuracy: 94.0%


