### Define Env

In [132]:
import time
import gymnasium as gym
from gymnasium import spaces
import numpy as np
from PIL import ImageGrab, Image
import pyautogui
import win32gui
import cv2
import win32con
from torchvision import transforms
from collections import deque
import keyboard
import pyautogui
import ctypes
from modules.read_memory import get_process_id, read_memory, PROCESS_ALL_ACCESS

P1_CELL_BBOX_DOLPHIN = (390, 160, 845, 960)

class PPLEnv(gym.Env):
    def __init__(self, base_address, color_mode='grey', cursor_logging=False, screenshot_saving=False, load_state_key='f2'):
        super(PPLEnv, self).__init__()
        # Action Space
        self.action_space = spaces.Discrete(5)  # 6 possible actions: W, A, S, D, H, K

        # Image Size
        self.img_size_color = (32, 32)
        
        # Logging
        self.cursor_logging = cursor_logging
        self.save_screenshots = screenshot_saving
        self.screenshot_history = []
        self.preprocessed_screenshot_history = []

        # Bounding boxes 
        self.state_bbox = P1_CELL_BBOX_DOLPHIN # bounding box for state
        self.bbox_bottom_left_cell_spotlight = (400, 965, 440, 1005) # bounding box for check game over

        # Keys
        self.load_state_key = load_state_key

        # Template Images
        self.template_cell_gray = cv2.cvtColor(cv2.imread("images/temp_img_cell.png"), cv2.COLOR_BGR2GRAY)

        # Track episode score for calc reward
        self.episode_score = 0
        self.punishment = -250
        self.no_reward_streak = 0

        # Select preprocessing function based on the mode
        if color_mode == 'grey':
            self.preprocess_frame = preprocess_frame_grey
        elif color_mode == 'color':
            self.preprocess_frame = preprocess_frame_color
        else:
            raise ValueError("Invalid mode specified. Use 'grey' or 'color'.")
        
        # Find pointers (use QtGui.dll as base adres)
        process_name = "DolphinMemoryEngine.exe"
        process_id = get_process_id(process_name)
        self.process_handle = ctypes.windll.kernel32.OpenProcess(PROCESS_ALL_ACCESS, False, process_id)
        
        score_base_offset=0x006EA800
        score_offsets=[0x130, 0x218, 0x18, 0x40, 0x220, 0xAD0]
        self.score_pointer = get_pointer(self, base_address, score_base_offset, score_offsets)

        cursor_horizontal_base_offset = 0x006EB0F8
        cursor_horizontal_offsets = [0x98, 0x8, 0x1C8, 0x128, 0x8, 0x0, 0x8C0]
        self.cursor_horizontal_pointer = get_pointer(self, base_address, cursor_horizontal_base_offset, offsets=cursor_horizontal_offsets)

        cursor_vertical_base_offset = 0x006EB2A8
        cursor_vertical_offsets = [0x78, 0x1C0, 0x188, 0xE0, 0x40]
        self.cursor_vertical_pointer = get_pointer(self, base_address, cursor_vertical_base_offset, offsets=cursor_vertical_offsets)

        # Test memory pointers
        cursor_x, cursor_y = get_cursor_pos(self)
        score = get_score(self)
        print(f"Environment succesfully initialized \n" + f"- cursor: ({cursor_x}, {cursor_y})\n" + f"- score: {score}")
        
    # FUNCTIONS
    def step(self, action):
        do_action(action=action)                    
        observation = self.get_state()              
        done = self.is_game_over()                  
        reward = self.calculate_reward(done)            
        return observation, reward, done

    def reset(self, seed=None, options=None):
        # Restart from save state
        keyboard.press(self.load_state_key) # load save state in slot f2
        time.sleep(.025) # wait to release key
        keyboard.release(self.load_state_key)
        time.sleep(2.2) # wait for dolphin to update
        
        # Reset Variables
        self.episode_score = 0 # reset total episode reward
        self.preprocessed_screenshot_history = [] # resetting history to prevent extremely large arrays of images
        self.no_reward_streak = 0

        # Return State
        observation = self.get_state()
        return observation

    def calculate_reward(self, done):
        if done:
            return self.punishment
        else:            
            # Get score from the game by reading memory
            total_score = get_score(self)
            # calculate delta score (score gain)
            reward = total_score - self.episode_score
            # keep track of total game score
            self.episode_score+=reward
            if reward == 0:
                self.no_reward_streak+=1
                return reward - self.no_reward_streak # everytime punish 1 more
            else:
                self.no_reward_streak=0 # reset the no reward streak
            # multiply reward by 10 to outweigh the continous -1 
            return reward

    # should take 0.003s
    def is_game_over(self): 
        input_img = np.array(ImageGrab.grab(bbox=self.bbox_bottom_left_cell_spotlight)) # Grab img and convert to numpy array
        input_gray = cv2.cvtColor(input_img, cv2.COLOR_RGB2GRAY) # grayscale it
        # Perform template matching for both templates
        result1 = cv2.matchTemplate(input_gray, self.template_cell_gray, cv2.TM_CCOEFF_NORMED)
        # Get the best match positions and scores for both templates
        _, max_val1, _, _ = cv2.minMaxLoc(result1)
        # Accuracy Check
        if max_val1 > 0.8:
            return False
        else:
            return True

    def reset_game(self):
        state = self.get_state()
        return state
    
    def get_state(self):
        # Grab Screenshot
        screenshot = ImageGrab.grab(bbox=self.state_bbox)  # PIL
        screenshot_np = np.array(screenshot)
        screenshot_np = self.preprocess_frame(screenshot_np, self.img_size_color)
        cursor_position_horizontal, cursor_position_vertical = get_cursor_pos(self)
        if self.save_screenshots:
            self.screenshot_history.append(screenshot)
            self.preprocessed_screenshot_history.append(screenshot_np)
        return [screenshot_np, cursor_position_horizontal, cursor_position_vertical]


