In [7]:
%%capture
!apt install python-opengl
!apt install ffmpeg
!apt install xvfb
!pip install pyvirtualdisplay
!pip install pyglet==1.5.1
!pip install torch

In [8]:
# Virtual display
from pyvirtualdisplay import Display

virtual_display = Display(visible=0, size=(1400, 900))
virtual_display.start()

FileNotFoundError: [Errno 2] No such file or directory: 'Xvfb'

In [9]:
!pip install gymnasium



In [10]:
from typing import Optional
import numpy as np
if not hasattr(np, "bool8"):
  np.bool8 = np.bool_

from collections import deque

import matplotlib.pyplot as plt
%matplotlib inline

# PyTorch
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical

# Gym
import gymnasium as gym
#import gym_pygame

# Hugging Face Hub
# from huggingface_hub import notebook_login # To log to our Hugging Face account to be able to upload models to the Hub.
# import imageio

  if not hasattr(np, "bool8"):


In [6]:
class StardewMineEv(gym.Env):
    def __init__(self, size: int = 10, max_floor: int = 10, max_energy: int = 100, local_view_size: int = 5, seed: Optional[int] = None):
        # The size of the square grid (10x10 by default)
        self.SIZE = size
        self.MAX_FLOOR = max_floor
        self.MAX_ENERGY = max_energy
        self.LOCAL_VIEW_SIZE = local_view_size  # Agent can see everything in the surrounding nxn area; n odd
        
        # Initialize state variables
        self.agent_location = np.array([-1, -1], dtype=np.int32)
        self.grid = None  # (size x size) grid
        self.energy = None
        self.floor = None
        self._ladder_location = None  # (y, x)

        np.random.seed(seed)

        # Initialize tile types
        self.EMPTY = 0
        self.LADDER = 1
        self.WEED = 2
        self.ROCK = 3
        self.ORE = 4  # TODO: add more ores
        self.OUT_OF_BOUND = -1
        self.MAX_TILE_TYPE = 4

        self.AGENT = 9  # Render ONLY

        # Initialize action space
        self.action_space = gym.spaces.Discrete(17)
        self.ACTION_MOVE_RIGHT = 0
        self.ACTION_MOVE_UP_RIGHT = 1
        self.ACTION_MOVE_UP = 2
        self.ACTION_MOVE_UP_LEFT = 3
        self.ACTION_MOVE_LEFT = 4
        self.ACTION_MOVE_DOWN_LEFT = 5
        self.ACTION_MOVE_DOWN = 6
        self.ACTION_MOVE_DOWN_RIGHT = 7
        self.ACTION_MINE_RIGHT = 8
        self.ACTION_MINE_UP_RIGHT = 9
        self.ACTION_MINE_UP = 10
        self.ACTION_MINE_UP_LEFT = 11
        self.ACTION_MINE_LEFT = 12
        self.ACTION_MINE_DOWN_LEFT = 13
        self.ACTION_MINE_DOWN = 14
        self.ACTION_MINE_DOWN_RIGHT = 15
        self.ACTION_DESCEND = 16
        

        # Map action numbers to actual movements on the grid
        # This makes the code more readable than using raw numbers
        self._action_to_direction = {
            self.ACTION_MOVE_RIGHT: np.array([1, 0]),
            self.ACTION_MOVE_UP_RIGHT: np.array([1, -1]),
            self.ACTION_MOVE_UP: np.array([0, -1]),
            self.ACTION_MOVE_UP_LEFT: np.array([-1, -1]),
            self.ACTION_MOVE_LEFT: np.array([-1, 0]),
            self.ACTION_MOVE_DOWN_LEFT: np.array([-1, 1]),
            self.ACTION_MOVE_DOWN: np.array([0, 1]),
            self.ACTION_MOVE_DOWN_RIGHT: np.array([1, 1]),
        }

        # Initialize observation space
        self.observation_space = gym.spaces.Dict(
            {
                "agent_location": gym.spaces.Box(0, self.SIZE - 1, shape=(2,), dtype=np.int32),
                "energy": gym.spaces.Box(0, self.MAX_ENERGY, shape=(1,), dtype=np.int32),
                "floor": gym.spaces.Box(0, self.MAX_FLOOR - 1, shape=(1,), dtype=np.int32),
                "local_view": gym.spaces.Box(self.OUT_OF_BOUND, self.MAX_TILE_TYPE, shape=(self.LOCAL_VIEW_SIZE, self.LOCAL_VIEW_SIZE), dtype=np.int32),
            }
        )

    
    def reset(self, seed: Optional[int] = None, options: Optional[dict] = None):
        """Start a new episode.

        Args:
            seed: Random seed for reproducible episodes
            options: Additional configuration (unused in this example)

        Returns:
            tuple: (observation, info) for the initial state
        """
        # IMPORTANT: Must call this first to seed the random number generator
        super().reset(seed=seed)

        # Randomly place the agent anywhere on the top row of the grid
        self.agent_location = self.np_random.integers(0, self.SIZE, size=2, dtype=np.int32)
        self.energy = self.MAX_ENERGY
        self.floor = 0

        # Generate floor rewards and obstacles
        self._generate_floor()  # TODO: generate floor with weed vs. no weed

        observation = self._get_obs()
        info = self._get_info()

        return observation, info


    def step(self, action: int):
        """Execute one timestep within the environment.

        Args:
            action: The action to take (0-3 for directions)

        Returns:
            tuple: (observation, reward, terminated, truncated, info)
        """

        reward = 0.0
        terminated = False
        truncated = False

        # DESCEND (16)
        if action == self.ACTION_DESCEND:
            if self.floor < self.MAX_FLOOR-1 and self._ladder_location is not None:
                ax, ay = self.agent_location
                lx, ly = self._ladder_location
                # Go to next floor if ladder is within 3x3 area
                if abs(lx - ax) <= 1 and abs(ly - ay) <= 1:
                    self.floor += 1
                    self._generate_floor()
                    reward += 0.5  # TODO: adjust

        # MINE (8-15): mine 1 tile in a direction
        elif 8 <= action <= 15:
            reward += self._mine_tile(action)

        # MOVE (0-7)
        else:
            direction = self._action_to_direction[action]

            # Update agent position, ensuring it stays within grid bounds
            # np.clip prevents the agent from walking off the edge
            agent_location = np.clip(
                self.agent_location + direction, 0, self.SIZE - 1
            )

            # If tile is empty, move agent
            if self.grid[agent_location[1], agent_location[0]] == 0:
                self.agent_location = agent_location
                reward -= 0.05
            else:
                reward -= 0.5  # Penalize agent for wasting movement

        # End episode when all rocks/ores in the last floor are collected OR when agent collapses
        if self._is_grid_empty():
            terminated = True
            reward += 5  # TODO: adjust
        elif self.energy <= 0:
            terminated = True
            reward -= 10  # TODO: adjust

        observation = self._get_obs()
        info = self._get_info()

        return observation, reward, terminated, truncated, info

    
    def _generate_floor(self):
        self.grid = np.full((self.SIZE, self.SIZE), self.EMPTY, dtype=np.int32)

        # Get environment spawn probability
        prob_weed, prob_rock, prob_ore = 0.1, 0.3, 0.05

        possible_ladder_locations = []

        for y in range(self.SIZE):
            for x in range(self.SIZE):
                if (x,y) == tuple(self.agent_location):
                    continue
                i = np.random.random()
                if i < prob_weed:
                    self.grid[y,x] = self.WEED
                elif i < prob_weed + prob_rock:
                    self.grid[y,x] = self.ROCK
                    possible_ladder_locations.append((y,x))
                elif i < prob_weed + prob_rock + prob_ore:
                    self.grid[y,x] = self.ORE
                    possible_ladder_locations.append((y,x))

        # Ensure that there's at least 1 rock in each floor to place the ladder
        # and also avoid placing rock under agent
        if not possible_ladder_locations:
            while True:
                x = np.random.randint(0, self.SIZE)
                y = np.random.randint(0, self.SIZE)
                if (x,y) != tuple(self.agent_location) and self.grid[y,x] == self.EMPTY:
                    self.grid[y,x] = self.ROCK
                    possible_ladder_locations.append((x,y))
                    break

        # Place ladder under a randomly chosen rock/ore if not at last floor
        if self.floor < self.MAX_FLOOR - 1:
            self._ladder_location = np.random.choice(possible_ladder_locations)
        else:
            self._ladder_location = None


    def _mine_tile(self, action: int):
        reward = 0.0
        self.energy -= 1

        direction = self._action_to_direction[action-8]
        x, y = self.agent_location + direction

        # If mine out of bounds, do nothing, and penalize for wasting movement
        if not (0 <= x < self.SIZE and 0 <= y < self.SIZE):
            return reward - 1

        tile = self.grid[y,x]

        match tile:
            case self.ORE:  # TODO: add more ore values
                reward += 1
            case self.EMPTY:  # Penalize agent for mining empty space
                reward -= 1
            case _:
                reward -= 0.01  # TODO: consider different rewards/penalty for rocks vs. weeds
        # Tile is empty after being mined
        self.grid[y,x] = self.EMPTY

        if self._ladder_location is not None and (y,x) == self._ladder_location:
            self.grid[y,x] = self.LADDER

        return reward


    def _is_grid_empty(self):
        return np.sum(self.grid) == 0


    def _get_obs(self):
        half = self.LOCAL_VIEW_SIZE // 2
        ax = int(self.agent_location[0])
        ay = int(self.agent_location[1])

        x0 = ax - half
        y0 = ay - half
        x1 = ax + half + 1
        y1 = ay + half + 1

        local_view = np.full(
            (self.LOCAL_VIEW_SIZE, self.LOCAL_VIEW_SIZE),
            self.OUT_OF_BOUND,
            dtype=np.float32,
        )

        gx0 = max(0, x0)
        gy0 = max(0, y0)
        gx1 = min(self.SIZE, x1)
        gy1 = min(self.SIZE, y1)

        patch = self.grid[gy0:gy1, gx0:gx1].astype(np.float32)

        px0 = gx0 - x0
        py0 = gy0 - y0
        local_view[py0:py0 + patch.shape[0], px0:px0 + patch.shape[1]] = patch

        obs = {
            'agent_location': self.agent_location.astype(np.float32),
            'energy': np.array([self.energy], dtype=np.float32),  # shape (1,)
            'floor': np.array([self.floor], dtype=np.float32),    # shape (1,)
            'local_view': local_view,
        }
        return obs



    def _get_info(self):
        return {}


    def render(self, render_mode: str = "human"):
        grid_copy = self.grid.copy()
        x,y = self.agent_location
        grid_copy[y,x] = 9  # Agent
        print("Floor:", self.floor, "Energy:", self.energy)
        print(grid_copy)