In [5]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from gym import Env
from gym.spaces import Discrete, Box
import numpy as np

In [7]:
# Define activations separately for indexing and instantiation
activation_names = ['sigmoid', 'tanh', 'relu', 'elu', 'selu', 'swish']
activation_map = {
    'sigmoid': nn.Sigmoid,
    'tanh': nn.Tanh,
    'relu': nn.ReLU,
    'elu': nn.ELU,
    'selu': nn.SELU,
    'swish': lambda: nn.SiLU(),  # swish ≈ SiLU in PyTorch
}

def decode_action(action_id):
    if 0 <= action_id <= 41:  # Dense Layer with some number of units
        units_list = [8, 16, 32, 64, 128, 256, 512]
        units_idx = action_id // len(activation_names)
        act_idx = action_id % len(activation_names)
        units = units_list[units_idx]
        activation_name = activation_names[act_idx]
        activation_fn = activation_map[activation_name]
        return ('dense', units, activation_name)  

    elif 42 <= action_id <= 44:  # Dropout layer
        dropout_rates = [0.0, 0.2, 0.5]
        return ('dropout', dropout_rates[action_id - 42])

    elif action_id == 45:  # BatchNorm layer
        return ('batchnorm',)

    elif action_id == 46:  # Stop building layers
        return ('stop',)

    else:
        raise ValueError(f"Invalid action id: {action_id}")


In [9]:
# look into changing this next time, add data specific features and start finding new datasets

In [15]:
class NASMLPEnv(Env):
    def __init__(self, dataset, max_layers=10):
        self.dataset = dataset  
        self.max_layers = max_layers
        self.dataset_features = self._compute_dataset_features(dataset) 
        self.action_space = Discrete(47)
        num_dataset_features = 4  
        max_possible_layers = max_layers + 10  
        self.observation_space = Box(
            low=0, high=1,
            shape=(max_possible_layers * 3 + num_dataset_features,),
            dtype=np.float32
        )
        self.architecture = []
        self.done = False

        self.best_architecture = None
        self.best_reward = -float('inf')

        self.architecture_log = []

        self.max_parameters = self._estimate_max_parameters()

    def _compute_dataset_features(self, dataset):
        X_train, y_train, X_val, y_val = dataset
        n_rows, n_features = X_train.shape
        feature_std = X_train.std(dim=0).mean().item()
        class_balance = y_train.sum(dim=0) / y_train.shape[0]
        class_balance = class_balance.max().item()  

        return np.array([
            n_rows / 1e5,            
            n_features / 1e3,        
            feature_std / 10.0,       
            class_balance             
        ])

    def reset(self):
        self.architecture = []
        self.done = False
        return self._get_obs()

    def _estimate_max_parameters(self):
        input_dim = self.dataset[0].shape[1]  
        max_units = 512
        total_params = 0
    
        for _ in range(self.max_layers):
            total_params += input_dim * max_units + max_units  
            input_dim = max_units
    
        output_dim = self.dataset[1].shape[1]
        total_params += max_units * output_dim + output_dim
    
        return total_params


    def step(self, action_id):
        decoded = decode_action(action_id)

        dense_count = sum(1 for layer in self.architecture if layer[0] == 'dense')
        if decoded[0] == 'stop' or (decoded[0] == 'dense' and dense_count >= self.max_layers):
            self.done = True
            reward, acc, complexity = self._evaluate_model()  

            # Log architecture, reward, accuracy, and complexity
            self.architecture_log.append({
                'architecture': list(self.architecture),
                'reward': reward,
                'accuracy': acc,
                'complexity': complexity
            })
    
            print(f"\n🎯 Final Architecture: {self.architecture}")
            print(f"🏆 Validation Accuracy (acc): {acc:.2f}%")
            print(f"⚙️  Complexity (number of layers): {complexity}")
            print(f"🏅 Reward (acc - penalty): {reward:.2f}%\n")

            # Track best
            if reward > self.best_reward:
                self.best_reward = reward
                self.best_architecture = list(self.architecture)
                self.best_accuracy = acc  
                self.best_complexity = complexity 
                print(f"🌟 New Best Architecture Found with Reward: {reward:.2f}%")
        else:
            self.architecture.append(decoded)
            reward = 0

        return self._get_obs(), reward, self.done, {}

    def _get_obs(self):
        obs = np.zeros(self.observation_space.shape[0])
        for i, layer in enumerate(self.architecture):
            base = i * 3
            if layer[0] == 'dense':
                obs[base] = layer[1] / 512  # normalized units
                obs[base + 1] = activation_names.index(layer[2]) / (len(activation_names) - 1)
                obs[base + 2] = 0
            elif layer[0] == 'dropout':
                obs[base] = layer[1]
                obs[base + 1] = -1
                obs[base + 2] = 1
            elif layer[0] == 'batchnorm':
                obs[base] = -1
                obs[base + 1] = -1
                obs[base + 2] = 2
        obs[-len(self.dataset_features):] = self.dataset_features
        return obs

    def _evaluate_model(self):
        X_train, y_train, X_val, y_val = self.dataset

        model = nn.Sequential()
        input_dim = X_train.shape[1]

        for i, layer in enumerate(self.architecture):
            if layer[0] == 'dense':
                model.add_module(f"fc{i}", nn.Linear(input_dim, layer[1]))
                model.add_module(f"act{i}", activation_map[layer[2]]())
                input_dim = layer[1]
            elif layer[0] == 'dropout':
                model.add_module(f"dropout{i}", nn.Dropout(p=layer[1]))
            elif layer[0] == 'batchnorm':
                model.add_module(f"bn{i}", nn.BatchNorm1d(input_dim))

        model.add_module("output", nn.Linear(input_dim, y_train.shape[1]))

        loss_fn = nn.CrossEntropyLoss()
        optimizer = optim.Adam(model.parameters(), lr=0.01) # can make the RL choose optimizer and LR also here
        train_loader = DataLoader(TensorDataset(X_train, y_train), batch_size=64, shuffle=True)

        model.train()
        for _ in range(10):  
            for xb, yb in train_loader:
                optimizer.zero_grad()
                output = model(xb)
                loss = loss_fn(output, yb)
                loss.backward()
                optimizer.step()

        model.eval()
        with torch.no_grad():
            preds = model(X_val)
            acc = (preds.argmax(dim=1) == y_val.argmax(dim=1)).float().mean().item()

        no_of_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
        max_params = self._estimate_max_parameters()
        alpha = 0.8 

        reward = (alpha * acc * 100) - ((1 - alpha) * (no_of_params*100/max_params))

        return reward, acc * 100, (no_of_params*100/max_params)

