In [49]:
import sys
sys.path.append('../common')
import utils
import importlib
importlib.reload(utils)
from utils import IOHandler as IO

import helper
importlib.reload(helper)

import time
import sys
import json

import numpy as np
import pynput.keyboard as kb

from enum import Enum
from PIL import Image

io = utils.IOHandler(offset=(1081, 675), game_dims=(1200,900), verbose=True)

In [50]:
verbose = True
break_program = False

MAX_DIGITS = 6
MAX_CHANCES = 15

# TODO: use load_images_from_directory instead
level_masks = []
for i in range(31):
    img = Image.open('levels/mask_' + str(i+1) + ".png")
    arr = np.array(img).swapaxes(0, 1).astype(np.int32)[:,:,:3]
    level_masks.append(arr)

# blank (10), none (11)
digit_masks = []
for i in range(12):
    img = Image.open('digits/mask_' + str(i) + ".png")
    arr = np.array(img).swapaxes(0, 1).astype(np.int32)[:,:,:3]
    digit_masks.append(arr)

# green (0), red (1), blank (2)
ball_masks = []
for i in range(3):
    img = Image.open('balls/mask_' + str(i) + ".png")
    arr = np.array(img).swapaxes(0, 1).astype(np.int32)[:,:,:3]
    ball_masks.append(arr)

time_masks = []
for i in range(10):
    img = Image.open('time/mask_' + str(i) + "_o.png")
    arr = np.array(img).swapaxes(0, 1).astype(np.int32)[:,:,:3]
    time_masks.append(arr)

time_blank_mask = np.array(Image.open('time/mask_blank.png')).swapaxes(0, 1).astype(np.int32)[:,:,:3]

# Load the strategy files
FULL_STRAT = {}
for num_digits in [3,4,5,6]:
  for has_repeats in [False, True]:
    substrat = json.load(open(f"strategies/output-{num_digits}-{has_repeats}-100-entropy.json"))
    if num_digits not in FULL_STRAT:
      FULL_STRAT[num_digits] = {}
    FULL_STRAT[num_digits][has_repeats] = substrat[str(num_digits)][str(has_repeats)]

In [51]:
def debug(*args, **kwargs):
    if verbose:
        print(*args, **kwargs)

class Screen(Enum):
    HOME = 'home'
    GAME_OVER = 'game_over'
    END_GAME = 'end_game'
    NEW_LEVEL = 'new_level'
    GREAT_JOB = 'great_job'
    ARCADE = 'arcade'
    MIDGAME = 'midgame'
    EASY_MODE = 'easy_mode'
    GREAT_PLAY = "great_play"

SCREEN_TYPES = {
    Screen.HOME: [1066, 684, [229, 99, 42]], # orange gumball
    Screen.GAME_OVER: [680, 380, [246, 196, 195]], # piggy bank
    Screen.END_GAME: [420, 860, [247, 207, 78]], # "end game" button, either ran out of guesses or time
    Screen.NEW_LEVEL: [850, 460, [245, 199, 69]], # "Start!", "Level: X", or congrats pop-up
    Screen.GREAT_JOB: [230, 664, [3, 14, 147]], # "great job!" pop-up
    Screen.ARCADE: [170, 852, [145, 61, 148]], # magenta arcade cabinet
    Screen.MIDGAME: [1095, 60, [92, 29, 196]], # purple music note
    Screen.EASY_MODE: [485, 44, [208, 126, 48]], # orange outline in level area
    Screen.GREAT_PLAY: [880, 600, [122, 26, 19]], # red gift box
}

io.set_screen_types(SCREEN_TYPES)

LEVEL_DIGITS = {
    'normal': [3, 3, 3, 4, 4, 4, 5, 5, 5, 6], 
    'easy': [3, 3, 3, 3, 4]
}
LEVEL_CHANCES = {
    'normal': [12,11,10]*3 + [11] + [11,10,9]*3 + [9] + [9,8,7]*3 + [8],
    'easy': [15]*5,
}
LEVEL_TIME = [120,140,160,190,210,230,260,280,300,330,90,110,130,160,180,200,230,250,270,300,60,80,100,130,150,170,200,220,240,270]

def get_level_digits(level, difficulty):
    return LEVEL_DIGITS[difficulty][(level-1) % 10]

def get_level_chances(level, difficulty):
    return LEVEL_CHANCES[difficulty][level-1]

def get_level_has_repeats(level):
    return level > 10

def get_level_time(level):
    return LEVEL_TIME[level-1]

