In [1]:
import pandas as pd # i/o is through dataframes/csvs
import numpy as np # array objects used to store data for fast lookups and writes

# Core objects

In [19]:
class Team:
    def __init__(self, name, known):
        self.name = name
        self.known = known
        self.r7_est = None
        self.r7_set_depth = -10000
        self.curr_est = None
        self.possible_scores = [True, True, True, True] # if they can 0, 1, 2, or 3
            # Note when score is set, possibilities aren't changed
            # So that when a score is reversed, the possibilities don't need to be re-inferred
        self.score_elim_depth = [-10000] * 4 # set to depth at which a possibility was ruled out
        self.r7_room = None
        self.r7_opponents = []
        self.r8_room = None
        self.r8_opponents = []

    def __repr__(self):
        return (
            f"{self.name}: "
            + f"known {self.known}, r7 est {self.r7_est}, total {self.curr_est}; "
            + f"possibilities {''.join([str(i) for i in range(4) if self.possible_scores[i]])}"
        )

    def rule_out_possibility(self, possibility, depth):
        """
        Say that something is illegal
        Works so you can rule out something that's already been ruled out
        and it won't overwrite the data badly, will keep the better depth
        TODO: add in interaction with the task stack
        """
        # print("  " * depth + f"ruling out possibility {possibility} for {self.name}")
        if self.possible_scores[possibility]:
            self.possible_scores[possibility] = False
            self.score_elim_depth[possibility] = depth

    def set_r7_est(self, est, depth):
        """
        Sets estimate for r7 performance, and records depth at which that happened
        and marks others as being impossible, also recording depth
        TODO: add in interaction with task stack
        """
        # print("  " * depth + f"setting estimate {est} for {self.name}")
        self.r7_est = est
        self.r7_set_depth = depth
        self.curr_est = self.known + self.r7_est
        # for i in range(4):
        #     # Mark the other ones as impossible
        #     if self.possible_scores[i] != est:
        #         self.rule_out_possibility(i, depth)
        # Update the various globals, what a nuisance! Will have to make this cleaner
        # And note, we append to BACK_TASKS because it's simpler to do here than later
        global TEAMS_UNALLOCATED, ON_K, ON_K_PULLED_UP, PULLUPS_FACING_K, BACK_TASKS
        TEAMS_UNALLOCATED -= 1
        ON_K[self.curr_est] += 1
        if ON_K[self.curr_est] == 3:
            BACK_TASKS.append(EstablishBarrier(self.curr_est, R8ROOMS))
        r8_opp_scores = [opp.curr_est for opp in self.r8_opponents if opp.curr_est is not None]
        if len(r8_opp_scores) > 0:
            opp_max = max(r8_opp_scores)
            # First, update ON_K_PULLED_UP
            if opp_max > self.curr_est: # Case a: this team is a pullup
                ON_K_PULLED_UP[self.curr_est] += 1
                if ON_K_PULLED_UP[self.curr_est] == 3:
                    BACK_TASKS.append(EstablishBarrier(self.curr_est + 0.5, R8ROOMS))
            elif opp_max < self.curr_est: # Case b: other teams are now pullups
                # Note the teams that aren't opp_max are ALREADY pullups
                prev_num = ON_K_PULLED_UP[opp_max]
                ON_K_PULLED_UP[opp_max] += len([s for s in r8_opp_scores if s == opp_max])
                if prev_num < 3 and ON_K_PULLED_UP[opp_max] >= 3:
                    BACK_TASKS.append(EstablishBarrier(opp_max + 0.5, R8ROOMS))
            # Next, update PULLUPS_FACING_K
            for opp_score in r8_opp_scores:
                if opp_score > self.curr_est:
                    PULLUPS_FACING_K[opp_score] += 1
                    if PULLUPS_FACING_K[opp_score] == 3:
                        BACK_TASKS.append(EstablishBarrier(opp_score - 0.5, R8ROOMS))
                elif opp_score < self.curr_est:
                    PULLUPS_FACING_K[self.curr_est] += 1
                    if PULLUPS_FACING_K[self.curr_est] == 3:
                        BACK_TASKS.append(EstablishBarrier(self.curr_est - 0.5, R8ROOMS))

    def repermit_possibility(self, possibility):
        """
        Mistakenly ruled out a possibility on the basis of a guess, or inference on that guess
        Now want to allow that possibility again
        """
        self.possible_scores[possibility] = True
        self.score_elim_depth[possibility] = -10000

    def unset_r7_est(self):
        """
        After a wrong guess, or an inference made from a wrong guess, want to unset
        """
        old_score = self.curr_est
        self.r7_est = None
        self.r7_set_depth = -10000
        self.curr_est = None
        # MAYBE update possibilities... (later note to self: why?)
        # Update the globals... this gets complicated...
        global TEAMS_UNALLOCATED, ON_K, ON_K_PULLED_UP, PULLUPS_FACING_K
        TEAMS_UNALLOCATED += 1
        ON_K[old_score] -= 1
        r8_opp_scores = [opp.curr_est for opp in self.r8_opponents if opp.curr_est is not None]
        if len(r8_opp_scores) > 0:
            opp_max = max(r8_opp_scores)
            # Again, first do ON_K_PULLED_UP
            if opp_max > old_score:
                ON_K_PULLED_UP[old_score] -= 1
            elif opp_max < old_score:
                ON_K_PULLED_UP[opp_max] -= len([s for s in r8_opp_scores if s == opp_max])
            # Again, we next do PULLUPS_FACING_K
            for opp_score in r8_opp_scores:
                if opp_score > old_score:
                    PULLUPS_FACING_K[opp_score] -= 1
                elif opp_score < old_score:
                    PULLUPS_FACING_K[old_score] -= 1

