In [40]:
import gymnasium as gym
from gymnasium.wrappers import FrameStackObservation
import numpy as np
import ale_py
from stable_baselines3 import DQN
from stable_baselines3.common.env_util import make_vec_env
import gymnasium as gym
from gymnasium.wrappers import AtariPreprocessing
import numpy as np
import ale_py



### Make Environment

In [150]:
# Rewards and penalties
class LifePenaltyWrapper(gym.RewardWrapper):
    def __init__(self, env, death_penalty=-10, move_penalty= 0, moving_target_reward = -5, influence_reward=0, point_reward = 1):
        super(LifePenaltyWrapper, self).__init__(env)
        self.prev_lives = 0
        self.death_penalty = death_penalty
        self.move_penalty = move_penalty
        self.influence_reward = influence_reward
        self.moving_target_reward = moving_target_reward
        self.point_reward = point_reward
        self.last_action = None  # Track the last action taken by the agent

    def reset(self, **kwargs):
        obs = self.env.reset(**kwargs)
        self.prev_lives = self.env.unwrapped.ale.lives()  # Ensure lives are synced
        self.last_action = None  # Reset last action
        return obs

    def step(self, action):
        self.last_action = action
        obs, reward, terminated, truncated, info = self.env.step(action)

        if reward in [10, 20, 30, 40, 50]:
            reward = self.point_reward
        elif reward == 60:
            reward = self.moving_target_reward

        # Check if a life is lost and apply penalty
        current_lives = self.env.unwrapped.ale.lives()
        if current_lives < self.prev_lives:
            reward = self.death_penalty  #when player dies the alien gives points for blowing up
        self.prev_lives = current_lives

        
        if action in [2, 3]:  #  Penalize moving left or right
            reward += self.move_penalty
            

        if action == 0:  # Penalize specific idle action
            reward += 0

        

        return obs, reward, terminated, truncated, info


In [151]:
def make_env(seed, stack_frames=4, mode=None):
   
    def _init():
        env = gym.make("ALE/Galaxian-v5", frameskip=1, render_mode=mode)  # Disable default frame skipping
        
        env = AtariPreprocessing(env, frame_skip=3)
        env = LifePenaltyWrapper(env)
        env = FrameStackObservation(env, stack_size=stack_frames)
        

        # Seed the environment
        env.action_space.seed(seed)
        env.observation_space.seed(seed)
        
        return env
    
    return _init

### Feature Extractor CNN

In [143]:
import torch
import torch.nn as nn
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor

class Custom3DCNN(BaseFeaturesExtractor):
    def __init__(self, observation_space, features_dim=256):
        super(Custom3DCNN, self).__init__(observation_space, features_dim)

        frames, height, width = observation_space.shape
        self.cnn = nn.Sequential(
            nn.Conv2d(in_channels=frames, out_channels=32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1),
            nn.ReLU(),
            nn.Flatten(),
        )

        with torch.no_grad():
            sample_input = torch.zeros(1, frames, height, width)
            n_flatten = self.cnn(sample_input).shape[1]
        self.fc = nn.Sequential(
            nn.Linear(n_flatten, features_dim),
            nn.ReLU()
            
        )

    def forward(self, observations):
        return self.fc(self.cnn(observations))


### RL Network

In [152]:
from stable_baselines3 import DQN
from stable_baselines3.common.env_util import make_vec_env
import gymnasium as gym
from gymnasium.wrappers import AtariPreprocessing
import numpy as np
import ale_py

gym.register_envs(ale_py)

# Create and wrap the environment

vec_env = make_vec_env(
    lambda: make_env(seed=np.random.randint(20, 60))(), 
    n_envs=4
)
print(vec_env.observation_space)


# Policy kwargs to use the custom CNN
policy_kwargs = dict(
    features_extractor_class=Custom3DCNN,
    features_extractor_kwargs=dict(features_dim=256),  # Output dimension of the feature extractor
)

# Create the DQN model
model = DQN(
    "CnnPolicy",  # Use a CNN-based policy
    vec_env,
    policy_kwargs=policy_kwargs,
    learning_rate=1e-4,
    buffer_size=20000,
    learning_starts=20000,
    batch_size=128,
    tau=1.0,
    gamma=0.99,
    train_freq=(16, 'step'),
    target_update_interval=1000,
    exploration_fraction=0.5,
    exploration_final_eps=0.1,
    verbose=1,
    device="cuda"
)


Box(0, 255, (4, 84, 84), uint8)
Using cuda device


### Training

In [94]:
from stable_baselines3.common.callbacks import CheckpointCallback
checkpoint_callback = CheckpointCallback(
    save_freq=10000,   # Save every 10,000 steps
    save_path='./checkpoints/',  # Directory to save the model
    name_prefix='new_V1_train'  # Prefix for the checkpoint filenames
)