def parse_level():
    if io.is_screen_type(Screen.EASY_MODE):
        level_img = io.capture_portion(615, 23, 45, 45)
    else:
        level_img = io.capture_portion(454, 23, 45, 45)
    curr_level = str(IO.get_best_mask(level_img, level_masks[:-1])[0] + 1)
    return curr_level

def parse_time():
    if io.is_screen_type(Screen.EASY_MODE):
        return -1
    time_img = io.capture_portion(751, 30, 68, 30)
    digit_minutes = IO.get_best_mask(time_img[:20, :30], time_masks)[0]
    digit_tens = IO.get_best_mask(time_img[28:28+20, :30], time_masks)[0]
    digit_ones = IO.get_best_mask(time_img[48:48+20, :30], time_masks)[0]
    return digit_minutes*60 + digit_tens*10 + digit_ones

def parse_digit(i, j):
    x0, y0 = (687,164)
    sx, sy = (32, 44)
    w, h = (30, 30)

    x = x0 + sx * j
    y = y0 + sy * i

    digit_img = io.capture_portion(x, y, w, h)

    idx = IO.get_best_mask(digit_img, digit_masks)[0]
    if idx == 10:
        digit = '-'
    elif idx == 11:
        digit = ''
    else:
        digit = str(idx)

    return digit

def parse_attempt(i):
    attempt = ''
    for j in range(MAX_DIGITS):
        attempt += parse_digit(i, j)
    return attempt

def parse_attempts():
    attempts = []
    for i in range (MAX_CHANCES):
        attempts.append(parse_attempt(i))
    return attempts

def capture_response_img(i):
    x0, y0 = (918, 165)
    w, h = (184, 13)
    sy = 44
    x = x0
    y = y0 + i*sy
    response_img = io.capture_portion(x, y, w, h)
    return response_img

def parse_response(i):
    w, h = (22, 13)
    sx = 30
    response_img = capture_response_img(i)
    response = [0, 0]
    for j in range (MAX_DIGITS):
        x = sx * j
        ball_img = response_img[x:x+w,:h, :]
        color = IO.get_best_mask(ball_img, ball_masks)[0]
        if color == 0:
            response[0] += 1
        elif color == 1:
            response[1] += 1
    return response

def parse_responses():
    return [parse_response(i) for i in range(MAX_CHANCES)]

# TODO: move break/kill program to utils and make names/keys consistent in bots
def on_press(key):
    global break_program, listener
    if key == kb.Key.esc:
        print("Escape pressed to quit")
        listener.stop()
        break_program = True
    elif key == kb.Key.tab:
        print("Tab pressed to take screenshot")
        io.save_game_capture()

# TODO: move to utils?
def get_into_game(difficulty='normal'):
    global break_program
    while not break_program:
        screen_types = io.calc_screen_types()
        print(screen_types)
        
        if Screen.ARCADE in screen_types:
            io.click_mouse(970, 760, delays=[0.03, 0.03, 0]) # "PLAY GAME" button
        elif Screen.GREAT_PLAY in screen_types:
            io.click_mouse(590, 1000, delays=[0.03, 0.03, 0]) # "Collect" button
        elif Screen.GAME_OVER in screen_types:
            io.click_mouse(220, 703, delays=[0.03, 0.03, 0]) # Replay button
        elif Screen.HOME in screen_types:
            debug(f"Start game - difficulty {difficulty}")
            if difficulty == 'easy':
                io.click_mouse(450, 745, delays=[0.03, 0.03, 0]) # easy button
            else:
                io.click_mouse(690, 745, delays=[0.03, 0.03, 0]) # normal button
            io.click_mouse(185, 845, delays=[0.03, 0.03, 0]) # 1 player game button
            time.sleep(2)
            continue
        elif Screen.END_GAME in screen_types:
            io.click_mouse(320, 860, delays=[0.03, 0.03, 0]) # End Game button
        elif Screen.NEW_LEVEL in screen_types:
            pass
        elif Screen.GREAT_JOB in screen_types:
            time.sleep(5)
            continue
        elif Screen.MIDGAME in screen_types:
            return

        time.sleep(0.5)

def get_initial_level_info():
    level = int(parse_level())
    is_easy_mode = io.is_screen_type(Screen.EASY_MODE)
    difficulty = 'easy' if is_easy_mode else 'normal'
    num_digits = get_level_digits(level, difficulty)
    chances = get_level_chances(level, difficulty)
    has_repeats = get_level_has_repeats(level)
    
    return level, is_easy_mode, difficulty, num_digits, chances, has_repeats