def get_cursor_pos(self):
    cursor_horizontal = read_memory(self.process_handle, self.cursor_horizontal_pointer, data_type=ctypes.c_uint8)
    cursor_vertical = read_memory(self.process_handle, self.cursor_vertical_pointer, data_type=ctypes.c_uint32)
    if self.cursor_logging:
        print(f'Cursor pos: ({cursor_horizontal}, {cursor_vertical})')
    return cursor_horizontal, cursor_vertical

def get_score(self):
    return read_memory(self.process_handle, self.score_pointer, data_type=ctypes.c_uint32)

def get_score_img():
    score_screenshot = ImageGrab.grab(bbox=(440, 50, 510, 160)) # capture the score number
    score_screenshot_np = np.array(score_screenshot)
    return score_screenshot_np

def do_action(action:int):
    # Simulate key press and release for the action
    if action == 0:
        press_and_release('w')  # Press and release 'w'
    elif action == 1:
        press_and_release('a')  # Press and release 'a'
    elif action == 2:
        press_and_release('s')  # Press and release 's'
    elif action == 3:
        press_and_release('d')  # Press and release 'd'
    elif action == 4:
        press_and_release('h')  # Press and release 'h'
    # elif action == 5:
    #     press_and_release('k')  # Press and release 'k'

def press_and_release(key=None):
    keyboard.press(key)
    time.sleep(.025)
    keyboard.release(key)

# Preprocess the frames (resize and convert to grayscale)
def preprocess_frame_color(frame, img_size_color):
    transform = transforms.Compose([
        transforms.ToPILImage(),
        # transforms.Grayscale(),  # Convert to grayscale
        transforms.Resize(img_size_color),  # Resize to 84x84
        transforms.ToTensor(),  # Convert to tensor, will have shape [1, 84, 84]
    ])
    return transform(frame).squeeze(0)  # Remove the single channel dimension, resulting in [84, 84]

# Preprocess the frames (resize and convert to grayscale)
def preprocess_frame_grey(frame):
    transform = transforms.Compose([
        transforms.ToPILImage(),
        transforms.Grayscale(),  # Convert to grayscale
        transforms.Resize((256, 256)),  # Resize to 84x84
        transforms.ToTensor(),  # Convert to tensor, will have shape [1, 84, 84]
    ])
    return transform(frame).squeeze(0)  # Remove the single channel dimension, resulting in [84, 84]