In [3]:
class R7Room:
    """
    This is actually just the whole object
    Might add some Belgian whistles to make the program run faster
    """
    def __init__(self, teams):
        self.teams = teams

In [4]:
class R8Room:
    def __init__(self, teams):
        self.teams = teams

# Initialisation

In [5]:
def initialise(year):
    """
    Initialises a bunch of globals, and returns None
    """
    # Create teams
    standings = pd.read_csv(f"data/{YEAR}/standings.csv", dtype={"team": str}, index_col="team")
    for idx, row in standings.iterrows():
        name = idx.encode("ascii", "ignore").decode("ascii") # delete non-ascii chars
        temp_team = Team(idx, row["points"])
        TEAMS[idx] = temp_team
    global TEAMS_UNALLOCATED
    TEAMS_UNALLOCATED = len(TEAMS)

    # Bring in r7 data
    r7_draw = pd.read_csv(f"data/{YEAR}/r7_draw.csv", dtype=str)
    for idx, row in r7_draw.iterrows():
        # Make rooms, and ensure teams linked to them
        teams = [
            TEAMS[row[p].encode("ascii", "ignore").decode("ascii")] 
            for p in ["og", "oo", "cg", "co"]
        ]
        temp_r7_room = R7Room(teams)
        R7ROOMS.append(temp_r7_room)
        for team in teams:
            team.r7_room = temp_r7_room
    for team in TEAMS.values():
        # Teams that skip r7 get 0 points
        if team.r7_room is None:
            team.set_r7_est(0, 0)
        # Add in opponents
        else:
            for room_team in team.r7_room.teams:
                if room_team is not team:
                    team.r7_opponents.append(room_team)

    # Bring in r8 data
    r8_draw = pd.read_csv(f"data/{YEAR}/r8_draw.csv", dtype=str)
    for idx, row in r8_draw.iterrows():
        # Make rooms, and ensure teams linked to them
        teams = [
            TEAMS[row[p].encode("ascii", "ignore").decode("ascii")] 
            for p in ["og", "oo", "cg", "co"]
        ]
        temp_r8_room = R8Room(teams)
        R8ROOMS.append(temp_r8_room)
        for team in teams:
            team.r8_room = temp_r8_room
    for team in TEAMS.values():
        if team.r8_room is not None:
            for room_team in team.r8_room.teams:
                if room_team is not team:
                    team.r8_opponents.append(room_team)

