In [25]:
# Импорт библиотек
from numba import njit, prange
from typing import Literal
from typing_extensions import Self
from numpy import empty, argsort, append, array
from numpy.typing import NDArray
import numpy as np
from random import choice, random
import os
import time

In [26]:
# Analyse
def encode_endings(endings):
	'''
	Функция кодировки данных из дата-сета числами
	'b' = 0
	'x' = 1
	'o' = 2
	'neutral' = 3
	'positive' = 4
	'negative' = 5
	'''
	encoded_type = '10u1'
	endings_count = len(endings)
	result = empty(endings_count, dtype=encoded_type)
	for i in range(endings_count):
		ending = endings[i]
		temp = []
		for figure in ending[:9]:
			if figure == 'b':
				temp.append(0)
			elif figure == 'x':
				temp.append(1)
			elif figure == 'o':
				temp.append(2)

		if ending[9] == 'neutral':
			temp.append(3)
		elif ending[9] == 'positive':
			temp.append(4)
		elif ending[9] == 'negative':
			temp.append(5)
		result[i] = temp
	return result

def read_endings():
	'''
	Функция чтения дата-сета
	'''
	source = np.genfromtxt('tic-tac-toe.data', delimiter=',', dtype='U30')
	for ending in source:
		if ending[9] == 'negative' and 'b' not in ending:
			ending[9] = 'neutral'
	return encode_endings(source)

@njit()
def get_situation(x_moves, o_moves):
	'''
	Функция преобразующая историю ходов в текущую позицию на карте
	'''
	return np.array([1 if (i in x_moves) else 2 if (i in o_moves) else 0 for i in range(9)])

@njit()
def get_ending_distance(situation, ending, player_is_x):
	'''
	Функция вычисляющая число ходов необходимых игроку для достижения концовки из текущей позиции.
	Если концовка не достижима возвращает -1
	'''
	ending_distance = 0
	for i in range(9):
		if situation[i] != ending[i]:
			if situation[i] == 0:
				if player_is_x and ending[i] == 1:
					ending_distance += 1
				if not player_is_x and ending[i] == 2:
					ending_distance += 1
			else:
				return -1
	return ending_distance

@njit()
def get_situation_endings(endings, situation, player_is_x):
	'''
	Отсеивает недостижимые концовки. 
	Адаптирует результат под то "кем" играет игрок.
	Сортирует концовки по расстоянию до них.
	'''
	endings_result_count = 0
	endings_count = len(endings)
	result_endings = np.empty((endings_count, 11), dtype='u1')
	for i in range(endings_count):
		ending = endings[i]
		result = ending[9]
		ending_distance = get_ending_distance(situation, ending, player_is_x)
		if ending_distance <= 0:
			continue
		# Инвертируем результат если играет "нолик"
		if player_is_x == False:
			if result == 5:
				result = 3
			elif result == 3:
				result = 5
		result_endings[endings_result_count, :] = [ending_distance, result, *ending[:9]]
		endings_result_count += 1
		
	return result_endings[argsort(result_endings[:endings_result_count,0], kind='mergesort')]

@njit()
def get_move_endings(result_endings, move, situation, player_is_x):
	'''
	Отсеивает недостижимые концовки. 
	Из информации по каждой концовке оставляет только расстояние и результат. 
	'''
	move_situation = [situation[i] if i != move else 1 if player_is_x else 2 for i in range(9)]
	endings_result_count = len(result_endings)
	move_endings_count = 0
	move_endings = empty((endings_result_count, 2), dtype='u1')
	for e_i in range(endings_result_count):
		ending = result_endings[e_i]
		ending_situation = ending[2:11]
		dist = get_ending_distance(move_situation, ending_situation, player_is_x)
		if dist == -1:
			continue
		result = ending[1]
		move_endings[move_endings_count, :] = [dist, result]
		move_endings_count += 1
	return move_endings[:move_endings_count]

