Permalink
Browse files

Add Windows support with libusbK

  • Loading branch information...
Qyriad committed May 17, 2018
1 parent 8eaa3d4 commit 62ff4dba45d47990a6d7b96fac5dbb8d6337eabc
Showing with 367 additions and 32 deletions.
  1. +177 −32 fusee-launcher.py
  2. +190 −0 libusbK.py
@@ -12,7 +12,7 @@
# don't like it? suck it up, or find your own damned exploit ^-^
#
# special thanks to:
# SciresM, motezazer -- guidance and support
# ScirèsM, motezazer -- guidance and support
# hedgeberg, andeor -- dumping the Jetson bootROM
# TuxSH -- for IDB notes that were nice to peek at
#
@@ -60,16 +60,18 @@ class HaxBackend:

# USB constants used
STANDARD_REQUEST_DEVICE_TO_HOST_TO_ENDPOINT = 0x82
STANDARD_REQUEST_DEVICE_TO_HOST = 0x80
GET_DESCRIPTOR = 0x6
GET_CONFIGURATION = 0x8

# Interface requests
GET_STATUS = 0x0

# List of OSs this class supports.
SUPPORTED_SYSTEMS = []

def __init__(self, usb_device, skip_checks=False):
def __init__(self, skip_checks=False):
""" Sets up the backend for the given device. """
self.dev = usb_device
self.skip_checks = skip_checks


@@ -101,18 +103,37 @@ def supported(cls, system_override=None):


@classmethod
def create_appropriate_backend(cls, usb_device, system_override=None, skip_checks=False):
def create_appropriate_backend(cls, system_override=None, skip_checks=False):
""" Creates a backend object appropriate for the current OS. """

# Search for a supportive backend, and try to create one.
for subclass in cls.__subclasses__():
if subclass.supported(system_override):
return subclass(usb_device, skip_checks=skip_checks)
return subclass(skip_checks=skip_checks)

# ... if we couldn't, bail out.
raise IOError("No backend to trigger the vulnerability-- it's likely we don't support your OS!")


def read(self, length):
""" Reads data from the RCM protocol endpoint. """
return bytes(self.dev.read(0x81, length, 1000))


def write_single_buffer(self, data):
"""
Writes a single RCM buffer, which should be 0x1000 long.
The last packet may be shorter, and should trigger a ZLP (e.g. not divisible by 512).
If it's not, send a ZLP.
"""
return self.dev.write(0x01, data, 1000)


def find_device(self, vid=None, pid=None):
""" Set and return the device to be used """
self.dev = usb.core.find(idVendor=vid, idProduct=pid)
return self.dev