# Micro level solver logic

In [6]:
class Task:
    """
    Shell object used for inheritance purposes
    This is just so you can see the structure
    """
    def __init__(self):
        pass

    def execute(self, depth):
        """
        Will do a few things:
        1. Look to do a specific deduction somewhere in the dataset
        2. If it makes changes, those changes queue up some more Tasks
        3. Returns True if it all went fine, returns False if it runs into a contradiction
        """
        return False

In [7]:
class CheckSingleTeamPossiblities(Task):
    # Rule that is much simpler than the others
    def __init__(self, team):
        self.team = team

    def execute(self, depth):
        """
        For a single team, see if they have any possibilities
        If only one possibility, that must be their result
        If no possibilities, we're at a dead end
        """
        # print("  " * depth + f"check_single_team_possibilities, team {self.team.name}")
        if self.team.r7_est is not None: # i.e., they already have an estimate
            return True
        possibilities = [i for i in range(4) if self.team.possible_scores[i]]
        if len(possibilities) == 1: # Woohoo! We can set it to some value
            self.team.set_r7_est(possibilities[0], depth)
            turn_history_into_tasks("set_r7_score", {"team": self.team, "score": possibilities[0]})
        return len(possibilities) > 0

In [9]:
class CheckPullupPossibility(Task):
    # Just to disqualify cases where there are too many pullups
    def __init__(self):
        pass

    def execute(self, depth):
        return max(ON_K_PULLED_UP) < 4 and max(PULLUPS_FACING_K) < 4

In [10]:
class RuleOutTeamsInSameRoomFromHavingSameScore(Task):
    # i.e., rule 7a
    def __init__(self, team):
        self.team = team
        self.score_set = team.r7_est

    def execute(self, depth):
        """
        To be executed once you have set the r7 estimate for some team
        then go to their room, and make it impossible for other teams to get that result
        """
        # print("  " * depth + f"rule_out_team_from_having_same_score, team {self.team.name}")
        for opponent in self.team.r7_opponents:
            if opponent.possible_scores[self.score_set]:
                opponent.rule_out_possibility(self.score_set, depth)
                if sum(opponent.possible_scores) == 0:
                    return False
                turn_history_into_tasks("deleted_r7_poss", {"team": opponent, "score": self.score_set})
        return True

In [11]:
class CheckR7RoomPossibilities(Task):
    # i.e., rule 7b, plus some extra bits
    def __init__(self, room):
        self.room = room

    def execute(self, depth):
        """
        General idea: for a given r7 room, make sure there is a 3, 2, 1, and a 0
        Specifically:
            1. If no team can get some result, we're at a contradiction
            2. If only one team can get a certain result, that team gets that result
        TODO: account for case where some team cannot get any result
        """
        # print("  " * depth + f"check_r7_room_possibilities, room {[t.name for t in self.room.teams]}")
        # Make a dataframe which has a column per team, row per possibility
        possibility_grid = pd.DataFrame(index=range(4))
        for team in self.room.teams:
            if team.r7_est is None:
                possibility_grid[team.name] = team.possible_scores
            else:
                possibility_grid[team.name] = [i == team.r7_est for i in range(4)]
        # See if any rows have just one 1
        num_possibles = possibility_grid.sum(axis=1)
        if num_possibles.min() == 0:
            # i.e., some team that cannot get any result
            return False
        for hinge in num_possibles[num_possibles == 1].index: 
            # A hinge is a result that only one team could have got
            rel_team = possibility_grid.loc[hinge].idxmax()
            if TEAMS[rel_team].r7_est is not None: # i.e., already have score assigned
                continue
            TEAMS[rel_team].set_r7_est(hinge, depth)
            turn_history_into_tasks("set_r7_score", {"team": TEAMS[rel_team], "score": hinge})
        return True

