In [4]:
import gym
import numpy as np
from gym import spaces
import joblib
import torch
import torch.nn as nn
import torch.optim as optim
from collections import deque
import random
import warnings
warnings.filterwarnings("ignore")
# -----------------------
# Custom Gym Environment
# -----------------------
class DLS_Environment(gym.Env):
    def __init__(self):
        super().__init__()
        self.model = joblib.load("random_forest_dls_model.pkl")
        self.action_space = spaces.Box(low=np.array([2, 3000, 1.83]),
                                       high=np.array([25, 3500, 1.91]),
                                       dtype=np.float32)
        self.observation_space = self.action_space
        self.target_dls = 150.0
        self.max_steps = 10
        self.current_step = 0

    def reset(self, target_dls=None):
        self.current_step = 0
        if target_dls is not None:
            self.target_dls = target_dls
        self.state = self.action_space.sample()
        return self.state

    def step(self, action):
        self.current_step += 1
        self.state = np.clip(action, self.action_space.low, self.action_space.high)
        predicted_dls = self.model.predict([self.state])[0]
        reward = -abs(predicted_dls - self.target_dls) / (203 - 83)
        done = self.current_step >= self.max_steps or abs(predicted_dls - self.target_dls) < 1.0
        return self.state, reward, done, {"Predicted_DLS": predicted_dls}

# -----------------------
# Replay Buffer
# -----------------------
class ReplayBuffer:
    def __init__(self, capacity=100000):
        self.buffer = deque(maxlen=capacity)

    def push(self, *args):
        self.buffer.append(args)

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        return map(np.array, zip(*batch))

    def __len__(self):
        return len(self.buffer)

# -----------------------
# MLP Network
# -----------------------
class MLP(nn.Module):
    def __init__(self, input_dim, output_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, 256),
            nn.ReLU(),
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.Linear(256, output_dim)
        )

    def forward(self, x):
        return self.net(x)

# -----------------------
# DDPG Agent
# -----------------------
class DDPG:
    def __init__(self, state_dim, action_dim, action_bounds):
        self.actor = MLP(state_dim, action_dim)
        self.actor_target = MLP(state_dim, action_dim)
        self.critic = MLP(state_dim + action_dim, 1)
        self.critic_target = MLP(state_dim + action_dim, 1)

        self.actor_target.load_state_dict(self.actor.state_dict())
        self.critic_target.load_state_dict(self.critic.state_dict())

        self.actor_opt = optim.Adam(self.actor.parameters(), lr=1e-4)
        self.critic_opt = optim.Adam(self.critic.parameters(), lr=1e-3)

        self.replay_buffer = ReplayBuffer()
        self.action_bounds = action_bounds
        self.gamma = 0.99
        self.tau = 0.005

    def select_action(self, state, noise=0.1):
        state = torch.FloatTensor(state).unsqueeze(0)
        action = self.actor(state).detach().numpy()[0]
        action += noise * np.random.randn(*action.shape)
        return np.clip(action, self.action_bounds.low, self.action_bounds.high)

    def train(self, batch_size=64):
        if len(self.replay_buffer) < batch_size:
            return

        state, action, reward, next_state, done = self.replay_buffer.sample(batch_size)
        state = torch.FloatTensor(state)
        action = torch.FloatTensor(action)
        reward = torch.FloatTensor(reward).unsqueeze(1)
        next_state = torch.FloatTensor(next_state)
        done = torch.FloatTensor(done).unsqueeze(1)

        with torch.no_grad():
            next_action = self.actor_target(next_state)
            target_q = self.critic_target(torch.cat([next_state, next_action], 1))
            y = reward + (1 - done) * self.gamma * target_q

        q_val = self.critic(torch.cat([state, action], 1))
        critic_loss = nn.MSELoss()(q_val, y)
        self.critic_opt.zero_grad()
        critic_loss.backward()
        self.critic_opt.step()

        actor_loss = -self.critic(torch.cat([state, self.actor(state)], 1)).mean()
        self.actor_opt.zero_grad()
        actor_loss.backward()
        self.actor_opt.step()

        for target, param in zip(self.actor_target.parameters(), self.actor.parameters()):
            target.data.copy_(self.tau * param.data + (1.0 - self.tau) * target.data)

        for target, param in zip(self.critic_target.parameters(), self.critic.parameters()):
            target.data.copy_(self.tau * param.data + (1.0 - self.tau) * target.data)