@njit()
def analyse(endings, player_is_x, x_moves, o_moves, allowed_moves):
	'''
	Производит анализ доступных ходов вычисляя для них следующие метрики:

	0. move
	1. nearest_draw
	2. nearest_win
	3. nearest_loose
	4. max_draw_rate
	5. max_win_rate
	6. max_loose_rate
	7. avg_draw_rate
	8. avg_win_rate
	9. avg_loose_rate
	10. min_draw_rate
	11. min_win_rate
	12. min_loose_rate
	13. total_draw_rate
	14. total_win_rate
	15. total_loose_rate
	'''
	situation = get_situation(x_moves, o_moves)
	result_endings = get_situation_endings(endings, situation, player_is_x)
	moves_count = len(allowed_moves)
	analyzed_moves = empty((moves_count, 13), dtype='f4')
	# Перебираем доступные ходы и доступные для них концовки
	for m_i in range(moves_count):
		move = allowed_moves[m_i]
		# Отфильтровываем недостижимые концовки
		# [dist, result]
		move_endings = get_move_endings(result_endings, move, situation, player_is_x)
		move_endings_count = len(move_endings)
		# Возможна ситуация когда концовка уже достигнута, тогда возвращаем пустой массив
		if move_endings_count == 0:
			return empty((0, 16), dtype='f4')
		# Вычисляем число групп концовок по расстоянию
		min_dist = min(move_endings[:, 0])
		dist_count = max(move_endings[:, 0]) - min_dist + 1
		dist_metrics = np.zeros((dist_count, 7), dtype='f4')
		# Инициализируем счётчики расстояния максимально возможным значением
		nearest_draw = 4
		nearest_win = 4
		nearest_loose = 4
		# Группируем концовки по расстоянию до них, вычисляя статистику
		for e_i in range(move_endings_count):
			dist, result = move_endings[e_i]
			d_i = dist - min_dist
			# Ничья - 3
			if result == 3 and nearest_draw > dist:
				nearest_draw = dist
			# Победа - 4
			elif result == 4 and nearest_win > dist:
				nearest_win = dist
			# Поражение - 5
			elif result == 5 and nearest_loose > dist:
				nearest_loose = dist
			
			# Вычисляем статистику
			# 3 - 3 = 0; draw_count
			# 4 - 3 = 1; win_count
			# 5 - 3 = 2; loose_count
			#		  3; total_count
			#		  4; draw_rate
			#		  5; win_rate
			#		  6; lose_rate
			metric_i = result - 3
			dist_metrics[d_i, metric_i] += 1
			dist_metrics[d_i, 3] += 1

		# Считаем проценты
		#		  3; total_count
		#		  4; draw_rate
		#		  5; win_rate
		#		  6; lose_rate
		for d_i in range(dist_count):
			for metric_i in range(3):
				dist_metrics[d_i, 4 + metric_i] = dist_metrics[d_i, metric_i] / dist_metrics[d_i, 3] * 100
		
		# Считаем метрики
		# move			  0
		#
		# nearest_draw	  1
		# nearest_win	  2
		# nearest_loose	  3
		#
		# 3 * 0 + 0 + 4 = 4; max_draw_rate
		# 3 * 0 + 1 + 4 = 5; max_win_rate
		# 3 * 0 + 2 + 4 = 6; max_loose_rate
		#
		# 3 * 1 + 0 + 4 = 7; avg_draw_rate
		# 3 * 1 + 1 + 4 = 8; avg_win_rate
		# 3 * 1 + 2 + 4 = 9; avg_loose_rate
		#
		# 3 * 2 + 0 + 4 = 10; min_draw_rate
		# 3 * 2 + 1 + 4 = 11; min_win_rate
		# 3 * 2 + 2 + 4 = 12; min_loose_rate
		#
		# 3 * 3 + 0 + 4 = 13; total_draw_rate
		# 3 * 3 + 1 + 4 = 14; total_win_rate
		# 3 * 3 + 2 + 4 = 15; total_loose_rate
		analyzed_moves[m_i, 0:4] = [move, nearest_draw, nearest_loose, nearest_win]
		for i in range(3):
			percents = dist_metrics[:, i + 4]
			absolute = dist_metrics[:, i]
			total = dist_metrics[:, 3]
			analyzed_moves[m_i, 3 * 0 + i + 4] = max(percents)
			analyzed_moves[m_i, 3 * 1 + i + 4] = sum(percents) / len(percents)
			analyzed_moves[m_i, 3 * 2 + i + 4] = min(percents)
			analyzed_moves[m_i, 3 * 3 + i + 4] = sum(absolute) / sum(total) * 100
	return analyzed_moves

endings = read_endings()

In [27]:
# Types
PlayerId = str
PlayerMove = Literal[0, 1, 2, 3, 4, 5, 6, 7, 8]
MovesList = NDArray[np.int8]