# TODO: Experiment with how long to spend searching
def calc_max_search_time():
    time_remaining = parse_time()
    if time_remaining > 0:
        max_search_time = time_remaining / 2 - 3
    else: # easy mode
        max_search_time = 60
    return max_search_time

# TODO: determine whether to search possible or unused
# TODO: stuff gets very messed up if I used unused. The expected info goes to zero...
def determine_search_set(possible, unused, num_digits):
    # return unused
    return possible
    if num_digits < 5:                        
        # unused = sorted(unused, key=lambda x: x in possible, reverse=True) # NOTE EXPENSIVE
        search_set = unused
    else:
        search_set = possible
    return search_set

def submit_suggestion(suggestion, attempt_num, num_digits):
    # Type in suggestion
    MAX_DELETE_TIME = 3
    MAX_TYPE_TIME = 3
    MAX_TOTAL_TIME = 8
    attempt = parse_attempt(attempt_num)
    main_start = time.time()
    while not break_program and (len(attempt) != num_digits or '-' in attempt) and not io.is_screen_type(Screen.GREAT_JOB):   
        if time.time() - main_start > MAX_TOTAL_TIME:
            debug(f'Failed to submit suggestion. Spent >{MAX_DELETE_TIME} seconds trying in total. Assume game over.')
            return None
        time.sleep(0.1)
        correct_digits = 0
        for i, c in enumerate(attempt):
            if attempt[i] == suggestion[i]:
                correct_digits += 1
            else:
                break
        for i in range(len(attempt)-1, correct_digits-1, -1):
            start = time.time()
            while not break_program and attempt[i] != '-':
                if time.time() - start > MAX_DELETE_TIME:
                    debug(f'Failed to submit suggestion. Spent >{MAX_DELETE_TIME} seconds trying to backspace digits. Assume game over.')
                    return None
                time.sleep(0.1)
                debug("press backspace")
                io.click_key(kb.Key.backspace)
                attempt = parse_attempt(attempt_num)
        for i in range(correct_digits, len(attempt)):
            while not break_program and attempt[i] != suggestion[i]:  
                if time.time() - start > MAX_TYPE_TIME:
                    debug(f'Failed to submit suggestion. Spent >{MAX_TYPE_TIME} seconds trying to type digits. Assume game over.')
                    return None
                time.sleep(0.1)
                print(attempt, suggestion)
                debug("press {}".format(suggestion[i]))
                io.click_key(suggestion[i])
                attempt = parse_attempt(attempt_num)
                if len(attempt) <= i or len(suggestion) <= i:
                    debug('Trying to check an index for attempt/suggestion that is out of bounds (tsk tsk). Assume game over.')
                    return None
    if break_program:
        return None
    return attempt

def wait_and_parse_response(num_digits, attempt_num):
    debug("response:\t", end='')
    MAX_RESPONSE_TIME = 5
    start = time.time()
    # Keep hammering '0' until the it pops up, this means the response has fully rolled in
    while not break_program and parse_attempt(attempt_num+1) == '-'*num_digits:
        if time.time() - start > MAX_RESPONSE_TIME:
            debug(f"WARNING: Waited too long (>{MAX_RESPONSE_TIME} seconds) for a response. Assume game over.")
            return None
        time.sleep(0.1)
        io.click_key("0")
    response = parse_response(attempt_num)
    debug(response)
    return response

def test_dedupe_codes_to_patterns():
    assert helper.dedupe_codes_to_patterns(["708"], ['117', '210', '345', '510', '558', '654']) == ['117', '210', '345', '558']
    assert helper.dedupe_codes_to_patterns(["012"], ['117', '210', '345', '510', '558', '654']) == ['117', '210', '345', '510', '558']
    assert helper.dedupe_codes_to_patterns(["708", "059"], ['117', '210', '345', '510', '558', '654']) == ['117', '210', '345', '510', '558', '654']
    assert helper.dedupe_codes_to_patterns(["001234"], ["05678", "08765"]) == ['05678']
    assert helper.dedupe_codes_to_patterns(["53052"], ["97237"]) == ['97237']

# test_dedupe_codes_to_patterns()

def get_best_next_guess_from_full_strat(num_digits, has_repeats, attempts, responses, full_strat):
  try:
    substrat = full_strat[num_digits][has_repeats]
    for attempt, response in zip(attempts, responses):
      suggestion = substrat['guess']
      if attempt != suggestion:
        return None
      substrat = substrat['scores'][str(tuple(response))]
    return substrat['guess']
  except KeyError as e:
    return None
    
# print(get_best_next_guess_from_full_strat(6, False, ["001234", "056789"], [(0, 3)], FULL_STRAT))