def get_pointer(self, base_address, base_offset, offsets: list[int]):
    # Calculate first dereferenced address
    first_pointer = base_address + base_offset
    dereferenced_address  = read_memory(self.process_handle, first_pointer, data_type=ctypes.c_uint64)
    dereferenced_address_hex = (hex(dereferenced_address))

    # Navigate through the other addresses to find the final pointer
    for offset in offsets:
        pointer = dereferenced_address + offset
        dereferenced_address  = read_memory(self.process_handle, pointer)
        dereferenced_address_hex = (hex(dereferenced_address))
    return pointer
    

### Define Replay-Buffer & Agent

In [133]:
import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np



class ReplayBuffer:
    def __init__(self, buffer_size=10000):
        self.memory = deque(maxlen=buffer_size)

    def add(self, experience):
        state, action, reward, next_state, done = experience

        # Ensure state and next_state are lists with image and cursor components
        state_image, state_cursor_h, state_cursor_v = state
        next_state_image, next_state_cursor_h, next_state_cursor_v = next_state

        # Convert image components to numpy arrays for consistency
        state_image = np.array(state_image)
        next_state_image = np.array(next_state_image)

        # Combine cursor components into arrays
        state_cursor = np.array([state_cursor_h, state_cursor_v])
        next_state_cursor = np.array([next_state_cursor_h, next_state_cursor_v])

        # Add the full experience to the memory
        self.memory.append(((state_image, state_cursor), action, reward, (next_state_image, next_state_cursor), done))

    def sample(self, batch_size):
        # Randomly sample experiences from the memory
        return random.sample(self.memory, batch_size)

    def __len__(self):
        # Return the current size of the memory
        return len(self.memory)



class DQNAgent2:
    def __init__(self, q_network: nn.Module, action_size: int):
        self.action_size = action_size
        self.memory = ReplayBuffer(buffer_size=10000)
        self.gamma = 0.9  # Discount factor: How many steps back into the past is valuable
        self.epsilon = 1.0  # Exploration rate
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.batch_size = 64
        self.update_frequency = 4

        # Create two networks: one for the current Q-function and one for the target Q-function
        self.q_network = q_network
        self.target_network = q_network.__class__(action_size)
        self.optimizer = optim.Adam(self.q_network.parameters(), lr=0.00001)
        # Define the loss function
        self.loss_fn = nn.MSELoss()
        # Copy weights from the current network to the target network
        self.target_network.load_state_dict(self.q_network.state_dict())

    def select_action(self, state, explore=True):
        # Unpack the state components
        image, cursor_h, cursor_v = state  # Assuming state is [image, cursor_H, cursor_V]
        
        # Convert image to a torch tensor and add a batch dimension
        image_tensor = torch.FloatTensor(image).unsqueeze(0)  # Shape: [1, channels, height, width]
        
        # Combine cursor positions into a tensor and add a batch dimension
        cursor_positions = torch.FloatTensor([[cursor_h, cursor_v]])  # Shape: [1, 2]
        
        if explore and np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        
        with torch.no_grad():  # Turn off gradients during evaluation
            q_values = self.q_network(image_tensor, cursor_positions)
        return torch.argmax(q_values).item()

    def random_action(self):
        return random.randrange(self.action_size)

    def replay(self):
        if len(self.memory) < self.batch_size:
            return

        # Sample a batch from the replay memory
        batch = self.memory.sample(self.batch_size)

        # Unzip the batch (states, actions, rewards, next_states, dones)
        states, actions, rewards, next_states, dones = zip(*batch)

        # Extract image data and cursor positions from the states and next states
        state_images = np.array([s[0] for s in states])  # Get images from current states
        state_cursors = np.array([s[1] for s in states])  # Get cursor positions from current states

        next_state_images = np.array([ns[0] for ns in next_states])  # Get images from next states
        next_state_cursors = np.array([ns[1] for ns in next_states])  # Get cursor positions from next states

        # Convert to torch tensors
        state_images = torch.tensor(state_images, dtype=torch.float32)
        state_cursors = torch.tensor(state_cursors, dtype=torch.float32)
        next_state_images = torch.tensor(next_state_images, dtype=torch.float32)
        next_state_cursors = torch.tensor(next_state_cursors, dtype=torch.float32)
        actions = torch.tensor(actions, dtype=torch.long)
        rewards = torch.tensor(rewards, dtype=torch.float32)
        dones = torch.tensor(dones, dtype=torch.float32)

        # Get current Q-values by passing both image and cursor data to the Q-network
        q_values = self.q_network(state_images, state_cursors)  # state_images: [batch_size, channels, height, width]
        # print(f'q_values: {q_values}')

        # Step 1: Unsqueeze the actions tensor to add a new dimension at position 1
        actions_expanded = actions.unsqueeze(1)  # Shape becomes [batch_size, 1]
        # print(f'actions_expanded.shape: {actions_expanded.shape}')

        # Step 2: Gather the Q-values for the selected actions using the expanded actions tensor
        selected_q_values = q_values.gather(1, actions_expanded)  # Shape: [batch_size, 1]

        # Step 3: Squeeze to remove the singleton dimension from the result
        q_values_for_actions = selected_q_values.squeeze(1)  # Shape: [batch_size]

        # Get target Q values by passing both image and cursor data of next states to the target network
        next_q_values = self.target_network(next_state_images, next_state_cursors).max(1)[0]

        # Compute the target Q values for the current batch
        target_q_values = rewards + (1 - dones) * self.gamma * next_q_values

        # Compute the loss between predicted Q values and target Q values
        loss = self.loss_fn(q_values_for_actions, target_q_values.detach())

        # Backpropagate and update the Q-network
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

    def update_target_network(self):
        self.target_network.load_state_dict(self.q_network.state_dict())


