In [2]:
import cv2
import time
import math
import asyncio
import pyautogui
import numpy as np

from mss import mss
from enum import Enum

In [3]:
%run ../src/astar.ipynb
%run ../src/pathfinding.ipynb

# Custom Classes

In [4]:
class Direction(Enum):
    TOP = 1
    LEFT = 2
    BOTTOM = 3
    RIGHT = 4

#  Helper functions

In [5]:
def countdown():
    time.sleep(1)
    print('3')
    time.sleep(1)
    print('2')
    time.sleep(1)
    print('1')

In [6]:
def screenshot():
    field_width = 688
    field_height = 608
    field_left = 607
    field_top = 302
    
    with mss() as sct:        
        monitor_number = 1
        mon = sct.monitors[monitor_number]
        monitor = {'mon': monitor_number, 'width': field_width, 'height': field_height, 'left': mon["left"] + field_left, 'top': mon["top"] + field_top}

        sct_img = sct.grab(monitor)
        img = np.array(sct_img.pixels).astype(np.uint8)[::,::,::-1]
        
        return img

In [7]:
# detect if the game is over yet
def game_over(img):
    (left, top, right, bottom) = (453, 436, 533, 515)
    img_crp = img[top:bottom, left:right]
    
    img_score_path = '../img_raw/gameover.png'
    img_score_full = cv2.imread(img_score_path)
    img_score = img_score_full[top:bottom, left:right]
    
    hist_crp = calc_hist(img_crp)
    hist_score = calc_hist(img_score)
    
    sum_diff = 0
    for chan in range(3):
        diff = cv2.compareHist(hist_crp[chan], hist_score[chan], cv2.HISTCMP_CORREL)
        sum_diff += diff
    
    avg_diff = sum_diff/3
    if(avg_diff > 0.7):
        return True
    
    return False

In [8]:
def calc_direction(current_pos, old_pos):
#     start_time = time.perf_counter()
    
    current_pos_no = 10 * current_pos.x + current_pos.y
    old_pos_no = 10 * old_pos.x + old_pos.y
    pos_diff = current_pos_no - old_pos_no
    
    ret_direction = None
    if(pos_diff == -1):
        ret_direction = Direction.TOP
    elif(pos_diff == -10):
        ret_direction = Direction.LEFT
    elif(pos_diff == 1):
        ret_direction = Direction.BOTTOM
    elif(pos_diff == 10):
        ret_direction = Direction.RIGHT
    
#     duration = time.perf_counter() - start_time
    
    return ret_direction

In [9]:
def press_keyboard_button(direction):
    if(direction == Direction.TOP):
        # press up arrow
        pyautogui.press('up')
        print('up')
    elif(direction == Direction.LEFT):
        # press left arrow
        pyautogui.press('left')
        print('left')
    elif(direction == Direction.BOTTOM):
        # press down arrow
        pyautogui.press('down')
        print('down')
    elif(direction == Direction.RIGHT):
        # press right arrow
        pyautogui.press('right')
        print('right')

In [10]:
def out_of_bounds(pos):
    if(pos.x < 0 or pos.x >= ncols or pos.y < 0 or pos.y >= nrows):
        return True
    return False

In [11]:
def apple_overlap_augmented_body(cells, head_pos, direction, n_augment):
    if(direction == Direction.TOP):
        for i in range(1, n_augment+1):
            tmp_pos = Position(head_pos.y - i, head_pos.x)
            if(not out_of_bounds(tmp_pos) and cells[head_pos.y - i, head_pos.x] == Label.APPL.value):
                return True
        
    elif(direction == Direction.BOTTOM):
        for i in range(1, n_augment+1):
            tmp_pos = Position(head_pos.y + i, head_pos.x)
            if(not out_of_bounds(tmp_pos) and cells[head_pos.y + i, head_pos.x] == Label.APPL.value):
                return True
            
    elif(direction == Direction.LEFT):
        for i in range(1, n_augment+1):
            tmp_pos = Position(head_pos.y, head_pos.x - i)
            if(not out_of_bounds(tmp_pos) and cells[head_pos.y, head_pos.x - i] == Label.APPL.value):
                return True
                
    elif(direction == Direction.RIGHT):
        for i in range(1, n_augment+1):
            tmp_pos = Position(head_pos.y, head_pos.x + i)
            if(not out_of_bounds(tmp_pos) and cells[head_pos.y, head_pos.x + i] == Label.APPL.value):
                return True
            
    return False

