diff --git a/src/chess_zero/agent/api_chess.py b/src/chess_zero/agent/api_chess.py index 45ef02d..549ba9e 100644 --- a/src/chess_zero/agent/api_chess.py +++ b/src/chess_zero/agent/api_chess.py @@ -1,23 +1,39 @@ from chess_zero.config import Config - +from threading import Thread +import numpy as np +import multiprocessing as mp +from multiprocessing import connection +import time class ChessModelAPI: - def __init__(self, config: Config, model): - self.config = config + def __init__(self, model): self.model = model + self.pipes = [] - def predict(self, x): - assert x.ndim in (3, 4) - input_stack_height = self.config.model.input_stack_height - assert x.shape == (input_stack_height, 8, 8) or x.shape[1:] == (input_stack_height, 8, 8) # should I get rid of these assertions...? they will change. - orig_x = x - if x.ndim == 3: - x = x.reshape(1, input_stack_height, 8, 8) + def start(self): + prediction_worker = Thread(target=self.predict_batch_worker, name="prediction_worker") + prediction_worker.daemon = True + prediction_worker.start() - with self.model.graph.as_default(): - policy, value = self.model.model.predict_on_batch(x) + def get_pipe(self): + me, you = mp.Pipe() + self.pipes.append(me) + return you - if orig_x.ndim == 3: - return policy[0], value[0] - else: - return policy, value + def predict_batch_worker(self): + while True: + ready = mp.connection.wait(self.pipes, timeout=0.001) + if not ready: + continue + data, result_pipes = [], [] + for pipe in ready: + while pipe.poll(): + data.append(pipe.recv()) + result_pipes.append(pipe) + if not data: + continue + data = np.asarray(data, dtype=np.float32) + with self.model.graph.as_default(): + policy_ary, value_ary = self.model.model.predict_on_batch(data) + for pipe, p, v in zip(result_pipes, policy_ary, value_ary): + pipe.send((p, float(v))) diff --git a/src/chess_zero/agent/model_chess.py b/src/chess_zero/agent/model_chess.py index bce4fa2..4bcbcd6 100644 --- a/src/chess_zero/agent/model_chess.py +++ b/src/chess_zero/agent/model_chess.py @@ -4,15 +4,15 @@ from logging import getLogger # noinspection PyPep8Naming -import tensorflow as tf +from tensorflow import get_default_graph from keras.engine.topology import Input from keras.engine.training import Model from keras.layers.convolutional import Conv2D from keras.layers.core import Activation, Dense, Flatten from keras.layers.merge import Add from keras.layers.normalization import BatchNormalization -from keras.losses import categorical_crossentropy, mean_squared_error from keras.regularizers import l2 +from chess_zero.agent.api_chess import ChessModelAPI from chess_zero.config import Config @@ -23,14 +23,21 @@ class ChessModel: def __init__(self, config: Config): self.config = config self.model = None # type: Model - self.digest = None self.graph = None + self.digest = None + self.api = None + + def get_pipes(self, num=1): + if self.api is None: + self.api = ChessModelAPI(self) + self.api.start() + return [self.api.get_pipe() for _ in range(num)] def build(self): mc = self.config.model in_x = x = Input((mc.input_stack_height, 8, 8)) # (batch, channels, height, width) - x = Conv2D(filters=mc.cnn_filter_num, kernel_size=mc.cnn_filter_size, padding="same", data_format="channels_first", kernel_initializer='glorot_normal', bias_initializer='zeros', kernel_regularizer=l2(mc.l2_reg), input_shape=(mc.input_stack_height, 8, 8))(x) + x = Conv2D(filters=mc.cnn_filter_num, kernel_size=mc.cnn_filter_size, padding="same", data_format="channels_first", use_bias=False, kernel_initializer='glorot_normal', bias_initializer='zeros', kernel_regularizer=l2(mc.l2_reg), input_shape=(mc.input_stack_height, 8, 8))(x) x = BatchNormalization(axis=1)(x) x = Activation("relu")(x) @@ -39,7 +46,7 @@ def build(self): res_out = x # for policy output - x = Conv2D(filters=2, kernel_size=1, data_format="channels_first", kernel_initializer='glorot_normal', bias_initializer='zeros', kernel_regularizer=l2(mc.l2_reg))(res_out) + x = Conv2D(filters=2, kernel_size=1, data_format="channels_first", use_bias=False, kernel_initializer='glorot_normal', bias_initializer='zeros', kernel_regularizer=l2(mc.l2_reg))(res_out) x = BatchNormalization(axis=1)(x) x = Activation("relu")(x) x = Flatten()(x) @@ -47,7 +54,7 @@ def build(self): policy_out = Dense(self.config.n_labels, kernel_initializer='glorot_normal', bias_initializer='zeros', kernel_regularizer=l2(mc.l2_reg), activation="softmax", name="policy_out")(x) # for value output - x = Conv2D(filters=1, kernel_size=1, data_format="channels_first", kernel_initializer='glorot_normal', bias_initializer='zeros', kernel_regularizer=l2(mc.l2_reg))(res_out) + x = Conv2D(filters=1, kernel_size=1, data_format="channels_first", use_bias=False, kernel_initializer='glorot_normal', bias_initializer='zeros', kernel_regularizer=l2(mc.l2_reg))(res_out) x = BatchNormalization(axis=1)(x) x = Activation("relu")(x) x = Flatten()(x) @@ -55,14 +62,15 @@ def build(self): value_out = Dense(1, kernel_initializer='glorot_normal', bias_initializer='zeros', kernel_regularizer=l2(mc.l2_reg), activation="tanh", name="value_out")(x) self.model = Model(in_x, [policy_out, value_out], name="chess_model") + self.graph = get_default_graph() def _build_residual_block(self, x): mc = self.config.model in_x = x - x = Conv2D(filters=mc.cnn_filter_num, kernel_size=mc.cnn_filter_size, padding="same", data_format="channels_first", kernel_initializer='glorot_normal', bias_initializer='zeros', kernel_regularizer=l2(mc.l2_reg))(x) + x = Conv2D(filters=mc.cnn_filter_num, kernel_size=mc.cnn_filter_size, padding="same", data_format="channels_first", use_bias=False, kernel_initializer='glorot_normal', bias_initializer='zeros', kernel_regularizer=l2(mc.l2_reg))(x) x = BatchNormalization(axis=1)(x) x = Activation("relu")(x) - x = Conv2D(filters=mc.cnn_filter_num, kernel_size=mc.cnn_filter_size, padding="same", data_format="channels_first", kernel_initializer='glorot_normal', bias_initializer='zeros', kernel_regularizer=l2(mc.l2_reg))(x) + x = Conv2D(filters=mc.cnn_filter_num, kernel_size=mc.cnn_filter_size, padding="same", data_format="channels_first", use_bias=False, kernel_initializer='glorot_normal', bias_initializer='zeros', kernel_regularizer=l2(mc.l2_reg))(x) x = BatchNormalization(axis=1)(x) x = Add()([in_x, x]) x = Activation("relu")(x) @@ -82,6 +90,8 @@ def load(self, config_path, weight_path): with open(config_path, "rt") as f: self.model = Model.from_config(json.load(f)) self.model.load_weights(weight_path) + self.graph = get_default_graph() + # self.model._make_predict_function() self.digest = self.fetch_digest(weight_path) logger.debug(f"loaded model digest = {self.digest}") return True @@ -96,11 +106,3 @@ def save(self, config_path, weight_path): self.model.save_weights(weight_path) self.digest = self.fetch_digest(weight_path) logger.debug(f"saved model digest {self.digest}") - - -def loss_function_for_policy(y_true, y_pred): - return categorical_crossentropy(y_true, y_pred) - - -def loss_function_for_value(y_true, y_pred): - return mean_squared_error(y_true, y_pred) diff --git a/src/chess_zero/agent/player_chess.py b/src/chess_zero/agent/player_chess.py index 0c355a0..fce62f6 100644 --- a/src/chess_zero/agent/player_chess.py +++ b/src/chess_zero/agent/player_chess.py @@ -1,18 +1,15 @@ from collections import defaultdict, namedtuple from logging import getLogger -from concurrent.futures import Future, ThreadPoolExecutor -from threading import Thread, Lock +from concurrent.futures import ThreadPoolExecutor +from threading import Lock -from profilehooks import profile - -import time import numpy as np import chess -from chess_zero.agent.api_chess import ChessModelAPI from chess_zero.config import Config from chess_zero.env.chess_env import ChessEnv, Winner # import chess.gaviota +# import chess.syzygy QueueItem = namedtuple("QueueItem", "state future") HistoryItem = namedtuple("HistoryItem", "action policy values visit") @@ -25,7 +22,6 @@ class VisitStats: def __init__(self): self.a = defaultdict(ActionStats) # (key, value) of type (Move, ActionStats)? self.sum_n = 0 - self.selected_yet = False class ActionStats: @@ -36,57 +32,44 @@ def __init__(self): class ChessPlayer: - def __init__(self, config: Config, model=None, play_config=None): + def __init__(self, config: Config, pipes=None, dummy=False, play_config=None): self.config = config - self.model = model self.play_config = play_config or self.config.play - self.api = ChessModelAPI(self.config, self.model) self.labels = self.config.labels self.n_labels = config.n_labels # self.tablebases = chess.gaviota.open_tablebases(self.config.resource.tablebase_dir) # self.tablebases = chess.syzygy.open_tablebases(self.config.resource.tablebase_dir) - self.prediction_queue_lock = Lock() - self.is_thinking = False - self.moves = [] + if dummy: + return + self.pipe_pool = pipes + self.node_lock = defaultdict(Lock) self.reset() def reset(self): self.tree = defaultdict(VisitStats) - self.node_lock = defaultdict(Lock) - self.prediction_queue = [] def action(self, env): self.reset() - key = env.transposition_key() - - self.is_thinking = True - prediction_worker = Thread(target=self.predict_batch_worker, name="prediction_worker") - prediction_worker.daemon = True - prediction_worker.start() - - try: # what exceptions do you think will be thrown here? - for tl in range(self.play_config.thinking_loop): - if self.play_config.tablebase_access and env.board.num_pieces() <= 5: # tablebase takes over - policy = self.tablebase_policy(env) # note: returns an "all or nothing" policy, regardless of tau, etc. - else: - self.search_moves(env) # this should leave env invariant! - policy = self.calc_policy(env) - action = int(np.random.choice(range(self.n_labels), p=policy)) - finally: - self.is_thinking = False + if self.play_config.tablebase_access and env.board.num_pieces() <= 5: # tablebase takes over + root_value = self.tablebase_and_evaluate(env) + policy = self.tablebase_policy(env) # note: returns an "all or nothing" policy, regardless of tau, etc. + else: + root_value = self.search_moves(env) # this should leave env invariant! + policy = self.calc_policy(env) + action = int(np.random.choice(range(self.n_labels), p=policy)) if self.play_config.resign_threshold is not None and \ self.play_config.min_resign_turn < env.fullmove_number and \ - np.max([a_s.q for a_s in self.tree[key].a.values()]) <= self.play_config.resign_threshold: # technically, resigning should be determined by leaf_v. + root_value <= self.play_config.resign_threshold: # technically, resigning should be determined by leaf_v? return chess.Move.null() # means resign else: self.moves.append([env.fen, list(policy)]) - move = next(move for move in self.tree[key].a.keys() if self.labels[move] == action) + move = next(move for move in self.tree[env.transposition_key()].a.keys() if self.labels[move] == action) return move def sl_action(self, env, move): @@ -97,14 +80,14 @@ def sl_action(self, env, move): self.moves.append([env.fen, list(ret)]) return move - def search_moves(self, env): - futures = [] - with ThreadPoolExecutor(max_workers=self.play_config.parallel_search_num) as executor: - for _ in range(self.play_config.simulation_num_per_move): - futures.append(executor.submit(self.search_my_move, env=env.copy(), is_root_node=True)) - [f.result() for f in futures] + def search_moves(self, env) -> (float, float): + num_sims = self.play_config.simulation_num_per_move + with ThreadPoolExecutor(max_workers=self.play_config.search_threads) as executor: + vals = executor.map(self.search_my_move, [env.copy() for _ in range(num_sims)], [True for _ in range(num_sims)]) + + return np.max(vals) - def search_my_move(self, env: ChessEnv, is_root_node=False): + def search_my_move(self, env: ChessEnv, is_root_node): """ Q, V is value for this Player (always white). @@ -113,7 +96,7 @@ def search_my_move(self, env: ChessEnv, is_root_node=False): :param is_root_node: :return: leaf value """ - if env.done: # should an MCTS worker even have access to mate info...? + if env.done: if env.winner == Winner.DRAW: return 0 else: @@ -121,39 +104,36 @@ def search_my_move(self, env: ChessEnv, is_root_node=False): key = env.transposition_key() - my_lock = self.node_lock[key] - - with my_lock: + with self.node_lock[key]: if key not in self.tree: - leaf_v = self.expand_and_evaluate(env) - return leaf_v # I'm returning everything from the POV of side to move + leaf_p, leaf_v = self.expand_and_evaluate(env) + self.tree[key].p = leaf_p + return leaf_v # returning everything from the POV of side to move # keep the same lock open? move_t, action_t = self.select_action_q_and_u(env, is_root_node) - env.step(move_t) - - virtual_loss = self.play_config.virtual_loss - with my_lock: - my_visitstats = self.tree[key] - my_actionstats = my_visitstats.a[move_t] + virtual_loss = self.play_config.virtual_loss + my_visit_stats = self.tree[key] + my_action_stats = my_visit_stats.a[move_t] + my_visit_stats.sum_n += virtual_loss + my_action_stats.n += virtual_loss + my_action_stats.w += -virtual_loss + my_action_stats.q = my_action_stats.w / my_action_stats.n # fixed a bug: must update q here... - my_visitstats.sum_n += virtual_loss - my_actionstats.n += virtual_loss - my_actionstats.w += -virtual_loss - my_actionstats.q = my_actionstats.w / my_actionstats.n # fixed a bug: must update q here... - leaf_v = -self.search_my_move(env) # next move + env.step(move_t) + leaf_v = -self.search_my_move(env, False) # next move # on returning search path, update: N, W, Q - with my_lock: - my_visitstats.sum_n += -virtual_loss + 1 - my_actionstats.n += -virtual_loss + 1 - my_actionstats.w += virtual_loss + leaf_v - my_actionstats.q = my_actionstats.w / my_actionstats.n + with self.node_lock[key]: + my_visit_stats.sum_n += -virtual_loss + 1 + my_action_stats.n += -virtual_loss + 1 + my_action_stats.w += virtual_loss + leaf_v + my_action_stats.q = my_action_stats.w / my_action_stats.n return leaf_v - def expand_and_evaluate(self, env) -> float: + def expand_and_evaluate(self, env) -> (np.ndarray, float): """expand new leaf this is called with state locked @@ -171,9 +151,7 @@ def expand_and_evaluate(self, env) -> float: if env.board.turn == chess.BLACK: leaf_p = Config.flip_policy(leaf_p) - self.tree[env.transposition_key()].temp_p = leaf_p - - return float(leaf_v) + return leaf_p, leaf_v def tablebase_and_evaluate(self, env): wdl = self.tablebases.probe_wdl(env.board) @@ -187,27 +165,12 @@ def tablebase_and_evaluate(self, env): return float(leaf_v) - def predict_batch_worker(self): - while self.is_thinking: - if self.prediction_queue: - with self.prediction_queue_lock: - item_list = self.prediction_queue # doesn't this just copy the reference? - self.prediction_queue = [] - - # logger.debug(f"predicting {len(item_list)} items") - data = np.array([x.state for x in item_list]) - policy_ary, value_ary = self.api.predict(data) - for item, p, v in zip(item_list, policy_ary, value_ary): - item.future.set_result((p, v)) - else: - time.sleep(self.play_config.prediction_worker_sleep_sec) - def predict(self, state): - future = Future() - item = QueueItem(state, future) - with self.prediction_queue_lock: # lists are atomic anyway though - self.prediction_queue.append(item) - return future.result() + pipe = self.pipe_pool.pop() + pipe.send(state) + ret = pipe.recv() + self.pipe_pool.append(pipe) + return ret def finish_game(self, z): """ @@ -224,14 +187,14 @@ def calc_policy(self, env): """ pc = self.play_config - my_visitstats = self.tree[env.transposition_key()] - var_n = np.zeros(self.n_labels) - var_n[[self.labels[move] for move in my_visitstats.a.keys()]] = [a_s.n for a_s in my_visitstats.a.values()] # too 'pythonic'? a.keys() and a.values() are guaranteed to be in the same order. + my_visit_stats = self.tree[env.transposition_key()] + policy = np.zeros(self.n_labels) + policy[[self.labels[move] for move in my_visit_stats.a.keys()]] = [a_s.n for a_s in my_visit_stats.a.values()] # too 'pythonic'? a.keys() and a.values() are guaranteed to be in the same order. if env.fullmove_number < pc.change_tau_turn: - return var_n / my_visitstats.sum_n # should never be dividing by 0 + return policy / my_visit_stats.sum_n # should never be dividing by 0 else: - action = np.argmax(var_n) # tau = 0 + action = np.argmax(policy) # tau = 0 ret = np.zeros(self.n_labels) ret[action] = 1 return ret @@ -247,25 +210,24 @@ def select_action_q_and_u(self, env, is_root_node): if self.play_config.tablebase_access and env.board.num_pieces() <= 5: return self.select_action_tablebase(env) - my_visitstats = self.tree[env.transposition_key()] - if not my_visitstats.selected_yet: - my_visitstats.selected_yet = True + my_visit_stats = self.tree[env.transposition_key()] + if my_visit_stats.p is not None: tot_p = 0 for move in env.board.legal_moves: - move_p = my_visitstats.temp_p[self.labels[move]] - my_visitstats.a[move].p = move_p # defaultdict is key here. + move_p = my_visit_stats.p[self.labels[move]] + my_visit_stats.a[move].p = move_p # defaultdict is key here. tot_p += move_p - for a_s in my_visitstats.a.values(): + for a_s in my_visit_stats.a.values(): a_s.p /= tot_p + my_visit_stats.p = None - # noinspection PyUnresolvedReferences - xx_ = np.sqrt(my_visitstats.sum_n + 1) # SQRT of sum(N(s, b); for all b) + xx_ = np.sqrt(my_visit_stats.sum_n + 1) # SQRT of sum(N(s, b); for all b) e = self.play_config.noise_eps c_puct = self.play_config.c_puct dirichlet_alpha = self.play_config.dirichlet_alpha - v_ = {move:(a_s.q + c_puct * (a_s.p if not is_root_node else (1 - e) * a_s.p + e * np.random.dirichlet([dirichlet_alpha])) * xx_ / (1 + a_s.n)) for move, a_s in my_visitstats.a.items()} # too much on one line...? + v_ = {move:(a_s.q + c_puct * (a_s.p if not is_root_node else (1 - e) * a_s.p + e * np.random.dirichlet([dirichlet_alpha])) * xx_ / (1 + a_s.n)) for move, a_s in my_visit_stats.a.items()} # too much on one line...? move_t = max(v_, key=v_.get) action_t = self.labels[move_t] @@ -283,28 +245,28 @@ def select_action_tablebase(self, env): action_t = self.labels[move_t] return move_t, action_t - # Uncomment the below code if using the SYZYGY TABLEBASES. NOTE: syzygy only provides _distance to zero_, which in general does not coincide with optimal _distance to mate_. - # violent_wins = {} - # quiets_and_draws = {} - # violent_losses = {} - # for move in env.board.legal_moves: # note: not worrying about hashing this. - # is_zeroing = env.board.is_zeroing(move) - # env.board.push(move) # note: minimizes distance to _zero_. distance to mate is not available through the tablebase bases. but gaviota are much larger... - # dtz = self.tablebases.probe_dtz(env.board) # casting to float isn't necessary; is coerced below upon comparison to 0.0 - # value = 1/dtz if dtz != 0.0 else 0.0 # a trick: fast mated < slow mated < draw < slow mate < fast mate - # if is_zeroing and value < 0: - # violent_wins[move] = value - # elif not is_zeroing or value == 0: - # quiets_and_draws[move] = value - # elif is_zeroing and value > 0: - # violent_losses[move] = value - # env.board.pop() - # if violent_wins: - # move_t = min(violent_wins, key=violent_wins.get) - # elif quiets_and_draws: - # move_t = min(quiets_and_draws, key=quiets_and_draws.get) - # elif violent_losses: - # move_t = min(violent_losses, key=violent_losses.get) - action_t = self.labels[move_t] - return move_t, action_t - + # Uncomment the below code if using the SYZYGY TABLEBASES. NOTE: syzygy only provides _distance to zero_, which in general does not coincide with optimal _distance to mate_. + # def select_action_tablebase(self, env): + # violent_wins = {} + # quiets_and_draws = {} + # violent_losses = {} + # for move in env.board.legal_moves: # note: not worrying about hashing this. + # is_zeroing = env.board.is_zeroing(move) + # env.board.push(move) # note: minimizes distance to _zero_. distance to mate is not available through the tablebase bases. but gaviota are much larger... + # dtz = self.tablebases.probe_dtz(env.board) # casting to float isn't necessary; is coerced below upon comparison to 0.0 + # value = 1/dtz if dtz != 0.0 else 0.0 # a trick: fast mated < slow mated < draw < slow mate < fast mate + # if is_zeroing and value < 0: + # violent_wins[move] = value + # elif not is_zeroing or value == 0: + # quiets_and_draws[move] = value + # elif is_zeroing and value > 0: + # violent_losses[move] = value + # env.board.pop() + # if violent_wins: + # move_t = min(violent_wins, key=violent_wins.get) + # elif quiets_and_draws: + # move_t = min(quiets_and_draws, key=quiets_and_draws.get) + # elif violent_losses: + # move_t = min(violent_losses, key=violent_losses.get) + # action_t = self.labels[move_t] + # return move_t, action_t diff --git a/src/chess_zero/configs/mini.py b/src/chess_zero/configs/mini.py index 6615a35..4303855 100644 --- a/src/chess_zero/configs/mini.py +++ b/src/chess_zero/configs/mini.py @@ -7,8 +7,10 @@ def __init__(self): class PlayConfig: def __init__(self): + self.max_processes = 3 + self.search_threads = 16 + self.vram_frac = 0.4 self.simulation_num_per_move = 10 - self.thinking_loop = 1 self.c_puct = 3 self.noise_eps = 0.25 self.dirichlet_alpha = 0.3 @@ -37,13 +39,14 @@ def __init__(self): class PlayWithHumanConfig: def __init__(self): self.play_config = PlayConfig() - self.play_config.thinking_loop = 5 self.play_config.tablebase_access = False class TrainerConfig: def __init__(self): self.batch_size = 32 # 2048 + self.cleaning_processes = 8 # RAM explosion... + self.vram_frac = 0.2 self.epoch_to_checkpoint = 1 self.start_total_steps = 0 self.save_model_steps = 1000 diff --git a/src/chess_zero/configs/normal.py b/src/chess_zero/configs/normal.py index fcbfb63..31be639 100644 --- a/src/chess_zero/configs/normal.py +++ b/src/chess_zero/configs/normal.py @@ -7,8 +7,10 @@ def __init__(self): class PlayConfig: def __init__(self): + self.max_processes = 8 + self.search_threads = 16 + self.vram_frac = 1.0 self.simulation_num_per_move = 800 - self.thinking_loop = 1 self.c_puct = 3 self.noise_eps = 0.25 self.dirichlet_alpha = 0.3 @@ -37,13 +39,14 @@ def __init__(self): class PlayWithHumanConfig: def __init__(self): self.play_config = PlayConfig() - self.play_config.thinking_loop = 5 self.play_config.tablebase_access = False class TrainerConfig: def __init__(self): self.batch_size = 32 # 2048 + self.cleaning_processes = 8 # RAM explosion... + self.vram_frac = 1.0 self.epoch_to_checkpoint = 1 self.start_total_steps = 0 self.save_model_steps = 2000 diff --git a/src/chess_zero/configs/small.py b/src/chess_zero/configs/small.py index dd0ffb3..c2431cd 100644 --- a/src/chess_zero/configs/small.py +++ b/src/chess_zero/configs/small.py @@ -1,14 +1,16 @@ class PlayDataConfig: def __init__(self): - self.nb_game_in_file = 100 # 100 + self.nb_game_in_file = 20 # 100 self.sl_nb_game_in_file = 100 self.max_file_num = 100 # 10 class PlayConfig: def __init__(self): + self.max_processes = 8 + self.search_threads = 16 + self.vram_frac = 0.4 self.simulation_num_per_move = 200 # 10 - self.thinking_loop = 1 self.c_puct = 3 self.noise_eps = .25 self.dirichlet_alpha = 0.3 @@ -37,19 +39,20 @@ def __init__(self): class PlayWithHumanConfig: def __init__(self): self.play_config = PlayConfig() - self.play_config.thinking_loop = 5 self.play_config.tablebase_access = False class TrainerConfig: def __init__(self): + self.cleaning_processes = 8 # RAM explosion... + self.vram_frac = 0.4 self.batch_size = 32 # 2048 self.epoch_to_checkpoint = 1 self.start_total_steps = 0 - self.save_model_steps = 10000 # 2000 + self.save_model_steps = 2000 # 2000 self.load_data_steps = 1000 - self.min_data_size_to_learn = 1000 - self.max_num_files_in_memory = 20 + self.min_data_size_to_learn = 10000 + self.max_num_files_in_memory = 60 class ModelConfig: diff --git a/src/chess_zero/env/chess_env.py b/src/chess_zero/env/chess_env.py index f80ea1a..bb09c11 100644 --- a/src/chess_zero/env/chess_env.py +++ b/src/chess_zero/env/chess_env.py @@ -22,27 +22,23 @@ class ChessEnv: def __init__(self, config: Config): self.config = config self.board = None - self.done = False self.winner = None # type: Winner self.resigned = False def reset(self): self.board = MyBoard() - self.done = False self.winner = None self.resigned = False return self def update(self, fen): self.board = MyBoard(fen) - self.done = False self.winner = None self.resigned = False return self def randomize(self, num): # generates a random position with _num_ pieces on the board. used to generate training data (with tablebase) self.board = MyBoard(None) - self.done = False self.winner = None self.resigned = False @@ -68,23 +64,15 @@ def step(self, move): self.board.push(move) - if self._is_game_over(): - self._conclude_game() - - def _is_game_over(self): - return self.board.is_game_over() or self.board.can_claim_draw() or self.fullmove_number >= self.config.play.automatic_draw_turn - - def _conclude_game(self): - self.done = True - result = self.board.result() - if result == '1/2-1/2' or self.board.can_claim_draw() or self.fullmove_number >= self.config.play.automatic_draw_turn: - self.winner = Winner.DRAW - else: - self.winner = Winner.WHITE if result == '1-0' else Winner.BLACK + if self.board.is_game_over() or self.board.can_claim_draw() or self.fullmove_number >= self.config.play.automatic_draw_turn: + result = self.board.result() + if result == '1/2-1/2' or self.board.can_claim_draw() or self.fullmove_number >= self.config.play.automatic_draw_turn: + self.winner = Winner.DRAW + else: + self.winner = Winner.WHITE if result == '1-0' else Winner.BLACK def _resign(self): self.winner = Winner.BLACK if self.board.turn == chess.WHITE else Winner.WHITE - self.done = True self.resigned = True def copy(self): @@ -95,6 +83,11 @@ def copy(self): def transposition_key(self): # used to be a @property, but that might be slower...? return self.board.transposition_key() + @property + def done(self): + return self.winner is not None + + @property def fen(self): return self.board.fen() @@ -175,10 +168,10 @@ def gather_features(self, t_history): # t_history = T denotes the number of hal stack = [] stack.append(np.full((1, 8, 8), self.halfmove_clock)) # np.int64's will later be coerced into np.float64's. - stack.append(np.full((1, 8, 8), self.has_queenside_castling_rights(False), dtype=np.float64)) - stack.append(np.full((1, 8, 8), self.has_kingside_castling_rights(False), dtype=np.float64)) - stack.append(np.full((1, 8, 8), self.has_queenside_castling_rights(True), dtype=np.float64)) - stack.append(np.full((1, 8, 8), self.has_kingside_castling_rights(True), dtype=np.float64)) + stack.append(np.full((1, 8, 8), self.has_queenside_castling_rights(chess.BLACK), dtype=np.float64)) + stack.append(np.full((1, 8, 8), self.has_kingside_castling_rights(chess.BLACK), dtype=np.float64)) + stack.append(np.full((1, 8, 8), self.has_queenside_castling_rights(chess.WHITE), dtype=np.float64)) + stack.append(np.full((1, 8, 8), self.has_kingside_castling_rights(chess.WHITE), dtype=np.float64)) stack.append(np.full((1, 8, 8), self.fullmove_number)) stack.append(np.full((1, 8, 8), self.turn, dtype=np.float64)) self._recursive_append(stack, t_history - 1, self.turn) diff --git a/src/chess_zero/gui/game_model.py b/src/chess_zero/gui/game_model.py index 200e72f..55057da 100644 --- a/src/chess_zero/gui/game_model.py +++ b/src/chess_zero/gui/game_model.py @@ -29,7 +29,6 @@ def _load_model(self): model = ChessModel(self.config) if not load_newest_model_weight(self.config.resource, model): raise RuntimeError("newest model not found!") - model.graph = tf.get_default_graph() return model def move_by_ai(self, env): @@ -49,4 +48,4 @@ def move_by_human(self, env): else: print("That is NOT a valid move :(.") # how will parse_san ever return a null move...? except: - print("That is NOT a valid move :(.") \ No newline at end of file + print("That is NOT a valid move :(.") diff --git a/src/chess_zero/gui/gui.py b/src/chess_zero/gui/gui.py index ea79c2e..e8b2865 100644 --- a/src/chess_zero/gui/gui.py +++ b/src/chess_zero/gui/gui.py @@ -33,7 +33,7 @@ def start(config: Config): print(f"Board FEN = {env.fen}") game = chess.pgn.Game.from_board(env.board) - game.headers['White'] = "Human" if human_is_white else f"AI {chess_model.model.digest[:10]}" - game.headers['Black'] = f"AI {chess_model.model.digest[:10]}" if human_is_white else "Human" + game.headers['White'] = "Human" if human_is_white else f"AI {chess_model.model.digest[:10]}..." + game.headers['Black'] = f"AI {chess_model.model.digest[:10]}..." if human_is_white else "Human" logger.debug("\n"+str(game)) print(f"\nEnd of the game. Game result: {env.board.result()}") diff --git a/src/chess_zero/lib/data_helper.py b/src/chess_zero/lib/data_helper.py index cd6aa85..631e4b9 100644 --- a/src/chess_zero/lib/data_helper.py +++ b/src/chess_zero/lib/data_helper.py @@ -1,4 +1,4 @@ -import json +import ujson import os from glob import glob import fnmatch @@ -16,25 +16,30 @@ def find_pgn_files(directory, pattern='*.pgn'): files.append(os.path.join(root, filename)) return files + def get_game_data_filenames(rc: ResourceConfig): pattern = os.path.join(rc.play_data_dir, rc.play_data_filename_tmpl % "*") files = list(sorted(glob(pattern))) return files + def get_newest_model_dirs(rc: ResourceConfig): # should be only one of these! dir_pattern = os.path.join(rc.model_dir, rc.model_dirname_tmpl % "*") dirs = list(sorted(glob(dir_pattern))) return dirs + def get_old_model_dirs(rc: ResourceConfig): dir_pattern = os.path.join(rc.old_model_dir, rc.model_dirname_tmpl % "*") dirs = list(sorted(glob(dir_pattern))) return dirs + def write_game_data_to_file(path, data): with open(path, "wt") as f: - json.dump(data, f) + ujson.dump(data, f) + def read_game_data_from_file(path): with open(path, "rt") as f: - return json.load(f) + return ujson.load(f) diff --git a/src/chess_zero/lib/model_helper.py b/src/chess_zero/lib/model_helper.py index b866884..9089a5a 100644 --- a/src/chess_zero/lib/model_helper.py +++ b/src/chess_zero/lib/model_helper.py @@ -37,6 +37,7 @@ def save_as_newest_model(rc: ResourceConfig, model): weight_path = os.path.join(model_dir, rc.model_weight_filename) model.save(config_path, weight_path) + def clear_old_models(rc: ResourceConfig): dirs = get_newest_model_dirs(rc)[:-1] for dir_ in dirs: diff --git a/src/chess_zero/run.py b/src/chess_zero/run.py index 8f43160..9c063a0 100644 --- a/src/chess_zero/run.py +++ b/src/chess_zero/run.py @@ -1,6 +1,6 @@ - import os import sys +import multiprocessing as mp from dotenv import load_dotenv, find_dotenv if find_dotenv(): @@ -14,5 +14,7 @@ if __name__ == "__main__": + mp.set_start_method('spawn') + sys.setrecursionlimit(10000) from chess_zero import manager manager.start() diff --git a/src/chess_zero/worker/evaluate.py b/src/chess_zero/worker/evaluate.py index 945c81d..83b1864 100644 --- a/src/chess_zero/worker/evaluate.py +++ b/src/chess_zero/worker/evaluate.py @@ -1,41 +1,45 @@ import os +from datetime import datetime from logging import getLogger from random import random from time import sleep -import tensorflow as tf import chess +import chess.pgn from chess_zero.agent.model_chess import ChessModel from chess_zero.agent.player_chess import ChessPlayer from chess_zero.config import Config from chess_zero.env.chess_env import ChessEnv, Winner from chess_zero.lib import tf_util -from chess_zero.lib.data_helper import get_newest_model_dirs, get_old_model_dirs +from chess_zero.lib.data_helper import get_old_model_dirs from chess_zero.lib.model_helper import load_newest_model_weight +from multiprocessing import Manager +from concurrent.futures import ProcessPoolExecutor, as_completed + logger = getLogger(__name__) def start(config: Config): - tf_util.set_session_config(per_process_gpu_memory_fraction=0.2) - return EvaluateWorker(config, env=ChessEnv(config)).start() + # tf_util.set_session_config(config.play.vram_frac) + return EvaluateWorker(config).start() class EvaluateWorker: - def __init__(self, config: Config, env=None): + def __init__(self, config: Config): """ :param config: """ self.config = config - self.eval_config = self.config.eval - self.env = env - self.newest_model = None + self.play_config = self.config.eval.play_config # don't need other fields in self.eval...? + self.current_model = ChessModel(self.config) + self.m = Manager() + self.current_pipes = self.m.list([self.current_model.get_pipes(self.config.play.search_threads) for _ in range(self.config.play.max_processes)]) def start(self): - self.newest_model = self.load_newest_model() while True: - self.refresh_newest_model() + load_newest_model_weight(self.config.resource, self.current_model) age = 0 old_model, model_dir = self.load_old_model(age) # how many models ago should we load? logger.debug(f"starting to evaluate newest model against model {model_dir}") @@ -46,55 +50,27 @@ def start(self): logger.debug(f"the newest model lost to the {age}th archived model ({model_dir})") def evaluate_model(self, old_model): - results = [] - winning_rate = 0 - for game_idx in range(self.eval_config.game_num): - # ng_win := if ng_model win -> 1, lose -> 0, draw -> None - newest_win, newest_is_white = self.play_game(self.newest_model, old_model) - if newest_win is not None: - results.append(newest_win) - winning_rate = sum(results) / len(results) - logger.debug(f"game {game_idx}: newest won = {newest_win}, newest played white = {newest_is_white}, winning rate = {winning_rate*100:.1f}, {self.env.fen}%") - if results.count(0) >= self.eval_config.game_num * (1-self.eval_config.replace_rate): - logger.debug(f"loss count has reached {results.count(0)}, so give up challenge") - break - if results.count(1) >= self.eval_config.game_num * self.eval_config.replace_rate: - logger.debug(f"win count has reached {results.count(1)}, current model wins") - break - - winning_rate = sum(results) / len(results) if len(results) != 0 else 0 - logger.debug(f"winning rate {winning_rate*100:.1f}%") - return winning_rate >= self.eval_config.replace_rate - - def play_game(self, newest_model, old_model): - random_endgame = self.eval_config.play_config.random_endgame - if random_endgame == -1: - self.env.reset() - else: - self.env.randomize(random_endgame) - - newest_player = ChessPlayer(self.config, newest_model, play_config=self.eval_config.play_config) - old_player = ChessPlayer(self.config, old_model, play_config=self.eval_config.play_config) - newest_is_white = random() < 0.5 - - while not self.env.done: - ai = newest_player if newest_is_white == (self.env.board.turn == chess.WHITE) else old_player - action = ai.action(self.env) - self.env.step(action) - - newest_win = None - if self.env.winner != Winner.DRAW: - newest_win = newest_is_white == (self.env.winner == Winner.WHITE) - return newest_win, newest_is_white - - def load_newest_model(self): - model = ChessModel(self.config) - load_newest_model_weight(self.config.resource, model) - model.graph = tf.get_default_graph() - return model - - def refresh_newest_model(self): - load_newest_model_weight(self.config.resource, self.newest_model) + old_pipes = self.m.list([old_model.get_pipes(self.play_config.search_threads) for _ in range(self.play_config.max_processes)]) + with ProcessPoolExecutor(max_workers=self.play_config.max_processes) as executor: + futures = [executor.submit(evaluate_buffer, self.config, self.current_pipes, old_pipes) for _ in range(self.config.eval.game_num)] + results = [] + game_idx = 0 + for future in as_completed(futures): + game_idx += 1 + current_win, env, current_is_white = future.result() # why .get() as opposed to .result()? + results.append(current_win) + w = results.count(True) + d = results.count(None) + l = results.count(False) + logger.debug(f"game {game_idx}: current won = {current_win} as {'White' if current_is_white else 'Black'}, W/D/L = {w}/{d}/{l}, {env.fen()}") + + game = chess.pgn.Game.from_board(env.board) # PGN dump + game.headers['White'] = f"AI {self.current_model.digest[:10]}..." if current_is_white else f"AI {old_model.digest[:10]}..." + game.headers['Black'] = f"AI {old_model.digest[:10]}..." if current_is_white else f"AI {self.current_model.digest[:10]}..." + game.headers["Date"] = datetime.now().strftime("%Y.%m.%d") + logger.debug("\n" + str(game)) + + return w / (w + l) >= self.config.eval.replace_rate def load_old_model(self, age): rc = self.config.resource @@ -109,13 +85,33 @@ def load_old_model(self, age): weight_path = os.path.join(model_dir, rc.model_weight_filename) model = ChessModel(self.config) model.load(config_path, weight_path) - model.graph = tf.get_default_graph() return model, model_dir - def remove_model(self, model_dir): - rc = self.config.resource - config_path = os.path.join(model_dir, rc.next_generation_model_config_filename) - weight_path = os.path.join(model_dir, rc.next_generation_model_weight_filename) - os.remove(config_path) - os.remove(weight_path) - os.rmdir(model_dir) + +def evaluate_buffer(config, current, old) -> (float, ChessEnv, bool): + current_pipes = current.pop() + old_pipes = old.pop() + + random_endgame = config.eval.play_config.random_endgame + if random_endgame == -1: + env = ChessEnv(config).reset() + else: + env = ChessEnv(config).randomize(random_endgame) + + current_is_white = random() < 0.5 + + current_player = ChessPlayer(config, pipes=current_pipes, play_config=config.eval.play_config) + old_player = ChessPlayer(config, pipes=old_pipes, play_config=config.eval.play_config) + + while not env.done: + ai = current_player if current_is_white == (env.board.turn == chess.WHITE) else old_player + action = ai.action(env) + env.step(action) + + current_win = None + if env.winner != Winner.DRAW: + current_win = current_is_white == (env.winner == Winner.WHITE) + + current.append(current_pipes) + old.append(old_pipes) + return current_win, env, current_is_white diff --git a/src/chess_zero/worker/optimize.py b/src/chess_zero/worker/optimize.py index 781b6ec..70b8b28 100644 --- a/src/chess_zero/worker/optimize.py +++ b/src/chess_zero/worker/optimize.py @@ -4,22 +4,23 @@ import numpy as np import os -from keras.optimizers import Adam from keras.callbacks import TensorBoard +from keras.optimizers import Adam import chess -from chess_zero.agent.model_chess import ChessModel, loss_function_for_policy, loss_function_for_value from chess_zero.config import Config from chess_zero.lib import tf_util from chess_zero.lib.data_helper import get_game_data_filenames, read_game_data_from_file from chess_zero.lib.model_helper import load_newest_model_weight, save_as_newest_model, clear_old_models from chess_zero.env.chess_env import MyBoard +from chess_zero.agent.model_chess import ChessModel +from concurrent.futures import ProcessPoolExecutor, as_completed logger = getLogger(__name__) def start(config: Config): - tf_util.set_session_config(per_process_gpu_memory_fraction=0.5) + tf_util.set_session_config(config.trainer.vram_frac) return OptimizeWorker(config).start() @@ -39,13 +40,12 @@ def start(self): def training(self): self.compile_model() tc = self.config.trainer - last_load_data_step = last_save_step = total_steps = tc.start_total_steps - min_data_size_to_learn = tc.min_data_size_to_learn + last_load_data_step = last_save_step = total_steps = 0 self.load_play_data() while True: - if self.dataset_size < min_data_size_to_learn: - logger.info(f"dataset_size={self.dataset_size} is less than {min_data_size_to_learn}") + if self.dataset_size < tc.min_data_size_to_learn: + logger.info(f"dataset_size={self.dataset_size} is less than {tc.min_data_size_to_learn}") sleep(60) self.load_play_data() continue @@ -61,103 +61,86 @@ def training(self): def train_epoch(self, epochs): tc = self.config.trainer - state_ary, policy_ary, value_ary = self.dataset - # tensorboard = TensorBoard(log_dir=os.path.join(self.config.resource.log_dir, str(time())), histogram_freq=0, batch_size=32, write_graph=True, write_grads=True, write_images=True, embeddings_freq=0, embeddings_layer_names=None, embeddings_metadata=None) - self.model.model.fit(state_ary, [policy_ary, value_ary], batch_size=tc.batch_size, epochs=epochs) # ..., callbacks=[tensorboard]) + state_deque, policy_deque, value_deque = self.dataset + state_ary, policy_ary, value_ary = np.asarray(state_deque), np.asarray(policy_deque), np.asarray(value_deque) + tensorboard_cb = TensorBoard(log_dir=self.config.resource.log_dir, batch_size=tc.batch_size, histogram_freq=1) + self.model.model.fit(state_ary, [policy_ary, value_ary], batch_size=tc.batch_size, epochs=epochs, shuffle=True, validation_split=0.05, callbacks=[tensorboard_cb]) steps = (state_ary.shape[0] // tc.batch_size) * epochs return steps def compile_model(self): self.optimizer = Adam(lr=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-08, decay=0.0) - losses = [loss_function_for_policy, loss_function_for_value] + losses = ['categorical_crossentropy', 'mean_squared_error'] self.model.model.compile(optimizer=self.optimizer, loss=losses) def replace_current_model(self): save_as_newest_model(self.config.resource, self.model) clear_old_models(self.config.resource) - def collect_all_loaded_data(self): - state_ary_list, policy_ary_list, value_ary_list = [], [], [] - for s_ary, p_ary, v_ary in self.loaded_data.values(): - state_ary_list.append(s_ary) - policy_ary_list.append(p_ary) - value_ary_list.append(v_ary) - state_ary = np.concatenate(state_ary_list) - policy_ary = np.concatenate(policy_ary_list) - value_ary = np.concatenate(value_ary_list) - return state_ary, policy_ary, value_ary - - @property - def dataset_size(self): - if self.dataset is None: - return 0 - return len(self.dataset[0]) - def load_model(self): - from chess_zero.agent.model_chess import ChessModel model = ChessModel(self.config) if self.config.opts.new or not load_newest_model_weight(self.config.resource, model): model.build() # optimize will now _also_ build a new model from scratch if none exists. save_as_newest_model(self.config.resource, model) return model + @property + def dataset_size(self): + if self.dataset is None: + return 0 + return len(self.dataset[0]) + def load_play_data(self): - filenames = get_game_data_filenames(self.config.resource) - filenames = filenames[-self.config.trainer.max_num_files_in_memory:] - updated = False - for filename in (self.loaded_filenames - set(filenames)): # unload first...! memory consumption - self.unload_data_of_file(filename) - updated = True - - for filename in filenames: - if filename in self.loaded_filenames: - continue - self.load_data_from_file(filename) - updated = True - - if updated: - logger.debug("updating training dataset") - try: - self.dataset = self.collect_all_loaded_data() - except Exception as e: - logger.warning(str(e)) - - def load_data_from_file(self, filename): - try: # necessary to catch an exception here: if the play data file isn't completely written yet, then some error will be thrown about a "missing delimiter", etc. - logger.debug(f"loading data from {filename}") - data = read_game_data_from_file(filename) - self.loaded_data[filename] = self.convert_to_training_data(data) - self.loaded_filenames.add(filename) - except Exception as e: - logger.warning(str(e)) - - def unload_data_of_file(self, filename): - logger.debug(f"removing data {filename} from training set") - self.loaded_filenames.remove(filename) - if filename in self.loaded_data: + new_filenames = set(get_game_data_filenames(self.config.resource)[-self.config.trainer.max_num_files_in_memory:]) + + for filename in self.loaded_filenames - new_filenames: + logger.debug(f"removing data {filename} from training set") + self.loaded_filenames.remove(filename) del self.loaded_data[filename] - def convert_to_training_data(self, data): - """ + with ProcessPoolExecutor(max_workers=self.config.trainer.cleaning_processes) as executor: + futures = {executor.submit(load_data_from_file, filename, self.config.model.t_history):filename for filename in new_filenames - self.loaded_filenames} + for future in as_completed(futures): + filename = futures[future] + logger.debug(f"loading data from {filename}") + self.loaded_filenames.add(filename) + self.loaded_data[filename] = future.result() + + self.dataset = self.collect_all_loaded_data() + + def collect_all_loaded_data(self): + if not self.loaded_data: + return + state_ary_list, policy_ary_list, value_ary_list = [], [], [] + for s_ary, p_ary, v_ary in self.loaded_data.values(): + state_ary_list.extend(s_ary) + policy_ary_list.extend(p_ary) + value_ary_list.extend(v_ary) + state_ary = np.stack(state_ary_list) + policy_ary = np.stack(policy_ary_list) + value_ary = np.expand_dims(np.stack(value_ary_list), axis=1) + return state_ary, policy_ary, value_ary + + +def load_data_from_file(filename, t_history): + # necessary to catch an exception here...? if the play data file isn't completely written yet, then some error will be thrown about a "missing delimiter", etc. + data = read_game_data_from_file(filename) - :param data: format is SelfPlayWorker.buffer - :return: - """ - state_list = [] - policy_list = [] - value_list = [] + state_list = [] + policy_list = [] + value_list = [] - board = MyBoard(None) - board.fullmove_number = 1000 # an arbitrary large value. + board = MyBoard(None) + board.fullmove_number = 1000 # an arbitrary large value. - for state, policy, value in data: - board.push_fen(state) - state = board.gather_features(self.config.model.t_history) - if board.turn == chess.BLACK: - policy = Config.flip_policy(policy) + for state, policy, value in data: + board.push_fen(state) + state = board.gather_features(t_history) + if board.turn == chess.BLACK: + policy = Config.flip_policy(policy) - state_list.append(state) - policy_list.append(policy) - value_list.append(value) + state_list.append(state) + policy_list.append(policy) + value_list.append(value) - return np.array(state_list), np.array(policy_list), np.array(value_list) + return state_list, policy_list, value_list diff --git a/src/chess_zero/worker/self_play.py b/src/chess_zero/worker/self_play.py index b073435..9db28dc 100644 --- a/src/chess_zero/worker/self_play.py +++ b/src/chess_zero/worker/self_play.py @@ -1,27 +1,29 @@ import os from datetime import datetime from logging import getLogger -from time import time +from time import time, sleep import chess -import tensorflow as tf +from concurrent.futures import ProcessPoolExecutor, as_completed from chess_zero.agent.player_chess import ChessPlayer +from chess_zero.agent.model_chess import ChessModel from chess_zero.config import Config from chess_zero.env.chess_env import ChessEnv, Winner from chess_zero.lib import tf_util from chess_zero.lib.data_helper import get_game_data_filenames, write_game_data_to_file from chess_zero.lib.model_helper import load_newest_model_weight, save_as_newest_model +from multiprocessing import Manager +from collections import deque from threading import Thread logger = getLogger(__name__) def start(config: Config): - tf_util.set_session_config(per_process_gpu_memory_fraction=0.4) - return SelfPlayWorker(config, env=ChessEnv(config)).start() - + tf_util.set_session_config(config.play.vram_frac) + return SelfPlayWorker(config).start() class SelfPlayWorker: - def __init__(self, config: Config, env=None, model=None): + def __init__(self, config: Config): """ :param config: @@ -29,87 +31,89 @@ def __init__(self, config: Config, env=None, model=None): :param chess_zero.agent.model_chess.ChessModel|None model: """ self.config = config - self.model = model - self.env = env # type: ChessEnv - self.white = None # type: ChessPlayer - self.black = None # type: ChessPlayer - self.buffer = [] - self.idx = 1 + self.current_model = self.load_model() + self.m = Manager() + self.current_pipes = self.m.list([self.current_model.get_pipes(self.config.play.search_threads) for _ in range(self.config.play.max_processes)]) def start(self): - if self.model is None: - self.model = self.load_model() + self.buffer = [] + load_newest_model_weight(self.config.resource, self.current_model) + game_idx = 0 while True: - start_time = time() - env = self.start_game() - end_time = time() - logger.debug(f"game {self.idx} time={(end_time - start_time):.3f}s, turn={int(env.fullmove_number)}. {env.winner}, resigned: {env.resigned}, {env.fen}") - if (self.idx % self.config.play_data.nb_game_in_file) == 0: - load_newest_model_weight(self.config.resource, self.model) - self.idx += 1 - - def start_game(self): - random_endgame = self.config.play.random_endgame - if random_endgame == -1: - self.env.reset() - else: - self.env.randomize(random_endgame) - self.white = ChessPlayer(self.config, self.model) - self.black = ChessPlayer(self.config, self.model) - while not self.env.done: - ai = self.white if self.env.board.turn == chess.WHITE else self.black - move = ai.action(self.env) - self.env.step(move) - self.finish_game() - game = chess.pgn.Game.from_board(self.env.board) - game.headers['Event'] = f"Game {self.idx}" - game.headers['White'] = game.headers['Black'] = f"AI {self.model.digest[:10]}..." - logger.debug("\n"+str(game)) - self.save_play_data() - self.remove_play_data() - return self.env - - def save_play_data(self): - data = [move for pair in zip(self.white.moves, self.black.moves) for move in pair] # interleave the two lists - if len(self.white.moves) > len(self.black.moves): - data += [self.white.moves[-1]] # tack on final move if white moved last - self.buffer += data - - if self.idx % self.config.play_data.nb_game_in_file == 0: + with ProcessPoolExecutor(max_workers=self.config.play.max_processes) as executor: + futures = [executor.submit(self_play_buffer, self.config, self.current_pipes) for _ in range(self.config.play_data.nb_game_in_file)] + start_time = time() + for future in as_completed(futures): + game_idx += 1 + env, data = future.result() + logger.debug(f"game {game_idx} time={(time() - start_time):.3f}s, turn={int(env.fullmove_number)}. {env.winner}, resigned: {env.resigned}, {env.fen}") + start_time = time() + self.buffer += data self.flush_buffer() + load_newest_model_weight(self.config.resource, self.current_model) + self.remove_play_data() + + def load_model(self): + model = ChessModel(self.config) + if self.config.opts.new or not load_newest_model_weight(self.config.resource, model): + model.build() + save_as_newest_model(self.config.resource, model) + return model def flush_buffer(self): rc = self.config.resource game_id = datetime.now().strftime("%Y%m%d-%H%M%S.%f") path = os.path.join(rc.play_data_dir, rc.play_data_filename_tmpl % game_id) logger.info(f"save play data to {path}") - #print(self.buffer) - thread = Thread(target = write_game_data_to_file, args=(path, (self.buffer))) + thread = Thread(target=write_game_data_to_file, args=(path, self.buffer)) thread.start() self.buffer = [] def remove_play_data(self): - files = get_game_data_filenames(self.config.resource) - for i in range(len(files) - self.config.play_data.max_file_num): - os.remove(files[i]) + filenames = get_game_data_filenames(self.config.resource)[:-self.config.play_data.max_file_num] + for filename in filenames: + os.remove(filename) - def finish_game(self): - if self.env.winner == Winner.WHITE: - white_win = 1 - elif self.env.winner == Winner.BLACK: - white_win = -1 - else: - white_win = 0 - self.white.finish_game(white_win) - self.black.finish_game(-white_win) +def self_play_buffer(config, current) -> (ChessEnv, list): + pipes = current.pop() # borrow - def load_model(self): - from chess_zero.agent.model_chess import ChessModel - model = ChessModel(self.config) - if self.config.opts.new or not load_newest_model_weight(self.config.resource, model): - model.build() - save_as_newest_model(self.config.resource, model) - model.graph = tf.get_default_graph() - return model + random_endgame = config.eval.play_config.random_endgame + if random_endgame == -1: + env = ChessEnv(config).reset() + else: + env = ChessEnv(config).randomize(random_endgame) + + white = ChessPlayer(config, pipes=pipes) + black = ChessPlayer(config, pipes=pipes) + + while not env.done: + ai = white if env.board.turn == chess.WHITE else black + action = ai.action(env) + env.step(action) + + if env.winner == Winner.WHITE: + white_win = 1 + elif env.winner == Winner.BLACK: + white_win = -1 + else: + white_win = 0 + + white.finish_game(white_win) + black.finish_game(-white_win) + + # game = chess.pgn.Game.from_board(env.board) # optional PGN dump + # game.headers["Date"] = datetime.now().strftime("%Y.%m.%d") + # game.headers['White'] = game.headers['Black'] = f"AI {self.model.digest[:10]}..." + + current.append(pipes) + return env, merge_data(white, black) + + +def merge_data(white, black): + data = [move for pair in zip(white.moves, black.moves) for move in pair] # interleave the two lists + if len(white.moves) > len(black.moves): + data += [white.moves[-1]] # tack on final move if white moved last + + return data diff --git a/src/chess_zero/worker/sl.py b/src/chess_zero/worker/sl.py index 15c487e..bb17482 100644 --- a/src/chess_zero/worker/sl.py +++ b/src/chess_zero/worker/sl.py @@ -8,10 +8,8 @@ from chess_zero.config import Config from chess_zero.env.chess_env import ChessEnv, Winner from chess_zero.lib import tf_util -from chess_zero.lib.data_helper import get_game_data_filenames, write_game_data_to_file, find_pgn_files -# from threading import Thread - -import random +from chess_zero.lib.data_helper import write_game_data_to_file, find_pgn_files +from concurrent.futures import ProcessPoolExecutor, as_completed logger = getLogger(__name__) @@ -19,7 +17,6 @@ def start(config: Config): - # tf_util.set_session_config(per_process_gpu_memory_fraction=0.01) return SupervisedLearningWorker(config, env=ChessEnv(config)).start() @@ -31,87 +28,83 @@ def __init__(self, config: Config, env=None): :param chess_zero.agent.model_chess.ChessModel|None model: """ self.config = config - self.env = env # type: ChessEnv - self.black = None # type: ChessPlayer - self.white = None # type: ChessPlayer - self.buffer = [] - self.idx = 1 def start(self): - start_time = time() - - for env in self.read_all_files(): - end_time = time() - logger.debug(f"game {self.idx} time={(end_time - start_time):.3f}s, turn={int(env.fullmove_number)}. {env.winner}, resigned: {env.resigned}, {env.fen}") - start_time = end_time - self.idx += 1 self.buffer = [] + start_time = time() - def read_all_files(self): + with ProcessPoolExecutor(max_workers=7) as executor: + games = self.get_games_from_all_files() + game_idx = 0 + for future in as_completed([executor.submit(supervised_buffer, self.config, game) for game in games]): + game_idx += 1 + env, data = future.result() + self.buffer += data + if game_idx % self.config.play_data.sl_nb_game_in_file == 0: + self.flush_buffer() + end_time = time() + logger.debug(f"game {game_idx} time={(end_time - start_time):.3f}s, turn={int(env.fullmove_number)}. {env.winner}, resigned: {env.resigned}, {env.fen}") + start_time = end_time + + # self.flush_buffer() # uneven number of games in file. + + def get_games_from_all_files(self): files = find_pgn_files(self.config.resource.play_data_dir) - print (files) - from itertools import chain - return chain.from_iterable(self.read_file(filename) for filename in files) + games = [] + for filename in files: + games.extend(self.get_games_from_file(filename)) + return games - def read_file(self,filename): + def get_games_from_file(self,filename): pgn = open(filename, errors='ignore') - for offset, header in chess.pgn.scan_headers(pgn): + games = [] + for offset in chess.pgn.scan_offsets(pgn): pgn.seek(offset) - game = chess.pgn.read_game(pgn) - yield self.add_to_buffer(game) - - def add_to_buffer(self,game): - self.env.reset() - self.white = ChessPlayer(self.config) - self.black = ChessPlayer(self.config) - result = game.headers["Result"] - self.env.board = game.board() - for move in game.main_line(): - ai = self.white if self.env.board.turn == chess.WHITE else self.black - ai.sl_action(self.env, move) - self.env.step(move) - - self.env.done = True - if not self.env.board.is_game_over() and result != '1/2-1/2': - self.env.resigned = True - if result == '1-0': - self.env.winner = Winner.WHITE - elif result == '0-1': - self.env.winner = Winner.BLACK - else: - self.env.winner = Winner.DRAW - - self.finish_game() - self.save_play_data() - return self.env - - def save_play_data(self): - data = [move for pair in zip(self.white.moves, self.black.moves) for move in pair] # interleave the two lists - if len(self.white.moves) > len(self.black.moves): - data += [self.white.moves[-1]] # tack on final move if white moved last - self.buffer += data - - if self.idx % self.config.play_data.sl_nb_game_in_file == 0: - self.flush_buffer() + games.append(chess.pgn.read_game(pgn)) + return games def flush_buffer(self): rc = self.config.resource game_id = datetime.now().strftime("%Y%m%d-%H%M%S.%f") path = os.path.join(rc.play_data_dir, rc.play_data_filename_tmpl % game_id) - logger.info(f"save play data to {path}") - #print(self.buffer) - write_game_data_to_file(path, self.buffer) # was having problems with multi-threading: file-write not failing to complete quickly and failing to be loaded by the trainer. - # thread = Thread(target = write_game_data_to_file, args=(path, (self.buffer))) + logger.info(f"saving play data to {path}") + # thread = Thread(target = write_game_data_to_file, args=(path, self.buffer)) # thread.start() + write_game_data_to_file(path, self.buffer) # was having problems with multi-threading: file-write not failing to complete quickly and failing to be loaded by the trainer. self.buffer = [] - def finish_game(self): - if self.env.winner == Winner.WHITE: - white_win = 1 - elif self.env.winner == Winner.BLACK: - white_win = -1 - else: - white_win = 0 - self.white.finish_game(white_win) - self.black.finish_game(-white_win) +def supervised_buffer(config, game) -> (ChessEnv, list): + env = ChessEnv(config).reset() + white = ChessPlayer(config, dummy=True) + black = ChessPlayer(config, dummy=True) + result = game.headers["Result"] + env.board = game.board() + for move in game.main_line(): + ai = white if env.board.turn == chess.WHITE else black + ai.sl_action(env, move) + env.step(move) + + if not env.board.is_game_over() and result != '1/2-1/2': + env.resigned = True + if result == '1-0': + env.winner = Winner.WHITE + white_win = 1 + elif result == '0-1': + env.winner = Winner.BLACK + white_win = -1 + else: + env.winner = Winner.DRAW + white_win = 0 + + white.finish_game(white_win) + black.finish_game(-white_win) + return env, merge_data(white, black) + + +def merge_data(white, black): + data = [move for pair in zip(white.moves, black.moves) for move in pair] # interleave the two lists + if len(white.moves) > len(black.moves): + data += [white.moves[-1]] # tack on final move if white moved last + + return data