In [25]:
from random import shuffle
import gym
from gym import spaces

import ray
from ray.rllib.algorithms import ppo

In [26]:
# class DonutGame:
#     def __init__(self):
#         self.deck = list(range(13)) * 4

#         shuffle(self.deck)        

#         self.players = [
#             {
#                 "visible": [self.deck.pop(), self.deck.pop()],
#                 "visbleLockedIn": [False, False],
#                 "hidden": [self.deck.pop(), self.deck.pop()],
#                 "hiddenLockedIn": [False, False],
#                 "drawnCard": -1
#             },
#             {
#                 "visible": [self.deck.pop(), self.deck.pop()],
#                 "visbleLockedIn": [False, False],
#                 "hidden": [self.deck.pop(), self.deck.pop()],
#                 "hiddenLockedIn": [False, False],
#                 "drawnCard": -1
#             },
#                         {
#                 "visible": [self.deck.pop(), self.deck.pop()],
#                 "visbleLockedIn": [False, False],
#                 "hidden": [self.deck.pop(), self.deck.pop()],
#                 "hiddenLockedIn": [False, False],
#                 "drawnCard": -1
#             },
#                         {
#                 "visible": [self.deck.pop(), self.deck.pop()],
#                 "visbleLockedIn": [False, False],
#                 "hidden": [self.deck.pop(), self.deck.pop()],
#                 "hiddenLockedIn": [False, False],
#                 "drawnCard": -1
#             }
#         ]

#         self.burnedCards = []

#         self.currentPlayer = 0
#         self._stage = 0

#     def game_finished(self):
#         return all(all(p["visbleLockedIn"]) and all(p["hiddenLockedIn"]) for p in self.players)

#     def getCurrentCards(self):
#         player = self.players[self.currentPlayer].copy()

        
#         hiddenMasked = []

#         for h, l in zip(player["hidden"], player["hiddenLockedIn"]):
#             hiddenMasked.append(h if l else -1)

#         player["hidden"] = hiddenMasked    

#         player["pile"] = self.burnedCards[-1] if self._stage == 0 else -1

#         return player

#     def drawCard(self, chooseBurnPile):
#         "Choose to draw a card either from the random deck (False) or the known card (True)"
        
#         assert self._stage == 0
#         self._stage = 1

#         player = self.players[self.currentPlayer]


#         if chooseBurnPile:
#             player["drawnCard"] = self.deck.pop()
#         else:
#             player["drawnCard"] = self.burnedCards.pop()
        
#     def playCard(self, cardPos, shouldSwap):
#         "Choose which position (0-3) to flip over and whether to use the drawn card or the one in the pile currently"
        
#         assert self._stage == 1
#         self._stage = 0

#         player = self.players[self.currentPlayer]

#         arr = None
#         arrLockedIn = None


#         if cardPos < 2:
#             arr = player["visible"]
#             arrLockedIn = player["visibleLockedIn"]
#         else:
#             arr = player["hidden"]
#             arrLockedIn = player["hiddenLockedIn"]

#         arrLockedIn[cardPos % 2] = True

#         if shouldSwap:
#             arr[cardPos % 2] = player["drawnCard"]
#         else
#             self.burnedCards.append(player["drawnCard"])
        
#         player["drawnCard"] = -1





        





In [27]:
class SinglePlayerDonutGame:
    VISIBILITY_MASK = (
        True,
        True,
        False,
        False
    )

    def __init__(self):
        self.deck = list(range(13)) * 4
        shuffle(self.deck)

        self.cards = [
            self.deck.pop(),
            self.deck.pop(),
            self.deck.pop(),
            self.deck.pop()
        ]

        self.tempDrawnCard = -1

        self.discardPile = [self.deck.pop()]

        self.lockedCards = [
            False,
            False,
            False,
            False
        ]

        self.stage = 0 # 0 = drawing 1 = playing
    
    def isFinished(self):
        return all(self.lockedCards)

    def observe(self):
        cards = []
        for card, isVisible, isLocked in zip(self.cards, self.VISIBILITY_MASK, self.lockedCards):
            cards.append(card if isVisible or isLocked else -1)

        return {
            "cards": cards,
            "lockedCard": self.lockedCards,
            "tempDrawnCard": self.tempDrawnCard if self.stage == 1 else -1,
            "discardPileTopCard": self.discardPile[-1] if self.stage == 0 else -1,
            "stage": self.stage
        }

    def drawCard(self, isFromRandomPile):
        assert self.stage == 0
        self.stage = 1

        if isFromRandomPile:
            self.tempDrawnCard = self.deck.pop()
        else:
            self.tempDrawnCard = self.discardPile.pop()

        return 

    def playCard(self, position, shouldSwap):
        if self.lockedCards[position]: return -1 # to

        assert self.stage == 1
        self.stage = 0


        if shouldSwap:
            self.discardPile.append(self.cards[position])
            self.cards[position] = self.tempDrawnCard
            self.tempDrawnCard = -1

        else:
            self.discardPile.append(self.tempDrawnCard)
            self.tempDrawnCard = -1

        self.lockedCards[position] = True
        
        return 0

    def getFinalScore(self):
        # 0 = King
        # 1 = Ace
        # 2 = 2
        # ...
        # 10 = 10
        # 11 = J
        # 12 = Q

        assert self.isFinished()

        finalCards = self.cards.copy()

        score = 0

        while len(finalCards):
            c = finalCards.pop()

            if c in finalCards:
                finalCards.remove(c) # Remove one othe occurence of c to consume the pair - if there is 3 c's then it should count as 1 pair and 1 single
            else:
                score += c
                
        return score



