In [2]:
# Подключаем нужные библиотеки
from kaggle_environments import make, evaluate
import numpy as np
import random
import collections
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
import math
from sklearn.tree import DecisionTreeClassifier
import pydash
from collections import Counter

warnings.filterwarnings('ignore')

# Создаем окружение для игры с заданным количеством шагов (ходов)
env = make("rps", configuration={"episodeSteps": 100})

Loading environment lux_ai_s3 failed: No module named 'gymnax'


In [3]:
%%writefile rock_agent.py

# 1. Агент, который всегда выбирает "камень"
def rock_agent(observation, configuration):
    """
    Агент, который всегда выбирает 'камень' (0).
    """
    return 0

Overwriting rock_agent.py


In [4]:
%%writefile paper_agent.py

# 2. Агент, который всегда выбирает "бумагу"
def paper_agent(observation, configuration):
    """
    Агент, который всегда выбирает 'бумагу' (1).
    """
    return 1

Overwriting paper_agent.py


In [5]:
%%writefile scissors_agent.py

# 3. Агент, который всегда выбирает "ножницы"
def scissors_agent(observation, configuration):
    """
    Агент, который всегда выбирает 'ножницы' (2).
    """
    return 2

Overwriting scissors_agent.py


In [6]:
%%writefile copy_opponent_agent.py

# 4. Агент, который копирует последний ход оппонента
import random
def copy_opponent_agent(observation, configuration):
    """
    Копирует последний ход противника. Если ход первый, выбирает случайно.
    """
    if observation.step > 0:
        return observation.lastOpponentAction  # копирует предыдущий ход противника
    else:
        return random.randrange(0, configuration.signs)  # случайный ход на первом шаге

Overwriting copy_opponent_agent.py


In [7]:
%%writefile reactionary_agent.py
# 5. Агент, который реагирует на последние действия противника

def get_score(left_move, right_move):
    """
    Вычисляет результат последнего действия. 
    Возвращает 0 при ничьей, 1 при выигрыше и -1 при проигрыше.
    """
    delta = (
        right_move - left_move
        if (left_move + right_move) % 2 == 0
        else left_move - right_move
    )
    return 0 if delta == 0 else math.copysign(1, delta)

last_react_action = None
def reactionary_agent(observation, configuration):
    """
    Агент реагирует на последние действия противника.
    Если предыдущий ход был проигран, выбирает новый ход.
    """
    global last_react_action
    if observation.step == 0:
        last_react_action = random.randrange(0, configuration.signs)  # случайный первый ход
    elif get_score(last_react_action, observation.lastOpponentAction) <= 1:
        last_react_action = (observation.lastOpponentAction + 1) % configuration.signs  # выбирает следующий ход

    return last_react_action

Overwriting reactionary_agent.py


In [8]:
%%writefile counter_reactionary_agent.py
# 6. Агент, который выбирает действие в зависимости от результатов последних ходов

last_counter_action = None
def get_score(left_move, right_move):
    """
    Возвращает счет на основе разницы между ходами.
    """
    delta = (
        right_move - left_move
        if (left_move + right_move) % 2 == 0
        else left_move - right_move
    )
    return 0 if delta == 0 else math.copysign(1, delta)

def counter_reactionary_agent(observation, configuration):
    """
    Если агент выиграл, выбирает действие, чтобы противодействовать.
    Иначе адаптируется к ходу противника.
    """
    global last_counter_action
    if observation.step == 0:
        last_counter_action = random.randrange(0, configuration.signs)  # случайный первый ход
    elif get_score(last_counter_action, observation.lastOpponentAction) == 1:
        last_counter_action = (last_counter_action + 2) % configuration.signs  # если выиграл, выбирает контр-ход
    else:
        last_counter_action = (observation.lastOpponentAction + 1) % configuration.signs  # выбирает ход противника + 1

    return last_counter_action

Overwriting counter_reactionary_agent.py


In [9]:
%%writefile statistical_agent.py
# 7. Агент, который анализирует статистику ходов противника