In [12]:
class EstablishBarrier(Task):
    """
    This is how 8 gets called
    Applies for all of them
        a. If 3 teams on k pulled up, can't have any <=k against >k
            so barrier is k + 0.5
        b. If 3 teams on are pulled up to hit teams on k, can't have any <k against >=k
            so barrier is k - 0.5
        c. If 3 teams on k, can't have any teams on <k against >k
            so barrier is k (using strict inequalities here)
    """

    def __init__(self, barrier, rooms):
        """
        Barrier is a number
        Rooms is an iterable of R8Room objects
            For global call, it's all the rooms
            For local call, it's a one-length list whose sole element is that room
        """
        self.barrier = barrier
        self.rooms = rooms

    def execute(self, depth):
        # Note: might come back later and add some fine-tuning
        # which is if a team that hasn't been allocated a score yet can only have got <=k
        # then no other teams can have got >k
        # and similar for the other direction
        room_str = "ALL" if len(self.rooms) > 1 else str([t.name for t in self.rooms[0].teams])
        # print("  " * depth + f"establish_barrier, barrier {self.barrier}, rooms {room_str}")
        for room in self.rooms:
            totals = [t.curr_est for t in room.teams if t.curr_est is not None]
            if len(totals) == 0: # i.e., nothing to be done here but cause errors
                continue

            # Error case, which is not allowed - would exceed 4 pullups
            # Currently DEPRECATED: already do pullup checking in a different Task
            # if min(totals) < self.barrier and max(totals) > self.barrier:
            #     # print(min(totals), self.barrier, max(totals), self.barrier)
            #     print(ON_K_PULLED_UP, PULLUPS_FACING_K)
            #     return False

            # Next case: a room with a team >barrier -> no one on <barrier
            if max(totals) > self.barrier:
                for team in room.teams:
                    if team.r7_est is not None:
                        continue
                    for i in range(4):
                        if i + team.known < self.barrier and team.possible_scores[i]:
                            team.rule_out_possibility(i, depth)
                            turn_history_into_tasks("deleted_r7_poss", {"team": team, "score": i})
            # Final case: a room with a team <barrier -> no one on >barrier
            if min(totals) < self.barrier:
                for team in room.teams:
                    if team.r7_est is not None:
                        continue
                    for i in range(4):
                        if i + team.known > self.barrier and team.possible_scores[i]:
                            team.rule_out_possibility(i, depth)
                            turn_history_into_tasks("deleted_r7_poss", {"team": team, "score": i})
        return True

In [13]:
def turn_history_into_tasks(action_type, action_details):
    """
    When you make some change, call this function
    Which does the nasty thing of translating an action done
        into a list of actions to take next
    And appends them all to TASK_STACK, which is a global
    Currently only of the form:
        "set_r7_score", {"team": team, "score": score}
        "deleted_r7_poss", {"team": team, "score": score}
    """
    # print("turning history into tasks")
    global TASK_STACK, BACK_TASKS
    if action_type == "set_r7_score":
        TASK_STACK.append(CheckPullupPossibility())
        # Rules 7a and b
        TASK_STACK.append(RuleOutTeamsInSameRoomFromHavingSameScore(action_details["team"]))
        TASK_STACK.append(CheckR7RoomPossibilities(action_details["team"].r7_room))
        
        # Local rule 8 calls (global ones handled elsewhere)
        barriers = (
            [i for i in range(27) if ON_K[i] >= 3]
            + [i + 0.5 for i in range(27) if ON_K_PULLED_UP[i] >= 3]
            + [i - 0.5 for i in range(27) if PULLUPS_FACING_K[i] >= 3]
        ) # Note we only care about the barriers closest to the new score
        lower_barrier = max([b for b in barriers if b < action_details["score"]], default=-1)
        upper_barrier = min([b for b in barriers if b > action_details["score"]], default=100)
        TASK_STACK.append(EstablishBarrier(upper_barrier, [action_details["team"].r8_room]))
        TASK_STACK.append(EstablishBarrier(lower_barrier, [action_details["team"].r8_room]))

        # Global rule 8 calls
        TASK_STACK += BACK_TASKS
        BACK_TASKS = []
    elif action_type == "deleted_r7_poss":
        # 1. Rule 7b (see if there is a score in a r7 room that only one team can get)
        TASK_STACK.append(CheckR7RoomPossibilities(action_details["team"].r7_room))
        # 2. NEW RULE (if a team has only one possibility, it must have that possibility)
        TASK_STACK.append(CheckSingleTeamPossiblities(action_details["team"]))
    # print(f"Task stack length: {len(TASK_STACK)}")