class MacOSBackend(HaxBackend):
"""
@@ -274,11 +295,144 @@ def _read_num_file(self, path):
raw = f.read()
return int(raw)

class WindowsBackend(HaxBackend):
"""
Use libusbK for most of it, and use the handle libusbK gets for us to call kernel32's DeviceIoControl
"""

BACKEND_NAME = "Windows"
SUPPORTED_SYSTEMS = ["Windows"]

# Windows and libusbK specific constants
WINDOWS_FILE_DEVICE_UNKNOWN = 0x00000022
LIBUSBK_FUNCTION_CODE_GET_STATUS = 0x807
WINDOWS_METHOD_BUFFERED = 0
WINDOWS_FILE_ANY_ACCESS = 0

RAW_REQUEST_STRUCT_SIZE = 24 # 24 is how big the struct is, just trust me
TO_ENDPOINT = 2

# Yoinked (with love) from Windows' CTL_CODE macro
def win_ctrl_code(self, DeviceType, Function, Method, Access):
""" Return a control code for use with DeviceIoControl() """
return ((DeviceType) << 16 | ((Access) << 14) | ((Function)) << 2 | (Method))

def __init__(self, skip_checks):
import libusbK
self.libk = libusbK
# Grab libusbK
self.lib = ctypes.cdll.libusbK


def find_device(self, Vid, Pid):
"""
Windows version of this function
Its return isn't actually significant, but it needs to be not None
"""

# Get a list of devices to use later
device_list = self.libk.KLST_HANDLE()
device_info = ctypes.pointer(self.libk.KLST_DEV_INFO())
ret = self.lib.LstK_Init(ctypes.byref(device_list), 0)

if ret == 0:
raise ctypes.WinError()

# Get info for a device with that vendor ID and product ID
device_info = ctypes.pointer(self.libk.KLST_DEV_INFO())
ret = self.lib.LstK_FindByVidPid(device_list, Vid, Pid, ctypes.byref(device_info))
self.lib.LstK_Free(ctypes.byref(device_list))
if device_info is None or ret == 0:
return None

# Populate function pointers for use with the driver our device uses (which should be libusbK)
self.dev = self.libk.KUSB_DRIVER_API()
ret = self.lib.LibK_LoadDriverAPI(ctypes.byref(self.dev), device_info.contents.DriverID)
if ret == 0:
raise ctypes.WinError()

# Initialize the driver for use with our device
self.handle = self.libk.KUSB_HANDLE(None)
ret = self.dev.Init(ctypes.byref(self.handle), device_info)
if ret == 0:
raise self.libk.WinError()

return self.dev


def read(self, length):
""" Read using libusbK """
# Create the buffer to store what we read
buffer = ctypes.create_string_buffer(length)

len_transferred = ctypes.c_uint(0)

# Call libusbK's ReadPipe using our specially-crafted function pointer and the opaque device handle
ret = self.dev.ReadPipe(self.handle, ctypes.c_ubyte(0x81), ctypes.addressof(buffer), ctypes.c_uint(length), ctypes.byref(len_transferred), None)

if ret == 0:
raise ctypes.WinError()

return buffer.raw

def write_single_buffer(self, data):
""" Write using libusbK """
# Copy construct to a bytearray so we Know™ what type it is
buffer = bytearray(data)

# Convert wrap the data for use with ctypes
cbuffer = (ctypes.c_ubyte * len(buffer))(*buffer)

# FIXME: Implement a Windows backend that talks to a patched version of libusbK
# so we can inject WdfUsbTargetDeviceSendControlTransferSynchronously to
# trigger the exploit.
len_transferred = ctypes.c_uint(0)

# Call libusbK's WritePipe using our specially-crafted function pointer and the opaque device handle
ret = self.dev.WritePipe(self.handle, ctypes.c_ubyte(0x01), cbuffer, len(data), ctypes.byref(len_transferred), None)
if ret == 0:
raise ctypes.WinError()

def ioctl(self, driver_handle: ctypes.c_void_p, ioctl_code: ctypes.c_ulong, input_bytes: ctypes.c_void_p, input_bytes_count: ctypes.c_size_t, output_bytes: ctypes.c_void_p, output_bytes_count: ctypes.c_size_t):
""" Wrapper for DeviceIoControl """
overlapped = self.libk.OVERLAPPED()
ctypes.memset(ctypes.addressof(overlapped), 0, ctypes.sizeof(overlapped))

ret = ctypes.windll.kernel32.DeviceIoControl(driver_handle, ioctl_code, input_bytes, input_bytes_count, output_bytes, output_bytes_count, None, ctypes.byref(overlapped))

# We expect this to error, which matches the others ^_^
if ret == False:
raise ctypes.WinError()

def trigger_vulnerability(self, length):
"""
Go over libusbK's head and get the master handle it's been using internally
and perform a direct DeviceIoControl call to the kernel to skip the length check
"""
# self.handle is KUSB_HANDLE, cast to KUSB_HANDLE_INTERNAL to transparent-ize it
internal = ctypes.cast(self.handle, ctypes.POINTER(self.libk.KUSB_HANDLE_INTERNAL))

# Get the handle libusbK has been secretly using in its ioctl calls this whole time
master_handle = internal.contents.Device.contents.MasterDeviceHandle

if master_handle is None or master_handle == self.libk.INVALID_HANDLE_VALUE:
raise ValueError("Failed to initialize master handle")

# the raw request struct is pretty annoying, so I'm just going to allocate enough memory and set the few fields I need
raw_request = ctypes.create_string_buffer(self.RAW_REQUEST_STRUCT_SIZE)

# set timeout to 1000 ms, timeout offset is 0 (since it's the first member), and it's an unsigned int
timeout_p = ctypes.cast(raw_request, ctypes.POINTER(ctypes.c_uint))
timeout_p.contents = ctypes.c_ulong(1000) # milliseconds

status_p = ctypes.cast(ctypes.byref(raw_request, 4), ctypes.POINTER(self.libk.status_t))
status_p.contents.index = self.GET_STATUS
status_p.contents.recipient = self.TO_ENDPOINT

buffer = ctypes.create_string_buffer(length)

code = self.win_ctrl_code(self.WINDOWS_FILE_DEVICE_UNKNOWN, self.LIBUSBK_FUNCTION_CODE_GET_STATUS, self.WINDOWS_METHOD_BUFFERED, self.WINDOWS_FILE_ANY_ACCESS)
ret = self.ioctl(master_handle, ctypes.c_ulong(code), raw_request, ctypes.c_size_t(24), buffer, ctypes.c_size_t(length))

if ret == False:
raise ctypes.WinError()


class RCMHax:
@@ -287,11 +441,6 @@ class RCMHax:
DEFAULT_VID = 0x0955
DEFAULT_PID = 0x7321

# USB constants used
STANDARD_REQUEST_DEVICE_TO_HOST_TO_DEVICE = 0x80
GET_DESCRIPTOR = 0x6
GET_CONFIGURATION = 0x8

# Exploit specifics
COPY_BUFFER_ADDRESSES = [0x40005000, 0x40009000] # The addresses of the DMA buffers we can trigger a copy _from_.
STACK_END = 0x40010000 # The address just after the end of the device's stack.
@@ -302,32 +451,32 @@ def __init__(self, wait_for_device=False, os_override=None, vid=None, pid=None,
# The first write into the bootROM touches the lowbuffer.
self.current_buffer = 0

# Grab a connection to the USB device itself.
self.dev = self._find_device(vid, pid)

# Keep track of the total amount written.
self.total_written = 0

# Create a vulnerability backend for the given device.
try:
self.backend = HaxBackend.create_appropriate_backend(system_override=os_override, skip_checks=override_checks)
except IOError:
print("It doesn't look like we support your OS, currently. Sorry about that!\n")
sys.exit(-1)

# Grab a connection to the USB device itself.
self.dev = self._find_device(vid, pid)

# If we don't have a device...
if self.dev is None:

# ... and we're allowed to wait for one, wait indefinitely for one to appear...
if wait_for_device:
print("Waiting for a TegraRCM device to come online...")
while self.dev is None:
self.dev = self._find_device()
self.dev = self._find_device(vid, pid)

# ... or bail out.
else:
raise IOError("No TegraRCM device found?")

# Create a vulnerability backend for the given device.
try:
self.backend = HaxBackend.create_appropriate_backend(self.dev, system_override=os_override, skip_checks=override_checks)
except IOError:
print("It doesn't look like we support your OS, currently. Sorry about that!\n")
sys.exit(-1)

# Print any use-related warnings.
self.backend.print_warnings()

@@ -343,15 +492,11 @@ def _find_device(self, vid=None, pid=None):
pid = pid if pid else self.DEFAULT_PID

# ... and use them to find a USB device.
return usb.core.find(idVendor=vid, idProduct=pid)


def get_device_descriptor(self):
return self.dev.ctrl_transfer(self.STANDARD_REQUEST_DEVICE_TO_HOST, self.GET_DESCRIPTOR, 1 << 8, 0, 18)
return self.backend.find_device(vid, pid)

def read(self, length):
""" Reads data from the RCM protocol endpoint. """
return self.dev.read(0x81, length, 1000)
return self.backend.read(length)


def write(self, data):
@@ -376,7 +521,7 @@ def write_single_buffer(self, data):
If it's not, send a ZLP.
"""
self._toggle_buffer()
return self.dev.write(0x01, data, 1000)
return self.backend.write_single_buffer(data)


def _toggle_buffer(self):
@@ -452,7 +597,7 @@ def parse_usb_id(id):

# Print the device's ID. Note that reading the device's ID is necessary to get it into
try:
device_id = switch.read_device_id().tostring()
device_id = switch.read_device_id()
print("Found a Tegra with Device ID: {}".format(device_id))
except usb.core.USBError as e:
# Raise the exception only if we're not being permissive about ID reads.
Oops, something went wrong.

0 comments on commit 62ff4db

Please sign in to comment.