In [12]:
def augment_snake_body(cells, head_pos, direction, n_augment):
    if(direction == Direction.TOP):
        for i in range(n_augment):
            cells[head_pos.y - i, head_pos.x] = Label.BODY.value
        new_head_pos = Position(head_pos.y - n_augment, head_pos.x)
        
    elif(direction == Direction.BOTTOM):
        for i in range(n_augment):
            cells[head_pos.y + i, head_pos.x] = Label.BODY.value
        new_head_pos = Position(head_pos.y + n_augment, head_pos.x)
            
    elif(direction == Direction.LEFT):
        for i in range(n_augment):
            cells[head_pos.y, head_pos.x - i] = Label.BODY.value
        new_head_pos = Position(head_pos.y, head_pos.x - n_augment)
            
    elif(direction == Direction.RIGHT):
        for i in range(n_augment):
            cells[head_pos.y, head_pos.x + i] = Label.BODY.value
        new_head_pos = Position(head_pos.y, head_pos.x + n_augment)
        
    cells[new_head_pos.y, new_head_pos.x] = Label.HEAD.value
    print(cells)
    print("++++++++++++++++++")
            
    return cells, new_head_pos

# Snake pathing & controlling functions

In [13]:
async def augment_and_pathfinding(cells, head_pos, apple_pos, cached_direction, first_move, start_time, offset_time, snake_speed, snake_turn_speed):    
    if(first_move):
        first_move = False
    else:
        if(apple_overlap_augmented_body(cells, head_pos, cached_direction, 3)):
            print('apple overlaps augmented body')
            return cached_direction, apple_pos, True, first_move
        
        exec_time = time.perf_counter() - start_time + offset_time
        n_augment = exec_time / snake_speed
        print(n_augment, exec_time)
        n_augment = round(n_augment)
        print(n_augment)

        cells, head_pos = augment_snake_body(cells, head_pos, cached_direction, n_augment)

    can_pass, new_direction, path_completed = await pathfinding(cells, head_pos, apple_pos, snake_speed, snake_turn_speed)
    cached_apple_pos = apple_pos

    if(new_direction is not None):
        cached_direction = new_direction
        
    return cached_direction, cached_apple_pos, path_completed, first_move

In [14]:
# find the optimal path and control the snake to move along the path
async def pathfinding(cells, head_pos, apple_pos, snake_speed, snake_turn_speed):    
    if(head_pos is None):
        print('Null head_pos')
        return None, None, False
    
    cm_pos = center_of_mass(cells, nrows, ncols)

    new_cells, path = find_optimal_path(cells, head_pos, apple_pos, [cm_pos], [1])
    if(new_cells is None):
        print('Null new_cells')
        return None, None, False
    
    print(new_cells)        
    print("------------------------")
    
    task = asyncio.create_task(control_snake(new_cells, path, head_pos, apple_pos, snake_speed, snake_turn_speed))
    await task
    
    return task.result()[0], task.result()[1], task.done()

In [15]:
async def control_snake(cells, path, head_pos, apple_pos, snake_speed, snake_turn_speed):    
#     current_direction = calc_direction(path[1], path[0])
    old_pos = path[0]
    last_fn_exec_time = 0.045
    
    for i, pos in enumerate(path[1:]):
        direction = calc_direction(pos, old_pos)    # snake's turn direction
        next_direction = calc_direction(path[i+1], pos)    # snake's next turn direction
        old_pos = pos
        
        if(not direction is None):
            press_keyboard_button(direction)
            current_direction = direction
            
            if(i==len(path)-2):
                await asyncio.sleep(snake_speed - last_fn_exec_time)
            elif(current_direction == next_direction):
                await asyncio.sleep(snake_speed)
            else:
                await asyncio.sleep(snake_turn_speed)
    
    near_wall, direction_wall = turn_if_near_wall(current_direction, apple_pos, snake_turn_speed)
    can_pass, direction_steps = check_forward_turns(cells, current_direction, apple_pos, snake_speed, snake_turn_speed)
        
    new_direction = current_direction
    has_turned = False
    
    if(near_wall and can_pass):
        if(direction_wall != direction_steps[0] and 
            not check_has_obstacle(cells, apple_pos, direction_wall, 3)):
            press_keyboard_button(direction_wall)
            await asyncio.sleep(snake_turn_speed)
            new_direction = direction_wall
            has_turned = True
        
    if(not has_turned):
        if(len(direction_steps) > 0 and direction_steps != [current_direction, current_direction, current_direction]):
            cached_direction = direction_steps[0]
            new_direction = direction_steps[2]
            for tmp_direction in direction_steps:
                press_keyboard_button(tmp_direction)
                if(tmp_direction == cached_direction):
                    await asyncio.sleep(snake_speed)
                else:
                    await asyncio.sleep(snake_turn_speed)
    
    return can_pass, new_direction

