In [28]:
# @title **[1/5] Run project imports** { run: "auto", display-mode: "form" }
import copy
import json
import math
import matplotlib.cm as cm
import matplotlib.pyplot as plt
import numpy as np
import numpy.typing as nptp
import pathlib
import prettytable
import threading
import typing as tp
import warnings

from dataclasses import dataclass, field
from scipy.optimize import linprog
from sys import setrecursionlimit

setrecursionlimit(10 ** 9)
threading.stack_size(67108864)
warnings.filterwarnings('ignore')
%config InlineBackend.figure_format = 'retina'

# **Game Theory**

In [29]:
# @title  **[2/5] Initialize Simplex Method** { display-mode: "form" }


@dataclass()
class LinearConstraint:
    coefs: list[float] = field(default_factory=list)
    ctype: str = field(default_factory=lambda: "")
    b: float = 0.0
    is_term_added: bool = False

    def __post_init__(self) -> None:
        self.add_terms()

    def add_terms(self) -> None:
        '''
        Adds terms to the equation based on the value of self.ctype.
        '''
        match self.ctype:
            case 'eq':
                pass
            case 'gte':
                self.coefs = [-coef for coef in self.coefs]
                self.b *= -1
                self.is_term_added = True
            case 'lte':
                self.is_term_added = True
            case _:
                raise ValueError(f'Error: unable to parse mode {self.ctype}')


