In [33]:
import numpy as np
import pygame
from scipy.integrate import solve_bvp
import multiprocessing
multiprocessing.set_start_method('spawn' , force = True)
import os
from stable_baselines3 import PPO ,A2C , DDPG ,TD3 , SAC
from stable_baselines3.common.vec_env import DummyVecEnv , SubprocVecEnv
from stable_baselines3.common.utils import set_random_seed
from stable_baselines3.common.evaluation import evaluate_policy
import time
import gymnasium as gym
from gymnasium.spaces import Discrete, Box, Dict
from stable_baselines3.common.env_checker import check_env
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.monitor import Monitor
import math
from stable_baselines3.common.callbacks import EvalCallback
import optuna

In [34]:
class Elastica_env(gym.Env):
    def __init__(self):
        super(Elastica_env, self).__init__()
        self.action_space = Box(low=np.array([-0.2, -0.2], dtype=np.float32), high=np.array([0.1 , 0.2], dtype=np.float64))
        self.observation_space = Box(low=np.float32(-50), high=np.float32(50), shape=(13,), dtype=np.float64)
        self.target = Box(low=np.array([4.5, -1], dtype=np.float32), high=np.array([5.57, 1], dtype=np.float64))
        self.num_timestep = 0
        self.reward = 0
        self.x = []
        self.y = []
        self.screen_width = 800.0
        self.screen_height = 600.0
        self.zoom_factor = 60.0
        self.enable_render = False
        self.h = -0.4
        self.v = 0.15
        self.screen = None  # Add this to manage the screen as a persistent object
        self.clock = None    # Add this to manage the clock as a persistent object
        self.initialized_render = False

    # Add the seed() method
    def seed(self, seed=None):
        self.np_random, seed = gym.utils.seeding.np_random(seed)
        return seed
    def step(self, action):
        self.num_timestep += 1
        self.h += action[0]
        self.v += action[1]
        self.X, self.Y, self.theta_dash_0  ,self.theta_dash_l, self.theta_l , self.E = self.elastica(self.h, self.v)

        new_observation = self.get_observation()
        self.render(self.enable_render)
        self.reward = self.score()
        done = self.get_done()
        truncation = self.get_truncation()
        info = {}
        return new_observation, self.reward, done, truncation, info

    def elastica(self, h, v):
        l = 6
        s = np.linspace(0, l, 500)
        def f(s, y):
            return np.vstack((y[1], h * np.sin(y[0]) - v * np.cos(y[0])))
        def bc(ya, yb):
            return np.array([ya[0] - 0, yb[1] - 0])
        y0 = np.zeros((2, s.size))
        sol = solve_bvp(f, bc, s, y0)
        theta = sol.sol(s)[0]
        dtheta_ds = sol.sol(s)[1]
        y1 = np.cos(theta)
        y2 = np.sin(theta)
        y3 = (dtheta_ds)**2
        self.x = []
        self.y = []
        for i in range(len(s)):
            self.x.append(np.trapz(y1[:i+1], x=s[:i+1]))
            self.y.append(np.trapz(y2[:i+1], x=s[:i+1]))

        e = 0.5*(np.trapz(y3 , s))-v*(np.trapz(y2 , s)) + h*(np.trapz(1-y1 , s))

        return self.x, self.y, dtheta_ds[0] ,dtheta_ds[-1] , theta[-1] , e 

    def get_observation(self):
        self.x_tip = self.X[-1]
        self.y_tip = self.Y[-1]
        d = ((self.x_tip - self.x_target)**2 + (self.y_tip - self.y_target)**2)**0.5
        return np.array([self.x_tip, self.y_tip, self.X[200], self.Y[200], self.X[400], self.Y[400], 
                         self.theta_l , self.theta_dash_0 , self.theta_dash_l ,self.E  ,
                         self.x_target, self.y_target ,d] ,  dtype=np.float64)

    def score(self):
        d = ((self.x_tip - self.x_target)**2 + (self.y_tip - self.y_target)**2)**0.5
        #d_score = -(d)**2
        d_score = math.exp(-d)
        total_score = d_score
        return total_score

    def get_done(self):
        done = False
        d = ((self.x_tip - self.x_target)**2 + (self.y_tip - self.y_target)**2)**0.5
        if d < 0.002:
            done = True
        return done

    def get_truncation(self):
        truncation = False
        if self.num_timestep > 19 :
            truncation = True
        return truncation

    def reset(self, seed=None):
        if seed is not None:
            self.np_random, seed = gym.utils.seeding.np_random(seed)
        targ = self.target.sample()
        self.x_target = targ[0]
        self.y_target = targ[1]
        if self.y_target<=0:
            self.h = -0.4
            self.v = 0.15
        if self.y_target>0:
            self.h = -0.4
            self.v = -0.15

        self.X, self.Y, self.theta_dash_0 ,self.theta_dash_l , self.theta_l , self.E  = self.elastica(self.h, self.v)
        self.num_timestep = 0
        self.reward = 0
        observation = self.get_observation()
        info = {}
        return observation, info
  
    def render(self, enable_render):
        if not enable_render:
            return

        if not self.initialized_render:
            # Initialize pygame components only once
            pygame.init()
            self.screen = pygame.display.set_mode((int(self.screen_width), int(self.screen_height)))
            pygame.display.set_caption("Elastica")
            self.clock = pygame.time.Clock()
            self.initialized_render = True

    # Clear screen for new frame
        self.screen.fill((255, 255, 255))
        
        offset_x = (self.screen_width - 10 * self.zoom_factor) / 2
        offset_y = (self.screen_height - 1.5 * self.zoom_factor) / 2
        points = [(self.X[i], self.Y[i]) for i in range(len(self.X))]

    # Draw the elastica and its components
        pygame.draw.lines(self.screen,(0, 0, 0),False,[(point[0] * self.zoom_factor + offset_x, point[1] * self.zoom_factor + offset_y)
                                                       for point in points],)
        pygame.draw.line(self.screen,(255, 0, 0),((self.X[-1]) * self.zoom_factor + offset_x,(self.Y[-1]) * self.zoom_factor + offset_y,),
            ((self.X[-1]) * self.zoom_factor + offset_x + 50 * np.sign(self.h),(self.Y[-1]) * self.zoom_factor + offset_y,),3,)
        
        pygame.draw.line(self.screen,(0, 255, 0),((self.X[-1]) * self.zoom_factor + offset_x,(self.Y[-1]) * self.zoom_factor + offset_y,),
            ((self.X[-1]) * self.zoom_factor + offset_x,(self.Y[-1]) * self.zoom_factor + offset_y + 50 * np.sign(self.v),),3,)
        
        pygame.draw.line(self.screen,(0, 0, 0),((self.X[0]) * self.zoom_factor + offset_x,(self.Y[0]) * self.zoom_factor + offset_y,),
               ((self.X[0]) * self.zoom_factor + offset_x,(self.Y[0]) * self.zoom_factor + offset_y + 25,),3,)
        
        pygame.draw.line(self.screen,(0, 0, 0),((self.X[0]) * self.zoom_factor + offset_x,(self.Y[0]) * self.zoom_factor + offset_y,),
            ((self.X[0]) * self.zoom_factor + offset_x,(self.Y[0]) * self.zoom_factor + offset_y - 25,),3,)
        
        pygame.draw.circle(self.screen,(255, 0, 0),(int(self.x_target * self.zoom_factor + offset_x),int(self.y_target * self.zoom_factor + offset_y),),5,)

        # Display text information
        font = pygame.font.Font(None, 36)
        score_text = font.render(f"Timesteps: {self.num_timestep}", True, (0, 0, 0))
        self.screen.blit(score_text,(int(self.screen_width - score_text.get_width() - 30), 120),)
        

        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                self.close()
                return
            
            elif event.type == pygame.VIDEORESIZE:
                print("Window resized to:", event.size)
                self.screen = pygame.display.set_mode(event.size, pygame.RESIZABLE)
                self.render(self.enable_render)  # Redraw everything after resizing
            
            elif event.type == pygame.ACTIVEEVENT:
                if event.state == 1:  # Input focus (keyboard/mouse focus)
                    if event.gain == 0:
                        print("Window lost input focus")
                    elif event.gain == 1:
                        print("Window gained input focus")
                elif event.state == 4:  # Active state (minimized/restored)
                    if event.gain == 0:
                        print("Window minimized or deactivated")
                        # Optional: Pause the game or rendering
                    elif event.gain == 1:
                        print("Window restored or activated")
                        self.render(self.enable_render)  # Redraw everything after restoration

        # Update the display
        pygame.display.flip()
        self.clock.tick(30)  # Limit frame rate to 30 FPS

    def close(self):
        if self.initialized_render:
            pygame.quit()
            self.initialized_render = False


