In [10]:
import gym
import numpy as np
from gym import spaces
from collections import deque
import random

class AdaptiveEnv(gym.Env):
    def __init__(self, n_classes=4, signal_length=128, window_size=1000, drift_interval=100):
        super(AdaptiveEnv, self).__init__()
        
        # Observation and action spaces
        self.observation_space = spaces.Box(low=-1.0, high=1.0, shape=(signal_length,), dtype=np.float32)
        self.action_space = spaces.Discrete(n_classes)
        
        # Simulated data generation settings
        self.n_classes = n_classes
        self.signal_length = signal_length
        self.window_size = window_size
        self.drift_interval = drift_interval
        self.current_step = 0

    def _generate_signal(self, class_id):
        """Generate a random signal for a given class with slight drift over time."""
        base_signal = np.sin(2 * np.pi * np.linspace(0, 1, self.signal_length) * (class_id + 1))
        noise = np.random.normal(0, 0.1, self.signal_length)
        drift = 0.05 * np.sin(0.1 * self.current_step)  # A small drift effect
        return base_signal + noise + drift

    def reset(self):
        """Reset environment to initial conditions."""
        self.current_step = 0
        self.current_class = random.randint(0, self.n_classes - 1)
        return self._generate_signal(self.current_class)
    
    def step(self, action):
        """Take an action and get the next state, reward, done flag, and additional info."""
        reward = 1.0 if action == self.current_class else -1.0
        self.current_step += 1

        # Introduce a new class (signal change) every `drift_interval` steps
        if self.current_step % self.drift_interval == 0:
            self.current_class = (self.current_class + 1) % self.n_classes

        # Generate the next signal
        next_state = self._generate_signal(self.current_class)
        done = self.current_step >= self.window_size
        return next_state, reward, done, {}


In [17]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor

class AdaptiveFeatureExtractor(BaseFeaturesExtractor):
    def __init__(self, observation_space):
        # The feature dimension to output
        features_dim = 128  # Set this as a constant or based on your requirements
        super(AdaptiveFeatureExtractor, self).__init__(observation_space, features_dim)
        
        input_dim = observation_space.shape[0]
        self.conv1 = nn.Conv1d(1, 16, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv1d(16, 32, kernel_size=3, stride=1, padding=1)
        self.fc = nn.Linear(32 * input_dim, features_dim)
        self.dropout = nn.Dropout(p=0.1)

    def forward(self, x):
        x = x.unsqueeze(1)  # Add channel dimension
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x


In [18]:
from stable_baselines3 import DQN

def setup_adaptive_dqn(env):
    policy_kwargs = dict(
        features_extractor_class=AdaptiveFeatureExtractor,
        features_extractor_kwargs={}  # No need to pass features_dim here
    )
    model = DQN("MlpPolicy", env, policy_kwargs=policy_kwargs, verbose=1)
    return model


In [19]:
from stable_baselines3.common.vec_env import DummyVecEnv

# Create the environment and wrap it
env = DummyVecEnv([lambda: AdaptiveEnv()])

# Instantiate the adaptive DQN model
model = setup_adaptive_dqn(env)

# Train the model
model.learn(total_timesteps=20000)


Using cpu device
----------------------------------
| rollout/            |          |
|    exploration_rate | 0.05     |
| time/               |          |
|    episodes         | 4        |
|    fps              | 54       |
|    time_elapsed     | 73       |
|    total_timesteps  | 4000     |
| train/              |          |
|    learning_rate    | 0.0001   |
|    loss             | 8.96e-05 |
|    n_updates        | 974      |
----------------------------------
----------------------------------
| rollout/            |          |
|    exploration_rate | 0.05     |
| time/               |          |
|    episodes         | 8        |
|    fps              | 47       |
|    time_elapsed     | 167      |
|    total_timesteps  | 8000     |
| train/              |          |
|    learning_rate    | 0.0001   |
|    loss             | 6.35e-05 |
|    n_updates        | 1974     |
----------------------------------
----------------------------------
| rollout/            |          |
|  

<stable_baselines3.dqn.dqn.DQN at 0x149e2586030>

In [20]:
# Set parameters
num_episodes = 10  # Define the number of episodes you want to test the model for

# Initialize metrics
total_rewards = []

for episode in range(num_episodes):
    obs = env.reset()
    done = False
    episode_reward = 0
    
    while not done:
        # Predict the action based on the current observation
        action, _ = model.predict(obs, deterministic=True)  # deterministic=True for evaluation
        
        # Take action in the environment
        obs, reward, done, info = env.step(action)
        
        # Accumulate rewards for this episode
        episode_reward += reward

    total_rewards.append(episode_reward)
    print(f"Episode {episode + 1}: Reward = {episode_reward}")

# Calculate average reward over episodes
average_reward = sum(total_rewards) / num_episodes
print(f"Average reward over {num_episodes} episodes: {average_reward}")


Episode 1: Reward = [1000.]
Episode 2: Reward = [1000.]
Episode 3: Reward = [1000.]
Episode 4: Reward = [1000.]
Episode 5: Reward = [1000.]
Episode 6: Reward = [1000.]
Episode 7: Reward = [1000.]
Episode 8: Reward = [1000.]
Episode 9: Reward = [1000.]
Episode 10: Reward = [1000.]
Average reward over 10 episodes: [1000.]