@dataclass()
class Simplex:
    matrix: nptp.NDArray
    basis: nptp.NDArray
    columns: nptp.NDArray

    def __init__(
        self,
        file_path: str | None = None,
        matrix: nptp.NDArray | None = None
    ):
        self.matrix = matrix if matrix is not None else self.parse(self.validate_path(file_path))
        shape = self.matrix.shape
        self.basis, self.columns = np.arange(shape[1] - 1, shape[1] + shape[0] - 2), np.arange(shape[1] - 1)

    @staticmethod
    def parse(file_path: str) -> nptp.NDArray:
        '''
        Parses a file containing linear programming data and returns the simplex matrix.
        :param file_path: The path to the file to be parsed.
        :return: The simplex matrix.
        '''
        f_coefs, constraints, goal = [], [], ""
        with open(file_path, "r") as file_:
            data = json.loads(file_.read())
            goal = Simplex.validate_goal(data['goal'])
            f_coefs = [-coef if goal == 'max' else coef for coef in data['f']]
            for constraint in data['constraints']:
                constraints.append(
                    LinearConstraint(
                        coefs=constraint['coefs'],
                        ctype=constraint['type'],
                        b=constraint['b']
                        )
                    )
        return Simplex.get_simplex_matrix(constraints, f_coefs)

    @staticmethod
    def get_simplex_matrix(
        constraints: list[LinearConstraint],
        f_coefs: list[float | int]
    ) -> nptp.NDArray:
        '''
        Generates the simplex matrix based on the given constraints and coefficients.
        :param constraints: A list of LinearConstraint objects representing the constraints.
        :param f_coefs: A list of coefficients for the objective function.
        :return: The simplex matrix.
        '''
        raw_matrix = []
        for constraint in constraints:
            if constraint.is_term_added:
                raw_matrix.append(constraint.coefs + [constraint.b])
            else:
                raw_matrix.append(constraint.coefs + [constraint.b])
                raw_matrix.append([-coef for coef in constraint.coefs] + [-constraint.b])
        raw_matrix.append(f_coefs + [0.0])
        return np.array(raw_matrix, dtype=np.float64)

    @staticmethod
    def validate_path(path: str | None) -> str:
        if (path is None) or (not pathlib.Path(path).is_file()):
            raise ValueError(f'Error: no file with path {path}')
        return path

    @staticmethod
    def validate_goal(goal: str) -> str:
        if goal not in ('max', 'min'):
            raise ValueError(f'Error: unable to parse mode {goal}')
        return goal

    def solve(self, *, verbose: bool = False, num_iter: int = 500) -> nptp.NDArray | str | None:
        '''
        Solves the linear programming problem using the simplex method.
        :param verbose: If True, display additional information during the solving process.
        :param num_iter: The maximum number of iterations to perform.
        :return: The optimal plan as an NDArray, a string indicating an unbounded linear function,
                or None if convergence was not achieved.
        '''
        self.get_initial_plan(num_iter=num_iter)
        return self.get_optimal_plan(verbose=verbose, num_iter=num_iter)

    def get_initial_plan(self, num_iter) -> None:
        '''
        Obtains the initial plan for the linear programming problem.
        :param num_iter: The maximum number of iterations to perform.
        :throws ValueError: If the initial plan cannot be obtained.
        :warns: If the initial plan cannot be obtained
            within the specified number of iterations.
        '''
        log_num_iter = num_iter
        while any(self.matrix[:-1, -1] < 0) and num_iter > 0:
            num_iter -= 1
            i = int(np.argmax(self.matrix[:-1, -1] < 0))
            if all(self.matrix[i, :-1] >= 0):
                raise ValueError(f'Error: could not get initial plan for {self.matrix}')
            l = int(np.argmax(self.matrix[i, :-1] < 0))
            old_matrix = self.matrix
            self.transform_jordan(
                *self.get_resolving(
                    initial_row=i,
                    initial_column=l,
                    initial=True,
                    )
                )
        if num_iter == 0:
            warnings.warn(f'Could not get initial plan in {log_num_iter} iterations')

    def get_optimal_plan(self, verbose: bool, num_iter: int) -> nptp.NDArray | str | None:
        '''
        Obtains the optimal plan for the linear programming problem.
        :param verbose: If True, display warnings for unbounded linear function.
        :param num_iter: The maximum number of iterations to perform.
        :return: The optimal plan as an NDArray.
        :warns: If the linear function is unbounded or if convergence
            is not achieved within the specified number of iterations.
        '''
        log_num_iter = num_iter
        while any(self.matrix[-1, :-1] < 0) and num_iter > 0:
            num_iter -= 1
            l = int(np.argmax(self.matrix[-1, :-1] < 0))
            if all(self.matrix[:-1, l] <= 0):
                if verbose:
                    warnings.warn('Linear function is unbound')
                return self.get_current_solution(verbose=verbose)
            i = int(np.argmax(self.matrix[:-1, l] > 0))
            self.transform_jordan(
                *self.get_resolving(
                    initial_row=i,
                    initial_column=l
                    )
                )
        if num_iter == 0:
             warnings.warn(f'Could not converge to optimal plan in {log_num_iter} iterations')

    def get_resolving(self, initial_row: int, initial_column: int, initial=False) -> tuple[int, int]:
        '''
        Determines the resolving element for the pivot operation.
        :param initial_row: The initial row index.
        :param initial_column: The initial column index.
        :return: A tuple containing the row and column indices of the resolving element.
        '''
        row, column = initial_row, initial_column
        min_ratio = float('inf')
        for r in range(len(self.matrix) - 1):
            if self.matrix[r][column] != 0:
                ratio = self.matrix[r][-1] / self.matrix[r][column]
                if 0 < ratio < min_ratio:
                    min_ratio = ratio
                    row = r
        return row, column

    def transform_jordan(self, row: int, column: int) -> None:
        new_matrix = copy.copy(self.matrix)
        kernel = self.matrix[row][column]
        # print(kernel)
        self.columns[column], self.basis[row] = self.basis[row], self.columns[column]
        shape_ = self.matrix.shape
        for i in range(shape_[0]):
            for j in range(shape_[1]):
                if i == row and j == column:
                    new_matrix[i][j] = 1 / kernel
                elif j == column:
                    new_matrix[i][j] = -self.matrix[i][j] / kernel
                elif i == row:
                    new_matrix[i][j] = self.matrix[i][j] / kernel
                else:
                    new_matrix[i][j] = self.matrix[i][j] - self.matrix[row][j] * self.matrix[i][column] / kernel
        # sorted_indices = np.argsort(self.columns)
        # self.columns = np.sort(self.columns)
        # self.matrix = np.concatenate((new_matrix[:, sorted_indices], new_matrix[-1][:, np.newaxis]), axis=1)
        self.matrix = new_matrix
        # print(self.matrix)

    def get_current_solution(self, *, verbose: bool = False) -> nptp.NDArray | str:
        n = len(self.basis) + len(self.columns)
        result = np.array([0.0 for _ in range(n)])
        for base, value in zip(self.basis, self.matrix[:-1, -1]):
            result[base] = value
        if verbose:
            return f'Current solution: {result}\nLinear form value: {abs(self.matrix[-1, -1])}\n'
        return result

    def __str__(self):
        table = prettytable.PrettyTable(
            field_names=['x'] + [f'x{i}' for i in self.columns] + ['b'],
        )
        # table.set_style(prettytable.MARKDOWN)
        table.add_rows(
            [[f'x{j}'] + self.matrix[i].tolist() for i, j in enumerate(self.basis)],
        )
        table.add_row(['Q'] + self.matrix[-1].tolist())
        return table.get_string()