In [35]:
env = Elastica_env()

In [36]:
check_env(env)
env.close()

In [38]:
env.enable_render = True
episodes = 10
for episode in range(1, episodes+1):
    state , info = env.reset()
    done = False
    score = 0
    truncation=False
    while not (done or truncation):
        #env.render(True )
        action = env.action_space.sample()
        #print(action)
        n_state, reward, done , truncation ,info = env.step(action)
        score+=reward
    print('Episode:{} Score:{}'.format(episode, score))
env.close()

Episode:1 Score:4.825024292335784
Episode:2 Score:4.087163133816204
Episode:3 Score:6.587745018432867
Episode:4 Score:6.415459938447593
Episode:5 Score:3.4319545520862706
Episode:6 Score:4.817397620026562
Episode:7 Score:4.109335225720665
Episode:8 Score:5.377665455719757
Episode:9 Score:6.116440766251155
Episode:10 Score:1.5439731305725963


In [None]:
env.enable_render = False
log_path = os.path.join('Training' , 'Logs')
os.makedirs(os.path.dirname(log_path) , exist_ok = True)
eval_env = Elastica_env()
eval_env = Monitor(eval_env , log_path)
#policy_kwargs = dict(net_arch=dict(pi=[256, 256 , 256], qf=[500, 400 , 400]) )  # by default tanh activation function
#policy_kwargs = dict(activation_fn=th.nn.ReLU,net_arch=dict(pi=[32, 32], qf=[32, 32]))    # ReLu activation function

