In [27]:
import numpy as np
import gym
from gym import spaces
from surrogate import SurrogateNet_multiMLP, geometric_position, geometric_reshape
from calc_init_param import calc_next_param
from core.param_util.param_tools import gen_param_csv
from automation import run_cmd
import os
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.distributions import Categorical
import shutil
from surrogate_springback import SurrogateNet_springback



In [28]:
class bending_env(gym.Env):
    def __init__(self, n_actions, episode=0):

        # Define the state space size
        self.state_space = spaces.Box(low=0, high=200, shape=(3, 7, 72), dtype=np.double)

        # Define the action space size
        self.action_space = spaces.Discrete(n=n_actions, start=1)

        # Initialize the current state with the stress distribution after pre-stretch
        self.state = None

        self.pre_idx = None
        self.pre_param = [321.1,0.0,0.0,0,-0.0,0.0]  # Pre-stretch length

        # Surrogate model
        self.model = SurrogateNet_multiMLP(1512, 1512)
        self.optimizer = optim.Adam(self.model.parameters(), lr=0.001)
        checkpoint = torch.load("/Users/zhanghantao/Desktop/拉弯参数/bending_parameter/Optimizing_bending_parameter/Surrogate_model.pth")
        self.model.load_state_dict(checkpoint['model_state_dict'])
        self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        
        # Bending Parameter list
        self.param_list = []  # To be considered: add the pre-stretch parameter to it

        self.action_list = []  # Record series of actions for each episode for future use

        self.max_step = 10  # Max number of bending steps

        self.num_episode = episode

        self.mould_name = "test" + str(self.num_episode)

        self.rec = geometric_reshape()
        
        # Some useful data path
        self.data_path_2 = "./data/mould_output/" + self.mould_name
        self.data_path_1 = "./data/model/" + self.mould_name

    def reset(self):
        # Reset the environment
        self.num_episode += 1
        self.mould_name = "test" + str(self.num_episode)
        self.data_path_2 = "./data/mould_output/" + self.mould_name
        self.data_path_1 = "./data/model/" + self.mould_name
        # Generate curve and mould for this episode
        print(self.mould_name)
        if not os.path.exists(self.data_path_2):
            os.makedirs(self.data_path_2)
        if not os.path.exists(self.data_path_1):
            os.makedirs(self.data_path_1)
        cmd = ['python ', 'gen_curve_and_mould.py', self.mould_name]
        # print(cmd)
        
        run_cmd(cmd)
        shutil.copy(self.data_path_2 + '/mould.stp', self.data_path_1)

        '''
            Initialize the state with the stress distribution after pre-stretch.
            Since the pre-strech steps are all the same for each test, we simply used the one of test 0.
        '''
        
        csv_path = "/Optimizing_bending_parameter/data/model/test0/simulation/strip_mises_Step-0.csv"
        if os.path.exists(csv_path):
            df = pd.read_csv(csv_path)
            x = df["S_Mises"]
            x = torch.tensor(x, dtype=torch.float32)
        self.state = x
        # geometric_position(self.rec, x)

        self.action_list = []  # Empty the action series
        self.param_list = []  # Empty the param list
        self.pre_idx = None  # Reset pre_idx
        return self.state

    def step(self, action):
        self.action_list.append(action)
        # print(self.state)
        strip_length = 40
        pre_length = 0.1
        k = 0.05 
        # Adding the next parameter to the list
        next_param, self.pre_idx = calc_next_param("./data/mould_output/" + self.mould_name, action, strip_length, pre_length, k, self.pre_idx)
        self.param_list.append(next_param)
        t = (np.array(next_param - np.array(self.pre_idx))).tolist()
        t = torch.tensor(t[:2] + [t[5]], dtype=torch.float32)
        # Execute the given action and return the next state, reward, and whether the episode is done
        self.model.eval()

        self.state = self.model(self.state, t)  # Surrogate model as transition function
        
        # Check if the episode is done and calculate the reward
        if self.pre_idx == 1999 or len(self.action_list) == self.max_step:
            reward = - max(self.state) - self.reward_model(self.state)
            done = True
        else:
            reward = - max(self.state)
            done = False  
        return self.state, reward, done, {}

    def reward_surrogate(self, final_state):
        self.reward_model = SurrogateNet_springback(self.rec)
        checkpoint = torch.load("/Users/zhanghantao/Desktop/拉弯参数/bending_parameter/Optimizing_bending_parameter/Surrogate_springback_model.pth")
        self.reward_model.load_state_dict(checkpoint['model_state_dict'])
        self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])

        reward = self.reward_model(final_state)
        return reward