In [36]:
# @title **[3/5] Initialize Game Matrix**


@dataclass()
class GameMatrix:
    matrix: nptp.NDArray
    _maxmin_strat: nptp.NDArray | None = field(init=False, compare=False, default=None)
    _minmax_strat: nptp.NDArray | None = field(init=False, compare=False, default=None)
    _maxmin_val: float = field(init=False, compare=False, default=float('inf'))
    _minmax_val: float = field(init=False, compare=False, default=float('inf'))
    _first_strats: nptp.NDArray = field(init=False, compare=False)
    _second_strats: nptp.NDArray = field(init=False, compare=False)
    _diff_value: float = field(init=False, compare=False, default=0.0)
    _game_cost: nptp.NDArray | None = field(init=False, compare=False, default=None)

    def __init__(self, *, file_path: str | None = None, matrix: nptp.NDArray | None = None):
        self.matrix = matrix if matrix is not None else self.parse(self.__validate_path(file_path))
        self._first_strats = np.array([f'a{i}' for i in range(self.matrix.shape[0])])
        self._second_strats = np.array([f'b{i}' for i in range(self.matrix.shape[1])])
        self.__post_init__()

    def __post_init__(self):  # strictly ordered
        self.dump_rows_duplicates()
        self.dump_rows_subdominant()
        self.dump_cols_duplicates()
        self.dump_cols_subdominant()
        self.check_saddle_point()

    def dump_rows_duplicates(self):
        '''
        Removes duplicate rows from the matrix.
        '''
        _, unique_row_indices = np.unique(self.matrix, axis=0, return_index=True)
        unique_rows = np.sort(unique_row_indices)
        self.matrix = self.matrix[unique_rows]
        self._first_strats = self._first_strats[unique_rows]

    def dump_cols_duplicates(self):
        '''
        Removes duplicate columns from the matrix.
        '''
        _, unique_col_indices = np.unique(self.matrix.T, axis=0, return_index=True)
        unique_cols = np.sort(unique_col_indices)
        self.matrix = self.matrix[:, unique_cols]
        self._second_strats = self._second_strats[unique_cols]

    def dump_rows_subdominant(self):
        dominant_rows = []
        for i in range(self.matrix.shape[0]):
            is_subdominant = False
            for j in range(self.matrix.shape[0]):
                if i != j and np.all(self.matrix[i] <= self.matrix[j]):
                    is_subdominant = True
                    break
            if not is_subdominant:
                dominant_rows.append(i)
        dominant_rows = sorted(dominant_rows)
        self._first_strats = np.take(self._first_strats, dominant_rows)
        self.matrix = np.take(self.matrix, dominant_rows, axis=0)

    def dump_cols_subdominant(self):
        dominant_columns = []
        for i in range(self.matrix.shape[1]):
            is_subdominant = False
            for j in range(self.matrix.shape[1]):
                if i != j and np.all(self.matrix[:, i] >= self.matrix[:, j]):
                    is_subdominant = True
                    break
            if not is_subdominant:
                dominant_columns.append(i)
        dominant_columns = sorted(dominant_columns)
        self._second_strats = np.take(self._second_strats, dominant_columns)
        self.matrix = np.take(self.matrix, dominant_columns, axis=1)


    def check_saddle_point(self, get_matrix=False) -> nptp.NDArray | None:
        min_vals = np.min(self.matrix, axis=1)  # minimum values along each row
        max_vals = np.max(self.matrix, axis=0)  # maximum values along each column
        if get_matrix:
            matrix = np.copy(self.matrix)
            matrix = np.concatenate((matrix, min_vals[:, np.newaxis]), axis=1)
            max_vals = np.append(max_vals, 0)  # match shapes after first concatenation
            matrix = np.concatenate((matrix, max_vals[np.newaxis, :]), axis=0)
            return matrix
        self._maxmin_strat = np.zeros(self.matrix.shape[0])
        self._minmax_strat = np.zeros(self.matrix.shape[1])
        self._maxmin_strat[np.argmax(min_vals)] = 1.0
        self._minmax_strat[np.argmin(max_vals)] = 1.0
        self._maxmin_val = np.max(min_vals) - self._diff_value
        self._minmax_val = np.min(max_vals) - self._diff_value

    def solve_simplex(self, matrix: nptp.NDArray, *, mode='direct') -> None:
        if mode not in ['direct', 'ambivalent']:
            raise ValueError(f'Unexpected simplex mode: {mode}')
        try:
            # simplex = Simplex(matrix=matrix)
            # simplex.solve()
            # solution = tp.cast(nptp.NDArray, simplex.get_current_solution())
            solution = linprog(matrix[-1, :-1], A_ub=matrix[:-1, :-1], b_ub=matrix[:-1, -1],
                               bounds=(0, None), method='simplex').x
        except Exception as e:
            warnings.warn(f'Simplex failure on {mode}, cause:\n{e}')
        else:
            if mode == 'direct':
                solution = solution[:self.matrix.shape[1]]
                self._game_cost = np.reciprocal(solution @ np.ones((self.matrix.shape[1], 1)))
                # optimal ambivalent strategy
                self._minmax_strat = solution * self._game_cost
            else:
                solution = solution[:self.matrix.shape[0]]
                ambiv_cost = np.reciprocal(solution @ np.ones((self.matrix.shape[0], 1)))
                assert np.allclose(
                    tp.cast(nptp.NDArray, self._game_cost),
                    ambiv_cost
                    ), f'Cost direct: {self._game_cost} Cost ambivalent: {ambiv_cost}'
                # optimal direct strategy
                self._maxmin_strat = solution * self._game_cost


    def solve(self) -> tp.Mapping[str, tp.Any] | None:
        self.check_saddle_point()
        if self.maxmin != self.minmax:
            shifted_matrix = self.shift_matrix(self.matrix)

            direct_matrix = self.get_simplexable(shifted_matrix)
            self.solve_simplex(direct_matrix, mode='direct')
            ambiv_matrix = -self.get_simplexable(shifted_matrix.T)
            self.solve_simplex(ambiv_matrix, mode='ambivalent')

        return {
            'cost': self.cost,
            'opt_strats': {
                'direct': self.stratfirst,
                'ambivalent': self.stratsecond,
                },
            }

    def shift_matrix(self, matrix: nptp.NDArray) -> nptp.NDArray:
        '''
        Returns matrix with non-negative elements to ensure that min(matrix) is non-negative.
        '''
        absolute = np.copy(matrix)
        min_element = np.min(absolute)
        if min_element < 0:
            self._diff_value = abs(min_element)
            absolute += self._diff_value
        return absolute

    @staticmethod
    def get_simplexable(matrix: nptp.NDArray) -> nptp.NDArray:
        simplex = np.vstack([matrix, -np.ones((1, matrix.shape[1]))])
        simplex = np.hstack([simplex, np.ones((simplex.shape[0], 1))])
        simplex[-1, -1] = 0.0
        return simplex

    @property
    def maxmin(self) -> float:
        return self._maxmin_val

    @property
    def minmax(self) -> float:
        return self._minmax_val

    @property
    def cost(self) -> nptp.NDArray:
        if self._game_cost is not None:
            return self._game_cost - self._diff_value
        return np.array([self.maxmin, self.minmax])

    @property
    def stratfirst(self) -> nptp.NDArray | None:
        if self._maxmin_strat is not None:
            return self._maxmin_strat
        warnings.warn('Undefined direct hybrid strategy')

    @property
    def stratsecond(self) -> nptp.NDArray | None:
        if self._minmax_strat is not None:
            return self._minmax_strat
        warnings.warn('Undefined ambivalent hybrid strategy')

    @staticmethod
    def parse(file_path: str) -> nptp.NDArray:
        matrix = []
        with open(file_path, "r") as file_:
            data = json.loads(file_.read())
            matrix = np.array(data['matrix'])
        return matrix

    @staticmethod
    def __validate_path(path: str | None) -> str:
        if path is None or not pathlib.Path(path).is_file():
            raise ValueError(f'Error: no file with path {path}')
        return path

    def __str__(self):
        matrix = tp.cast(nptp.NDArray, self.check_saddle_point(get_matrix=True))
        table = prettytable.PrettyTable(
            field_names=["strat"] + self._second_strats.tolist() + ['min'],
        )
        table.add_rows(
            [[self._first_strats[j]] + matrix[j].tolist() for j in range(matrix.shape[0] - 1)],
        )
        table.add_row(['max'] + matrix[-1].tolist())
        return table.get_string()