def objective(trial):
    learning_rate = trial.suggest_float('learning_rate' , 1e-5 , 1e-3 ,log = True)
    entropy_coef = trial.suggest_float ('entropy_coef' , 1e-4 , 1e-2  , log = True)
    gamma = trial.suggest_float('gamma' ,0.9 ,0.9999 , log = True)
    tau = trial.suggest_float('tau' , 0.005 , 0.05 , log = True)
    batch_size = trial.suggest_categorical('batch_size' , [64 , 128 , 256 , 1000 , 2000 , 4000 , 8000 , 16000])
    target_update_interval = trial.suggest_categorical('target_update_interval' , [1000 , 5000 , 10000])
    gradient_steps = trial.suggest_categorical('gradient_steps' , [1 , 2, 4])
    buffer_size = trial.suggest_categorical('buffer_size' , [100000 , 200000 , 500000 , 1000000 , 2000000])
    
    model = SAC('MlpPolicy', env, verbose=0 , learning_rate = learning_rate ,
                ent_coef = entropy_coef , gamma = gamma , tau = tau , batch_size = batch_size , 
                target_update_interval = target_update_interval , gradient_steps= gradient_steps ,
                buffer_size = buffer_size )
    
    eval_callback = EvalCallback(eval_env ,  eval_freq = 1000 ,deterministic = True )
    model.learn(total_timesteps = 10000 , callback = eval_callback)

    mean_reward , _ = evaluate_policy(model , eval_env , n_eval_episodes = 10 ,deterministic=True)
    return mean_reward

study = optuna.create_study (direction = 'maximize')
study.optimize(objective , n_trials = 50 , n_jobs = 1)
best_parameters = study.best_params
print(best_parameters)

In [43]:
#log_path = os.path.join('Training', 'Logs')
#PPO_path = os.path.join('Training', 'Saved Models', 'DDPG')
#print(PPO_path)
#env = DummyVecEnv([lambda: env])
# model = SAC( 'MlpPolicy', env, verbose=0 ,  learning_rate=best_parameters['learning_rate'],
#             ent_coef=best_parameters['entropy_coef'], gamma=best_parameters['gamma'], tau=best_parameters['tau'], 
#            batch_size=best_parameters['batch_size'], target_update_interval = best_parameters['target_update_interval'],
#            gradient_steps=best_parameters['gradient_steps'],buffer_size=best_parameters['buffer_size'])  
model = SAC( 'MlpPolicy', env, verbose=0 ,  learning_rate=0.000985293296715492,
            ent_coef=0.0030394039632778013, gamma=0.9007530746938588, tau=0.03547973047760216, 
           batch_size=1000, target_update_interval = 5000 ,
           gradient_steps=1,buffer_size=100000)

