In [1]:
from battlehack20 import Game
from battlehack20.engine.container.instrument import Instrument
from RestrictedPython import compile_restricted
import dis
import math
from types import CodeType
import faulthandler
from types import SimpleNamespace

In [2]:
faulthandler.enable()

In [80]:
examplefuncs = """
import random


# This is an example bot written by the developers!
# Use this to help write your own code, or run it against your bot to see how well you can do!

DEBUG = 1
def dlog(str):
    if DEBUG > 0:
        log(str)


def check_space_wrapper(r, c, board_size):
    # check space, except doesn't hit you with game errors
    if r < 0 or c < 0 or c >= board_size or r >= board_size:
        return False
    try:
        return check_space(r, c)
    except:
        return None

def turn():
    dlog('Starting Turn!')
    board_size = get_board_size()

    team = get_team()
    opp_team = Team.WHITE if team == Team.BLACK else team.BLACK
    dlog('Team: ' + str(team))

    robottype = get_type()
    dlog('Type: ' + str(robottype))

    if robottype == RobotType.PAWN:
        row, col = get_location()
        dlog('My location is: ' + str(row) + ' ' + str(col))

        if team == Team.WHITE:
            forward = 1
        else:
            forward = -1

        # try catpuring pieces
        if check_space_wrapper(row + forward, col + 1, board_size) == opp_team: # up and right
            capture(row + forward, col + 1)
            dlog('Captured at: (' + str(row + forward) + ', ' + str(col + 1) + ')')
            # import pdb
            # pdb.set_trace()

        elif check_space_wrapper(row + forward, col - 1, board_size) == opp_team: # up and left
            capture(row + forward, col - 1)
            dlog('Captured at: (' + str(row + forward) + ', ' + str(col - 1) + ')')

        # otherwise try to move forward
        elif row + forward != -1 and row + forward != board_size and not check_space_wrapper(row + forward, col, board_size):
            #               ^  not off the board    ^            and    ^ directly forward is empty
            move_forward()
            dlog('Moved forward!')

        # confusion = "you need a line here to avoid segfault. we aren't sure why but are working on it"
        # ^ I think this is related to the potential ambiguity of what the following else is referring to?

    else:
        if team == Team.WHITE:
            index = 0
        else:
            index = board_size - 1

        for _ in range(board_size):
            i = random.randint(0, board_size - 1)
            if not check_space(index, i):
                spawn(index, i)
                dlog('Spawned unit at: (' + str(index) + ', ' + str(i) + ')')
                break

    bytecode = get_bytecode()
    dlog('Done! Bytecode left: ' + str(bytecode))

"""

In [81]:
examplefuncsturn = """
dlog('Starting Turn!')
board_size = get_board_size()

team = get_team()
opp_team = Team.WHITE if team == Team.BLACK else team.BLACK
dlog('Team: ' + str(team))

robottype = get_type()
dlog('Type: ' + str(robottype))

if robottype == RobotType.PAWN:
    row, col = get_location()
    dlog('My location is: ' + str(row) + ' ' + str(col))

    if team == Team.WHITE:
        forward = 1
    else:
        forward = -1

    # try catpuring pieces
    if check_space_wrapper(row + forward, col + 1, board_size) == opp_team: # up and right
        capture(row + forward, col + 1)
        dlog('Captured at: (' + str(row + forward) + ', ' + str(col + 1) + ')')

    elif check_space_wrapper(row + forward, col - 1, board_size) == opp_team: # up and left
        capture(row + forward, col - 1)
        dlog('Captured at: (' + str(row + forward) + ', ' + str(col - 1) + ')')

    # otherwise try to move forward
    elif row + forward != -1 and row + forward != board_size and not check_space_wrapper(row + forward, col, board_size):
        #               ^  not off the board    ^            and    ^ directly forward is empty
        move_forward()
        dlog('Moved forward!')

    # confusion = "you need a line here to avoid segfault. we aren't sure why but are working on it"
    # ^ I think this is related to the potential ambiguity of what the following else is referring to?

else:
    if team == Team.WHITE:
        index = 0
    else:
        index = board_size - 1

    for _ in range(board_size):
        i = random.randint(0, board_size - 1)
        if not check_space(index, i):
            spawn(index, i)
            dlog('Spawned unit at: (' + str(index) + ', ' + str(i) + ')')
            break

bytecode = get_bytecode()
dlog('Done! Bytecode left: ' + str(bytecode))
"""