# **Benchmark and Tests**

In [37]:
# @title **[4/5] Initialize MVP Tests** { display-mode: "form" }


@dataclass()
class GameTheoryCase:
    name: str
    json_: dict


TEST_CASES = [
    GameTheoryCase(
        name="3x3-pos",
        json_={
            "matrix": [
                [1, 2, 4],
                [5, 3, 6],
                [0, 7, 1],
            ],
        },
    ),
    GameTheoryCase(
        name="4x4-pos",
        json_={
            "matrix": [
                [3, 9, 2, 1],
                [7, 8, 5, 6],
                [4, 7, 3, 5],
                [4, 7, 3, 5],
            ],
        },
    ),
    GameTheoryCase(
        name="3x3",
        json_={
            "matrix": [
                [2, 1, 3],
                [3, 0, 1],
                [1, 2, 1],
            ],
        },
    ),
    GameTheoryCase(
        name="3x3-bigger",
        json_={
            "matrix": [
                [4, 2, 2],
                [2, 5, 0],
                [0, 2, 5],
            ],
        },
    ),
    GameTheoryCase(
        name="4x4-sign",
        json_={
            "matrix": [
                [1, 2, 0, -3],
                [2, -1, -1, -1],
                [-2, 0, 0, 1],
                [4, 1, 0, -2],
            ],
        },
    ),
    GameTheoryCase(
        name="4x4-abs",
        json_={
            "matrix": [
                [3, 4, 3, 5],
                [1, 6, 2, 0],
                [3, 5, 3, 4],
                [2, 0, 1, 6],
            ],
        },
    ),
    GameTheoryCase(
        name="3x5",
        json_={
            "matrix": [
                [5, -8, 7, -6, 0],
                [8, -5, 9, -3, 2],
                [-2, 7, -3, 6, -4],
            ],
        },
    ),
]

