In [1]:
import numpy as np
import pandas as pd
from fractions import Fraction

In [2]:
NUM_PLAYERS = 3
PLAYERS = list(range(1, NUM_PLAYERS+1))
# all possible (giver, receiver) pairs
CANDIDATE_MOVES = [(g, r) for idx, g in enumerate(PLAYERS) for r in PLAYERS if g != r]
LOSER_IDX = 0

# Helper functions

## Functional

In [3]:
def getChildStates(initState):
    """
    Applies all possible moves to the initial state.
    Returns the child states.
    """
    return [applyMove(initState, move) for move in CANDIDATE_MOVES]

def getCoef(initState):
    """
    Returns a pair consisting of:
    - a dictionary of (state:coefficient) pairs, representing the coefficient of the hitting probability 
      for each state in the first-step analysis, starting from the given initial state
    - a rational number representing the constant in the equation.
    """
    # assign all states the individual outcome probability
    childStates = getChildStates(initState)
    stateCoefs = {s:OUTCOME_PROB for s in childStates}
    const = 0
    
    # apply any possible simplifications to the states
    copy = stateCoefs.copy()
    for state in copy:
        # prune the 0 hitting prob states
        if zeroHittingProb(state):
            stateCoefs.pop(state)
        # convert the hitting prob 1 states to constants
        elif (status := alreadyHit(state)):
            const += stateCoefs[state] * status  # 1 for True
            stateCoefs.pop(state)
        # optimsation 1: convert equal wealth states to constants
        elif equalWealth(state):
            const += stateCoefs[state] / NUM_PLAYERS
            stateCoefs.pop(state)
    
    # optimization 2: combine symmetrical states
    copy = list(stateCoefs.copy().keys())
    n = len(copy)
    for i in range(n):
        for j in range(i+1, n):
            s1, s2 = copy[i], copy[j]
            if (symmetricalStates(s1, s2)):
                combinedProb = stateCoefs.pop(s1) + stateCoefs.pop(s2)
                stateCoefs[standardiseState(s1)] = combinedProb
    
    return standardiseStates(stateCoefs), const

def zeroHittingProb(state):
    """
    Returns true if the given state cannot reach the state of Player 1 being the loser, false otherwise.
    """
    return (0 in state) and (state[LOSER_IDX] > 0)

def symmetricalStates(s1, s2):
    """
    Returns true if the two states are symmetrical (undistinguished apart from the loser), false otherwise.
    """
    return s1[LOSER_IDX] == s2[LOSER_IDX] and set(s1[LOSER_IDX+1:]) == set(s2[LOSER_IDX+1:])
    
def equalWealth(s):
    """
    Returns true if all players have the same wealth in the current state (an integer triple), 
    false otherwise.
    """
    return all(amt == s[0] for amt in s)

## Formatting / Representational

In [4]:
from sympy import *
from sympy import nsolve
from sympy.solvers import solve
from ast import literal_eval as make_tuple  # for parsing state as a tuple

In [5]:
def standardiseState(s):
    """
    Returns the standardised form of the state (a triple).
    The convention is to keep the smaller value of the undistinguished players at the front.
    """
    return (s[LOSER_IDX],) + tuple(sorted(s[LOSER_IDX+1:]))

def standardiseStates(stateCoefs):
    """
    Standardises all states (keys) in the dictionary of (state:coefficient) pairs.
    Returns the standardised form of the dictionary.
    """
    return {standardiseState(s):p for s, p in stateCoefs.items()}

def getEquation(initState, stateCoefs, const):
    """
    Converts the dictionary of state:coefficient pairs and constant
    into an equality of the form: h(initState) = coefs * h(states) + const
    """
    # equation will be in the form of human readable equalities (like how we normally write down)
    symbols = [stateToSymbol(s) for s in stateCoefs]
    eqn = Eq(stateToSymbol(initState), getExpr(stateCoefs, const))
    return eqn, symbols
    
def stateToSymbol(state):
    """
    Converts the state (triple) to a Symbol object.
    """
    return Symbol(f'h{state}')

def symbolToState(symbol):
    """
    Converts the Symbol object to a state (triple).
    """
    return make_tuple(str(symbol)[1:])