In [28]:
class Player:
	_name: PlayerId
	_draw_counter: int
	_win_counter: int
	_loose_counter: int
	_moves: MovesList
	_original: Self
	_opponent_name: str
	_copies: list[tuple[Self, Self]]
	_is_x_player: bool
	_is_original: bool

	def __init__(self, name: PlayerId):
		self._name = name
		self._draw_counter = None
		self._win_counter = None
		self._loose_counter = None
		self._total_counter = None
		self._moves = None
		self._original = None
		self._opponent_name = None
		self._copies = None
		self._is_x_player = None
		self._is_original = True

	def __getitem__(self, index) -> None|Self:
		if self._copies is None:
			return None
		return self._copies[index]
	
	def __setitem__(self, index, value):
		if self._copies is None:
			return None
		self._copies[index] = value
	
	def __make_copy(self, opponent: Self, is_x_player: bool):
		copy = type(self)(self._name)
		copy._opponent_name = opponent._name
		copy._is_x_player = is_x_player
		copy._copy_handler(self)
		return copy
	
	def _copy_handler(self, original: Self):
		self._original = original
		self._draw_counter = 0
		self._win_counter = 0
		self._loose_counter = 0
		self._total_counter = 0
		self._is_original = False
	
	def prepare_for_games(self, opponents: list[Self]):
		opponents_count = len(opponents)
		self._copies = list([(None, None)] * opponents_count)
		for i in prange(opponents_count):
			opponent = opponents[i]
			self[i] = (
				self.__make_copy(opponent, True), 
				self.__make_copy(opponent, False),
			)

	def clear_score(self, id_range):
		os.makedirs(f'scores_{id_range}', exist_ok=True)
		with open(f'scores_{id_range}/{self._name}.log', 'w') as file:
			# title = ','.join(map(
			# 	lambda copies: f'X vs {copies[0]._opponent_name},O vs {copies[1]._opponent_name}', 
			# 	self._copies
			# ))
			# file.write(f'{title},')
			file.write('total\n')
			
	def save_score(self, id_range):
		score_lines = []
		draw_counter = 0
		win_counter = 0
		loose_counter = 0
		total_counter = 0
		for copies in self._copies:
			draw_counter += copies[0]._draw_counter + copies[1]._draw_counter
			win_counter += copies[0]._win_counter + copies[1]._win_counter
			loose_counter += copies[0]._loose_counter + copies[1]._loose_counter
			total_counter += copies[0]._total_counter + copies[1]._total_counter
			# score_lines.append(f'{copies[0]._draw_counter}|{copies[0]._win_counter}|{copies[0]._loose_counter}|{copies[0]._total_counter}')
			# score_lines.append(f'{copies[1]._draw_counter}|{copies[1]._win_counter}|{copies[1]._loose_counter}|{copies[1]._total_counter}')

		with open(f'scores_{id_range}/{self._name}.log', 'a') as file:
			# file.write(','.join(score_lines))
			file.write(f'{draw_counter}|{win_counter}|{loose_counter}|{total_counter}')
			file.write('\n')

	def new_game_handler(self):
		self._moves = empty(0, dtype='u1')

	def make_move(self, opponent_moves: MovesList, allowed_moves: MovesList) -> PlayerMove:
		assert type(self) != Player, 'Player не может ходить!'
		assert self._is_original == False, 'Оригинал не может ходить!'

	def move_handler(self, move: PlayerMove):
		self._moves = append(self._moves, array([move], dtype='u1'))

	def draw_handler(self):
		assert type(self) != Player, 'Player не может выйти в ничью!'
		assert self._is_original == False, 'Оригинал не может выйти в ничью!'
		self._draw_counter += 1
		self._total_counter += 1

	def win_handler(self):
		assert type(self) != Player, 'Player не может победить!'
		assert self._is_original == False, 'Оригинал не может победить!'
		self._win_counter += 1
		self._total_counter += 1

	def loose_handler(self):
		assert type(self) != Player, 'Player не может проиграть!'
		assert self._is_original == False, 'Оригинал не может проиграть!'
		self._loose_counter += 1
		self._total_counter += 1

	def __str__(self):
		result = f'{self.__class__.__name__} {self._name}'
		if self._is_original:
			result += ' Origin'
		else:
			result += f'[{"X" if self._is_x_player else "O"}]'
			result += f' vs {self._opponent_name}'
			result += f'[{"O" if self._is_x_player else "X"}]'
		return f'{{{result}}}' 


