In [3]:
import os
from stable_baselines3.ppo import CnnPolicy
from stable_baselines3 import PPO
from pettingzoo.butterfly import pistonball_v5
import supersuit as ss
from pettingzoo.utils.conversions import aec_to_parallel
from stable_baselines3.common.callbacks import CheckpointCallback
!dir


 Volume in drive D is New Volume
 Volume Serial Number is 86B6-5744

 Directory of D:\aa\Fauzan\Tutorial\CollegeStudy\Semester 6\SMA\tubessma

12/06/2024  15:05    <DIR>          .
10/06/2024  20:22    <DIR>          ..
10/06/2024  20:31    <DIR>          .ipynb_checkpoints
12/06/2024  04:03    <DIR>          model
11/06/2024  20:18         9.941.743 rl_model_piston_480000_steps.zip
12/06/2024  15:05           298.923 sma_tubes.ipynb
               2 File(s)     10.240.666 bytes
               4 Dir(s)  319.292.198.912 bytes free


In [None]:
import numpy as np
from pettingzoo.butterfly.pistonball.pistonball import raw_env
from pettingzoo import AECEnv
import pygame
import pymunk
import gymnasium
from gymnasium.utils import EzPickle
from pettingzoo.utils import agent_selector, wrappers

class CustomPistonball(raw_env):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        
        self.team_1_agents = [agent for i, agent in enumerate(self.agents) if i % 2 == 0]
        self.team_2_agents = [agent for i, agent in enumerate(self.agents) if i % 2 != 0]

    def draw_pistons(self):
        piston_color_blue = (65, 159, 221)
        piston_color_red = (255, 0, 0)
        x_pos = self.wall_width
        for i, piston in enumerate(self.pistonList):
            self.screen.blit(
                self.piston_body_sprite,
                (x_pos, self.screen_height - self.wall_width - self.piston_body_height),
            )
            height = (
                self.screen_height
                - self.wall_width
                - self.piston_body_height
                - (piston.position[1] + self.piston_radius)
                + (self.piston_body_height - 6)
            )
            body_rect = pygame.Rect(
                piston.position[0] + self.piston_radius + 1,  # +1 to match up to piston graphics
                piston.position[1] + self.piston_radius + 1,
                18,
                height,
            )
            piston_color = piston_color_blue if i % 2 == 1 else piston_color_red
            pygame.draw.rect(self.screen, piston_color, body_rect)
            x_pos += self.piston_width

    def reset(self, *args, **kwargs):
        super().reset(*args, **kwargs)
        horizontal_offset_range = 10  
        vertical_offset_range = 10  
        
        ball_x = (
            self.screen_width 
            - self.wall_width
            - self.ball_radius / 2
            - horizontal_offset_range
            + self.horizontal_offset
        )
        ball_y = (
            self.screen_height
            - self.wall_width
            - self.piston_body_height
            - self.ball_radius
            - (0.5 * self.pixels_per_position * self.n_piston_positions)
            - vertical_offset_range
            + self.vertical_offset
        )
        self.ball.position = (ball_x, ball_y)
        self.ball.angle = 0
        self.ball.velocity = (0, 0)
        # Ensure ball starts somewhere middle of the environtment
        self.ball.position = (max(ball_x, self.wall_width + self.ball_radius + 1), ball_y)
    def enable_render(self):
        self.screen = pygame.display.set_mode((self.screen_width, self.screen_height))
        pygame.display.set_caption("Pistonball")

        self.renderOn = True
        # self.screen.blit(self.background, (0, 0))
        self.draw_background()
        self.draw()

    def render(self):
        super().render()
        if self.render_mode =="human":
            for event in pygame.event.get():
                if event.type == pygame.QUIT:     
                    pygame.quit()
                pygame.display.flip()
        return (np.transpose(observation, axes=(1, 0, 2))
                if self.render_mode == "rgb_array"
                else None
            )
    def step(self, action):
        if (
            self.terminations[self.agent_selection]
            or self.truncations[self.agent_selection]
        ):
            self._was_dead_step(action)
            return

        action = np.asarray(action)
        agent = self.agent_selection
        if self.continuous:
            self.move_piston(self.pistonList[self.agent_name_mapping[agent]], action)
        else:
            self.move_piston(
                self.pistonList[self.agent_name_mapping[agent]], action - 1
            )

        self.space.step(self.dt)
        if self._agent_selector.is_last():
            ball_min_x = int(self.ball.position[0] - self.ball_radius)
            ball_next_x = (
                self.ball.position[0]
                - self.ball_radius
                + self.ball.velocity[0] * self.dt
            )

            # Check for termination conditions
            if ball_next_x <= self.wall_width + 1:
                self.terminate = True
                reward_team = self.team_2_agents
            if ball_next_x >= ((self.piston_width*self.n_pistons)-(self.wall_width)-1):
                self.terminate = True
                print(self.screen_width-self.wall_width)
                reward_team = self.team_1_agents
            else:
                reward_team = []

            # Ensure that the ball can't pass through the wall
            ball_min_x = max(self.wall_width, ball_min_x)
            self.draw()
            local_reward = self.get_local_reward(self.lastX, ball_min_x)
            global_reward = (100 / self.distance) * (self.lastX - ball_min_x)
            if not self.terminate:
                global_reward += self.time_penalty

            # Initialize total rewards for all agents
            total_reward = {agent: 0 for agent in self.agents}

            # Distribute rewards
            if reward_team:
                for agent in reward_team:
                    total_reward[agent] = global_reward

            # If not terminating, distribute rewards normally
            if not self.terminate:
                for index in self.get_nearby_pistons():
                    total_reward[self.agents[index]] += local_reward * self.local_ratio

            self.rewards = total_reward
            self.lastX = ball_min_x
            self.frames += 1
        else:
            self._clear_rewards()

        self.truncate = self.frames >= self.max_cycles

        # Clear the list of recent pistons for the next reward cycle
        if self.frames % self.recentFrameLimit == 0:
            self.recentPistons = set()
        if self._agent_selector.is_last():
            self.terminations = dict(
                zip(self.agents, [self.terminate for _ in self.agents])
            )
            self.truncations = dict(
                zip(self.agents, [self.truncate for _ in self.agents])
            )

        self.agent_selection = self._agent_selector.next()
        self._cumulative_rewards[agent] = 0
        self._accumulate_rewards()
        
        if self.render_mode == "human":
            self.render()

    # Create an instance of the custom environment

