<a href="https://colab.research.google.com/github/anant1525/BucketPick/blob/main/Spaceship_survival_problem_v5.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# # the Gym environment class
from gym import Env # predefined spaces from Gym
from gym import spaces # used to randomize starting positions
import random # used for integer datatypes
import numpy as np
from collections import namedtuple, deque

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchsummary import summary

import os

In [2]:
class SpaceshipEnvironment(Env):
    def __init__(self):
        self.cumulative_reward = 0

        # set the initial state to a flattened BOARD_SIZExBOARD_SIZE grid with a randomly placed Spaceship and asteroids
        self.state = [ [SPACE]*BOARD_LENGTH for _ in range(BOARD_LENGTH)]

        self.spaceship_x = random.randrange(0, BOARD_LENGTH)
        self.spaceship_y = random.randrange(0, BOARD_LENGTH)
        self.state[self.spaceship_y][self.spaceship_x] = SPACESHIP # Update state
        # initialize asteroids positions
        self.asteroid_x = []
        self.asteroid_y = []
        for c in range(ASTEROID_COUNT):
            x = random.randrange(0, BOARD_LENGTH)
            y = random.randrange(0, BOARD_LENGTH)
            # make sure spaceship and any asteroid positions overlapping each other
            while x == self.spaceship_x and y == self.spaceship_y:
                x = random.randrange(0, BOARD_LENGTH)
                y = random.randrange(0, BOARD_LENGTH)
            self.asteroid_x.append(x)
            self.asteroid_y.append(y)
            self.state[y][x] = ASTEROID

        # convert the python array into a numpy array (needed since Gym expects the state to be this way)
        self.state = np.array(self.state, dtype=np.int16)
        print(self.state.shape)

        # observation space (valid ranges for observations in the state)
        self.observation_space = spaces.Box(0, 2, [BOARD_LENGTH, BOARD_LENGTH], dtype=np.int16)

        # valid actions:
        #   0 = up
        #   1 = down
        #   2 = left
        #   3 = right
        self.action_space = spaces.Discrete(4)

    def step(self, action):
        # placeholder for debugging information
        info = {}

        # set default values for done, reward, and the player position before taking the action
        done = False
        reward = -0.01
        previous_position_x = self.spaceship_x
        previous_position_y = self.spaceship_y

        # take the action by moving the player
        if action == UP:
            self.spaceship_y -= 1
            if self.spaceship_y < 0:
                self.spaceship_y = (BOARD_LENGTH - 1)

        elif action == DOWN:
            self.spaceship_y += 1
            if self.spaceship_y >= BOARD_LENGTH:
                self.spaceship_y = 0

        elif action == LEFT:
            self.spaceship_x -= 1
            if self.spaceship_x < 0:
                self.spaceship_x = (BOARD_LENGTH - 1)

        elif action == RIGHT:
            self.spaceship_x += 1
            if self.spaceship_x >= BOARD_LENGTH:
                self.spaceship_x = 0
        else:
            print(action)
            raise Exception("invalid action")

        # check for win/lose conditions and set reward
        if self.state[self.spaceship_y][self.spaceship_x] == SPACE:
            reward = 1.0
            self.cumulative_reward += reward
            done = True
            clear_screen()
            print(f'Cumulative Reward: {self.cumulative_reward}')
            print('YOU WIN!!!!')

        elif self.state[self.spaceship_y][self.spaceship_x] == ASTEROID:
            reward = -1.0
            self.cumulative_reward += reward
            done = True
            clear_screen()
            print(f'Cumulative Reward: {self.cumulative_reward}')
            print('YOU LOSE')

        #
        # Update the environment state
        #
        if not done:
            # update the player position
            self.state[previous_position_y][previous_position_x] = SPACE
            self.state[self.spaceship_y][self.spaceship_x] = SPACESHIP

        self.cumulative_reward += reward
        return self.state, reward, done, info

    def render(self):
        # visualization can be added here
        pretty_print(self.state, self.cumulative_reward)

    def reset(self):
        self.cumulative_reward = 0

        # set the initial state to a flattened 6x6 grid with a randomly placed entry, win, and player
        self.state = [ [SPACE]*BOARD_LENGTH for i in range(BOARD_LENGTH)]

        self.spaceship_x = random.randrange(0, BOARD_LENGTH)
        self.spaceship_y = random.randrange(0, BOARD_LENGTH)
        self.state[self.spaceship_y][self.spaceship_x] = SPACESHIP # Update state
        # initialize asteroids positions
        self.asteroid_x = []
        self.asteroid_y = []
        for _ in range(ASTEROID_COUNT):
            x = random.randrange(0, BOARD_LENGTH)
            y = random.randrange(0, BOARD_LENGTH)
            # make sure spaceship and any asteroid positions overlapping each other
            while x == self.spaceship_x and y == self.spaceship_y:
                x = random.randrange(0, BOARD_LENGTH)
                y = random.randrange(0, BOARD_LENGTH)
            self.asteroid_x.append(x)
            self.asteroid_y.append(y)
            self.state[y][x] = ASTEROID

        # convert the python array into a numpy array (needed since Gym expects the state to be this way)
        self.state = np.array(self.state, dtype=np.int16)

        return self.state