In [29]:
@njit
def propagate(weights: NDArray, player_is_x: bool, 
	x_situation: NDArray, o_situation: NDArray):
	weights_count = 2*9 + 1 # 2 поля 9 клеток (по 1 полю для каждого игрока) + 1 под знак (Х или О)
	input_signals = np.concatenate((
		x_situation, 
		o_situation, 
		np.array([.99 if player_is_x else .01])
	))
	result = 0
	for i in prange(weights_count):
		result += input_signals[i]*weights[i]
	return result, input_signals

In [30]:
@njit
def use_perceptron(weights, player_is_x: bool, 
	x_moves: MovesList, o_moves: MovesList, 
	allowed_moves: list[PlayerMove]):
	count = len(allowed_moves)
	weights_count = 2*9 + 1 # 2 поля 9 клеток (по 1 полю для каждого игрока) + 1 под знак (Х или О)
	result_signals = np.zeros((count, 4+weights_count), dtype='f4')
	if count == 0:
		return result_signals
	
	x_situation = np.empty(9)
	o_situation = np.empty(9)
	for i in prange(9):
		x_situation[i] = .01
		o_situation[i] = .01
	for i in prange(len(x_moves)):
		x_situation[x_moves[i]] = .99
	for i in prange(len(o_moves)):
		o_situation[o_moves[i]] = .99

	for m_i in prange(count):
		_x_situation = x_situation.copy()
		_o_situation = o_situation.copy()
		if player_is_x:
			_x_situation[m_i] = .99
		else:
			_o_situation[m_i] = .99
		move_signal, input_signals = propagate(weights, player_is_x, _x_situation, _o_situation)
		result_signals[m_i, :] = [
			allowed_moves[m_i], 
			move_signal, 
			move_signal >= .8, 
			move_signal >= .6, 
			*input_signals
		]
	return result_signals

In [31]:
class PerceptronPlayer(Player):
	_weights: list[float]
	_history = list[NDArray]
	_training: bool
	_x_deltas: list
	_o_deltas: list
	def __init__(self, name: str, 
		weights: list[float]=None, training: bool=False):
		super().__init__(name)
		self._weights = weights
		self._training = training

	def _copy_handler(self, original: Self):
		super()._copy_handler(original)
		self._weights = [w for w in original._weights]
		self._history = []
		self._training = original._training

	def make_move(self, opponent_moves: MovesList, allowed_moves: MovesList) -> PlayerMove:
		super().make_move(opponent_moves, allowed_moves)
		assert len(allowed_moves) != 0, 'Игра уже окончена!'
		x_moves = None
		o_moves = None
		if self._is_x_player:
			x_moves = self._moves
			o_moves = opponent_moves
		else:
			o_moves = self._moves
			x_moves = opponent_moves
		analyse_result = use_perceptron(
			self._weights, self._is_x_player, 
			x_moves, o_moves, allowed_moves
		)
		if self._training:
			self._history.append(analyse_result)
		result = [m[0] for m in analyse_result if m[2]]
		if len(result) > 0:
			int(choice(result))
		result = [m[0] for m in analyse_result if m[3]]
		if len(result) > 0:
			return int(choice(result))
		return int(choice(analyse_result)[0])
	
	def train(self, endings):
		for ending in endings:
			'''
			'b' = 0
			'x' = 1
			'o' = 2
			'neutral' = 3
			'positive' = 4
			'negative' = 5
			'''
			x_situation = np.empty(9)
			o_situation = np.empty(9)
			for i in prange(9):
				x_situation[i] = .01
				o_situation[i] = .01
			for i in prange(9):
				if ending[i] == 1:
					x_situation[i] = .99
				elif ending[i] == 2:
					o_situation[i] = .99
			x_signal, x_input = propagate(self._weights, True, x_situation.copy(), o_situation.copy())
			o_signal, o_input = propagate(self._weights, False, x_situation.copy(), o_situation.copy())
			weights_count = 2*9 + 1 # 2 поля 9 клеток (по 1 полю для каждого игрока) + 1 под знак (Х или О)
			x_delta = 0
			o_delta = 0
			if ending[9] == 3:		# Ничья
				x_delta = .5 - x_signal
				o_delta = .5 - o_signal
			elif ending[9] == 4:	# Х - победил, О - проиграл
				x_delta = .99 - x_signal
				o_delta = .01 - o_signal
			elif ending[9] == 5:	# Х - проиграл, О - победил
				x_delta = .01 - x_signal
				o_delta = .99 - o_signal

			self._x_deltas.append(x_delta)
			self._o_deltas.append(o_delta)
			max_step = .9
			x_step = min([x_delta, max_step])
			o_step = min([o_delta, max_step])
			x_step = max([x_step, -max_step])
			o_step = max([o_step, -max_step])
			x_step = x_step / 10000
			o_step = o_step / 10000
			for i in prange(weights_count):
				if x_input[i] == .99:
					self._weights[i] += x_step * random()
				if o_input[i] == .99:
					self._weights[i] += o_step * random()
			for i in prange(weights_count):
				if random() < .03:
					self._weights[i] += (random() - .5) / 10000