# -----------------------
# Training Loop
# -----------------------
env = DLS_Environment()
agent = DDPG(state_dim=3, action_dim=3, action_bounds=env.action_space)

episodes = 1000
for ep in range(episodes):
    state = env.reset(target_dls=150)  # Set your desired DLS here
    total_reward = 0
    for t in range(50):
        action = agent.select_action(state)
        next_state, reward, done, info = env.step(action)
        agent.replay_buffer.push(state, action, reward, next_state, float(done))
        agent.train()
        state = next_state
        total_reward += reward
        if done:
            break
    print(f"Episode {ep + 1}, Reward: {total_reward:.2f}, Final DLS: {info['Predicted_DLS']:.2f}")


Episode 1, Reward: -1.26, Final DLS: 134.90
Episode 2, Reward: -1.26, Final DLS: 134.90
Episode 3, Reward: -1.26, Final DLS: 134.90
Episode 4, Reward: -1.26, Final DLS: 134.90
Episode 5, Reward: -1.26, Final DLS: 134.90
Episode 6, Reward: -1.26, Final DLS: 134.90
Episode 7, Reward: -1.26, Final DLS: 134.90
Episode 8, Reward: -1.26, Final DLS: 134.90
Episode 9, Reward: -3.55, Final DLS: 199.48
Episode 10, Reward: -4.12, Final DLS: 199.48
Episode 11, Reward: -4.12, Final DLS: 199.48
Episode 12, Reward: -4.12, Final DLS: 199.48
Episode 13, Reward: -4.12, Final DLS: 199.48
Episode 14, Reward: -4.12, Final DLS: 199.48
Episode 15, Reward: -4.12, Final DLS: 199.48
Episode 16, Reward: -4.12, Final DLS: 199.48
Episode 17, Reward: -4.12, Final DLS: 199.48
Episode 18, Reward: -2.41, Final DLS: 134.90
Episode 19, Reward: -1.26, Final DLS: 134.90
Episode 20, Reward: -1.26, Final DLS: 134.90
Episode 21, Reward: -1.26, Final DLS: 134.90
Episode 22, Reward: -1.26, Final DLS: 134.90
Episode 23, Reward:

In [6]:
# ----------------------------
# Inference: Find inputs for desired DLS
# ----------------------------
def infer_optimal_inputs(agent, target_dls, trials=10):
    env = DLS_Environment()
    best_action = None
    best_error = float('inf')
    best_dls = None

    for _ in range(trials):
        state = env.reset(target_dls=target_dls)
        action = agent.select_action(state, noise=0.0)  # No noise for clean inference
        _, _, _, info = env.step(action)
        error = abs(info["Predicted_DLS"] - target_dls)
        if error < best_error:
            best_action = action
            best_dls = info["Predicted_DLS"]
            best_error = error

    return best_action, best_dls, best_error

# Example: Find inputs for target DLS = 150 nm
desired_dls = 150
inputs, predicted_dls, error = infer_optimal_inputs(agent, desired_dls)

print(f"Target DLS: {desired_dls} nm")
print(f"Predicted Input Parameters:")
print(f"  Time(min):       {inputs[0]:.2f}")
print(f"  Scanspeed(mm/s): {inputs[1]:.2f}")
print(f"  Fluence(J/cm²):  {inputs[2]:.3f}")
print(f"Predicted DLS:     {predicted_dls:.2f} nm")
print(f"Error:             {error:.2f} nm")


Target DLS: 150 nm
Predicted Input Parameters:
  Time(min):       25.00
  Scanspeed(mm/s): 3500.00
  Fluence(J/cm²):  1.910
Predicted DLS:     105.40 nm
Error:             44.60 nm