In [39]:
# @title **[5/5] Run MVP Tests!** { display-mode: "form" }
for test_id, test in enumerate(TEST_CASES):
    with open("./tmp.json", "w+") as tmp:
        tmp.write(str(test.json_).replace("'", '"'))
    try:
        gm = GameMatrix(file_path="./tmp.json")
        print(f'START-TEST[{test_id}][{test.name}]'.center(80, '~'))
        print(gm, end='\n')
        solution = gm.solve()
        print(gm, end='\n')
        print(solution, end='\n')
        print(f'Game cost is: {gm.cost}')
        print(f'END-TEST[{test_id}][{test.name}]'.center(80, '~'), end='\n\n\n\n')
    except Exception as e:
        print(f'Catched exception on TEST-{test_id}:\n{e}\n')

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~START-TEST[0][3x3-pos]~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+-------+----+----+-----+
| strat | b0 | b1 | min |
+-------+----+----+-----+
|   a1  | 5  | 3  |  3  |
|   a2  | 0  | 7  |  0  |
|  max  | 5  | 7  |  0  |
+-------+----+----+-----+
[[ 5.  3.  1.]
 [ 0.  7.  1.]
 [-1. -1.  0.]]
[0.11428571 0.14285714]
[[-5. -0. -1.]
 [-3. -7. -1.]
 [ 1.  1. -0.]]
[0.2        0.05714286]
+-------+----+----+-----+
| strat | b0 | b1 | min |
+-------+----+----+-----+
|   a1  | 5  | 3  |  3  |
|   a2  | 0  | 7  |  0  |
|  max  | 5  | 7  |  0  |
+-------+----+----+-----+
{'cost': array([3.88888889]), 'opt_strats': {'direct': array([0.77777778, 0.22222222]), 'ambivalent': array([0.44444444, 0.55555556])}}
Game cost is: [3.88888889]
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~END-TEST[0][3x3-pos]~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~



~~~~~~~~~~~~~~~~~~~~~~~~~~~~~START-TEST[1][4x4-pos]~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+-------+----+----+-----+
| strat | b2 | b3 | min |
+-------+----+----+-----+
|   a0  |