In [82]:

class Instruction(SimpleNamespace):
    def __init__(self, instruction, in_dict=None):
        if in_dict is not None:
            super().__init__(**in_dict)
        else:
            super().__init__(**{a:b for a,b in zip(dis.Instruction._fields+('jump_to', 'was_there', 'extra_extended_args'), instruction + (None, True, 0))})

    def is_jumper(self):
        return self.is_abs_jumper() or self.is_rel_jumper()

    def is_rel_jumper(self):
        return self.opcode in dis.hasjrel

    def is_abs_jumper(self):
        return self.opcode in dis.hasjabs

    @classmethod
    def ExtendedArgs(self, value):
        return Instruction(None, in_dict={
            'opcode':144, 'opname':'EXTENDED_ARGS', 'arg':value,
            'argval':value, 'argrepr':value, 'offset':None,
            'starts_line':None, 'is_jump_target':False, 'was_there': False,
            'extra_extended_args': 0,
        })

    def calculate_offset(self, instructions):
        # Return the offset (rel or abs) to self.jump_to in instructions
        target_loc = 2 * instructions.index(self.jump_to) - 2 * self.jump_to.extra_extended_args

        if self.is_abs_jumper():
            return target_loc

        self_loc = 2 * instructions.index(self)

        return target_loc - self_loc - 2