# clears the screen of any output
def clear_screen():
    os.system("cls")# prints out the environment state in a visually appealing way
def pretty_print(state_array, cumulative_reward):
   clear_screen()
   print(f'Cumulative Reward: {cumulative_reward}')
   print()
   for i in range(6):
       for j in range(6):
           print('{:4}'.format(state_array[i*6 + j]), end = "")
       print()


  and should_run_async(code)


In [3]:
class ReplayBuffer:

    def __init__(self, action_size, buffer_size, batch_size, seed):

        self.action_size = action_size
        self.memory = deque(maxlen=buffer_size)
        self.batch_size = batch_size
        self.experience = namedtuple("Experience", field_names=["state", "action", "reward", "next_state", "done"])
        self.seed = random.seed(seed)

    def add(self, state, action, reward, next_state, done):
        """Add a new experience to memory."""
        e = self.experience(state, action, reward, next_state, done)
        self.memory.append(e)

    def sample(self):
        """Randomly sample a batch of experiences from memory."""
        experiences = random.sample(self.memory, k=self.batch_size)

        states = torch.from_numpy(np.vstack([e.state for e in experiences if e is not None])).float().to(device)
        actions = torch.from_numpy(np.vstack([e.action for e in experiences if e is not None])).long().to(device)
        rewards = torch.from_numpy(np.vstack([e.reward for e in experiences if e is not None])).float().to(device)
        next_states = torch.from_numpy(np.vstack([e.next_state for e in experiences if e is not None])).float().to(device)
        dones = torch.from_numpy(np.vstack([e.done for e in experiences if e is not None]).astype(np.uint8)).float().to(device)

        return (states, actions, rewards, next_states, dones)

    def __len__(self):
        """Return the current size of internal memory."""
        return len(self.memory)


In [4]:
class CNNQNetwork(nn.Module):
    def __init__(self, state_dim, action_dim, seed):
        super(CNNQNetwork, self).__init__()
        self.seed = torch.manual_seed(seed)
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=16, kernel_size=3, stride=1)
        self.conv2 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=1)
        self.fc1 = nn.Linear(32, 256)  # Adjusted based on input size
        self.fc2 = nn.Linear(256, action_dim)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2)
        x = x.view(-1, 32)  # Adjusted based on input size
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x