In [None]:
def augment_and_pathfinding2(cells, head_pos, apple_pos, cached_direction, first_move, start_time, offset_time, snake_speed, snake_turn_speed):    
    if(first_move):
        first_move = False
    else:
        if(apple_overlap_augmented_body(cells, head_pos, cached_direction, 3)):
            print('apple overlaps augmented body')
            return cached_direction, apple_pos, True, first_move
        
        exec_time = time.perf_counter() - start_time + offset_time
        n_augment = exec_time / snake_speed
        print(n_augment, exec_time)
        n_augment = round(n_augment)
        print(n_augment)

        cells, head_pos = augment_snake_body(cells, head_pos, cached_direction, n_augment)

    can_pass, new_direction, path_completed = pathfinding2(cells, head_pos, apple_pos, snake_speed, snake_turn_speed)
    cached_apple_pos = apple_pos

    if(new_direction is not None):
        cached_direction = new_direction
        
    return cached_direction, cached_apple_pos, path_completed, first_move

In [16]:
# find the optimal path and control the snake to move along the path
def pathfinding2(cells, head_pos, apple_pos, snake_speed, snake_turn_speed):    
    if(head_pos is None):
        print('Null head_pos')
        return None, None, False
    
    cm_pos = center_of_mass(cells, nrows, ncols)

    new_cells, path = find_optimal_path(cells, head_pos, apple_pos, [cm_pos], [1])
    if(new_cells is None):
        print('Null new_cells')
        return None, None, False
    
    print(new_cells)        
    print("------------------------")
    
    re1, re2 = control_snake2(new_cells, path, head_pos, apple_pos, snake_speed, snake_turn_speed)
    
    return re1, re2, True

In [17]:
def control_snake2(cells, path, head_pos, apple_pos, snake_speed, snake_turn_speed):    
#     current_direction = calc_direction(path[1], path[0])
    old_pos = path[0]
    last_fn_exec_time = 0.045
    
    for i, pos in enumerate(path[1:]):
        direction = calc_direction(pos, old_pos)    # snake's turn direction
        next_direction = calc_direction(path[i+1], pos)    # snake's next turn direction
        old_pos = pos
        
        if(not direction is None):
            press_keyboard_button(direction)
            current_direction = direction
            
            if(i==len(path)-2):
                time.sleep(snake_speed - last_fn_exec_time)
            elif(current_direction == next_direction):
                time.sleep(snake_speed)
            else:
                time.sleep(snake_turn_speed)
    
    near_wall, direction_wall = turn_if_near_wall(current_direction, apple_pos, snake_turn_speed)
    can_pass, direction_steps = check_forward_turns(cells, current_direction, apple_pos, snake_speed, snake_turn_speed)
        
    new_direction = current_direction
    has_turned = False
    
    if(near_wall and can_pass):
        if(direction_wall != direction_steps[0] and 
            not check_has_obstacle(cells, apple_pos, direction_wall, 3)):
            press_keyboard_button(direction_wall)
            time.sleep(snake_turn_speed)
            new_direction = direction_wall
            has_turned = True
        
    if(not has_turned):
        if(len(direction_steps) > 0 and direction_steps != [current_direction, current_direction, current_direction]):
            cached_direction = direction_steps[0]
            new_direction = direction_steps[2]
            for tmp_direction in direction_steps:
                press_keyboard_button(tmp_direction)
                if(tmp_direction == cached_direction):
                    time.sleep(snake_speed)
                else:
                    time.sleep(snake_turn_speed)
    
    return can_pass, new_direction