# Run the environment


In [5]:
env = CustomPistonball(n_pistons=20, time_penalty=-0.1, random_rotate=True, ball_mass=0.75, ball_friction=0.3, ball_elasticity=1.5, max_cycles=125)
env = aec_to_parallel(env)

In [None]:
env = ss.color_reduction_v0(env, mode='B')

In [17]:
env = ss.resize_v1(env, x_size=64, y_size=64)

In [36]:
env = ss.frame_stack_v1(env, 3)

In [37]:
env = ss.pettingzoo_env_to_vec_env_v1(env)

In [10]:
env = ss.concat_vec_envs_v1(env, 8, num_cpus=4, base_class='stable_baselines3')

In [None]:
model = PPO(CnnPolicy, env, verbose=3, gamma=0.95, n_steps=256, ent_coef=0.0905168, learning_rate=0.00062211, vf_coef=0.042202, max_grad_norm=0.9, gae_lambda=0.99, n_epochs=5, clip_range=0.3, batch_size=256)
PPO_path = os.path.join('./model')
checkpoint_callback = CheckpointCallback(
  save_freq=1000,
  save_path=PPO_path,
  name_prefix="rl_model_piston",
  save_replay_buffer=True,
  save_vecnormalize=True,
)
model.learn(total_timesteps=2000000, callback=checkpoint_callback)
print('running')
final_path = os.path.join('./Model/final_model')
model.save(final_path)


In [None]:
env = CustomPistonball(n_pistons=20, time_penalty=-0.1, random_rotate=True, ball_mass=0.75, ball_friction=0.3, ball_elasticity=1.5, max_cycles=125, render_mode='human')

env = ss.color_reduction_v0(env, mode='B')
env = ss.resize_v1(env, x_size=64, y_size=64)
env = ss.frame_stack_v1(env, 3)

In [24]:
model = PPO.load('./model/rl_model_piston_1280000_steps.zip')

In [None]:

for i in range (10):
    env = CustomPistonball(n_pistons=20, render_mode='human')
    print(env.screen_width-env.wall_width)
    env = ss.color_reduction_v0(env, mode='B')
    env = ss.resize_v1(env, x_size=64, y_size=64)
    env = ss.frame_stack_v1(env, 3)
    model = PPO.load('model/model-fix__/final.zip')
    env.reset()
    for agent in env.agent_iter():
        observation, reward, termination, truncation, info = env.last()
    
        if termination or truncation:
            action = None
        else:
            # this is where you would insert your policy
            print(reward)
            action = model.predict(observation, deterministic=False)[0]
        
        env.step(action)
        print(
                env.ball.position[0]
                - env.ball_radius
                + env.ball.velocity[0] * env.dt)
        
        print(f'screen width = {(env.piston_width*env.n_pistons)-(env.wall_width/2)}')
    print(i)
    env.close()


840
0
41.0
screen width = 780.0
0
41.0
screen width = 780.0
0
41.0
screen width = 780.0
0
41.0
screen width = 780.0
0
41.0
screen width = 780.0
0
41.0
screen width = 780.0
0
42.18302365214308
screen width = 780.0
0
43.39735942221451
screen width = 780.0
0
44.99384955927042
screen width = 780.0
0
46.99228139659718
screen width = 780.0
0
49.37885951848576
screen width = 780.0
0
52.125798307342315
screen width = 780.0
0
55.232737096198875
screen width = 780.0
0
58.69967588505543
screen width = 780.0
0
62.52661467391198
screen width = 780.0
0
65.07023447254464
screen width = 780.0
0
66.84954314549694
screen width = 780.0
0
68.4187379341062
screen width = 780.0
0
69.74815056027322
screen width = 780.0
0
70.84444866805704
screen width = 780.0
0.0
71.72828065007899
screen width = 780.0
0.0
72.42361514078192
screen width = 780.0
0.0
73.12458544668144
screen width = 780.0
0
73.83096881840443
screen width = 780.0
0
74.14750628357888
screen width = 780.0
0
74.35870962413924
screen width = 780.0
0

In [7]:
import gym
from stable_baselines3 import SAC
from stable_baselines3.common.env_util import DummyVecEnv, make_vec_env
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.callbacks import CheckpointCallback
from stable_baselines3.common.utils import set_random_seed

In [10]:
for i in range (10):
    env = CustomPistonball(n_pistons=20, render_mode='human')
    env = ss.color_reduction_v0(env, mode='B')
    env = ss.resize_v1(env, x_size=64, y_size=64)
    env = ss.frame_stack_v1(env, 3)
    model = SAC.load('model/sac_pistonball_320000_steps.zip')
    env.reset()
    for agent in env.agent_iter():
        observation, reward, termination, truncation, info = env.last()
    
        if termination or truncation:
            action = None
        else:
            # this is where you would insert your policy
            action = model.predict(observation, deterministic=False)[0]
            print(action)
    
        env.step(action)
        print(termination, truncation)
    print(i)
    env.close()


MemoryError: Unable to allocate 11.4 GiB for an array with shape (6250, 160, 3, 64, 64) and data type uint8