### CNN Model

In [134]:
import torch
import torch.nn as nn
import torch.nn.functional as F


class TestCNN(nn.Module):
    def __init__(self, num_actions=5):
        super(TestCNN, self).__init__()
        
        # Convolutional layers for processing the 32x32 RGB image
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=16, kernel_size=3, stride=1, padding=1)  # 32x32 -> 32x32
        
        # Fully connected layers for the flattened output from the convolutional layers
        self.fc_conv = nn.Linear(16 * 32 * 32, 128)  # Flatten to 1D, 128 output features
        
        # Dense layer for the cursor position input
        self.cursor_fc = nn.Linear(2, 32)  # Processing (x, y) coordinates
        
        # Combined layers (Updated input size to 32800)
        self.fc_combined1 = nn.Linear(16416, 64)  # Corrected input size
        self.fc_combined2 = nn.Linear(64, num_actions)  # Output layer for action probabilities

    def forward(self, image, cursor):
        """
        Args:
            image: Tensor of shape (batch_size, 3, 32, 32) representing the game board.
            cursor: Tensor of shape (batch_size, 2) representing the cursor position (x, y).
        
        Returns:
            policy: Tensor of shape (batch_size, num_actions) representing the action probabilities.
        """
        # Process the image through convolutional layers
        x = F.relu(self.conv1(image))   # Conv Layer 1
        x = x.view(x.size(0), -1)       # Flatten the output to 1D
        # Process the cursor position through a dense layer
        y = F.relu(self.cursor_fc(cursor))
        # Combine the features from the image and cursor
        combined = torch.cat((x, y), dim=1)
        # Fully connected layers for the combined features
        z = F.relu(self.fc_combined1(combined))
        policy = F.softmax(self.fc_combined2(z), dim=1)  # Action probabilities with softmax
        
        return policy



### Create everything 

In [121]:
env = PPLEnv(base_address=0x7FFD2EC20000, color_mode='color', screenshot_saving=True)
deep_q_learning_model = TestCNN(num_actions=env.action_space.n)
agent = DQNAgent2(deep_q_learning_model, action_size=env.action_space.n)

Environment succesfully initialized 
- cursor: (3, 0)
- score: 30


In [135]:
env = PPLEnv(base_address=0x7FFD2EC20000, color_mode='color')

Environment succesfully initialized 
- cursor: (4, 10)
- score: 0


### Training Loop

In [136]:
from modules.helper import alt_tab