In [None]:
def run_bot():
    global break_program, listener
    break_program = False
    listener = kb.Listener(on_press=on_press)
    # listener.start()

    # TODO: fix-up debug & print statements so more helpful & clear

    io.click_mouse(0, 0, delays=[0.03, 0.03, 0])

    while not break_program:
        desired_difficulty = 'normal'
        get_into_game(desired_difficulty)
        
        time.sleep(0.1)

        level, is_easy_mode, difficulty, num_digits, chances, has_repeats = get_initial_level_info()
        print(f"---LEVEL {level}: ({num_digits} digits, {'has' if has_repeats else 'no'} repeats, difficuly {difficulty})---")

        possible, unused = helper.calc_initial_possible_unused(num_digits, has_repeats)
        sub_strategy = FULL_STRAT[num_digits][has_repeats]
        attempts = []
        responses = []
        attempt_num = 0

        while not break_program: # This loop completes a level
            time.sleep(0.1)
            debug("len(possible):\t", len(possible))
            
            attempt = parse_attempt(attempt_num)
            print (f"Attempt #{attempt_num}:\t{attempt}")
            if '-' in attempt:
                suggestion = get_best_next_guess_from_full_strat(num_digits, has_repeats, attempts, responses, full_strat=FULL_STRAT)

                if suggestion is None:
                    debug(f"Warning: unable to find next guess in FULL_STRAT: {(num_digits, has_repeats, attempts, responses)}")
                    if attempt_num == 0:
                        suggestion = helper.get_best_first_guess(num_digits, has_repeats, full_strat=FULL_STRAT)
                    else:
                        max_search_time = calc_max_search_time()
                        search_set = determine_search_set(possible, unused, num_digits)
                        suggestion = helper.calc_best_next_guess(search_set, max_search_time, possible)
                        if suggestion is None:
                            debug("Failed to calculate the next suggestion; assume game over.")
                            break

                print(f"Suggestion #{attempt_num}:\t{suggestion}")

                attempt = submit_suggestion(suggestion, attempt_num, num_digits)
                if attempt is None:
                    debug(f"Submit {suggestion} was unsuccessful; assume game over.")
                    break

            # If finished the level, wait for the next one
            if io.is_screen_type(Screen.GREAT_JOB):
                debug("Passed the level; wait for the next.")
                break
            # If failed the level, restart the game
            elif io.is_screen_type(Screen.END_GAME):
                debug("Failed the level; restart the game.")
                break
            
            response = wait_and_parse_response(num_digits, attempt_num)
            if response is None:
                debug("Unable to parse response; assume game over.")
                break
            num_greens, num_reds = int(response[0]), int(response[1])

            # If all green gumballs, we won; move to next level
            if num_greens == num_digits:
                debug(f"Level success! ({attempt_num + 1} attempts). Move to the next.")
                break
        
            possible, unused = helper.filter_possible_unused(possible, unused, attempt, (num_greens, num_reds))
            attempts.append(attempt)
            responses.append(response)
            
            attempt_num += 1

run_bot()

{<Screen.HOME: 'home'>}
Start game - difficulty normal
{<Screen.MIDGAME: 'midgame'>, <Screen.NEW_LEVEL: 'new_level'>}
{<Screen.MIDGAME: 'midgame'>, <Screen.NEW_LEVEL: 'new_level'>}
{<Screen.MIDGAME: 'midgame'>}
---LEVEL 1: (3 digits, no repeats, difficuly normal)---
len(possible):	 720
Attempt #0:	---
Suggestion #0:	012
--- 012
press 0
0-- 012
press 1
01- 012
press 2
response:	[0, 1]
len(possible):	 252
Attempt #1:	0--
Suggestion #1:	134
press backspace
--- 134
press 1
1-- 134
press 3
13- 134
press 4
response:	[0, 1]
len(possible):	 75
Attempt #2:	0--
Suggestion #2:	350
press backspace
--- 350
press 3
3-- 350
press 5
35- 350
press 0
response:	[1, 0]
len(possible):	 16
Attempt #3:	0--
Suggestion #3:	640
press backspace
--- 640
press 6
6-- 640
press 4
64- 640
press 0
response:	[1, 1]
len(possible):	 3
Attempt #4:	0--
Suggestion #4:	078
0-- 078
press 7
07- 078
press 8
response:	[0, 2]
len(possible):	 1
Attempt #5:	0--
Suggestion #5:	480
press backspace
--- 480
press 4
4-- 480
press 8
48- 

KeyboardInterrupt: 