diff --git a/.travis.yml b/.travis.yml index 2d4cc58d88..927f6ca199 100644 --- a/.travis.yml +++ b/.travis.yml @@ -42,6 +42,7 @@ script: - docker exec ubuntu bash -c 'cd avatar2/ && nosetests-3.4 ./tests/test_remote_memoryprotocol.py' - docker exec ubuntu bash -c 'cd avatar2/ && nosetests-3.4 ./tests/test_gdbprotocol.py' - docker exec ubuntu bash -c 'cd avatar2/ && nosetests-3.4 ./tests/test_gdbplugin.py' + - docker exec ubuntu bash -c 'cd avatar2/ && nosetests-3.4 ./tests/test_inceptionprotocol.py' - docker exec ubuntu bash -c 'cd avatar2/ && echo Yes | bash ./targets/build_qemu.sh' - docker exec ubuntu bash -c 'cd avatar2/ && AVATAR2_GDB_EXECUTABLE=gdb-multiarch AVATAR2_QEMU_EXECUTABLE=./targets/build/qemu/arm-softmmu/qemu-system-arm nosetests-2.7 ./tests/test_qemutarget.py' diff --git a/avatar2/protocols/inception.py b/avatar2/protocols/inception.py new file mode 100644 index 0000000000..db1fd43aa0 --- /dev/null +++ b/avatar2/protocols/inception.py @@ -0,0 +1,812 @@ +import sys +import logging +import struct +import ctypes + +if sys.version_info < (3, 0): + import Queue as queue +else: + import queue + +from math import ceil +from collections import OrderedDict +from os.path import abspath + +# Python USB package used to interact with Inception-debugger +# Could be replace by the Inception driver +import usb.core +import usb.util + +from avatar2.targets import TargetStates +from avatar2.message import AvatarMessage, UpdateStateMessage, BreakpointHitMessage + + +class InceptionProtocol(object): + """ + This class implements the Inception protocol. + It enables communication with the Inception-debugger hardware. + + :param additional_args: Additional arguments delivered to Inception. + :type additional_args: list + :param device_vendor_id: The usb device vendor id to connect to. + :param device_product_id: The usb device product id to connect to. + """ + + def __init__(self, additional_args=[], + avatar=None, origin=None, + device_vendor_id=0x04b4, device_product_id=0x00f1, + output_directory='/tmp'): + + # USB device information + self._device = None + self._device_vendor_id = device_vendor_id + self._device_product_id = device_product_id + + # pyusb device handler + self._ep_out = None + self._ep_in_response = None + self._ep_in_irq = None + + # internal variables + self._bkpt_limit = 0 + self._bkpt_list = [None] * 1 + self._debug_enabled = False + + self._queue = queue.Queue() if avatar is None \ + else avatar.queue + self._fast_queue = queue.Queue() if avatar is None \ + else avatar.fast_queue + + self._origin = origin + self.log = logging.getLogger('%s.%s' % + (origin.log.name, self.__class__.__name__) + ) if origin else \ + logging.getLogger(self.__class__.__name__) + + def shutdown(self): + """ + Shuts down Inception + + returns: True on success + """ + + usb.util.dispose_resources(self._device) + return True + + def connect(self): + """ + Connects to USB3 Inception-debugger for all subsequent communication + + returns: True on success + """ + + self._device = usb.core.find(idVendor=self._device_vendor_id, + idProduct=self._device_product_id) + + if self._device is None: + self.log.critical('Failed to connect to Inception-debugger') + raise ConnectionRefusedError("Inception-debugger is not connected") + + try: + self._device.set_configuration() + self._device.reset() + except usb.core.USBError as e: + self.log.critical("Could not set configuration: %s" % str(e)) + raise ConnectionRefusedError("Could not set configuration: %s" % str(e)) + + # get an endpoint instance + intf = self._device[0][(0,0)] + + if self._device.is_kernel_driver_active(0): + self._device.detach_kernel_driver(0) + + self._ep_out = usb.util.find_descriptor(intf, bEndpointAddress=0x01) + if self._ep_out is None: + self.log.critical("Inception-debugger is connected but no endpoint 0x01 found") + raise ConnectionRefusedError("Inception-debugger is connected but no endpoint 0x01 found") + + self._ep_in_response = usb.util.find_descriptor(intf, bEndpointAddress=0x81) + if self._ep_in_response is None: + self.log.critical("Inception-debugger is connected but no endpoint 0x81 found") + raise ConnectionRefusedError("Inception-debugger is connected but no endpoint 0x81 found") + + self._ep_in_irq = usb.util.find_descriptor(intf, bEndpointAddress=0x82) + if self._ep_in_irq is None: + self.log.critical("Inception-debugger is connected but no endpoint 0x82 found") + raise ConnectionRefusedError("Inception-debugger is connected but no endpoint 0x82 found") + + self.log.info("Connected to Inception-debugger") + + UpdateStateMessage(self, TargetStates.INITIALIZED) + + return True + + #def is_breakpoint_hitten(self): + # """ + # Check if the processor hit a breakpoint + + # :return: PC or None if no breakpoint is hitten + # """ + + # self.stop() + # pc = self.read_pc() + # for i, j in enumerate(self._bkpt_list): + # if j == pc: + # self.log.info("Breakpoint hit") + # avatar_msg = BreakpointHitMessage(self._origin, bktp_nb, pc) + # self._queue.put(avatar_msg) + # return i, j + # self.cont() + + # return None + + def reset(self): + """ + Resets the target + + returns: True on success + """ + pass + + def cont(self): + """ + Resume the execution of the target + + :returns: True on success + """ + pass + + def stop(self): + """ + Stops the execution of the target + + :returns: True on success + """ + pass + + def step(self): + """ + Steps one instruction + + :returns: True on success + """ + pass + + def wait(self, state): + """ + Wait until CPU reach provided state. + + :state: state of the CPU + """ + pass + + def check_halt(self): + """ + Return if the CPU is halted or running. + + :return: True for halted, False for running + """ + pass + + def read_pc(self): + """ + Read PC register value + The target must be stopped before reading the pc. + + :return: program counter + """ + pass + + def write_register(self, register, value): + """ + Writing a register to the target + + :param register: The name of the register + :param value: The actual value written to the register + :return: True on success + """ + pass + + def read_register(self, register): + """ + Reading a register from the target + + :param register: The name of the register + :return: The actual value read from the register + """ + pass + + def read_memory(self, address, wordsize=4, num_words=1, raw=False): + """ + Read from memory of the target + + :param address: The address to read from + :param wordsize: The size of a read word (1, 2, 4 or 8) + :param words: The amount of words to read (default: 1) + :param raw: Whether the read memory is returned unprocessed + :return: The read memory + """ + pass + + def write_memory(self, address, wordsize, value, num_words=1, raw=False): + """ + Writing to memory of the target + + :param address: The address to write to + :param wordsize: The size of the write (1, 2, 4 or 8) + :param value: The actual value written to memory + :type value: int if num_words == 1 and raw == False + list if num_words > 1 and raw == False + str or byte if raw == True + :param num_words: The amount of words to read + :param raw: Specifies whether to write in raw or word mode + :returns: True on success else False + """ + pass + + def set_breakpoint(self, address, + hardware=True, + temporary=False, + regex=False, + condition=None, + ignore_count=0, + thread=0, + pending=False): + """Inserts a breakpoint + + :param bool hardware: Hardware breakpoint + :param bool tempory: Tempory breakpoint + :param str regex: If set, inserts breakpoints matching the regex + :param str condition: If set, inserts a breakpoint with the condition + :param int ignore_count: Amount of times the bp should be ignored + :param int thread: Threadno in which this breakpoints should be added + :return: True on success + """ + pass + + def remove_breakpoint(self, bkptnb): + """ + Deletes a breakpoint + + :bkptnb: Breakpoint number + :return: True on success + """ + pass + + def set_watchpoint(self, variable, write=True, read=False): + """Inserts a watchpoint + + :param variable: The name of a variable or an address to watch + :param bool write: Write watchpoint + :param bool read: Read watchpoint + :return: True on success + """ + pass + + +class IPCortexM3(InceptionProtocol): + """ + Inception protocol for cortex-m3. + """ + + ''' + ARM documentation usefull for the Cortex M3 System Debug: + * FPB: http://infocenter.arm.com/help/index.jsp?topic=/com.arm.doc.ddi0337e/ch11s04s01.html + * Core debug registers: http://infocenter.arm.com/help/index.jsp?topic=/com.arm.doc.ddi0337e/ch11s04s01.html + ''' + + + def __init__(self, additional_args=[], + avatar=None, origin=None, + device_vendor_id=0x04b4, device_product_id=0x00f1, + output_directory='/tmp'): + + super(IPCortexM3, self).__init__(additional_args, + avatar, origin, device_vendor_id, device_product_id, + output_directory) + + self.regs = OrderedDict() + self.regs.update({ "R0" : 0 }) + self.regs.update({ "R1" : 1 }) + self.regs.update({ "R2" : 2 }) + self.regs.update({ "R3" : 3 }) + self.regs.update({ "R4" : 4 }) + self.regs.update({ "R5" : 5 }) + self.regs.update({ "R6" : 6 }) + self.regs.update({ "R7" : 7 }) + self.regs.update({ "R8" : 8 }) + self.regs.update({ "R9" : 9 }) + self.regs.update({ "R10" : 10 }) + self.regs.update({ "R11" : 11 }) + self.regs.update({ "R12" : 12 }) + self.regs.update({ "SP" : 13 }) + self.regs.update({ "LR" : 14 }) + self.regs.update({ "PC" : 15 }) + self.regs.update({ "CPSR" : 16}) + + def reset(self): + """ + Resets the target + + returns: True on success + """ + + self.log.debug("Resetting target") + + # Reset JTAG + data = '3000000030000000' + self._ep_out.write(bytearray.fromhex(data)) + + # Enable the FlashPatch module : breakpoint + # FlashPatch Control Register (FP_CTRL) + self.write_memory(0xE0002000, 4, 3) + + # Now we need to retrive the number of supported hw bkpt from the core + FP_CTRL = self.read_memory(0xE0002000) + + # bits [7:4] are number of code slots field + self._bkpt_limit = (FP_CTRL >> 4) & 0xF + self.log.debug(("Number of available breakpoints read %d") % (self._bkpt_limit)) + if self._bkpt_limit == 0: + raise Exception("No hardware breakpoint found") + + # Watchpoint are located @ bits [11:8] + #w = (FP_CTRL >> 8) & 0xF + + # Set SYSRESETREG bit at 1 in AIRCR register to request a system reset. + # (system reset of all major components except for debug) + self.write_memory(0xE000ED0C, 4, ((0xFA05 << 16) | (1 << 2))) + + return True + + def cont(self): + """ + Resume the execution of the target + + :returns: True on success + """ + + self.log.debug("Attempted to continue execution on the target.") + + # Set C_HALT bit at 0 in the DHCSR register. + self.write_memory(0xE000EDF0, 4, (0xA05F << 16) | 0b01) + + avatar_msg = UpdateStateMessage(self._origin, TargetStates.RUNNING) + self._fast_queue.put(avatar_msg) + + return True + + def stop(self): + """ + Stops the execution of the target + + :returns: True on success + """ + + self.log.debug("Attempted to stop execution of the target.") + #DHCSR = self.read_memory(0xE000EDF0, 4) + + # Set C_HALT and C_DEBUGEN bits at 1 in DHCSR register + self.write_memory(0xE000EDF0, 4, (0xA05F << 16) | 0b11) + self._debug_enabled = True + + # Check the core is halted + DHCSR = self.read_memory(0xE000EDF0, 4) + if not ((DHCSR >> 1) & 1): + self.log.warning("Core not halted after stop") + # Check the core acknowledges by reading S_HALT bit in DHCSR + if not ((DHCSR >> 17) & 1): + self.log.warning("Core not in debug state after stop") + self._debug_enabled = False + + avatar_msg = UpdateStateMessage(self._origin, TargetStates.STOPPED) + self._fast_queue.put(avatar_msg) + + return True + + def step(self): + """ + Steps one instruction + + :returns: True on success + """ + + self.log.debug("Attempted to step on the target.") + + # Enable Debug mode if not activated + if not self._debug_enabled: + self.write_memory(0xE000EDF0, 4, ((0xA05F << 16) | 0b11)) + self._debug_enabled = True + + # Check the core acknowledges by reading S_HALT bit in DHCSR + DHCSR = self.read_memory(0xE000EDF0, 4) + if not ((DHCSR >> 17) & 1): + self.log.warning("Core is not in debug state before stepping") + + # Execute a step by setting the C_STEP bit to 1 in DHCSR register + self.write_memory(0xE000EDF0, 4, (0xA05F << 16) | 0b101) + + avatar_msg = UpdateStateMessage(self._origin, TargetStates.RUNNING) + self._fast_queue.put(avatar_msg) + + # Check the core is halted + DHCSR = self.read_memory(0xE000EDF0, 4) + if not ((DHCSR >> 1) & 1): + self.log.warning("Core is not halted after single step") + + avatar_msg = UpdateStateMessage(self._origin, TargetStates.STOPPED) + self._fast_queue.put(avatar_msg) + + return True + + def wait(self, state): + """ + Wait until CPU reach provided state. + + :state: state of the CPU + """ + + halt = True if state == TargetStates.STOPPED else False + + self.log.debug("Wait until target reach %s state" % state) + while (self.check_halt() != halt): + pass + + avatar_msg = UpdateStateMessage(self._origin, state) + self._fast_queue.put(avatar_msg) + + def read_register(self, register): + """ + Reading a register from the target + + :param register: The name of the register + :return: The actual value read from the register + """ + + self.log.debug("Read register %s" % register.upper()) + # Before reading the processor register value in DCRDR, + # we must first ask to transfer it via DCRSR + self.write_memory(0xE000EDF4, 4, self.regs[register.upper()]) + + return self.read_memory(0xE000EDF8) + + def write_register(self, register, value): + """ + Writing a register to the target + + :param register: The name of the register + :param value: The actual value written to the register + :return: True on success + """ + + self.log.debug("Write register %s" % register.upper()) + # Write register value in DCRDR + self.write_memory(0xE000EDF8, 4, value) + # Ask DCRSR to transfer it to the processor register + self.write_memory(0xE000EDF4, 4, (self.regs[register.upper()] | (1 << 16)) ) + + return True + + def read_memory(self, address, wordsize=4, num_words=1, raw=False): + """ + Read from memory of the target + + :param address: The address to read from + :param wordsize: The size of a read word (1, 2, 4 or 8) + :param num_words: The amount of words to read (default: 1) + :param raw: Whether the read memory is returned unprocessed + :return: The read memory + """ + + # Note: Inception does not support burst of read more than 4 bytes + + # Command to order a read + command = 0x24000001 + + burst_size = 1 + start_addr = address + + # number of packets to be sent (header + payload) + # 1 packet = 64 bits + nb_packet = ceil((wordsize * num_words) / 4) + nb_packet_init = nb_packet + buf = ctypes.create_string_buffer(8 * burst_size) + if raw: + raw_ret = ctypes.create_string_buffer(nb_packet * 4) + else: + ret = [] + + self.log.debug("Read_memory %s bytes starting from @=%s" % (nb_packet * 4, hex(address))) + + while (nb_packet > 0): + + burst = nb_packet if nb_packet < burst_size else burst_size + nb_packet -= burst_size + + self.log.debug("Sending read for %s bytes from address @=%s" % (burst_size * 4, hex(start_addr))) + # Create a burst + for i in range(burst): + struct.pack_into(">I", buf, (i * 8), command) + struct.pack_into(">I", buf, (i * 8) + 4, start_addr) + start_addr += 4 + + self._ep_out.write(buf) + + # Unpack the bitstream of USB packets received + for i in range(burst): + + response = self._ep_in_response.read(8, 0) + # message is a bitstream of 64bits integer. + # The highest 32bits are the status code + if response[3] != 2: + raise Exception("Debugger returned an error") + + value = struct.unpack_from(">I", response, 4)[0] + self.log.debug("Read value: %s" % hex(value)) + + if raw: + offset = (ceil((wordsize * num_words) / 4) - nb_packet - burst) * 4 + struct.pack_into(">I", raw_ret, offset, value) + else: + ret.append(value) + + if raw: + return raw_ret.raw + elif num_words == 1: + return ret[0] + else: + return ret + + + def write_memory(self, address, wordsize, value, num_words=1, raw=False): + """ + Writing to memory of the target + + :param address: The address to write to + :param wordsize: The size of the write (1, 2, 4 or 8) + :param value: The actual value written to memory + :type value: int if num_words == 1 and raw == False + list if num_words > 1 and raw == False + str or byte if raw == True + :param num_words: The amount of words to read + :param raw: Specifies whether to write in raw or word mode + :returns: True on success else False + """ + + # Command to order a write + command = 0x14000001 + + # Convert value to a list and compute the remainder bytes + value, remainder = self._convert_to_list(wordsize, value, num_words, raw) + + # one packet is 12 bytes: 4 bytes commands, + # 4 bytes addresses and 4 bytes values + nb_packet = len(value) + buf = ctypes.create_string_buffer(12) + struct.pack_into(">I", buf, 0, command) + + self.log.debug("Write_memory %s bytes starting from @=%s" % (nb_packet * 4, hex(address))) + + nb_full_packet = len(value) - 1 if remainder else nb_packet + + for i in range(0, nb_full_packet): + val = value.pop(0) + struct.pack_into(">I", buf, 4, address + i * 4) + struct.pack_into(">I", buf, 8, val) + + self.log.debug("Write %s at address @=%s" % (hex(val), hex(address + i * 4))) + self._ep_out.write(buf) + + # Write the remainder bytes when value is not multiple of 4 bytes + if remainder: + val = value.pop(0) + self._write_remainder(remainder, address, val) + + return True + + def _convert_to_list(self, wordsize, value, num_words, raw=False): + """ + Convert the value to be written into a list of 4 bytes and computes the remainder. + + :param wordsize: The size of the write (1, 2, 4 or 8) + :param value: The actual value written to memory + :type value: int if num_words == 1 and raw == False + list if num_words > 1 and raw == False + str or byte if raw == True + :param num_words: The amount of words to read + :param raw: Specifies whether to write in raw or word mode + :returns: The value in a list of integer and the remainder + """ + + remainder = 0 + + if raw: + if not len(value): + raise ValueError("value has zero length") + data = [] + + for i in range(0, len(value), 4): + data.append(struct.unpack_from(">I", value, i)[0]) + + remainder = len(value) % 4 + if remainder == 1: + data.append(struct.unpack_from(">B", value, len(value))[0]) + if remainder == 2: + data.append(struct.unpack_from(">H", value, len(value))[0]) + + elif num_words == 1: + data = [] + data.append(value) + remainder = wordsize % 4 + + else: + data = value[:] + remainder = (wordsize * num_words) % 4 + + if remainder == 3: + raise ValueError("value must be 1 or multiple of 2") + + return data, remainder + + def _write_remainder(self, remainder, address, value): + """ + When the value to be written is not a multiple of 4 bytes, + we must write the remainder by reading first the previous value. + + :param remainder: the number of remainder bytes to write + :param address: the address where to write + :param value: the value to write + :type value: int + :returns: True on success + """ + + command = 0x14000001 + buf = ctypes.create_string_buffer(12) + + if remainder == 3: + raise ValueError("value must be 1 or multiple of 2") + + read_value = self.read_memory(address, 4, 1, raw=True) + + struct.pack_into(">I", buf, 0, command) + struct.pack_into(">I", buf, 4, address) + + if remainder == 1: + read_value = struct.unpack_from(">BBB", read_value, 0) + #rem_value = struct.unpack_from(">B", struct.pack(">I", value), 0)[0] + rem_value = struct.unpack_from(">B", value.raw, 0)[0] + + struct.pack_into(">B", buf, 8, read_value[0]) + struct.pack_into(">B", buf, 9, read_value[1]) + struct.pack_into(">B", buf, 10, read_value[2]) + struct.pack_into(">B", buf, 11, rem_value) + + if remainder == 2: + read_value = struct.unpack_from(">H", read_value, 0) + #rem_value = struct.unpack_from(">H", struct.pack(">I", value), 0)[0] + rem_value = struct.unpack_from(">H", value, 0)[0] + + struct.pack_into(">H", buf, 8, read_value[0]) + struct.pack_into(">H", buf, 10, rem_value) + + self._ep_out.write(buf) + + return True + + def set_breakpoint(self, address, + hardware=False, + temporary=False, + regex=False, + condition=None, + ignore_count=0, + thread=0, + pending=False): + """Inserts a breakpoint + + :param bool hardware: Hardware breakpoint + :param bool tempory: Tempory breakpoint + :param str regex: If set, inserts breakpoints matching the regex + :param str condition: If set, inserts a breakpoint with the condition + :param int ignore_count: Amount of times the bp should be ignored + :param int thread: Threadno in which this breakpoints should be added + :return: True on success + """ + + #if hardware == False: + # raise Exception("Software breakpoint not implemented") + + # Update bkpt counter and update bkpt register address + indexes = [i for i, j in enumerate(self._bkpt_list) if j == None] + + # If no bkpt are available, raise an exception + if indexes == []: + raise Exception("Breakpoint limitation reaches") + + # FP_CTRL bit must also be set to enable breakpoints + self.write_memory(0xE0002000, 4, 3) + + # Compute a free comparator register address + FPCRegAddress = 0xE0002008 + ( indexes[0] * 4 ) + + #set the flash patch comparator register value (FP_COMPx) + FPCRegValue = 0 + FPCRegValue += 0b11 << 30 # Breakpoint on match on lower halfword + FPCRegValue += address << 2 # Address to compare against + FPCRegValue += 0b11 # Enable the comparator + + self.write_memory(FPCRegAddress, 4, FPCRegValue) + self._bkpt_list[indexes[0]] = address + self.log.info("Breakpoint set") + + return True + + def set_watchpoint(self, variable, write=True, read=False): + """Inserts a watchpoint + Not implemented yet + + :param variable: The name of a variable or an address to watch + :param bool write: Write watchpoint + :param bool read: Read watchpoint + :return: True on success + """ + + self.log.critical("Watchpoint not implemented") + raise Exception("Watchpoint not implemented") + return False + + def remove_breakpoint(self, bkptnb): + """ + Deletes a breakpoint + + :bkptnb: Breakpoint number + :return: True on success + """ + + if self._bkpt_limit < bkptnb : + raise Execption("bkptnb higher than supported breakpoint") + + # Update bkpt counter and update bkpt register address + FPCRegAddress = 0xE0002008 + ( bkptnb * 4 ) + + #set the flash patch comparator register value (FP_COMPx) + FPCRegValue += b'0x00' # Enable the comparator + + write_memory(self, FPCRegAddress, 4, FPCRegValue) + + self._bkpt_list[bkptnb] = None + self.log.info("Breakpoint removed") + + return True + + def check_halt(self): + """ + Return if the CPU is halted or running. + + :return: True for halted, False for running + """ + + halted = False + + # Check the C_HALT bit in the DHCSR register. + DHCSR = self.read_memory(0xE000EDF0) + if ((DHCSR >> 1) & 1): + halted = True + + return halted + + def read_pc(self): + """ + Read PC register value. + The target must be stopped before reading the pc. + + :return: pc + """ + + # Read pc + DCRDR = self.read_register("PC") + #PSR = self.read_register("CPSR") + + # We are always in Thumb mode with Cortex M3 + #return DCRDR - 4 + return DCRDR + diff --git a/avatar2/targets/inception_target.py b/avatar2/targets/inception_target.py new file mode 100644 index 0000000000..323ff73dee --- /dev/null +++ b/avatar2/targets/inception_target.py @@ -0,0 +1,71 @@ +from avatar2.targets import Target, TargetStates +from avatar2.protocols.inception import InceptionProtocol, IPCortexM3 + +from avatar2.watchmen import watch + +class InceptionTarget(Target): + ''' + Inception is a framework to perform security testing of real world + firmware programs. The inception target and protocol allow Avatar2 + to interact with the low latency inception debugger and speed up + read/write operations between the host and the device. + + For more information, please visit: + https://inception-framework.github.io/inception/debugger.html + + Publication: + Inception: System-wide security testing of real-world embedded systems + software + Nassim Corteggiani, Giovanni Camurati, Aurélien Francillon + 27th USENIX Security Symposium (USENIX Security 18), Baltimore, MD + ''' + + def __init__(self, avatar, + processor='cortex-m3', + device_vendor_id=0x04b4, + device_product_id=0x00f1, + **kwargs): + + super(InceptionTarget, self).__init__(avatar, **kwargs) + + self.processor = processor + self._device_product_id = device_product_id + self._device_vendor_id = device_vendor_id + + @watch('TargetInit') + def init(self): + + if self.processor == 'cortex-m3': + inception = IPCortexM3(avatar=self.avatar, origin=self, + device_vendor_id=self._device_vendor_id, + device_product_id=self._device_product_id, + output_directory=self.avatar.output_directory) + else: + inception = None + self.log.warning("Target board not implemented") + raise Exception("Target board not implemented") + + + if inception.connect(): + inception.reset() + self.update_state(TargetStates.RUNNING) + self.log.info("Connected to Target") + else: + self.log.warning("Connecting failed") + raise Exception("Connecting to target failed") + + if inception.stop(): + self.update_state(TargetStates.STOPPED) + + self.protocols.set_all(inception) + self.protocols.monitor = inception + + #self.wait() + + def reset(self): + return self.protocols.execution.reset() + + @watch('TargetWait') + def wait(self, state=TargetStates.STOPPED): + return self.protocols.execution.wait(state) + diff --git a/setup.py b/setup.py index 93d8c00106..37dff896b0 100644 --- a/setup.py +++ b/setup.py @@ -27,7 +27,8 @@ 'enum34;python_version<"3.4"', 'unicorn', 'bitstring', - 'pylink-square', + 'pylink-square', + 'pyusb', ], url='https://github.com/avatartwo/avatar2', description='A Dynamic Multi-Target Orchestration Framework', diff --git a/tests/inception/README.md b/tests/inception/README.md new file mode 100644 index 0000000000..0c2ceabe82 --- /dev/null +++ b/tests/inception/README.md @@ -0,0 +1,35 @@ +# Inception + +[Inception][1] is a framework to perform security testing of real world firmware programs. +It features several components, but here we are interested into its low latency [hardware debugger][2]. + +The inception target and protocol allow Avatar2 to interact with the inception debugger and speed up read/write operations between the host and the device. +To install the debugger, please refer to: https://github.com/Inception-framework/debugger + +In the same folder is present a script `test_inception_hardware_perf.py` that test read/write operations using the [nucleo L152RE example][3]. +Following is the output with the nucleo board: +``` +# python3 test-inception.py +Targets initialized +Targets stopped, start tests for n = 100 +[*] Raw read / writes tests + - Read the full memory + -> On average raw read of 81920 bytes takes 1.99 sec, speed: 40.21 KB/sec + - Write the full memory + -> On average raw write of 81920 bytes takes 1.11 sec, speed: 72.02 KB/sec + - Read and write the full memory + -> On average raw read&write of 81920 bytes takes 3.10 sec, speed: 25.79 KB/sec + - Random read / writes of random size in the ram +[*] Transfer state to dummy target + - Transfer state + -> On average transfer state from nucleo to dum of 81920 bytes takes 2.00 sec, speed: 40.06 KB/sec +[*] Test completed +``` + +# Publication: +1. N. Corteggiani, G. Camurati, A. Francillon. "Inception: System-wide security testing of real-world embedded systems software" 27th USENIX Security Symposium (USENIX Security 18), Baltimore, MD, August 2018. + -[Paper](http://s3.eurecom.fr/docs/usenixsec18_corteggiani.pdf) - [Website](https://inception-framework.github.io/inception/) - [Code](https://github.com/Inception-framework) + +[1]: https://inception-framework.github.io/inception/ +[2]: https://inception-framework.github.io/inception/debugger.html +[3]: https://github.com/avatartwo/avatar2-examples/tree/master/nucleo_l152re diff --git a/tests/inception/test_inception_hardware_perf.py b/tests/inception/test_inception_hardware_perf.py new file mode 100644 index 0000000000..8934d13678 --- /dev/null +++ b/tests/inception/test_inception_hardware_perf.py @@ -0,0 +1,252 @@ +from avatar2 import * + +import sys +import os +import logging +import serial +import time +import argparse +import pyudev + +import struct +import ctypes +from random import randint +# For profiling +import pstats + + +logging.basicConfig(filename='/tmp/inception-tests.log', level=logging.INFO) + + +# **************************************************************************** +def single_step(target, nb_test): + print("[*] Single step target %d times" % nb_test) + + for i in range(nb_test): + pc = target.protocols.execution.read_pc() + print(pc) + target.step() + print('stepped') + next_pc = target.protocols.execution.read_pc() + print(next_pc) + +# **************************************************************************** +def read_full_mem(target, nb_test, raw=True, summary=True): + print(" - Read the full memory") + nb_test = 1 + average_read = 0 + + for i in range(nb_test): + t0 = time.time() + target.read_memory(ram.address, 1, ram.size, raw=raw) + t1 = time.time() + average_read += t1 - t0 + + if summary: + average_read = average_read / nb_test + speed_read = ram.size / average_read / 1024 + print(" -> On average raw read of %s bytes takes %.2f sec, speed: %.2f KB/sec" % (ram.size, average_read, speed_read)) + +# **************************************************************************** +def write_full_mem(target, nb_test, raw=True, summary=True): + print(" - Write the full memory") + nb_test = 1 + average_write = 0 + buf = ctypes.create_string_buffer(ram.size) + for i in range(int(ram.size / 4)): + struct.pack_into(">I", buf, i * 4, randint(0, 0xffffffff)) + + for i in range(nb_test): + t0 = time.time() + target.write_memory(ram.address, 1, buf, raw=raw) + t1 = time.time() + average_write += t1 - t0 + + if summary: + average_write = average_write / nb_test + speed_write = ram.size / average_write / 1024 + print(" -> On average raw write of %s bytes takes %.2f sec, speed: %.2f KB/sec" % (ram.size, average_write, speed_write)) + +# **************************************************************************** +def read_write_full_mem(target, nb_test, raw=True, summary=True): + print(" - Read and write the full memory") + reads = [] + average_read_write = 0 + + for i in range(nb_test): + if raw: + t0 = time.time() + reads.append(target.read_memory(ram.address, 1, ram.size, raw=raw)) + target.write_memory(ram.address, 1, reads[i], raw=raw) + t1 = time.time() + else: + t0 = time.time() + reads.append(target.read_memory(ram.address, 1, ram.size, raw=raw)) + target.write_memory(ram.address, 1, reads[i], len(reads[i]), raw=raw) + t1 = time.time() + + average_read_write += t1 - t0 + + if summary: + average_read_write = average_read_write / nb_test + speed_read_write = ram.size / average_read_write / 1024 + print(" -> On average raw read&write of %s bytes takes %.2f sec, speed: %.2f KB/sec" % (ram.size, average_read_write, speed_read_write)) + + # Verify all reads are identical + for i in range(len(reads) - 1): + assert(reads[i] == reads[i+1]) + #print("[!] Multiple reads produce different values !") + +# **************************************************************************** +def random_read_write(target, nb_test, raw=True): + print(" - Random read / writes of random size in the ram") + for i in range(0, nb_test): + size = randint(0, int(ram.size / 8)) * 8 + #size = 2**4 + + # Reset the board and wait to reach the breakpoint + target.reset() + target.wait() + + if raw: + m1 = ctypes.create_string_buffer(size) + for j in range(int(size / 4)): + struct.pack_into(">I", m1, j * 4, randint(0, 0xFFFFFFFF)) + + target.write_memory(ram.address, 1, m1, raw=True) + m2 = target.read_memory(ram.address, 1, size, raw=True) + + n1, n2 = ([] for i in range(2)) + for j in range(int(size / 4)): + n1.append(struct.unpack_from(">I", m1, j)[0]) + n2.append(struct.unpack_from(">I", m2, j)[0]) + assert(n1 == n2) + #print("i=%s m1: %s m2: %s" % (i, m1.raw, m2)) + #print("[!] Multiple random reads produce different values !") + + else: + m1 = [] + for j in range(int(size / 4)): + m1.append(randint(0, 0xFFFFFFFF)) + + target.write_memory(ram.address, 1, m1, size, raw=False) + m2 = target.read_memory(ram.address, 1, size, raw=False) + + for j in range(int(size / 4)): + assert(m1[j] == m2[j]) + #print("[!] Multiple random reads produce different values !") + #print("i=%s j=%s m1[j]: %s m2[j]: %s" % (i, j, m1[j], m2[j])) + +# **************************************************************************** +def random_4bytes_read_write(target, nb_test): + print(" - Random read / writes of 4 bytes in the ram") + + for i in range(nb_test): + written_word = randint(0, 0xFFFFFFFF) + address = randint(ram.address, ram.address + ram.size - 4) + + target.write_memory(address, 4, written_word, 1, raw=False) + read_word = target.read_memory(address, 4, 1, raw=False) + + assert(written_word == read_word) + +# **************************************************************************** +def read_write_registers(target, nb_test): + print(" - Read / write registers") + + regs = ['R0', 'R1', 'R2', 'R3', 'R4', 'R5', 'R6', 'R7', 'R8', 'R9', 'R10', + 'R11', 'R12', 'SP', 'LR', 'PC', 'CPSR'] + + for i in range(nb_test): + + for j in range(17): + written_reg = randint(0, 0xFFFFFFFF) + saved_reg = target.read_register(regs[j]) + + target.write_register(regs[j], written_reg) + read_reg = target.read_register(regs[j]) + + ''' + if read_reg != written_reg: + print(i) + print(j) + print(hex(read_reg)) + print(hex(written_reg)) + ''' + + target.write_register(regs[j], saved_reg) + + +# **************************************************************************** +def transfer_state(av, target_from, target_to, nb_test, summary=True): + print(" - Transfer state") + average = 0 + + for i in range(nb_test): + + t0 = time.time() + av.transfer_state(target_from, target_to, synced_ranges=[ram]) + t1 = time.time() + average += t1 - t0 + + if summary: + average = average / nb_test + speed = ram.size / average / 1024 + print(" -> On average transfer state from %s to %s of %s bytes takes %.2f sec, speed: %.2f KB/sec" % (target_from.name, target_to.name, ram.size, average, speed)) + + + + +if __name__ == '__main__': + + # Number each test is repeated + n = 2 + + avatar = Avatar(arch=ARMV7M, output_directory='/tmp/inception-tests') + nucleo = avatar.add_target(InceptionTarget, name='nucleo') + dum = avatar.add_target(DummyTarget, name='dum') + #qemu = avatar.add_target(QemuTarget, gdb_port=1236) + + + # Memory mapping of NUCLEO-L152RE + rom = avatar.add_memory_range(0x08000000, 0x1000000, 'rom', + file=firmware) + ram = avatar.add_memory_range(0x20000000, 0x14000, 'ram') + mmio = avatar.add_memory_range(0x40000000, 0x1000000, + forwarded=True, forwarded_to=nucleo) + + ram = avatar.get_memory_range(0x20000000) + + avatar.init_targets() + print("Targets initialized") + + nucleo.reset() + nucleo.cont() + nucleo.stop() + print("Targets stopped, start tests for n = %s" % n) + + print("[*] Raw read / writes tests") + read_full_mem(nucleo, n) + write_full_mem(nucleo, n) + read_write_full_mem(nucleo, n) + random_read_write(nucleo, n) + + print("[*] !raw read / writes tests") + read_full_mem(nucleo, n, raw=False, summary=False) + write_full_mem(nucleo, n, raw=False, summary=False) + read_write_full_mem(nucleo, n, raw=False, summary=False) + random_read_write(nucleo, n, raw=False) + + random_4bytes_read_write(nucleo, 100 * n) + + print("[*] Read / Write registers") + read_write_registers(nucleo, n) + + print("[*] Transfer state to dummy target") + transfer_state(avatar, nucleo, dum, n) + + + #Stop all threads for the profiler + print("[*] Test completed") + avatar.stop() + diff --git a/tests/test_inceptionprotocol.py b/tests/test_inceptionprotocol.py new file mode 100644 index 0000000000..e54992d0d4 --- /dev/null +++ b/tests/test_inceptionprotocol.py @@ -0,0 +1,158 @@ +from avatar2.protocols.inception import IPCortexM3 +#import avatar2 + +import struct +import ctypes + +from nose.tools import * + +SLEEP_TIME = 1 +MEM_ADDR = 0x12345678 +i = None + + + +class FakeCortexM3(): + + def __init__(self): + + # constants + self.write_cmd = 0x14000001 + self.read_cmd = 0x24000001 + self.burst_size = 1 + + # Cortex M3 registers + self.fake_regs = dict() + # FP_CTRL reg + self.fake_regs[0xE0002000] = 0b1001100011 + # DHCSR reg + self.fake_regs[0xE000EDF0] = 0b11 + + self.fake_write_addr = 0x0 + self.fake_write_val = 0x0 + self.fake_read_val = 0x0 + self.read_token = 0 + + def write(self, buf): + + if len(buf) == 8: + h_bits = struct.unpack_from('>I', buf, 0)[0] + l_bits = struct.unpack_from('>I', buf, 4)[0] + + # Command to reset the JTAG + if h_bits == 0x30000000 and l_bits == 0x30000000: + return True + + # Read memory command + elif h_bits == self.read_cmd: + self.read_token += 1 + if l_bits in self.fake_regs: + self.fake_read_val = self.fake_regs[l_bits] + else: + self.fake_read_val = 0xdeadbeef + return True + + else: + return False + + elif len(buf) == 12: + cmd = struct.unpack_from('>I', buf, 0)[0] + addr = struct.unpack_from('>I', buf, 4)[0] + val = struct.unpack_from('>I', buf, 8)[0] + + # Write memory command + if cmd == self.write_cmd: + self.fake_write_addr = addr + self.fake_write_val = val + return True + + return False + + def read(self, size, na): + resp = bytearray(8) + struct.pack_into('>I', resp, 0, 0x00000002) + struct.pack_into('>I', resp, 4, self.fake_read_val) + + self.read_token -= 1 + if self.read_token <= 0: + self.fake_read_val = 0x0 + + return resp + +class FakeIPCortexM3(IPCortexM3): + ''' + Because IPCortexM3 rely on pyusb to communicate with the hardware + debugger, we need to overwrite some functions in order to test reads + and writes. + ''' + + def __init__(self, **kwargs): + super(FakeIPCortexM3, self).__init__(**kwargs) + self._fakecm3 = FakeCortexM3() + + def connect(self): + + self._device = None + self._ep_in_irq = None + + # used to write + self._ep_out = self._fakecm3 + # used to read + self._ep_in_response = self._fakecm3 + + return True + + def shutdown(self): + self._ep_out = None + self._ep_in_response = None + return True + + +# **************************************************************************** + +def setup(): + global i + + i = FakeIPCortexM3() + i.connect() + i.reset() + + +def teardown(): + i.shutdown() + + +@with_setup(setup, teardown) +def test_register_read_and_write(): + + ret = i.write_register('R0', 2020) + assert_equal(ret, True) + + ret = i.read_register('r0') + assert_equal(ret, 0xdeadbeef) + + +@with_setup(setup, teardown) +def test_break_run_and_read_write_mem(): + + ret = i.set_breakpoint(0x8000000) + assert_equal(ret, True) + + ret = i.cont() + assert_equal(ret, True) + + #time.sleep(SLEEP_TIME) + + ret = i.read_memory(MEM_ADDR, 4) + assert_equal(ret, 0xdeadbeef) + + ret = i.write_memory(MEM_ADDR, 4, 0x8badf00d) + assert_equal(ret, True) + assert_equal(i._fakecm3.fake_write_addr, MEM_ADDR) + assert_equal(i._fakecm3.fake_write_val, 0x8badf00d) + + +if __name__ == '__main__': + setup() + test_break_run_and_read_write_mem() + teardown()