action_histogram = {}
def statistical_agent(observation, configuration):
    """
    Запоминает ходы противника и выбирает действие, чтобы противодействовать
    наиболее частому ходу противника.
    """
    global action_histogram
    if observation.step == 0:
        action_histogram = {}  # очищает историю в начале
        return random.randint(0, 2)  # случайный ход в начале
    
    action = observation.lastOpponentAction
    action_histogram[action] = action_histogram.get(action, 0) + 1  # считает ходы противника

    mode_action = max(action_histogram, key=action_histogram.get)  # находит наиболее частый ход
    return (mode_action + 1) % configuration.signs  # возвращает контр-ход

Overwriting statistical_agent.py


In [10]:
%%writefile Jonson_agent.py
# 8. Агент Джонсона с анализом паттернов противника

def Jonson_agent(observation, configuration):
    """
    Агент с анализом паттернов противника. Использует таблицу частот для предсказания.
    """
    k = 2
    global table, action_seq
    if observation.step % 25 == 0:
        action_seq, table = [], collections.defaultdict(lambda: [1, 1, 1])  # обновляет таблицу каждые 25 шагов
    if len(action_seq) <= 2 * k + 1:
        action = int(np.random.randint(3))
        if observation.step > 0:
            action_seq.extend([observation.lastOpponentAction, action])  # добавляет последний ход противника
        else:
            action_seq.append(action)
        return action

    key = ''.join([str(a) for a in action_seq[:-1]])  # ключ на основе последовательности ходов
    table[key][observation.lastOpponentAction] += 1  # обновляет таблицу по ключу

    action_seq[:-2] = action_seq[2:]
    action_seq[-2] = observation.lastOpponentAction
    key = ''.join([str(a) for a in action_seq[:-1]])
    
    if observation.step < 50:
        next_opponent_action_pred = np.argmax(table[key])  # предсказание по таблице
    else:
        scores = np.array(table[key])
        next_opponent_action_pred = np.random.choice(3, p=scores/scores.sum())
        
    action = (next_opponent_action_pred + 1) % 3
    if observation.step > 90:
        action = next_opponent_action_pred
    action_seq[-1] = action
    return int(action)

Overwriting Jonson_agent.py


In [11]:
%%writefile nash_equilibrium_agent.py
# 9. Агент по принципу равновесия Нэша (случайный выбор)

def nash_equilibrium_agent(observation, configuration):
    """
    Случайный выбор действия для достижения равновесия Нэша.
    """
    return random.randint(0, 2)

Overwriting nash_equilibrium_agent.py


In [12]:
%%writefile rock_paper_agent.py
# 10. Агент, который случайно выбирает между "камнем" и "бумагой"

def rock_paper_agent(observation, configuration):
    """
    Случайно выбирает между 'камнем' (0) и 'бумагой' (1).
    """
    return random.randint(0, 1)

Overwriting rock_paper_agent.py


In [13]:
%%writefile paper_scissors_agent.py
# 11. Агент, который случайно выбирает между "бумагой" и "ножницами"

def paper_scissors_agent(observation, configuration):
    """
    Случайно выбирает между 'бумагой' (1) и 'ножницами' (2).
    """
    return random.randint(1, 2)

Overwriting paper_scissors_agent.py


In [14]:
%%writefile rock_scissors_agent.py
# 12. Агент, который случайно выбирает между "камнем", "ножницами" и "бумагой"
import random

def rock_scissors_agent(observation, configuration):
    """
    Случайно выбирает 'камень' (0), 'ножницы' (2) или 'бумагу' (1).
    """
    return random.randint(0, 2)

Overwriting rock_scissors_agent.py


In [15]:
%%writefile transition_agent.py
# 13. Агент с матрицей переходов для предсказания действий противника

T = np.zeros((3, 3))
P = np.zeros((3, 3))

a1, a2 = None, None

def transition_agent(observation, configuration):
    """
    Агент анализирует переходы между действиями противника и предсказывает следующее.
    """
    global T, P, a1, a2
    if observation.step > 1:
        a1 = observation.lastOpponentAction
        T[a2, a1] += 1  # обновляет матрицу переходов
        P = np.divide(T, np.maximum(1, T.sum(axis=1)).reshape(-1, 1))
        a2 = a1
        if np.sum(P[a1, :]) == 1:
            return int((np.random.choice([0, 1, 2], p=P[a1, :]) + 1) % 3)  # предсказание на основе вероятностей
        else:
            return int(np.random.randint(3))  # случайный ход
    else:
        if observation.step == 1:
            a2 = observation.lastOpponentAction
        return int(np.random.randint(3))