In [29]:
class Actor(nn.Module):
    def __init__(self, rec, n_actions, alpha):
        super(Actor, self).__init__()
        self.conv_module = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=16, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
       
        # Define the final fully connected layer for generating the action
        self.fc1 = nn.Linear(32 * 18 * 1 * 1, 64)  
        self.fc2 = nn.Linear(64, n_actions)
        self.rec = rec
        self.relu = nn.ReLU()
        self.softmax = nn.Softmax()

        self.optimizer = optim.Adam(self.parameters(), lr=alpha)

    def forward(self, stress):
        x = torch.tensor(geometric_position(self.rec, stress), dtype=torch.double)
        
        # print(stress)
        x = self.conv_module(x)
            
        x = x.view(-1, 32 * 18 * 1 * 1)  # Flatten the output
            
        # Fully connected layers with ReLU activation
        x = self.relu(self.fc1(x))
            
        probs = self.softmax(self.fc2(x))
        
        return probs

# Define the Critic network
class Critic(nn.Module):
    def __init__(self, rec, alpha):
        super(Critic, self).__init__()
        self.conv_module = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=16, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
       
        # Define the final fully connected layer for generating the action
        self.fc1 = nn.Linear(32 * 18 * 1 * 1, 64)  
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, 1)
        self.rec = rec
        self.relu = nn.ReLU()

        self.optimizer = optim.Adam(self.parameters(), lr=alpha)

    def forward(self, stress):
        x = torch.tensor(geometric_position(self.rec, stress), dtype=torch.double)
        # print(stress)
        x = self.conv_module(x)
        x = x.view(-1, 32 * 18 * 1 * 1)  # Flatten the output
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        x = self.fc3(x)
        return x


In [30]:
class Agent():
    def __init__(self, n_actions, env, gamma=0.99, lr=1e-4):
        super(Agent, self).__init__()
        self.env = env
        self.actor = Actor(env.rec, n_actions, alpha=lr)
        self.critic = Critic(env.rec, alpha=lr)
        self.gamma = gamma
        self.log_probs = None

    def choose_action(self, observation):
        probs = Actor.forward(observation)
        dist = Categorical(probs)
        self.log_probs = dist.sample()
        action = dist.sample().numpy()[0]
        return action
        
    def learn(self, state, new_state, reward, done):
        self.actor.optimizer.zero_grad()
        self.critic.optimizer.zero_grad()

        value_1 = self.critic.forward(state)
        value_2 = self.critic.forward(new_state)

        delta = reward + self.gamma * value_2 * (1-int(done)) - value_1

        critic_loss = delta ** 2
        actor_loss = -self.log_probs * delta

        (actor_loss + critic_loss).backward()
        self.actor.optimizer.step()
        self.critic.optimizer.step()
            
            


In [31]:
N_games = 100
bending_env = bending_env(n_actions=20)
print(bending_env.rec)
agent = Agent(n_actions=20, env=bending_env)
score_history = []
while bending_env.num_episode < N_games:
    t_step = 1
    state = bending_env.reset()
    reward = 0
    while t_step < bending_env.max_step:
        action = agent.choose_action(state)
        new_state, reward, done, _ = bending_env.step(action)
        if done:
            break
        agent.learn(state, new_state, reward, done)
        state = new_state
        t_step += 1
    score_history.append(reward)

    print('episode: ', bending_env.num_episode,'score: %.2f' % reward)


[[[  36.   34.   32. ... 1436. 1429. 1422.]
  [  35.   33.   31. ... 1435. 1428. 1421.]
  [ 144.  142.  140. ... 1434. 1427. 1420.]
  ...
  [ 245.  242.  239. ... 1432. 1425. 1418.]
  [ 244.  241.  238. ... 1431. 1424. 1417.]
  [ 243.  240.  237. ... 1430. 1423. 1416.]]

 [[  72.   70.   68. ... 1471. 1464. 1457.]
  [  71.   69.   67. ... 1470. 1463. 1456.]
  [ 180.  178.  176. ... 1469. 1462. 1455.]
  ...
  [ 275.  272.  269. ... 1467. 1460. 1453.]
  [ 274.  271.  268. ... 1466. 1459. 1452.]
  [ 273.  270.  267. ... 1465. 1458. 1451.]]

 [[ 108.  106.  104. ... 1502. 1495. 1488.]
  [ 107.  105.  103. ... 1501. 1494. 1487.]
  [ 216.  214.  212. ... 1500. 1493. 1486.]
  ...
  [ 305.  302.  299. ... 1498. 1491. 1484.]
  [ 304.  301.  298. ... 1497. 1490. 1483.]
  [ 303.  300.  297. ... 1496. 1489. 1482.]]]
test1


FileNotFoundError: [Errno 2] No such file or directory: 'python '