In [None]:
eval_callback = EvalCallback(eval_env,  eval_freq = 1000 ,deterministic = True )
model.learn(total_timesteps= 1000000 , callback = eval_callback )

Eval num_timesteps=1000, episode_reward=11.82 +/- 2.53
Episode length: 20.00 +/- 0.00
New best mean reward!
Eval num_timesteps=2000, episode_reward=12.91 +/- 1.86
Episode length: 20.00 +/- 0.00
New best mean reward!
Eval num_timesteps=3000, episode_reward=15.17 +/- 0.46
Episode length: 20.00 +/- 0.00
New best mean reward!
Eval num_timesteps=4000, episode_reward=17.49 +/- 1.02
Episode length: 20.00 +/- 0.00
New best mean reward!
Eval num_timesteps=5000, episode_reward=17.84 +/- 1.20
Episode length: 20.00 +/- 0.00
New best mean reward!
Eval num_timesteps=6000, episode_reward=16.39 +/- 1.75
Episode length: 20.00 +/- 0.00
Eval num_timesteps=7000, episode_reward=18.59 +/- 0.35
Episode length: 20.00 +/- 0.00
New best mean reward!
Eval num_timesteps=8000, episode_reward=18.77 +/- 0.93
Episode length: 20.00 +/- 0.00
New best mean reward!
Eval num_timesteps=9000, episode_reward=18.78 +/- 0.95
Episode length: 20.00 +/- 0.00
New best mean reward!
Eval num_timesteps=10000, episode_reward=19.02 +/-

In [44]:
SAC_path = os.path.join('Training', 'Saved Models', 'ElasticadeskGPU1_SAC')   # Keep all the files in same directory
os.makedirs(os.path.dirname(SAC_path), exist_ok=True)

In [None]:
# model.save(SAC_path)
# print(f"Model saved at: {SAC_path}"

In [45]:
model=SAC.load(SAC_path)
print(f"Model loaded from: {SAC_path}")

Model loaded from: Training\Saved Models\ElasticadeskGPU1_SAC


In [46]:
env.enable_render = True
episodes = 10
for episode in range(1, episodes+1):
    state  , info = env.reset()
    done = False
    score = 0 
    truncation=False
    test=[]
    n_score=[]
    dis = []
    while not (done or truncation):
        action,_ = model.predict(state , deterministic=True)
        #print(action)
        state, reward, done, truncation ,info = env.step(action)
        n_score.append(reward)
        dis.append(state[-1])
        score+=reward
        test.append(done)
    #print(test)
    print(len(test))
    print(n_score)
    #print(np.min(dis))
    print('Episode:{} Score:{}'.format(episode, score))
env.close()

Window gained input focus
20
[0.8892140579441836, 0.9543835886808281, 0.961565895983912, 0.9612192572563479, 0.9611199714996557, 0.9613644263474418, 0.9615340143645049, 0.9616439695686239, 0.9617157443529699, 0.9617634598881405, 0.9617949227347149, 0.9618161811583698, 0.9618302607546404, 0.9618400604273442, 0.9618463333619961, 0.961850802280918, 0.9618537986112494, 0.9618557655950593, 0.961856979180441, 0.9618580649226268]
Episode:1 Score:19.153927554913963
20
[0.982400707783382, 0.963628145498634, 0.9635373964860683, 0.9634507603575662, 0.9633775580668753, 0.9633159677886763, 0.96326383588384, 0.9632191224599704, 0.9631806523219998, 0.9631474689619339, 0.9631187062147025, 0.9630936531835493, 0.9630717103138116, 0.9630529337435195, 0.9630362438642563, 0.9630216206911368, 0.9630090402371986, 0.9629979421808642, 0.962988101047379, 0.9629798540559237]
Episode:2 Score:19.282891421141287
20
[0.926488118437212, 0.9546635541571887, 0.9659520631084862, 0.967859679234739, 0.9680274712563756, 0.

In [18]:
env.close()