In [18]:
def turn_if_near_wall(direction, head_pos, snake_turn_speed):
    left_wall_x = 0
    right_wall_x = 16
    top_wall_y = 0
    bottom_wall_y = 14
    
    dx1 = head_pos.x - left_wall_x
    dx2 = right_wall_x - head_pos.x
    dy1 = head_pos.y - top_wall_y
    dy2 = bottom_wall_y - head_pos.y
    
    delta_dx = dx1 - dx2
    delta_dy = dy1 - dy2
    
    k = 5    # space from head to wall threshold
    
    ret_direction = None
    if((direction == Direction.LEFT and dx1 <= k) or
       (direction == Direction.RIGHT and dx2 <= k)):
        if(delta_dy > 0):          # dy1 > dy2
            ret_direction = Direction.TOP
        else:
            ret_direction = Direction.BOTTOM
        
    elif((direction == Direction.TOP and dy1 <= k) or
       (direction == Direction.BOTTOM and dy2 <= k)):
        if(delta_dx > 0):          # dx1 > dx2
            ret_direction = Direction.LEFT
        else:
            ret_direction = Direction.RIGHT
            
    if(ret_direction is None):
        return False, direction
    else:
#         press_keyboard_button(ret_direction)
#         await asyncio.sleep(snake_turn_speed)
        return True, ret_direction

In [19]:
def check_forward_turns(cells, direction, head_pos, snake_speed, snake_turn_speed):
    possible_offsets = [
        [0,0,0],
        [0,0,1],
        [0,0,3],
        [0,1,1],
        [0,1,2],
        [0,3,2],
        [0,3,3],
        [1,1,1],
        [1,1,2],
        [1,2,2],
        [3,2,2],
        [3,3,2],
        [3,3,3]
    ]
    
    direction_steps_list = get_direction_steps_list(cells, head_pos, direction, possible_offsets)
    tmp_head_pos = head_pos
    tmp_direction = direction
    
    if(len(direction_steps_list) > 0):
        for direction_steps, final_pos in direction_steps_list:
            if(not check_has_obstacle(cells, final_pos, direction_steps[-1], 3)):
                return True, direction_steps

    return False, []

In [20]:
def get_direction_steps_list(cells, head_pos, direction, possible_offsets):
    direction_steps_list = []
    
    for offsets in possible_offsets:
        tmp_pos = head_pos
        directions = []
        out_bounds = False
        has_obstacle = False
        for direction_offset in offsets:
            new_direction_value = (direction.value + direction_offset) % len(Direction)
            if(new_direction_value == 0):
                new_direction_value = 4
            new_direction = Direction(new_direction_value)
            
            if(new_direction == Direction.TOP):
                tmp_pos = Position(tmp_pos.y - 1, tmp_pos.x)
            elif(new_direction == Direction.LEFT):
                tmp_pos = Position(tmp_pos.y, tmp_pos.x - 1)
            elif(new_direction == Direction.BOTTOM):
                tmp_pos = Position(tmp_pos.y + 1, tmp_pos.x)
            elif(new_direction == Direction.RIGHT):
                tmp_pos = Position(tmp_pos.y, tmp_pos.x + 1)
            
            out_bounds = out_of_bounds(tmp_pos)
            
            if(out_bounds):
                break
            else:
                has_obstacle = cells[tmp_pos.y, tmp_pos.x] != 0 and cells[tmp_pos.y, tmp_pos.x] != 1
                if(has_obstacle):
                    break
            
            directions.append(new_direction)
        if(not out_bounds and not has_obstacle):
            direction_steps_list.append((directions, tmp_pos))
    
    return direction_steps_list     

