# TichuAgents: Training the Agents

This notebook is used for training Agents using Reinforcement Learning to play Tichu.

# Setup

In [None]:
# Execute after restarting runtime
!git clone https://github.com/alxwdm/tichuagent

In [None]:
# Execute when content on github changed
%cd /content/tichuagent
!git pull
%cd /content/

In [1]:
import sys
import numpy as np
sys.path.append('/content/tichuagent')
# import Environment
from env.env import Env
# import all Agents
from agents.heuristic.greedy import greedyAgent
from agents.ddpg.ddpg_agent import DDPGAgent
# import utility functions
from utils import play_dumb_game, play_greedy_game

# Shared utils

In [None]:
def battle(agents, n_games=1000):
    """ Lets trained agents play games against each other. """
    pass # TODO

# DDPG Agent

Deep Deterministic Policy Gradient is an off-policy RL approach that combines both value- and policy-based learning. The DDPG actor learns how to act (i.e. policy-based), and a critic learns how to estimate the current situation (i.e. value-based). 

In [11]:
import sys
import numpy as np
sys.path.append('/content/tichuagent')
# import Environment
from env.env import Env
# import all Agents
from agents.heuristic.greedy import greedyAgent
from agents.ddpg.ddpg_agent import DDPGAgent
# import utility functions
from utils import play_dumb_game, play_greedy_game

PRINT_EVERY = 20

def ddpg(n_episodes=100, episode_offset=0, checkpoint_path=None,
         eps_start = 0.3, eps_decay = 0.995, max_steps = 1000):
    """ Trains a DDPG Agent on Tichu. """
    # initialize environment and agent
    env = Env(train_mode=True)
    state_size, action_size = env.info()
    heuristic_agent = greedyAgent()
    agent = DDPGAgent(state_size=39, action_size=17,
                  random_seed=8765, heuristic_agent=heuristic_agent)
    all_scores = []
    all_steps = []
    all_valid_moves = []
    eps = eps_start
    # reload checkpoint from previous training if available
    if checkpoint_path:
        agent.load_checkpoint(filepath=checkpoint_path)
    # train for n_episodes
    for i_episode in range(episode_offset, n_episodes + episode_offset):
        state, reward, done, active_player = env.reset()
        action_buffer = [None, None, None, None]
        scores = [0, 0, 0, 0]
        nstep = 0
        init_cnt = 0
        agent_move_cnt = 0
        eps_move_cnt = 0
        # make a valid initial move from heuristic agent (first steps)
        # each player must make an initial move before learning,
        # because of reward-design (reward is valid every 4 steps)
        idle_cnt = 0
        while any(elem is None for elem in action_buffer):
            action_buffer[active_player], eps_move = agent.act(
                                                    state[active_player], 1)
            state, reward, done, active_player = env.step(active_player,
                                            action_buffer[active_player])            
            idle_cnt += 1
            if idle_cnt > 10:
                raise EnvironmentError("Something went wrong.")
                #return state[active_player]
        # train one episode
        while nstep < max_steps:
            prev_state = state
            prev_active = active_player
            # regular learning routine after initialization:
            # learn from previous step, then take next step
            # vice-versa not possible (because of state/reward validity)
            agent.step(prev_state[active_player],
                       action_buffer[active_player],
                       reward[active_player],
                       state[active_player],
                       done, nstep)
            # add rewards to scores list
            scores[active_player] += reward[active_player]
            # take an action in the environment
            prev_state[active_player] = state[active_player]
            action_buffer[active_player], eps_move = agent.act(
                                                     state[active_player], eps)
            state, reward, done, active_player = env.step(active_player,
                                                  action_buffer[active_player])
            nstep += 1
            # count successfull ddpg moves
            if not(prev_active == active_player) and not(eps_move):
                agent_move_cnt += 1
            elif not(prev_active == active_player) and eps_move:
                eps_move_cnt += 1
            # all agents take a step when game is finished
            if done:
                for i in range(4):
                    agent.step(prev_state[i], action_buffer[i],
                               reward[i], state[i], done, nstep)
                break
        # print episode info
        print(('\rEpisode: {} \t Total Steps: {} \t Valid Agent Steps: {} \t' +
               'Valid eps steps: {} \t Avg score: {} \t Current eps: {}').format(
            i_episode, nstep, agent_move_cnt, eps_move_cnt, np.mean(scores), eps),
              end='')
        if i_episode > 0 and i_episode % PRINT_EVERY == 0:
            print('')
        # take average statistics of all agents
        all_scores.append(np.mean(scores))
        all_steps.append(nstep)
        all_valid_moves.append(agent_move_cnt)
        eps = eps_decay * eps # decrease epsilon
    # save checkpoints
    fpath = 'checkpoint_' + str(i_episode)
    agent.save_checkpoint(filename=fpath)
    return all_scores, all_steps, all_valid_moves

In [None]:
all_scores, all_steps, all_valid_moves = ddpg(n_episodes=1000, eps_decay=0.997)

# Debugging Area

In [25]:
from itertools import compress

from env.cards import Cards
from env.deck import Deck
from env.player import Player

