forked from DorinK/AI-Checkers
-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.py
executable file
·123 lines (101 loc) · 4.58 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
"""Generic utility functions
"""
# from __future__ import print_function
from threading import Thread
from queue import Queue
import time
import copy
INFINITY = float(6000)
class ExceededTimeError(RuntimeError):
"""Thrown when the given function exceeded its runtime.
"""
pass
def function_wrapper(func, args, kwargs, result_queue):
"""Runs the given function and measures its runtime.
:param func: The function to run.
:param args: The function arguments as tuple.
:param kwargs: The function kwargs as dict.
:param result_queue: The inter-process queue to communicate with the parent.
:return: A tuple: The function return value, and its runtime.
"""
start = time.process_time()
try:
result = func(*args, **kwargs)
except MemoryError as e:
result_queue.put(e)
return
runtime = time.process_time() - start
result_queue.put((result, runtime))
def run_with_limited_time(func, args, kwargs, time_limit):
"""Runs a function with time limit
:param func: The function to run.
:param args: The functions args, given as tuple.
:param kwargs: The functions keywords, given as dict.
:param time_limit: The time limit in seconds (can be float).
:return: A tuple: The function's return value unchanged, and the running time for the function.
:raises PlayerExceededTimeError: If player exceeded its given time.
"""
q = Queue()
t = Thread(target=function_wrapper, args=(func, args, kwargs, q))
t.start()
# This is just for limiting the runtime of the other thread, so we stop eventually.
# It doesn't really measure the runtime.
t.join(time_limit)
if t.is_alive():
raise ExceededTimeError
q_get = q.get()
if isinstance(q_get, MemoryError):
raise q_get
return q_get
class MiniMaxWithAlphaBetaPruning:
def __init__(self, utility, my_color, no_more_time, selective_deepening):
"""Initialize a MiniMax algorithms with alpha-beta pruning.
:param utility: The utility function. Should have state as parameter.
:param my_color: The color of the player who runs this MiniMax search.
:param no_more_time: A function that returns true if there is no more time to run this search, or false if
there is still time left.
:param selective_deepening: A functions that gets the current state, and
returns True when the algorithm should continue the search
for the minimax value recursivly from this state.
"""
self.utility = utility
self.my_color = my_color
self.no_more_time = no_more_time
self.selective_deepening = selective_deepening
def search(self, state, depth, alpha, beta, maximizing_player):
"""Start the MiniMax algorithm.
:param state: The state to start from.
:param depth: The maximum allowed depth for the algorithm.
:param alpha: The alpha of the alpha-beta pruning.
:param alpha: The beta of the alpha-beta pruning.
:param maximizing_player: Whether this is a max node (True) or a min node (False).
:return: A tuple: (The alpha-beta algorithm value, The move in case of max node or None in min mode)
"""
if self.no_more_time() or (depth <= 0 and not self.selective_deepening(state)):
return self.utility(state), None
next_moves = state.get_possible_moves()
if not next_moves:
# This player has no moves. So the previous player is the winner.
return INFINITY if state.curr_player != self.my_color else -INFINITY, None
if maximizing_player:
selected_move = next_moves[0]
best_move_utility = -INFINITY
for move in next_moves:
new_state = copy.deepcopy(state)
new_state.perform_move(move)
minimax_value, _ = self.search(new_state, depth - 1, alpha, beta, False)
alpha = max(alpha, minimax_value)
if minimax_value > best_move_utility:
best_move_utility = minimax_value
selected_move = move
if beta <= alpha or self.no_more_time():
break
return alpha, selected_move
else:
for move in next_moves:
new_state = copy.deepcopy(state)
new_state.perform_move(move)
beta = min(beta, self.search(new_state, depth - 1, alpha, beta, True)[0])
if beta <= alpha or self.no_more_time():
break
return beta, None