In [21]:
def check_has_obstacle(cells, head_pos, direction, n_cells):
    has_obstacle = False
    
    if(direction == Direction.TOP):
        for i in range(1, n_cells+1):
            tmp_pos = Position(head_pos.y - i, head_pos.x)
            if(out_of_bounds(tmp_pos)):
                has_obstacle = True
                break
            elif(cells[tmp_pos.y, tmp_pos.x] != 0 and cells[tmp_pos.y, tmp_pos.x] != 1):
                has_obstacle = True
                break
    elif(direction == Direction.LEFT):
        for i in range(1, n_cells+1):
            tmp_pos = Position(head_pos.y, head_pos.x - i)
            if(out_of_bounds(tmp_pos)):
                has_obstacle = True
                break
            elif(cells[tmp_pos.y, tmp_pos.x] != 0 and cells[tmp_pos.y, tmp_pos.x] != 1):
                has_obstacle = True
                break
    elif(direction == Direction.BOTTOM):
        for i in range(1, n_cells+1):
            tmp_pos = Position(head_pos.y + i, head_pos.x)
            if(out_of_bounds(tmp_pos)):
                has_obstacle = True
                break
            elif(cells[tmp_pos.y, tmp_pos.x] != 0 and cells[tmp_pos.y, tmp_pos.x] != 1):
                has_obstacle = True
                break
    elif(direction == Direction.RIGHT):
        for i in range(1, n_cells+1):
            tmp_pos = Position(head_pos.y, head_pos.x + i)
            if(out_of_bounds(tmp_pos)):
                has_obstacle = True
                break
            elif(cells[tmp_pos.y, tmp_pos.x] != 0 and cells[tmp_pos.y, tmp_pos.x] != 1):
                has_obstacle = True
                break
            
    return has_obstacle

#  Main function

In [22]:
async def main():    
    pyautogui.PAUSE = 0.0
    pyautogui.FAILSAFE = False
    threshold = 0
    snake_speed = 0.1713
    snake_turn_speed = 0.171
    offset_time = 0
    
    first_move = True
    path_completed = False
    
    cached_apple_pos = None
    cached_direction = Direction.RIGHT
    
    countdown()
    
    while(True):
        start_time = time.perf_counter()
        img = screenshot()
        if(game_over(img)):
            print('game over')
            break
            
        cells, head_pos, apple_pos = get_cells(img, Label, threshold)
        
        print(cells)
        print("=============================")        
            
        # Execute if the snake has not eaten the apple yet but finished moving along the path
        if(apple_pos is not None and path_completed and cached_apple_pos == apple_pos):
            cached_direction, cached_apple_pos, path_completed, first_move = await augment_and_pathfinding(
                cells, head_pos, apple_pos, cached_direction, first_move, 
                start_time, offset_time, snake_speed, snake_turn_speed)
                
        # Execute if the apple position has changed
        elif(apple_pos is not None and (cached_apple_pos is None or cached_apple_pos != apple_pos)):
            cached_direction, cached_apple_pos, path_completed, first_move = await augment_and_pathfinding(
                cells, head_pos, apple_pos, cached_direction, first_move, 
                start_time, offset_time, snake_speed, snake_turn_speed)

In [24]:
def main2():    
    pyautogui.PAUSE = 0.0
    pyautogui.FAILSAFE = True
    threshold = 0
    snake_speed = 0.1713
    snake_turn_speed = 0.171
    offset_time = 0
    
    first_move = True
    path_completed = False
    
    cached_apple_pos = None
    cached_direction = Direction.RIGHT
    
    countdown()
    
    while(True):
        start_time = time.perf_counter()
        img = screenshot()
        if(game_over(img)):
            print('game over')
            break
            
        cells, head_pos, apple_pos = get_cells(img, Label, threshold)
        
        print(cells)
        print("=============================")        
            
        # Execute if the snake has not eaten the apple yet but finished moving along the path
        if(apple_pos is not None and path_completed and cached_apple_pos == apple_pos):
            cached_direction, cached_apple_pos, path_completed, first_move = augment_and_pathfinding2(
                cells, head_pos, apple_pos, cached_direction, first_move, 
                start_time, offset_time, snake_speed, snake_turn_speed)
                
        # Execute if the apple position has changed
        elif(apple_pos is not None and (cached_apple_pos is None or cached_apple_pos != apple_pos)):
            cached_direction, cached_apple_pos, path_completed, first_move = augment_and_pathfinding2(
                cells, head_pos, apple_pos, cached_direction, first_move, 
                start_time, offset_time, snake_speed, snake_turn_speed)