In [22]:
# for debugging, use this function to nicely print action vector as cards:
# example: _vec_to_cards(action).show()
def _vec_to_cards(vec):
    """ Turns a vector representation into a Cards instance. """
    all_cards = Deck().all_cards
    return Cards(list(compress(all_cards, vec)))

In [17]:
def _flatten_state(state):
    """ A very ugly state flattening function. TODO! """
    flattened_list = [item for sublist in state for item in sublist]
    flattened_state = []
    for elem in flattened_list:
        if type(elem) != list:
            flattened_state.append(elem)
        else:
            for e in elem:
                flattened_state.append(e)
    return np.asarray(flattened_state, dtype='int32')

In [29]:
# Try alternative state/action design
env = Env()
state, reward, done, active_player = env.reset()

In [38]:
def state_conv_suitless(state_vec):
    """
    An alternative state with reduced state space size.
    
    state design:
    [hand_size, tichu_flag, suitless_cards] of active player
    [is_opponent, hand_size, tichu_flag, suitless_cards] of stack leader

    returns state vector in flattened format
    """
    def suitless_enc(crd_state):
        suitless_cards = np.zeros(17, int).tolist()
        for i in range(13):
            suitless_cards[i] = sum(crd_state[i*4:i*4+4])
        suitless_cards[13] = crd_state[13]
        suitless_cards[14] = crd_state[14]
        suitless_cards[15] = crd_state[15]
        suitless_cards[16] = crd_state[16]
        return suitless_cards

    def _flatten_conv_state(state):
        """ A state flattening function for suitless state. """
        flattened_list = state
        flattened_state = []
        for elem in flattened_list:
            if type(elem) != list:
                flattened_state.append(elem)
            else:
                for e in elem:
                    flattened_state.append(e)
        return np.asarray(flattened_state, dtype='int32')

    # get info from full state
    hand_size = state_vec[0][0][0]
    hand_cards = _vec_to_cards(state_vec[0][0][2])
    opp_cards_0 = _vec_to_cards(state_vec[0][1][2])
    teammate_cards = _vec_to_cards(state_vec[0][2][2])
    opp_cards_1 = _vec_to_cards(state_vec[0][3][2])
    # determine leading cards
    # new stack
    if (teammate_cards.type == 'pass' and opp_cards_0.type == 'pass' and 
            opp_cards_1.type == 'pass'):
        leading_idx = 0
        is_opponent = 0
        leading_cards = Cards([])
    # teammate leading
    elif teammate_cards.power > max(opp_cards_0.power, opp_cards_1.power):
        leading_idx = 2
        is_opponent = 0
        leading_cards = teammate_cards
    # opponent 0 leading
    elif ((opp_cards_0.power > opp_cards_1.power) or 
            (opp_cards_1.type == 'pass')):
        leading_idx = 1
        is_opponent = 1
        leading_cards = opp_cards_0
    # opponent 1 leading
    else:
        leading_idx = 3
        is_opponent = 1
        leading_cards = opp_cards_1
    # get first part of state: self perspective
    conv_state = []
    conv_state.append(state[0][0][0]) # Hand Size
    conv_state.append(state[0][0][1]) # Tichu Flag
    conv_state.append(suitless_enc(state[0][0][2])) # suitless encoded hand
    # get second part of state: leading player perspective
    if leading_idx == 0:
        leading_size = 0
        leading_tichu = 0
        leading_suitless = np.zeros(17, int).tolist()
    else:
        leading_size = leading_cards.size
        leading_tichu = state[0][leading_idx][1]
        leading_suitless = suitless_enc(state[0][leading_idx][2])
    conv_state.append(is_opponent) # opponent yes/no
    conv_state.append(leading_size) # hand size
    conv_state.append(leading_tichu) # tichu flag
    conv_state.append(leading_suitless) # suitless encoded leading cards

    print(conv_state[0:3])
    print(conv_state[3:])
    return _flatten_conv_state(conv_state)

_ = state_conv_suitless(state)

[14, 0, [2, 0, 3, 1, 0, 2, 1, 0, 2, 0, 1, 0, 1, 0, 0, 0, 0]]
[0, 0, 0, [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]]


In [None]:
def action_conv_suitless(suitless_action, state_cards):
    """
    Converts a "suitless" action into action vector expected by env.
    
    Example:
    suitless_action: [2, 2, 0, 0, ...] for a 2-3-pair sequence
    action_vector: [1, 1, 0, 0, 1, 1, 0, 0, ...] depending on available cards
    """
    action_vec = np.zeros(len(56), int)
    # encode regular cards:
    for i in range(13):
        card_count = suitless_action[i]
        available_cards = state_cards[i*4:i*4+4]
        if card_count == 0:
            pass
        elif sum(available_cards) < card_count:
            suc = False
            break
        else:
            for j in range(4):
                if available_cards[j] == 1 and card_count > 0:
                    action_vec[i*4+j] = 1
                    card_count -= 1
                else:
                    pass
    # encode special cards
    action_vec[-4] = suitless_action[13]
    action_vec[-3] = suitless_action[14]
    action_vec[-2] = suitless_action[15]
    action_vec[-1] = suitless_action[16]

    return action_vec