In [17]:
import pandas as pd
import torch
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
from torch.nn.functional import one_hot

import openml
dataset = openml.datasets.get_dataset(23512)
X, y, _, _ = dataset.get_data(target=dataset.default_target_attribute)

df = pd.concat([X, y], axis=1)

df.dropna(inplace=True)

X = df.drop(columns=[dataset.default_target_attribute])
y = df[dataset.default_target_attribute]

scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

le = LabelEncoder()
y_encoded = le.fit_transform(y)  

y_oh = one_hot(torch.tensor(y_encoded)).float()

X_train_np, X_val_np, y_train_tensor, y_val_tensor = train_test_split(
    X_scaled, y_oh, test_size=0.1, random_state=42)

X_train_tensor = torch.tensor(X_train_np).float()
X_val_tensor = torch.tensor(X_val_np).float()

In [18]:
dataset = (X_train_tensor, y_train_tensor, X_val_tensor, y_val_tensor)

In [21]:
print(X_train_tensor.shape)  # [n_train, n_features]
print(y_train_tensor.shape)  # [n_train, 2]  (one-hot binary)

torch.Size([88244, 28])
torch.Size([88244, 2])


In [23]:
# Pass into your NAS environment
env = NASMLPEnv(dataset=dataset, max_layers=10)

In [None]:
from stable_baselines3 import DQN

import time

start_time = time.time()

model = DQN(
    "MlpPolicy",
    env,
    gamma=0.99,                       
    exploration_initial_eps=1.0,     
    exploration_final_eps=0.05,      
    exploration_fraction=0.1,        
    verbose=1,
    tensorboard_log="./nas_logs/"
)

#model = DQN("MlpPolicy", env, verbose=1, tensorboard_log="./nas_logs/")
model.learn(total_timesteps=10000)

end_time = time.time()
elapsed_time = end_time - start_time

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.




Logging to ./nas_logs/DQN_23

🎯 Final Architecture: [('dense', 8, 'tanh'), ('dense', 8, 'sigmoid'), ('dense', 256, 'relu'), ('dense', 16, 'selu'), ('dense', 32, 'selu'), ('dense', 256, 'sigmoid'), ('dropout', 0.5), ('dense', 16, 'selu'), ('dense', 16, 'swish'), ('dense', 512, 'elu'), ('dense', 512, 'elu')]
🏆 Validation Accuracy (acc): 68.39%
⚙️  Complexity (number of layers): 12.290306070566245
🏅 Reward (acc - penalty): 52.26%

🌟 New Best Architecture Found with Reward: 52.26%

🎯 Final Architecture: [('dense', 32, 'relu'), ('dense', 256, 'sigmoid'), ('dense', 32, 'swish'), ('dense', 64, 'sigmoid'), ('dropout', 0.2), ('dense', 128, 'elu'), ('dense', 16, 'tanh'), ('dense', 512, 'swish'), ('dense', 64, 'sigmoid'), ('dense', 512, 'relu'), ('dropout', 0.0), ('dense', 128, 'tanh')]
🏆 Validation Accuracy (acc): 53.32%
⚙️  Complexity (number of layers): 7.178568757253828
🏅 Reward (acc - penalty): 41.22%


🎯 Final Architecture: [('dense', 32, 'tanh'), ('dense', 64, 'swish'), ('dense', 512, 'swi

In [None]:
model.save("nas_agent_checkpoint_data_specific_alpha_0.8_23512")

In [None]:
print(elapsed_time)

In [None]:
print(env.best_architecture)

In [None]:
print(f"\n Best Reward: {env.best_reward:.2f}%")

In [None]:
print(f"\n Best Accuracy: {env.best_accuracy:.2f}%")

In [None]:
print(f"\n Best Complexity: {env.best_complexity:.2f}%")