# Macro level solver logic

In [14]:
def clean(depth):
    """
    This runs after something at a given depth has gone wrong and led to absurdity
        e.g., some team cannot get any possible result for r7
    That is the result of the guess at that depth being wrong
    So we need to undo every inference made past that depth
    This goes through and undoes those inferences
    NOTE: cleaning at depth d means everything depth >=d gets deleted
    """
    # All information is stored in the teams, so only need to do cleaning on teams
    # (Global stuff gets updated with each team)
    for team in TEAMS.values():
        if team.r7_set_depth >= depth:
            team.unset_r7_est()
        for i in range(4):
            if team.score_elim_depth[i] >= depth:
                team.repermit_possibility(i)

In [15]:
def get_guess_team():
    """
    Find which team you will make a guess about
    Does this by finding the team with the fewest options, so guesses are targeted
        but I'm not so sure that's the best way to do things
    """
    guess_team = None
    min_options = 10000
    for team in TEAMS.values():
        if team.r7_est is not None:
            continue
        if (num_options := sum(team.possible_scores)) < min_options:
            min_options = num_options
            guess_team = team
        if num_options == 2: # Can't do any better than 2
            break
    return guess_team

In [None]:
def save_to_disk(filename):
    """
    Pretty simple: just opens up a csv and saves there
    """
    with open("results.csv", "a") as f:
        f.write(",\n") 
        f.write(","join([str(t.r7_est) for t in TEAMS.values())

In [22]:
def solve(depth):
    """
    This is the core solver logic
    Idea is it does deductive solving until
        a. It runs into a contradiction, and so returns right there
        b. It finds a solution, and so saves and returns
        c. It runs out of deductions, in which case it makes guesses and recurses
    """
    global TASK_STACK
    while len(TASK_STACK) > 0:
        current_task = TASK_STACK[0]
        went_smoothly = current_task.execute(depth)
        if not went_smoothly:
            clean(depth)
            TASK_STACK = []
            return
        TASK_STACK = TASK_STACK[1:]
        if TEAMS_UNALLOCATED == 0:
            save_to_disk("results.csv")
            TASK_STACK = []
            return
    guess_team = get_guess_team()
    for i in range(4):
        if guess_team.possible_scores[i]:
            TASK_STACK = []
            guess_team.set_r7_est(i, depth + 1)
            turn_history_into_tasks("set_r7_score", {"team": guess_team, "score": i})
            solve(depth + 1)
            clean(depth + 1)

# Run it

In [24]:
# Primary cell for running the show
YEAR = 723

# Main things for storing current info
TEAMS = {} # key is string, being team name, value is Team object
R7ROOMS = []
R8ROOMS = []

# Helpers for running the algo
TASK_STACK = []
BACK_TASKS = []
ON_K_PULLED_UP = np.zeros(28)    # this at [12] being 3 means 3 teams on 12 pulled up
PULLUPS_FACING_K = np.zeros(28)  # this at [12] being 3 means 3 teams pulled up to face 12
ON_K = np.zeros(28)              # this at [12] being 3 means there are 3 teams on 12
TEAMS_UNALLOCATED = -1           # how many teams are yet to have estimate made

initialise(YEAR)


with open("results.csv", "w") as f:
    f.write(",".join([t.name for t in TEAMS.values()]))
solve(1)