Overwriting transition_agent.py


In [16]:
%%writefile decision_tree.py


def construct_local_features(rollouts):
    step_mode_features = np.array([[step % k for step in rollouts['steps']] for k in (2, 3, 5)])
    step_div_features = np.array([[step // k for step in rollouts['steps']] for k in (100, 150, 250)])
    features = np.concatenate([step_mode_features, step_div_features])
    features = np.append(features, rollouts['actions'])
    features = np.append(features, rollouts['opp-actions'])
    return features

def construct_global_features(rollouts):
    features = []
    for key in ['actions', 'opp-actions']:
        for i in range(3):
            actions_count = np.mean([r == i for r in rollouts[key]])
            features.append(actions_count)
    return np.array(features)

def construct_features(short_stat_rollouts, long_stat_rollouts):
    lf = construct_local_features(short_stat_rollouts)
    gf = construct_global_features(long_stat_rollouts)
    features = np.concatenate([lf, gf])
    return features

def predict_opponent_move(train_data, test_sample):
    classifier = DecisionTreeClassifier(random_state=42)
    classifier.fit(train_data['x'], train_data['y'])
    return classifier.predict(test_sample)

def update_rollouts_hist(rollouts_hist, last_move, opp_last_action):
    rollouts_hist['steps'].append(last_move['step'])
    rollouts_hist['actions'].append(last_move['action'])
    rollouts_hist['opp-actions'].append(opp_last_action)
    return rollouts_hist

def warmup_strategy(observation, configuration):
    global rollouts_hist, last_move
    action = int(np.random.randint(3))
    if observation.step == 0:
        last_move = {'step': 0, 'action': action}
        rollouts_hist = {'steps': [], 'actions': [], 'opp-actions': []}
    else:
        rollouts_hist = update_rollouts_hist(rollouts_hist, last_move, observation.lastOpponentAction)
        last_move = {'step': observation.step, 'action': action}
    return int(action)

def init_training_data(rollouts_hist, k):
    for i in range(len(rollouts_hist['steps']) - k + 1):
        short_stat_rollouts = {key: rollouts_hist[key][i:i+k] for key in rollouts_hist}
        long_stat_rollouts = {key: rollouts_hist[key][:i+k] for key in rollouts_hist}
        features = construct_features(short_stat_rollouts, long_stat_rollouts)        
        data['x'].append(features)
    test_sample = data['x'][-1].reshape(1, -1)
    data['x'] = data['x'][:-1]
    data['y'] = rollouts_hist['opp-actions'][k:]
    return data, test_sample

def agent(observation, configuration):
    # hyperparameters
    k = 5
    min_samples = 25
    global rollouts_hist, last_move, data, test_sample
    if observation.step == 0:
        data = {'x': [], 'y': []}
    # if not enough data -> randomize
    if observation.step <= min_samples + k:
        return warmup_strategy(observation, configuration)
    # update statistics
    rollouts_hist = update_rollouts_hist(rollouts_hist, last_move, observation.lastOpponentAction)
    # update training data
    if len(data['x']) == 0:
        data, test_sample = init_training_data(rollouts_hist, k)
    else:        
        short_stat_rollouts = {key: rollouts_hist[key][-k:] for key in rollouts_hist}
        features = construct_features(short_stat_rollouts, rollouts_hist)
        data['x'].append(test_sample[0])
        data['y'] = rollouts_hist['opp-actions'][k:]
        test_sample = features.reshape(1, -1)
        
    # predict opponents move and choose an action
    next_opp_action_pred = predict_opponent_move(data, test_sample)
    action = int((next_opp_action_pred + 1) % 3)
    last_move = {'step': observation.step, 'action': action}
    return action

Writing decision_tree.py


In [17]:
%%writefile submission.py


# Create a small amount of starting history
history = {
    "guess":      [0,1,2],
    "prediction": [0,1,2],
    "expected":   [0,1,2],
    "action":     [1,2,0],
    "opponent":   [0,1],
    "rotn":       [0,1],
}
# observation   =  {'step': 1, 'lastOpponentAction': 1}
# configuration =  {'episodeSteps': 1000, 'agentTimeout': 60, 'actTimeout': 1, 'runTimeout': 1200, 'isProduction': False, 'signs': 3}
def statistical_prediction_agent(observation, configuration):    
    global history
    actions          = list(range(configuration.signs))  # [0,1,2]
    last_action      = history['action'][-1]
    prev_opp_action  = history['opponent'][-1]
    opponent_action  = observation.lastOpponentAction if observation.step > 0 else 2
    rotn             = (opponent_action - prev_opp_action) % configuration.signs

    history['opponent'].append(opponent_action)
    history['rotn'].append(rotn)
    
    # Make weighted random guess based on the complete move history, weighted towards relative moves based on our last action 
    move_frequency   = Counter(history['rotn'])
    action_frequency = Counter(zip(history['action'], history['rotn'])) 
    move_weights     = [   move_frequency.get(n, 1) 
                         + action_frequency.get((last_action,n), 1) 
                         for n in range(configuration.signs) ] 
    guess            = random.choices( population=actions, weights=move_weights, k=1 )[0]
    
    # Compare our guess to how our opponent actually played
    guess_frequency  = Counter(zip(history['guess'], history['rotn']))
    guess_weights    = [ guess_frequency.get((guess,n), 1) 
                         for n in range(configuration.signs) ]
    prediction       = random.choices( population=actions, weights=guess_weights, k=1 )[0]

    # Repeat, but based on how many times our prediction was correct
    pred_frequency   = Counter(zip(history['prediction'], history['rotn']))
    pred_weights     = [ pred_frequency.get((prediction,n), 1) 
                         for n in range(configuration.signs) ]
    expected         = random.choices( population=actions, weights=pred_weights, k=1 )[0]

    
    # Slowly decay to 50% pure randomness as the match progresses
    pure_random_chance = observation.step / (configuration.episodeSteps * 2)
    if random.random() < pure_random_chance:
        action = random.randint(0, configuration.signs-1)
        is_pure_random_chance = True
    else:
        # Play the +1 counter move
        # action = (expected + 1) % configuration.signs                  # without rotn
        action = (opponent_action + expected + 1) % configuration.signs  # using   rotn
        is_pure_random_chance = False
    
    # Persist state
    history['guess'].append(guess)
    history['prediction'].append(prediction)
    history['expected'].append(expected)
    history['action'].append(action)

    return action

Writing submission.py


In [20]:
# Турнир между агентами
agents = ['rock_agent', 'paper_agent', 'scissors_agent',
          'copy_opponent_agent', 'reactionary_agent', 
          'counter_reactionary_agent', 'statistical_agent',
          'Jonson_agent', 'nash_equilibrium_agent',
          'rock_paper_agent', 'paper_scissors_agent',
          'rock_scissors_agent', 'transition_agent',
          # 'decision_tree', 'submission'
          ]

# Добавляем .py к каждому агенту
agents_py = [i + ".py" for i in agents]
n_agents = len(agents)

# Инициализируем матрицу результатов
scores = np.zeros((n_agents, n_agents))

# Играем каждый агент против каждого
for i in range(n_agents):
    for j in range(i + 1, n_agents):
        result = evaluate("rps", [agents_py[i], agents_py[j]], configuration={"episodeSteps": 100}, num_episodes=1)
        scores[i][j] = result[0][0]  # сохраняем результат для агента i
        scores[j][i] = result[0][1]  # сохраняем результат для агента j

# Определяем победителя
vic = []
for i in range(n_agents):
    cnt_vic = sum(1 for j in range(n_agents) if scores[i][j] > 0)  # считаем победы агента
    vic.append(cnt_vic)

# Определяем индексы победителей
i_victors = [i for i, v in enumerate(vic) if v == max(vic)]
if len(i_victors) > 1:
    print("The winners are", '\n', *[agents_py[i] for i in i_victors], "!")
else:
    print("The winner is", agents_py[i_victors[0]], "!")

The winners are 
 rock_agent.py paper_agent.py scissors_agent.py !