# Training parameters
num_episodes = 1            # Total number of episodes to train
max_steps_per_episode = 500    # Maximum steps per episode
update_target_frequency = 10   # Update target network every 10 episodes
epsilon_decay_factor = 0.995   # Decay rate for epsilon
agent.q_network.train()
agent.target_network.train()

alt_tab()
for episode in range(num_episodes):
    state = env.reset()  # Reset the environment at the start of each episode
    state_image, cursor_h, cursor_v = state

    for step in range(max_steps_per_episode):

        # Select an action (explore or exploit)
        action = agent.select_action((state_image, cursor_h, cursor_v), explore=True)

        # Take the action in the environment
        next_state, reward, done = env.step(action)
        next_image, next_cursor_h, next_cursor_v = next_state
        print(reward)

        # Store experience in the replay buffer
        agent.memory.add(((state_image, cursor_h, cursor_v), action, reward, 
                          (next_image, next_cursor_h, next_cursor_v), done))

        # Update the current state
        state_image, cursor_h, cursor_v = next_image, next_cursor_h, next_cursor_v

        # Perform replay and train the Q-network
        agent.replay()

        # If the episode ends, break the loop
        if done:
            break

    # Decay epsilon
    agent.epsilon = max(agent.epsilon_min, agent.epsilon * epsilon_decay_factor)

    # Update the target network periodically
    if episode % update_target_frequency == 0:
        agent.update_target_network()

    print(f"Episode {episode}/{num_episodes}, Reward: {env.episode_score}, Epsilon: {agent.epsilon:.4f}")

# Save the trained model
alt_tab()
torch.save(agent.q_network.state_dict(), "trained_model_newest.pth")
print("Training complete and model saved!")


-1
-2
-3
-4
-5
-6
-7
-8
-9
-10
-11
-12
-13
-14
-15
-16
-17
-18
-19
-20
-21
-22
-23
-24
-25
-26
-27
-28
-29
-30
-31
-32
-33
-34
-35
-36
-37
-38
-39
-40
-41
-42
-43
-44
-45
-46
-47
-48
-49
-50
-51
-52
-53
-54
-55
-56
-57
-58
-59
-60
-61
-62
-63
-64
-65
-66
-67
-68
-69
-70
-71
-72
-73
-74
-75
-76
-77
-78
-79
-80
-81
-82
-83
-84
-85
-86
-87
-88
-89
-90
-91
-92
-93
-94
-95
-96
-97
-98
-99
-100
-101
-102
-103
-104
-105
-106
-107
-108
-109
-110
-111
-112
-113
-114
-115
-116
-117
-118
-119
-120
-121
-122
-123
-124
-125
-126
-127
-128
-129
-130
-131
-250
Episode 0/1, Reward: 0, Epsilon: 0.4598
Training complete and model saved!


### Test Loop

In [None]:
from modules.helper import alt_tab

# Training parameters
num_episodes = 1            # Total number of episodes to train
max_steps_per_episode = 500    # Maximum steps per episode
update_target_frequency = 10   # Update target network every 10 episodes
epsilon_decay_factor = 0.995   # Decay rate for epsilon
agent.q_network.eval()
agent.target_network.eval()

alt_tab()
for episode in range(num_episodes):
    state = env.reset()  # Reset the environment at the start of each episode
    state_image, cursor_h, cursor_v = state

    for step in range(max_steps_per_episode):

        # Select an action (explore or exploit)
        action = agent.select_action((state_image, cursor_h, cursor_v), explore=False)

        # Take the action in the environment
        next_state, reward, done = env.step(action)
        next_image, next_cursor_h, next_cursor_v = next_state

        # Update the current state
        state_image, cursor_h, cursor_v = next_image, next_cursor_h, next_cursor_v

        # If the episode ends, break the loop
        if done:
            break

    # Update the target network periodically
    if episode % update_target_frequency == 0:
        agent.update_target_network()

    print(f"Episode {episode}/{num_episodes}, Reward: {env.episode_score}, Epsilon: {agent.epsilon:.4f}")

# Save the trained model
alt_tab()
print("Testing completed!")



Episode 0/1, Reward: 0, Epsilon: 0.5967
Testing completed!