In [32]:
weights = np.random.random(2*9+1)
a = PerceptronPlayer('test', weights, True)
a.prepare_for_games([a])
a, b = a[0]
# o,o,x,x,x,o,o,x,x,neutral
# x,x,x,x,o,o,x,o,o,positive
# x,x,o,x,x,b,o,o,o,negative
neutral_game_board_1 = np.array([
	'o', 'o', 'b',
	'x', 'x', 'o',
	'o', 'x', 'x',
])
neutral_game_board_2 = np.array([
	'o', 'o', 'x',
	'b', 'x', 'o',
	'o', 'x', 'x',
])
positive_game_board_1 = np.array([
	'x', 'x', 'b',
	'o', 'o', 'b',
	'b', 'b', 'b',
])
positive_game_board_2 = np.array([
	'b', 'b', 'b',
	'x', 'o', 'b',
	'x', 'o', 'b',
])

boards = [neutral_game_board_1, neutral_game_board_2, positive_game_board_1, positive_game_board_2]
def test(_a: PerceptronPlayer, _boards):
	deltas = _a._o_deltas + _a._x_deltas
	count = len(deltas)
	count = count if count != 0 else 1
	mean = sum(deltas) / count
	dispersion = sum((d - mean) ** 2 for d in deltas) / count
	print(dispersion ** .5)
	for board in _boards:
		moves = []
		other_moves = []
		allowed_moves = []
		for i in prange(9):
			if board[i] == 'b':
				allowed_moves.append(i)
			elif board[i] == 'x':
				moves.append(i)
			elif board[i] == 'o':
				other_moves.append(i)
		_a._moves = np.array(moves)
		move = _a.make_move(np.array(other_moves), np.array(allowed_moves))
		print(_a._history[0][:, 1])
		_a._history = []
	_a._x_deltas = []
	_a._o_deltas = []

a.new_game_handler()
a._x_deltas = []
a._o_deltas = []
test(a, boards)

for j in prange(100):
	for i in prange(100):
		a.train(endings)
	print(j + 1, '--')
	test(a, boards)

0.0 [0.504715198499543, 0.8832760438482752, 0.8265427056785594, 0.764689067302073, 0.17939731859315122, 0.9080447885603312, 0.45516174556869116, 0.20220568263322602, 0.39337085773704605, 0.04577388549080941, 0.10838095752615595, 0.7774512943188406, 0.6596900046593897, 0.34081371417437445, 0.29376783106480553, 0.7983775855223523, 0.5608568774200905, 0.7163483587296545, 0.44083334259040297]
2 [3.7554784]
3 [3.816095]
8 [2.8713386 2.8713386 3.6813505 3.620734  3.047148 ]
1 [3.1043255 3.475315  3.4197164 2.6097047 2.7855139]
1 --
0.9000328047328215 [-0.05638032536153505, 0.08024633289837545, 0.17790986336775397, 0.13908015401090246, -0.09609985381194099, 0.11143543590303541, 0.12203073316078383, 0.0026298957426664376, 0.07861974847357167, -0.0925404062800302, 0.024077739687910108, 0.19285134373500534, 0.14025135610287853, -0.12028772230892255, 0.06518022290493829, 0.09323523028224164, -0.000560732346863356, 0.07216812346086517, 0.26575925407495044]
2 [0.4270865]
3 [0.4651396]
5 [0.31539318

KeyboardInterrupt: 