In [28]:
def map_observation(obs):
    # some fields can value -1 for invalid or hidden. Map these to +ve intager indexes
    return {
        **obs,
        "cards": [c + 1 for c in obs["cards"]],
        "tempDrawnCard": obs["tempDrawnCard"] + 1,
        "discardPileTopCard": obs["discardPileTopCard"] + 1
    }
    

In [29]:
class SinglePlayerDonutEnv(gym.Env):
    def __init__(self, env_config={}):
        self.action_space = spaces.Dict({
            "drawFromRandom": spaces.Discrete(2),

            "position": spaces.MultiDiscrete(4),
            "shouldSwap": spaces.Discrete(2)
        })

        self.observation_space = spaces.Dict({
            "cards": spaces.MultiDiscrete([14, 14, 14, 14]),
            "lockedCard": spaces.MultiBinary(4),
            "tempDrawnCard": spaces.Discrete(14),
            "discardPileTopCard": spaces.Discrete(14),
            "stage": spaces.Discrete(2)
        })

        self.game = SinglePlayerDonutGame()        

    def reset(self):
        self.game = SinglePlayerDonutGame()

        return map_observation(self.game.observe())
        
    def step(self, action):
        reward = 0

        if self.game.stage == 0:
            self.game.drawCard(action["drawFromRandom"])
            
        elif self.game.stage == 1:
            e = self.game.playCard(
                action["position"],
                action["shouldSwap"]
            )

            reward = 10 * e

        obs = self.game.observe()
        done = self.game.isFinished()

        if done:
            reward = -self.game.getFinalScore()

            if reward == 0: # Lots more reward for donut. Should really do a proper test with this.
                reward = 10

        return map_observation(obs), reward, done, {}


In [30]:
def test_with_synthetic_deck(fake_deck, correct_score):
    f = SinglePlayerDonutEnv()
    f.reset()


    f.game.cards = [fake_deck.pop() for i in range(4)]
    f.game.discardPile = [fake_deck.pop()]

    f.game.deck = fake_deck

    f.step({"drawFromRandom": 1})
    f.step({
        "position": 0,
        "shouldSwap": 0
    })
    f.step({"drawFromRandom": 1})
    f.step({
        "position": 1,
        "shouldSwap": 0
    })
    f.step({"drawFromRandom": 1})
    f.step({
        "position": 2,
        "shouldSwap": 0
    })
    f.step({"drawFromRandom": 1})
    _, reward, *_ = f.step({
        "position": 3,
        "shouldSwap": 0
    })

    assert reward == correct_score, f"Got reward {reward} but expected {correct_score} with final cards {f.game.cards}"


test_with_synthetic_deck([5, 5, 5, 5, 5, 7, 8, 9, 0], -7-8-9) # Dummy test deck to check no matches
test_with_synthetic_deck([5, 5, 5, 5, 5, 7, 7, 8, 9], -8-9) # Dummy test deck to check a double
test_with_synthetic_deck([5, 5, 5, 5, 5, 7, 7, 8, 8], 10) # Dummy test deck to check 2 doubles
test_with_synthetic_deck([5, 5, 5, 5, 5, 7, 7, 7, 8], -7-8) # Dummy test deck to check a triple
test_with_synthetic_deck([5, 5, 5, 5, 5, 7, 7, 7, 7], 10) # Dummy test deck to check a quad

In [31]:
def test_invalid_position():
    f = SinglePlayerDonutEnv()
    f.reset()

    f.step({"drawFromRandom": 1})
    f.step({
        "position": 0,
        "shouldSwap": 0
    })
    f.step({"drawFromRandom": 1})
    _, reward, *_ = f.step({
        "position": 0,
        "shouldSwap": 0
    })

    assert reward == -10, f"Got reward {reward} but expected -10 when playing the same position twice"


test_invalid_position()

In [32]:


ray.init()

algo = ppo.PPO(env=SinglePlayerDonutEnv, config={})

2022-11-15 23:03:41,855	INFO worker.py:1518 -- Started a local Ray instance.
2022-11-15 23:03:42,532	INFO algorithm.py:1871 -- Your framework setting is 'tf', meaning you are using static-graph mode. Set framework='tf2' to enable eager execution with tf2.x. You may also then want to set eager_tracing=True in order to reach similar execution speed as with static-graph mode.
2022-11-15 23:03:42,533	INFO ppo.py:378 -- In multi-agent mode, policies will be optimized sequentially by the multi-GPU optimizer. Consider setting simple_optimizer=True if this doesn't work for you.
2022-11-15 23:03:42,534	INFO algorithm.py:351 -- Current log_level is WARN. For more information, set 'log_level': 'INFO' / 'DEBUG' or use the -v and -vv flags.
[2m[36m(RolloutWorker pid=192390)[0m 2022-11-15 23:03:45,026	ERROR worker.py:756 -- Exception raised in creation task: The actor died because of an error raised in its creation task, [36mray::RolloutWorker.__init__()[39m (pid=192390, ip=138.38.233.129, repr

TypeError: iteration over a 0-d array

In [None]:
for i in range(10):
    print(algo.train())