In [83]:
def instrument(bytecode):
    """
    The primary method of instrumenting code, which involves injecting a bytecode counter between every instruction to be executed

    :param bytecode: a code object, the bytecode submitted by the player
    :return: a new code object that has been injected with our bytecode counter
    """

    # Ensure all code constants (e.g. list comprehensions) are also instrumented.
    new_consts = []
    for i, constant in enumerate(bytecode.co_consts):
        if type(constant) == CodeType:
            new_consts.append(instrument(constant))
        else:
            new_consts.append(constant)
    new_consts = tuple(new_consts)

    instructions = list(dis.get_instructions(bytecode))

    function_name_index = len(bytecode.co_names)  # we will be inserting our __instrument__ call at the end of co_names

    # the injection, which consists of a function call to an __instrument__ method which increments bytecode
    # these three instructions will be inserted between every line of instrumented code
    injection = [
        dis.Instruction(opcode=116, opname='LOAD_GLOBAL', arg=function_name_index%256, argval='__instrument__', argrepr='__instrument__', offset=None, starts_line=None, is_jump_target=False),
        dis.Instruction(opcode=131, opname='CALL_FUNCTION', arg=0, argval=0, argrepr=0, offset=None, starts_line=None, is_jump_target=False),
        dis.Instruction(opcode=1, opname='POP_TOP', arg=None, argval=None, argrepr=None, offset=None, starts_line=None, is_jump_target=False)
    ]
    #extends the opargs so that it can store the index of __instrument__
    while function_name_index > 255: #(255 = 2^8 -1 = 1 oparg)
        function_name_index >>= 8
        injection = [
            dis.Instruction(
                opcode=144,
                opname='EXTENDED_ARGS',
                arg=function_name_index%256,
                argval=function_name_index%256,
                argrepr=function_name_index%256,
                offset=None,
                starts_line=None,
                is_jump_target=False
            )
        ] + injection

    # For maintenance we add an empty jump_to field to each instruction
    for i, instruction in enumerate(instructions):
        instructions[i] = Instruction(instruction)

    # Next, we cache a reference to the jumpers to each jump target in the targets
    for i, instruction in enumerate(instructions):
        # We're only looking for jumpers
        if not instruction.is_jumper():
            continue

        target = [t for t in instructions if instruction.argval == t.offset][0]
        instruction.jump_to = target

        # If any targets jump to themselves, that's not kosher.
        if instruction == target:
            raise SyntaxError('No self-referential loops.')

    unsafe = {110, 113, 114, 115, 116, 120, 124, 125, 131, 93, 110, 120, 121, 122, 143, 154, 111, 112, 113, 114, 115, 119}  # bytecode ops that break the instrument

    # We then inject the injection before every call, except for those following an EXTENDED_ARGS.
    cur_index = -1
    for (cur, last) in zip(instructions[:], [None]+instructions[:-1]):
        cur_index += 1
        if last is not None and last.opcode == 144: #EXTEND_ARG
            continue

        if last is not None and last.opcode in unsafe:
            continue

        for j, inject in enumerate(injection):
            injected_instruction = Instruction(inject)
            injected_instruction.was_there = False # keeping track of the instructions added by us
            instructions.insert(cur_index + j, injected_instruction)
        cur_index += len(injection)


    # Iterate through instructions. If it's a jumper, calculate the new correct offset. For each new offset, if it
    # is too large to fit in the current number of EXTENDED_ARGS, inject a new EXTENDED_ARG before it. If you never
    # insert a new EXTENDED_ARGS, break out of the loop.
    fixed = False
    while not fixed:
        fixed = True
        
        print('not fixed!')

        i = 0
        for instruction in instructions[:]:
            instruction.offset = 2 * i

            if not instruction.is_jumper():
                i += 1
                continue

            correct_offset = instruction.calculate_offset(instructions)
            instruction.arg = correct_offset % 256
            correct_offset >>= 8
            print('fwt')
            extended_args = 0
            while correct_offset > 0:
                # Check if there is already an EXTENDED_ARGS behind
                if i > extended_args and instructions[i - extended_args - 1].opcode == 144:
                    instructions[i - extended_args - 1].arg = correct_offset % 256
                    print('hheekkkoo')

                # Otherwise, insert a new one
                else:
                    instructions.insert(i, Instruction.ExtendedArgs(correct_offset % 256))
                    instruction.extra_extended_args += 1
                    print('helo')
                    i += 1
                    fixed = False

                correct_offset >>= 8
                extended_args += 1
            i += 1
    #Maintaining correct line info ( traceback bug fix)
    #co_lnotab stores line information in Byte form
    # It stores alterantively, the number of instructions to the next increase in line number and
    # the increase in line number then
    #We need to ensure that these are bytes (You might want to break an increase into two see the article or code below)
    #The code did not update these bytes, we need to update the number of instructions before the beginning of each line
    #It should be similar to the way the jump to statement were fixed, I tried to mimick them but failed, I feel like I do not inderstand instruction.py
    # I am overestimating the number of instructions before the start of the line in this fix
    # you might find the end of this article helpful: https://towardsdatascience.com/understanding-python-bytecode-e7edaae8734d
    old_lnotab = {} #stores the old right info in a more usefull way (maps instruction num to line num)
    i = 0
    line_num = 0 #maintains line number by adding differences
    instruction_num = 0 #maintains the instruction num by addind differences
    while 2*i < len(bytecode.co_lnotab):
        instruction_num += bytecode.co_lnotab[2 * i]
        line_num += bytecode.co_lnotab[2 * i + 1]
        old_lnotab[instruction_num] = line_num
        i += 1
    #Construct a map from old instruction numbers, to new ones.
    num_injected = 0
    instruction_index = 0
    old_to_new_instruction_num = {}
    for instruction in instructions:
        if instruction.was_there:
            old_to_new_instruction_num[2 * (instruction_index - num_injected)] = 2 * instruction_index
        instruction_index += 1
        if not instruction.was_there:
            num_injected += 1
    new_lnotab = {}
    for key in old_lnotab:
        new_lnotab[old_to_new_instruction_num[key]] = old_lnotab[key]

    #Creating a differences list of integers, while ensuring integers in it are bytes
    pairs = sorted(new_lnotab.items())
    new_lnotab = []
    previous_pair = (0, 0)
    for pair in pairs:
        num_instructions = pair[0] - previous_pair[0]
        num_lines = pair[1] - previous_pair[1]
        while num_instructions > 127:
            new_lnotab.append(127)
            new_lnotab.append(0)
            num_instructions -= 127
        new_lnotab.append(num_instructions)
        while num_lines > 127:
            new_lnotab.append(127)
            new_lnotab.append(0)
            num_lines -= 127
        new_lnotab.append(num_lines)
        previous_pair = pair
    #tranfer to bytes and we are good :)
    new_lnotab = bytes(new_lnotab)

    # Finally, we repackage up our instructions into a byte string and use it to build a new code object
    byte_array = [[inst.opcode, 0 if inst.arg is None else inst.arg % 256] for inst in instructions]
    new_code = bytes(sum(byte_array, []))

    print('wtf')
    
    # Make sure our code can locate the __instrument__ call
    new_names = tuple(bytecode.co_names) + ('__instrument__', )

    return Instrument.build_code(bytecode, new_code, new_names, new_consts, new_lnotab)