# Train the model
model.learn(total_timesteps=200000, callback=checkpoint_callback)

# Save the model
model.save("new_v1_dqn_galaxian")

----------------------------------
| rollout/            |          |
|    ep_len_mean      | 665      |
|    ep_rew_mean      | 14.5     |
|    exploration_rate | 0.965    |
| time/               |          |
|    episodes         | 4        |
|    fps              | 400      |
|    time_elapsed     | 9        |
|    total_timesteps  | 3936     |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 671      |
|    ep_rew_mean      | 8.5      |
|    exploration_rate | 0.933    |
| time/               |          |
|    episodes         | 8        |
|    fps              | 422      |
|    time_elapsed     | 17       |
|    total_timesteps  | 7412     |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 787      |
|    ep_rew_mean      | 34.7     |
|    exploration_rate | 0.898    |
| time/               |          |
|    episodes       

In [153]:
# Continue training

from stable_baselines3 import DQN
from stable_baselines3.common.callbacks import CheckpointCallback


vec_env = make_vec_env(
    lambda: make_env(seed=np.random.randint(0, 5000))(), 
    n_envs=8
)

# Reload the saved model
model = DQN.load(
    "new_v15_freq64_dqn_galaxian_long.zip", 
    env=vec_env, 
    exploration_initial_eps=0.3,
    exploration_final_eps=0.1, 
    exploration_fraction = 0.3, 
    learning_starts=20000, 
    train_freq=(128, 'step')
    ) # it acts different wt smaller freq


# Define checkpoint callback
checkpoint_callback = CheckpointCallback(
    save_freq=1000000, 
    save_path="./checkpoints/",
    name_prefix="new"
)

# Continue training
model.learn(total_timesteps=1000000, callback=checkpoint_callback)

# Save the retrained model
model.save("new_v16_dqn_galaxian_long.zip")




----------------------------------
| rollout/            |          |
|    ep_len_mean      | 437      |
|    ep_rew_mean      | -48.8    |
|    exploration_rate | 0.297    |
| time/               |          |
|    episodes         | 4        |
|    fps              | 856      |
|    time_elapsed     | 5        |
|    total_timesteps  | 4592     |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 603      |
|    ep_rew_mean      | -29.8    |
|    exploration_rate | 0.295    |
| time/               |          |
|    episodes         | 8        |
|    fps              | 956      |
|    time_elapsed     | 7        |
|    total_timesteps  | 7504     |
----------------------------------
----------------------------------
| rollout/            |          |
|    ep_len_mean      | 744      |
|    ep_rew_mean      | -19.6    |
|    exploration_rate | 0.292    |
| time/               |          |
|    episodes       

### View trained model

In [155]:
# Load the saved model
model = DQN.load("new_v16_dqn_galaxian_long.zip")

# Create the environment with render_mode

vec_env = make_vec_env(
    lambda: make_env(seed=np.random.randint(0, 50), mode='human')(), 
    n_envs=1
)

# Test the model
obs = vec_env.reset()
for _ in range(10000):
    action, _states = model.predict(obs, deterministic=True)  # Use deterministic=True for evaluation
    obs, reward, done, info = vec_env.step(action)  # Perform a step
    print(f"Reward: {reward}, info: {info}")
    vec_env.render()  # Optional: Visualize the gameplay
    if done:
        obs = vec_env.reset()



Reward: [0.], info: [{'lives': 4, 'episode_frame_number': 23, 'frame_number': 23, 'TimeLimit.truncated': False}]
Reward: [0.], info: [{'lives': 4, 'episode_frame_number': 26, 'frame_number': 26, 'TimeLimit.truncated': False}]
Reward: [0.], info: [{'lives': 4, 'episode_frame_number': 29, 'frame_number': 29, 'TimeLimit.truncated': False}]
Reward: [0.], info: [{'lives': 4, 'episode_frame_number': 32, 'frame_number': 32, 'TimeLimit.truncated': False}]
Reward: [0.], info: [{'lives': 4, 'episode_frame_number': 35, 'frame_number': 35, 'TimeLimit.truncated': False}]
Reward: [0.], info: [{'lives': 4, 'episode_frame_number': 38, 'frame_number': 38, 'TimeLimit.truncated': False}]
Reward: [0.], info: [{'lives': 4, 'episode_frame_number': 41, 'frame_number': 41, 'TimeLimit.truncated': False}]
Reward: [0.], info: [{'lives': 4, 'episode_frame_number': 44, 'frame_number': 44, 'TimeLimit.truncated': False}]
Reward: [0.], info: [{'lives': 4, 'episode_frame_number': 47, 'frame_number': 47, 'TimeLimit.tru

KeyboardInterrupt: 