def getExpr(stateCoefs, const):
    """
    Converts the dictionary of state:coefficient pairs and constant
    into an expression.
    """
    expr = const
    for s in stateCoefs:
        expr = Add(expr, stateCoefs[s] * stateToSymbol(s))
    return expr

def formatEquation(eqn):
    """
    Returns the formatted string for the equation.
    """
    return f'{eqn.lhs} = {eqn.rhs}'

def formatSolutions(sol):
    """
    Returns the solutions as a formatted list (states converted back to triples)
    """
    return [(symbolToState(s), p) for s, p in sol.items()]

## System

In [6]:
import signal

In [7]:
class TimeoutException(Exception):
    """
    Custom exception for timeout.
    """
    pass

def sigalarmHandler(signum, frame):
    """
    Handler function to be called when SIGALRM is received.
    """
    # We get signal!
    raise TimeoutException()

# Main class for the first-step analysis

In [25]:
class LoserAnalysis():
    def __init__(self, initState):
        self.initState = standardiseState(initState)
        self.generatedStates = set([initState])
        self.equations = []
        self.symbols = set([stateToSymbol(self.initState)]) # or can encode all generated states at once in the end
    
    def getEquations(self):
        """
        Returns a list of equations (as Equality objects) generated during the first step analysis.
        """
        self.recurseGenerateEqns(self.initState)
        return self.equations
        
    def recurseGenerateEqns(self, initState):
        """
        Recursively generates the first step analysis equations,
        and records the new symbols and equations.
        """
        childStatesProb, const = getCoef(initState)  # state:prob pairs, under unified representation
        newStates = set(childStatesProb).difference(self.generatedStates)
        
        # compute for the current state
        # do this before the recursion to close off the loop
        eqn, symbols = getEquation(initState, childStatesProb, const)
        self.equations.append(eqn)

        # base case: no new states
        if not newStates:
            return

        self.symbols = self.symbols.union(symbols)
        self.generatedStates = self.generatedStates.union(newStates)
        
        # recurse
        for state in newStates:
            self.recurseGenerateEqns(state)
    
    def solveEqns(self, asFloat=False):
        """
        Returns the hitting probabilities of all states involved in the analysis,
        as a dictionary of (state symbol : probability) pairs.
        If `asFloat` is True, the probabilities are returned as floating point numbers; 
        fractions otherwise.
        """
        self.getEquations()
        sol = solve(self.equations, self.symbols)
        return {s:float(p) for s, p in sol.items() if sol} if asFloat else sol
    
    def exportEqns(self, filename):
        """
        Exports the first step analysis equations to a text file under the provided filename.
        """
        self.getEquations()
        fp = open(filename, 'w')
        for e in self.equations:
            fp.write(formatEquation(e) + '\n')
        fp.close()
    
    def getHittingProb(self, asFloat=False, approx=False, t=30):
        """
        Returns the hitting probability given the initial state.
        - asFloat: if True, the probabilities are represented as floating point numbers; fractions otherwise.
        - t: number of time steps if using the approximation method.
        """
        if approx:
            x, y, z = self.initState
            return Q(x, y, z, t)  # memoisation + enumeration method
        
        sol = self.solveEqns()
        if sol:
            p = sol[stateToSymbol(self.initState)]
            return float(p) if asFloat else p
        else:
            return None

# Aggregate states and hitting probs to one file

In [11]:
import csv
from itertools import product

In [26]:
def getTargetProbsForStates(minWealth, maxWealth, asFloat=True, approx=False, t=30, printOnGo=False):
    """
    Returns hitting probabilities as a dictionary of state : P(Loser = Player 1) pairs, 
    beginning from all possible initial states within the range [`minWealth`, `maxWealth`] (inclusive).
    The hitting probabilities of any intermediate states are NOT included.
    
    - asFloat: if True, the probabilities are represented as floating point numbers; fractions otherwise.
    - approx: if True, approximate probabilities are returned; exact otherwise.
    - t: number of time steps if using the approximation method.
    - printOnGo: if True, prints the progress as it runs; silent otherwise.
    """
    
    stateProbs = dict()  # states as symbols : hitting prob

    for x in range(minWealth, maxWealth+1):
        for y in range(minWealth, maxWealth+1):
            for z in range(minWealth, maxWealth+1):
                initState = (x, y, z)
                
                try:
                    sol = LoserAnalysis(initState).getHittingProb(asFloat=asFloat, approx=approx, t=t)
                    if printOnGo:
                        print(f"{initState}: {sol}\n")
                    
                    stateProbs[initState] = sol
                
                except RecursionError:
                    print(f'Maximum recursion depth exceeded for state {initState}')

    return stateProbs