In [84]:
exfuncs = compile_restricted(examplefuncs, 'bot.py', 'exec')

In [85]:
exfuncs = instrument(exfuncs)

not fixed!
fwt
wtf
not fixed!
fwt
fwt
fwt
fwt
fwt
wtf
not fixed!
fwt
fwt
fwt
hheekkkoo
fwt
helo
fwt
fwt
hheekkkoo
fwt
helo
fwt
hheekkkoo
fwt
fwt
hheekkkoo
fwt
hheekkkoo
fwt
hheekkkoo
fwt
helo
fwt
hheekkkoo
fwt
fwt
fwt
fwt
hheekkkoo
fwt
hheekkkoo
not fixed!
fwt
fwt
fwt
hheekkkoo
fwt
hheekkkoo
fwt
fwt
hheekkkoo
fwt
hheekkkoo
fwt
hheekkkoo
fwt
fwt
hheekkkoo
fwt
hheekkkoo
fwt
hheekkkoo
fwt
hheekkkoo
fwt
hheekkkoo
fwt
fwt
fwt
fwt
hheekkkoo
fwt
hheekkkoo
wtf
not fixed!
wtf


In [86]:
code = {'bot': exfuncs}

In [87]:
game = Game([code,code],debug=True)

[32m[Game info] Seed: 1337[0m


In [99]:
game.turn()

[32m[Game info] Turn 12[0m
[32m[Game info] Queue: {6: <ROBOT 6 WHITE>, 7: <ROBOT 7 BLACK>, 8: <ROBOT 8 BLACK>, 9: <ROBOT 9 WHITE>, 10: <ROBOT 10 WHITE>, 11: <ROBOT 11 BLACK>, 12: <ROBOT 12 BLACK>, 13: <ROBOT 13 WHITE>, 14: <ROBOT 14 WHITE>, 15: <ROBOT 15 BLACK>, 16: <ROBOT 16 BLACK>, 17: <ROBOT 17 WHITE>, 18: <ROBOT 18 WHITE>, 19: <ROBOT 19 BLACK>, 20: <ROBOT 20 BLACK>, 21: <ROBOT 21 WHITE>, 22: <ROBOT 22 WHITE>, 23: <ROBOT 23 BLACK>}[0m
[32m[Game info] Lords: [<ROBOT BLACK HQ BLACK>, <ROBOT WHITE HQ WHITE>][0m
[Robot 6 WHITE log] Starting Turn!
[Robot 6 WHITE log] Team: Team.WHITE
[Robot 6 WHITE log] Type: RobotType.PAWN
[Robot 6 WHITE log] My location is: 8 12
[Robot 6 WHITE log] Done! Bytecode left: 19917
[Robot 7 BLACK log] Starting Turn!
[Robot 7 BLACK log] Team: Team.BLACK
[Robot 7 BLACK log] Type: RobotType.PAWN
[Robot 7 BLACK log] My location is: 7 9
[Robot 7 BLACK log] Moved forward!
[Robot 7 BLACK log] Done! Bytecode left: 19909
[Robot 8 BLACK log] Starting Turn!
[Robot

In [3]:
dis.hasjrel

[93, 110, 120, 121, 122, 143, 154]

In [6]:
dis.hasjabs

[111, 112, 113, 114, 115, 119]

In [5]:
s = """
if x == 1:
    if y == 1:
        print(x)
    else:
        print(y)
    x = 1
else:
    print(2)
"""

In [9]:
s2 = """
if x == 1:
    if y == 1:
        print(x)
    elif x == 1:
        print(z)
    else:
        print(y)
else:
    print(2)
"""

In [10]:
dis.dis(s)

  2           0 LOAD_NAME                0 (x)
              2 LOAD_CONST               0 (1)
              4 COMPARE_OP               2 (==)
              6 POP_JUMP_IF_FALSE       40

  3           8 LOAD_NAME                1 (y)
             10 LOAD_CONST               0 (1)
             12 COMPARE_OP               2 (==)
             14 POP_JUMP_IF_FALSE       26

  4          16 LOAD_NAME                2 (print)
             18 LOAD_NAME                0 (x)
             20 CALL_FUNCTION            1
             22 POP_TOP
             24 JUMP_FORWARD             8 (to 34)

  6     >>   26 LOAD_NAME                2 (print)
             28 LOAD_NAME                1 (y)
             30 CALL_FUNCTION            1
             32 POP_TOP

  7     >>   34 LOAD_CONST               0 (1)
             36 STORE_NAME               0 (x)
             38 JUMP_FORWARD             8 (to 48)

  9     >>   40 LOAD_NAME                2 (print)
             42 LOAD_CONST               1 (2)
 

In [11]:
dis.dis(s2)

  2           0 LOAD_NAME                0 (x)
              2 LOAD_CONST               0 (1)
              4 COMPARE_OP               2 (==)
              6 POP_JUMP_IF_FALSE       54

  3           8 LOAD_NAME                1 (y)
             10 LOAD_CONST               0 (1)
             12 COMPARE_OP               2 (==)
             14 POP_JUMP_IF_FALSE       26

  4          16 LOAD_NAME                2 (print)
             18 LOAD_NAME                0 (x)
             20 CALL_FUNCTION            1
             22 POP_TOP
             24 JUMP_ABSOLUTE           62

  5     >>   26 LOAD_NAME                0 (x)
             28 LOAD_CONST               0 (1)
             30 COMPARE_OP               2 (==)
             32 POP_JUMP_IF_FALSE       44

  6          34 LOAD_NAME                2 (print)
             36 LOAD_NAME                3 (z)
             38 CALL_FUNCTION            1
             40 POP_TOP
             42 JUMP_ABSOLUTE           62

  8     >>   44 LOAD_NAM

In [75]:
dis.dis(exfuncs)

  2           0 LOAD_GLOBAL             30 (__instrument__)
              2 CALL_FUNCTION            0
              4 POP_TOP
              6 LOAD_NAME                0 (dlog)
              8 LOAD_GLOBAL             30 (__instrument__)
             10 CALL_FUNCTION            0
             12 POP_TOP
             14 LOAD_CONST               0 ('Starting Turn!')
             16 LOAD_GLOBAL             30 (__instrument__)
             18 CALL_FUNCTION            0
             20 POP_TOP
             22 CALL_FUNCTION            1
             24 POP_TOP
             26 LOAD_GLOBAL             30 (__instrument__)
             28 CALL_FUNCTION            0
             30 POP_TOP

  3          32 LOAD_NAME                1 (get_board_size)
             34 LOAD_GLOBAL             30 (__instrument__)
             36 CALL_FUNCTION            0
             38 POP_TOP
             40 CALL_FUNCTION            0
             42 STORE_NAME               2 (board_size)
             44 LOAD_GLOBA

In [76]:
dis.dis(examplefuncs)

  2           0 LOAD_NAME                0 (dlog)
              2 LOAD_CONST               0 ('Starting Turn!')
              4 CALL_FUNCTION            1
              6 POP_TOP

  3           8 LOAD_NAME                1 (get_board_size)
             10 CALL_FUNCTION            0
             12 STORE_NAME               2 (board_size)

  5          14 LOAD_NAME                3 (get_team)
             16 CALL_FUNCTION            0
             18 STORE_NAME               4 (team)

  6          20 LOAD_NAME                4 (team)
             22 LOAD_NAME                5 (Team)
             24 LOAD_ATTR                6 (BLACK)
             26 COMPARE_OP               2 (==)
             28 POP_JUMP_IF_FALSE       36
             30 LOAD_NAME                5 (Team)
             32 LOAD_ATTR                7 (WHITE)
             34 JUMP_FORWARD             4 (to 40)
        >>   36 LOAD_NAME                4 (team)
             38 LOAD_ATTR                6 (BLACK)
        >>   40 S

In [69]:
a = list(dis.get_instructions(examplefuncs))

In [70]:
for ix,i in enumerate(a[:]):
    a[ix] = Instruction(i)

In [71]:
iss = [i for i in a if i.opcode == 143]

In [72]:
iss

[]

In [73]:
dis.hasjrel

[93, 110, 120, 121, 122, 143, 154]

In [74]:
a[0].opcode

101