In [5]:
class Agent():

    def __init__(self, state_size, action_size, seed):

        self.state_size = state_size
        self.action_size = action_size
        self.seed = random.seed(seed)

        # Q-Network
        self.qnetwork_local = CNNQNetwork(state_size, action_size, seed).to(device)
        #summary(self.qnetwork_local, state_size)
        self.qnetwork_target = CNNQNetwork(state_size, action_size, seed).to(device)
        #summary(self.qnetwork_target, state_size)
        self.optimizer = optim.Adam(self.qnetwork_local.parameters(), lr=LR)

        # Replay memory
        self.memory = ReplayBuffer(action_size, BUFFER_SIZE, BATCH_SIZE, seed)
        # Initialize time step (for updating every UPDATE_EVERY steps)
        self.t_step = 0

    def step(self, state, action, reward, next_state, done):

        # Save experience in replay memory
        self.memory.add(state, action, reward, next_state, done)

        # Learn every UPDATE_EVERY time steps.
        self.t_step = (self.t_step + 1) % UPDATE_EVERY
        if self.t_step == 0:
            # If enough samples are available in memory, get random subset and learn
            if len(self.memory) > BATCH_SIZE:
                experiences = self.memory.sample()
                self.learn(experiences, GAMMA)

    def act(self, state, eps=0.):

        state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0).to(device)  # Convert state to tensor and add batch dimension
        print("state_tensor : ", state_tensor.shape)
        self.qnetwork_local.eval()

        with torch.no_grad():
            action_values = self.qnetwork_local(state_tensor)

        self.qnetwork_local.train()

        # Epsilon-greedy action selection
        if random.random() > eps:
            #print("Action Values : ",action_values)
            action = torch.argmax(action_values).item()
            msg = "Best action "
        else:
            # Choose a random action
            action = random.randint(0, self.action_size - 1)
            msg = "Random action "

        print("{0} : {1}".format( msg, action))
        return action

    def learn(self, experiences, gamma):
        states, actions, rewards, next_states, dones = experiences

        # Reshape states and next_states to have the correct shape for CNN
        states = states.view(-1, 1, BOARD_LENGTH, BOARD_LENGTH)
        next_states = next_states.view(-1, 1, BOARD_LENGTH, BOARD_LENGTH)

        # Compute and minimize the loss
        q_targets_next = self.qnetwork_target(next_states).detach().max(1)[0].unsqueeze(1)
        q_targets = rewards + gamma * q_targets_next * (1 - dones)
        q_expected = self.qnetwork_local(states).gather(1, actions)

        loss = F.mse_loss(q_expected, q_targets)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # Update the target network
        self.soft_update(self.qnetwork_local, self.qnetwork_target, TAU)

    def soft_update(self, local_model, target_model, tau):

        for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
            target_param.data.copy_(tau*local_param.data + (1.0-tau)*target_param.data)

    def locate_spaceship(self, state):
        for i, x in enumerate(state):
            if 1 in x:
                return i, x.index(1)
        return 0, 0

    def find_best_action(self, state, action_values):
        #print(state)
        pos = np.where(state==1)
        #print("pos : ", pos)
        y, x = pos[0][0], pos[1][0]
        neighbour_action_values = []
        neighbour_action_values.append(action_values[y - 1][x]) #up
        neighbour_action_values.append(action_values[y + 1 if (y + 1) < BOARD_LENGTH else 0][x])
        neighbour_action_values.append(action_values[y][x - 1])
        neighbour_action_values.append(action_values[y][x + 1 if (x + 1) < BOARD_LENGTH else 0])
        return neighbour_action_values.index(max(neighbour_action_values))

In [6]:
def dqn(n_episodes=500, max_t=1000, eps_start=1.0, eps_end=0.01, eps_decay=0.995):

    scores = []                        # list containing scores from each episode
    scores_window = deque(maxlen=100)  # last 100 scores
    eps = eps_start                    # initialize epsilon
    for i_episode in range(1, n_episodes+1):
        #print("before reset \n", env.state)
        state = env.reset()
        #print("after reset \n", env.state)
        score = 0
        for t in range(max_t):
            action = agent.act(state, eps)
            #print('action, state, eps\n', action, state, eps)
            next_state, reward, done, _ = env.step(action)
            agent.step(state, action, reward, next_state, done)
            state = next_state
            score += reward
            if done:
                break
        scores_window.append(score)       # save most recent score
        scores.append(score)              # save most recent score
        eps = max(eps_end, eps_decay*eps) # decrease epsilon
        print('\rEpisode {}\tAverage Score: {:.2f}'.format(i_episode, np.mean(scores_window)), end="")
        if i_episode % 100 == 0:
            print('\rEpisode {}\tAverage Score: {:.2f}'.format(i_episode, np.mean(scores_window)))
        if np.mean(scores_window)>=200.0:
            print('\nEnvironment solved in {:d} episodes!\tAverage Score: {:.2f}'.format(i_episode-100, np.mean(scores_window)))
            torch.save(agent.qnetwork_local.state_dict(), 'checkpoint.pth')
            break
    return scores

In [7]:
# global constants

# board size
BOARD_LENGTH = 6
BOARD_SIZE = BOARD_LENGTH * BOARD_LENGTH # assuming the board is square

# Number of asteroids
ASTEROID_COUNT = 4

# game board values
SPACE = 0
SPACESHIP = 1
ASTEROID = 2

# action values
NUM_ACTIONS = 4
UP, DOWN, LEFT, RIGHT = 0, 1, 2, 3

In [8]:
env = SpaceshipEnvironment()

BUFFER_SIZE = int(1e5)  # replay buffer size
BATCH_SIZE = 64         # minibatch size
GAMMA = 0.99            # discount factor
TAU = 1e-3              # for soft update of target parameters
LR = 5e-4               # learning rate
UPDATE_EVERY = 4        # how often to update the network

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

(6, 6)


In [None]:
agent = Agent(state_size=(1, BOARD_LENGTH, BOARD_LENGTH), action_size=NUM_ACTIONS, seed=0)
scores = dqn()

  and should_run_async(code)