In [16]:
def exportStateProbs(filename, stateProbs, append=False, header=True):
    """
    Creates a CSV file with columns (Initial state, P(Loser = Player 1)).
    The state is an integer triple, and the probability is a rational number.
    - filename: a string representing the filename of the CSV file;
    - stateProbs: a dictionary of state : hitting prob pairs, representing the contents to be exported.
    - append: if True, appends to the file; (over-)writes otherwise.
    - header: if True, adds a header to the file.
    """
    if append:
        fp = open(filename, 'a')
    else:
        fp = open(filename, 'w')
    writer = csv.writer(fp)
    if header:
        writer.writerow(['Initial state', 'P(Loser = Player 1)'])
    
    writer.writerows(stateProbs.items())
    fp.close()

In [27]:
# Explores the exact hitting probabilities within the given range
def writeProbsInRange(minWealth, maxWealth, exactProbFilename, floatProbFilename=None, 
                      startState=(0,0,0), timeLimit=30, printOnGo=False):
    """
    Writes hitting probabilities to the file(s) under the provided filenames, 
    for initial stack sizes within the range [`minWealth`, `maxWealth`].
    
    - exactProbFilename: filename (string) of the file to contain the exact hitting probs.
    - floatProbFilename: filename (string) of the file to contain the hitting probs as floating point numbers.
    - startState: state to begin writing from; allows to continue from where the program left off last time.
    - timeLimit: integer limit (in seconds) on the computation time of the hitting prob for each state.
    """
    
    allStates = product(range(minWealth, maxWealth+1), repeat=NUM_PLAYERS)
    
    if startState != (0,0,0) :  # allows to continue from where we left off
        allStates = list(allStates)
        assert(startState in allStates)  # defensive check
        start = allStates.index(startState)
        allStates = allStates[start:]
    
    for state in allStates:
        # Set up signal handler for SIGALRM, saving previous value
        oldHandler = signal.signal(signal.SIGALRM, sigalarmHandler)
        # Start timer
        signal.alarm(timeLimit)
        
        try:
            exactProb = LoserAnalysis(state).getHittingProb()
            exportStateProbs(exactProbFilename, {state:exactProb}, append=True, header=False)
            
            if floatProbFilename:
                floatProb = float(exactProb)
                exportStateProbs(floatProbFilename, {state:floatProb}, append=True, header=False)
            
            if printOnGo:
                print(f"{state}: {exactProb} ~ {float(exactProb)}\n")
                
        except TimeoutException:
            print(f'Took too long to find the hitting prob for state {state}')
        finally:
            # Turn off timer
            signal.alarm(0)
            # Restore handler to previous value
            signal.signal(signal.SIGALRM, oldHandler)

In [4]:
def readProbs(filename):
    """
    Reads the probabilities from the file under the provided filename.
    Returns the dictionary of state:hitting prob pairs.
    """
    stateProbs = {}
    
    fp = open(filename, 'r')
    data = csv.reader(fp)
    header = next(data)  # skip header
    for row in data:
        # first col is state, second col is prob
        stateProbs[eval(row[0])] = float(row[1])  # convert to correct data type
    
    fp.close()
    
    return stateProbs

# Further analysis

In [28]:
from itertools import product

