-
Notifications
You must be signed in to change notification settings - Fork 3
/
utils.py
137 lines (111 loc) · 4.68 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
"""Generic utility functions
"""
from __future__ import print_function
from threading import Thread
from Queue import Queue
import time
import copy
INFINITY = float('inf')
class ExceededTimeError(RuntimeError):
"""Thrown when the given function exceeded its runtime.
"""
pass
def function_wrapper(func, args, kwargs, result_queue):
"""Runs the given function and measures its runtime.
:param func: The function to run.
:param args: The function arguments as tuple.
:param kwargs: The function kwargs as dict.
:param result_queue: The inter-process queue to communicate with the parent.
:return: A tuple: The function return value, and its runtime.
"""
start = time.clock()
try:
result = func(*args, **kwargs)
except MemoryError as e:
result_queue.put(e)
return
runtime = time.clock() - start
result_queue.put((result, runtime))
def run_with_limited_time(func, args, kwargs, time_limit):
"""Runs a function with time limit
:param func: The function to run.
:param args: The functions args, given as tuple.
:param kwargs: The functions keywords, given as dict.
:param time_limit: The time limit in seconds (can be float).
:return: A tuple: The function's return value unchanged, and the running time for the function.
:raises PlayerExceededTimeError: If player exceeded its given time.
"""
q = Queue()
t = Thread(target=function_wrapper, args=(func, args, kwargs, q))
t.start()
# This is just for limiting the runtime of the other thread, so we stop eventually.
# It doesn't really measure the runtime.
t.join(time_limit * 1.1)
if t.is_alive():
raise ExceededTimeError
q_get = q.get()
if isinstance(q_get, MemoryError):
raise q_get
return q_get
class MiniMaxWithAlphaBetaPruning:
def __init__(self, utility, my_color, no_more_time):
"""Initialize a MiniMax algorithms with alpha-beta pruning.
:param utility: The utility function. Should have state as parameter.
:param my_color: The color of the player who runs this MiniMax search.
:param no_more_time: A function that returns true if there is no more time to run this search, or false if
there is still time left.
"""
self.utility = utility
self.my_color = my_color
self.no_more_time = no_more_time
def search(self, state, depth, alpha, beta, maximizing_player):
"""Start the MiniMax algorithm.
:param state: The state to start from.
:param depth: The maximum allowed depth for the algorithm.
:param alpha: The alpha of the alpha-beta pruning.
:param alpha: The beta of the alpha-beta pruning.
:param maximizing_player: Whether this is a max node (True) or a min node (False).
:return: A tuple: (The alpha-beta algorithm value, The move in case of max node or None in min mode)
"""
if depth == 0 or self.no_more_time():
return self.utility(state), None
next_moves = state.get_possible_moves()
if not next_moves:
# This player has no moves. So the previous player is the winner.
return INFINITY if state.curr_player != self.my_color else -INFINITY, None
if maximizing_player:
selected_move = next_moves[0]
best_move_utility = -INFINITY
for move in next_moves:
new_state = copy.deepcopy(state)
new_state.perform_move(move)
minimax_value, _ = self.search(new_state, depth - 1, alpha, beta, False)
alpha = max(alpha, minimax_value)
if minimax_value > best_move_utility:
best_move_utility = minimax_value
selected_move = move
if beta <= alpha or self.no_more_time():
break
return alpha, selected_move
else:
for move in next_moves:
new_state = copy.deepcopy(state)
new_state.perform_move(move)
beta = min(beta, self.search(new_state, depth - 1, alpha, beta, True)[0])
if beta <= alpha or self.no_more_time():
break
return beta, None
if __name__ == '__main__':
# Testing stuff
def f(t, s):
start_time = time.clock()
while time.clock() - start_time < t:
for i in range(10**3):
pass
return s * 2
print(run_with_limited_time(f, (1.5, 'a'), {}, 2.5))
try:
print(run_with_limited_time(f, (3.5, 'b'), {}, 2.5))
except ExceededTimeError:
print('OK')
print(run_with_limited_time(f, (1.5, 4), {}, float('inf')))