In [2]:
# When is player 1 better off giving away $1 / $1 to others?
def findBetterGiving1(minWealth, maxWealth, printOnGo=False, t=30, giveToAnother=False):
    """
    Identifies and returns a list of states such that L(x-1,y,z) < L(x,y,z).
    Input:
    - minWealth, maxWealth: integer bounds on each player's amount of money.
    - printOnGo: boolean indicating if we print our progress along the way
    - t: number of time steps if using the approximation method
    - giveToAnother: boolean indicating if the $1 is to be given to another player
    """
    
    allStates = product(range(minWealth, maxWealth+1), repeat=NUM_PLAYERS)
    hittingProbs = {(x,y,z):Q(x,y,z,t) for (x,y,z) in allStates}
    betterGiving1 = {}  # states where player 1 is better off giving away $1
    given1 = {}  # states for comparison
    
    for x in range(minWealth+1, maxWealth+1):
        for y in range(minWealth, maxWealth+1):
            for z in range(minWealth, maxWealth+1):
                sSmall = standardiseState((x-1,y+1,z)) if giveToAnother else standardiseState((x-1,y,z))
                sLarge = standardiseState((x,y,z))
                
                if sSmall in hittingProbs and sLarge in hittingProbs:
                    pSmall = hittingProbs[sSmall]
                    pLarge = hittingProbs[sLarge]
                    
                    if pSmall < pLarge:
                        given1[sSmall] = pSmall
                        betterGiving1[sLarge] = pLarge
                        
                    if printOnGo:
                        print(f"{sSmall}: {pSmall}")
                        print(f"{sLarge}: {pLarge}")

    return betterGiving1, given1

In [20]:
def readApproxBetterGiving1Probs(filename):
    """
    Reads the estimated states (using approximate probabilities) where player 1 is better off giving saway $1,
    from the file under the provided filename.
    Returns two dictionaries of state:hitting prob pairs:
    1) betterGiving1: states where player 1 is better off giving saway $1
    2) given1: states for comparison, where player 1 has already given away $1
    """
    
    stateProbs = readProbs(filename)
    states = list(stateProbs.keys())
    n = len(stateProbs)
    betterGiving1 = {}
    given1 = {}
    
    for i in range(0,n-1,2):
        sSmall = states[i]
        sLarge = states[i+1]
        given1[sSmall] = stateProbs[sSmall]
        betterGiving1[sLarge] = stateProbs[sLarge]

    return betterGiving1, given1

In [21]:
# Apply stricter filter to a narrower search space

def stricterFilter(candidates, t=40, printOnGo=False, 
                   filename=None, continueFrom=None, giveToAnother=False):
    """
    Applies a stricter filter to search for states where player 1 is better off giving saway $1, 
    from the provided candidate states.
    - candidates: dictionary of state:prob pairs, representing the search space of states.
    - t: number of time steps if using the approximation method.
    - printOnGo: if True, prints the progress as it runs; silent otherwise.
    - filename: filename (string) of the file to export the selected states to.
    - continueFrom: state to begin writing from; allows to continue from where the program left off last time.
    - giveToAnother: boolean indicating if the $1 is to be given to another player.
    """
    
    betterGiving1 = {}
    given1 = {}
    
    states = list(candidates.keys())
    if continueFrom and continueFrom in states and continueFrom != states[-1]:
        states = states[states.index(continueFrom)+1:]
    
    for i in range(len(states)):
        # Set up signal handler for SIGALRM, saving previous value
        oldHandler = signal.signal(signal.SIGALRM, sigalarmHandler)
        # Start timer
        signal.alarm(t)
        
        x, y, z = states[i]
        sLarge = (x, y, z)
        sSmall = (x-1, y, z)
        if giveToAnother:
            sSmall = (x-1, y+1, z)
        
        try:
            pLarge = LoserAnalysis(sLarge).getHittingProb()  # exact probs
            pSmall = LoserAnalysis(sSmall).getHittingProb()
                
        except (TimeoutException, RecursionError):
            print(f'Approximating hitting prob for {sLarge}')
            # use approx probs
            pLarge = LoserAnalysis(sLarge).getHittingProb(approx=True, t=t)
            pSmall = LoserAnalysis(sSmall).getHittingProb(approx=True, t=t)
            
        finally:
            if pLarge > pSmall:
                given1[sSmall] = pSmall
                betterGiving1[sLarge] = pLarge
                exportStateProbs(filename, {sSmall: pSmall, sLarge: pLarge}, append=True, header=False)
                
            if printOnGo:
                print(f"{sLarge}: {pLarge}")
                print(f"{sSmall}: {pSmall}\n")
            
            # Turn off timer
            signal.alarm(0)
            # Restore handler to previous value
            signal.signal(signal.SIGALRM, oldHandler)
        
    
    return betterGiving1, given1