From f500b60d44f2efb15a9b8bd471dcd04e7c6b25e2 Mon Sep 17 00:00:00 2001 From: greateggsgreg Date: Sun, 4 Mar 2018 14:20:31 -0500 Subject: [PATCH 1/3] PEP8 formatting fixes and merge conflict fixes --- adb/adb_commands.py | 691 ++++++++++++++--------------- adb/adb_debug.py | 317 +++++++------- adb/adb_protocol.py | 925 +++++++++++++++++++-------------------- adb/common.py | 587 +++++++++++++------------ adb/common_cli.py | 215 ++++----- adb/fastboot.py | 677 ++++++++++++++-------------- adb/fastboot_debug.py | 164 +++---- adb/filesync_protocol.py | 412 ++++++++--------- adb/sign_m2crypto.py | 19 +- adb/sign_pycryptodome.py | 2 +- adb/sign_pythonrsa.py | 83 ++-- adb/usb_exceptions.py | 50 +-- 12 files changed, 2076 insertions(+), 2066 deletions(-) diff --git a/adb/adb_commands.py b/adb/adb_commands.py index c9ca631..549810f 100644 --- a/adb/adb_commands.py +++ b/adb/adb_commands.py @@ -38,355 +38,356 @@ # pylint: disable=invalid-name DeviceIsAvailable = common.InterfaceMatcher(CLASS, SUBCLASS, PROTOCOL) - try: - # Imported locally to keep compatibility with previous code. - from adb.sign_m2crypto import M2CryptoSigner + # Imported locally to keep compatibility with previous code. + from adb.sign_m2crypto import M2CryptoSigner except ImportError: - # Ignore this error when M2Crypto is not installed, there are other options. - pass + # Ignore this error when M2Crypto is not installed, there are other options. + pass class AdbCommands(object): - """Exposes adb-like methods for use. - - Some methods are more-pythonic and/or have more options. - """ - protocol_handler = adb_protocol.AdbMessage - filesync_handler = filesync_protocol.FilesyncProtocol - - def __init__(self): - - self.__reset() - - def __reset(self): - self.build_props = None - self._handle = None - self._device_state = None - - # Connection table tracks each open AdbConnection objects per service type for program functions - # that choose to persist an AdbConnection object for their functionality, using - # self._get_service_connection - self._service_connections = {} - - def _get_service_connection(self, service, service_command=None, create=True, timeout_ms=None): - """ - Based on the service, get the AdbConnection for that service or create one if it doesnt exist - - :param service: - :param service_command: Additional service parameters to append - :param create: If False, dont create a connection if it does not exist - :return: - """ - - connection = self._service_connections.get(service, None) - - if connection: - return connection - - if not connection and not create: - return None - - if service_command: - destination_str = b'%s:%s' % (service, service_command) - else: - destination_str = service - - connection = self.protocol_handler.Open( - self._handle, destination=destination_str, timeout_ms=timeout_ms) - - self._service_connections.update({service: connection}) - - return connection - - def ConnectDevice(self, port_path=None, serial=None, default_timeout_ms=None, **kwargs): - """Convenience function to setup a transport handle for the adb device from - usb path or serial then connect to it. - - Args: - port_path: The filename of usb port to use. - serial: The serial number of the device to use. - default_timeout_ms: The default timeout in milliseconds to use. - kwargs: handle: Device handle to use (instance of common.TcpHandle or common.UsbHandle) - banner: Connection banner to pass to the remote device - rsa_keys: List of AuthSigner subclass instances to be used for - authentication. The device can either accept one of these via the Sign - method, or we will send the result of GetPublicKey from the first one - if the device doesn't accept any of them. - auth_timeout_ms: Timeout to wait for when sending a new public key. This - is only relevant when we send a new public key. The device shows a - dialog and this timeout is how long to wait for that dialog. If used - in automation, this should be low to catch such a case as a failure - quickly; while in interactive settings it should be high to allow - users to accept the dialog. We default to automation here, so it's low - by default. - - If serial specifies a TCP address:port, then a TCP connection is - used instead of a USB connection. - """ - - # If there isnt a handle override (used by tests), build one here - if 'handle' in kwargs: - self._handle = kwargs.pop('handle') - elif serial and b':' in serial: - self._handle = common.TcpHandle(serial, timeout_ms=default_timeout_ms) - else: - self._handle = common.UsbHandle.FindAndOpen( - DeviceIsAvailable, port_path=port_path, serial=serial, - timeout_ms=default_timeout_ms) - - self._Connect(**kwargs) - - return self - - def Close(self): - for conn in list(self._service_connections.values()): - if conn: - try: - conn.Close() - except: - pass - - if self._handle: - self._handle.Close() - - self.__reset() - - def _Connect(self, banner=None, **kwargs): - """Connect to the device. - - Args: - banner: See protocol_handler.Connect. - **kwargs: See protocol_handler.Connect and adb_commands.ConnectDevice for kwargs. - Includes handle, rsa_keys, and auth_timeout_ms. - Returns: - An instance of this class if the device connected successfully. - """ - - if not banner: - banner = socket.gethostname().encode() - - conn_str = self.protocol_handler.Connect(self._handle, banner=banner, **kwargs) - - # Remove banner and colons after device state (state::banner) - parts = conn_str.split(b'::') - self._device_state = parts[0] - - # Break out the build prop info - self.build_props = str(parts[1].split(b';')) - - return True - - @classmethod - def Devices(cls): - """Get a generator of UsbHandle for devices available.""" - return common.UsbHandle.FindDevices(DeviceIsAvailable) + """Exposes adb-like methods for use. - def GetState(self): - return self._device_state - - def Install(self, apk_path, destination_dir='', timeout_ms=None, replace_existing=True, transfer_progress_callback=None): - """Install an apk to the device. - - Doesn't support verifier file, instead allows destination directory to be - overridden. - - Args: - apk_path: Local path to apk to install. - destination_dir: Optional destination directory. Use /system/app/ for - persistent applications. - replace_existing: whether to replace existing application - timeout_ms: Expected timeout for pushing and installing. - transfer_progress_callback: callback method that accepts filename, bytes_written and total_bytes of APK transfer - - Returns: - The pm install output. - """ - if not destination_dir: - destination_dir = '/data/local/tmp/' - basename = os.path.basename(apk_path) - destination_path = posixpath.join(destination_dir, basename) - self.Push(apk_path, destination_path, timeout_ms=timeout_ms, progress_callback=transfer_progress_callback) - - cmd = ['pm install'] - if replace_existing: - cmd.append('-r') - cmd.append('"{}"'.format(destination_path)) - return self.Shell(' '.join(cmd), timeout_ms=timeout_ms) - - def Uninstall(self, package_name, keep_data=False, timeout_ms=None): - """Removes a package from the device. - - Args: - package_name: Package name of target package. - keep_data: whether to keep the data and cache directories - timeout_ms: Expected timeout for pushing and installing. - - Returns: - The pm uninstall output. - """ - cmd = ['pm uninstall'] - if keep_data: - cmd.append('-k') - cmd.append('"%s"' % package_name) - return self.Shell(' '.join(cmd), timeout_ms=timeout_ms) - - def Push(self, source_file, device_filename, mtime='0', timeout_ms=None, progress_callback=None): - """Push a file or directory to the device. - - Args: - source_file: Either a filename, a directory or file-like object to push to - the device. - device_filename: Destination on the device to write to. - mtime: Optional, modification time to set on the file. - timeout_ms: Expected timeout for any part of the push. - progress_callback: callback method that accepts filename, bytes_written and total_bytes, - total_bytes will be -1 for file-like objects + Some methods are more-pythonic and/or have more options. """ - if isinstance(source_file, str): - if os.path.isdir(source_file): - self.Shell("mkdir " + device_filename) - for f in os.listdir(source_file): - self.Push(os.path.join(source_file, f), device_filename + '/' + f, progress_callback=progress_callback) - return - source_file = open(source_file, "rb") - - with source_file: - connection = self.protocol_handler.Open( - self._handle, destination=b'sync:', timeout_ms=timeout_ms) - self.filesync_handler.Push(connection, source_file, device_filename, - mtime=int(mtime), progress_callback=progress_callback) - connection.Close() - - def Pull(self, device_filename, dest_file=None, timeout_ms=None, progress_callback=None): - """Pull a file from the device. - - Args: - device_filename: Filename on the device to pull. - dest_file: If set, a filename or writable file-like object. - timeout_ms: Expected timeout for any part of the pull. - progress_callback: callback method that accepts filename, bytes_written and total_bytes, - total_bytes will be -1 for file-like objects - - Returns: - The file data if dest_file is not set. Otherwise, True if the destination file exists - """ - if not dest_file: - dest_file = io.BytesIO() - elif isinstance(dest_file, str): - dest_file = open(dest_file, 'w') - else: - raise ValueError("destfile is of unknown type") - - conn = self.protocol_handler.Open( - self._handle, destination=b'sync:', timeout_ms=timeout_ms) - - self.filesync_handler.Pull(conn, device_filename, dest_file, progress_callback) - - conn.Close() - if isinstance(dest_file, io.BytesIO): - return dest_file.getvalue() - else: - dest_file.close() - return os.path.exists(dest_file) - - def Stat(self, device_filename): - """Get a file's stat() information.""" - connection = self.protocol_handler.Open(self._handle, destination=b'sync:') - mode, size, mtime = self.filesync_handler.Stat( - connection, device_filename) - connection.Close() - return mode, size, mtime - - def List(self, device_path): - """Return a directory listing of the given path. - - Args: - device_path: Directory to list. - """ - connection = self.protocol_handler.Open(self._handle, destination=b'sync:') - listing = self.filesync_handler.List(connection, device_path) - connection.Close() - return listing - - def Reboot(self, destination=b''): - """Reboot the device. - - Args: - destination: Specify 'bootloader' for fastboot. - """ - self.protocol_handler.Open(self._handle, b'reboot:%s' % destination) - - def RebootBootloader(self): - """Reboot device into fastboot.""" - self.Reboot(b'bootloader') - - def Remount(self): - """Remount / as read-write.""" - return self.protocol_handler.Command(self._handle, service=b'remount') - - def Root(self): - """Restart adbd as root on the device.""" - return self.protocol_handler.Command(self._handle, service=b'root') - - def EnableVerity(self): - """Re-enable dm-verity checking on userdebug builds""" - return self.protocol_handler.Command(self._handle, service=b'enable-verity') - - def DisableVerity(self): - """Disable dm-verity checking on userdebug builds""" - return self.protocol_handler.Command(self._handle, service=b'disable-verity') - - def Shell(self, command, timeout_ms=None): - """Run command on the device, returning the output. - - Args: - command: Shell command to run - timeout_ms: Maximum time to allow the command to run. - """ - return self.protocol_handler.Command( - self._handle, service=b'shell', command=command, - timeout_ms=timeout_ms) - - def StreamingShell(self, command, timeout_ms=None): - """Run command on the device, yielding each line of output. - - Args: - command: Command to run on the target. - timeout_ms: Maximum time to allow the command to run. - - Yields: - The responses from the shell command. - """ - return self.protocol_handler.StreamingCommand( - self._handle, service=b'shell', command=command, - timeout_ms=timeout_ms) - - def Logcat(self, options, timeout_ms=None): - """Run 'shell logcat' and stream the output to stdout. - - Args: - options: Arguments to pass to 'logcat'. - timeout_ms: Maximum time to allow the command to run. - """ - return self.StreamingShell('logcat %s' % options, timeout_ms) - - def InteractiveShell(self, cmd=None, strip_cmd=True, delim=None, strip_delim=True): - """Get stdout from the currently open interactive shell and optionally run a command - on the device, returning all output. - - Args: - command: Optional. Command to run on the target. - strip_cmd: Optional (default True). Strip command name from stdout. - delim: Optional. Delimiter to look for in the output to know when to stop expecting more output - (usually the shell prompt) - strip_delim: Optional (default True): Strip the provided delimiter from the output - - Returns: - The stdout from the shell command. - """ - conn = self._get_service_connection(b'shell:') - - return self.protocol_handler.InteractiveShellCommand( - conn, cmd=cmd, strip_cmd=strip_cmd, - delim=delim, strip_delim=strip_delim) + protocol_handler = adb_protocol.AdbMessage + filesync_handler = filesync_protocol.FilesyncProtocol + + def __init__(self): + + self.__reset() + + def __reset(self): + self.build_props = None + self._handle = None + self._device_state = None + + # Connection table tracks each open AdbConnection objects per service type for program functions + # that choose to persist an AdbConnection object for their functionality, using + # self._get_service_connection + self._service_connections = {} + + def _get_service_connection(self, service, service_command=None, create=True, timeout_ms=None): + """ + Based on the service, get the AdbConnection for that service or create one if it doesnt exist + + :param service: + :param service_command: Additional service parameters to append + :param create: If False, dont create a connection if it does not exist + :return: + """ + + connection = self._service_connections.get(service, None) + + if connection: + return connection + + if not connection and not create: + return None + + if service_command: + destination_str = b'%s:%s' % (service, service_command) + else: + destination_str = service + + connection = self.protocol_handler.Open( + self._handle, destination=destination_str, timeout_ms=timeout_ms) + + self._service_connections.update({service: connection}) + + return connection + + def ConnectDevice(self, port_path=None, serial=None, default_timeout_ms=None, **kwargs): + """Convenience function to setup a transport handle for the adb device from + usb path or serial then connect to it. + + Args: + port_path: The filename of usb port to use. + serial: The serial number of the device to use. + default_timeout_ms: The default timeout in milliseconds to use. + kwargs: handle: Device handle to use (instance of common.TcpHandle or common.UsbHandle) + banner: Connection banner to pass to the remote device + rsa_keys: List of AuthSigner subclass instances to be used for + authentication. The device can either accept one of these via the Sign + method, or we will send the result of GetPublicKey from the first one + if the device doesn't accept any of them. + auth_timeout_ms: Timeout to wait for when sending a new public key. This + is only relevant when we send a new public key. The device shows a + dialog and this timeout is how long to wait for that dialog. If used + in automation, this should be low to catch such a case as a failure + quickly; while in interactive settings it should be high to allow + users to accept the dialog. We default to automation here, so it's low + by default. + + If serial specifies a TCP address:port, then a TCP connection is + used instead of a USB connection. + """ + + # If there isnt a handle override (used by tests), build one here + if 'handle' in kwargs: + self._handle = kwargs.pop('handle') + elif serial and b':' in serial: + self._handle = common.TcpHandle(serial, timeout_ms=default_timeout_ms) + else: + self._handle = common.UsbHandle.FindAndOpen( + DeviceIsAvailable, port_path=port_path, serial=serial, + timeout_ms=default_timeout_ms) + + self._Connect(**kwargs) + + return self + + def Close(self): + for conn in list(self._service_connections.values()): + if conn: + try: + conn.Close() + except: + pass + + if self._handle: + self._handle.Close() + + self.__reset() + + def _Connect(self, banner=None, **kwargs): + """Connect to the device. + + Args: + banner: See protocol_handler.Connect. + **kwargs: See protocol_handler.Connect and adb_commands.ConnectDevice for kwargs. + Includes handle, rsa_keys, and auth_timeout_ms. + Returns: + An instance of this class if the device connected successfully. + """ + + if not banner: + banner = socket.gethostname().encode() + + conn_str = self.protocol_handler.Connect(self._handle, banner=banner, **kwargs) + + # Remove banner and colons after device state (state::banner) + parts = conn_str.split(b'::') + self._device_state = parts[0] + + # Break out the build prop info + self.build_props = str(parts[1].split(b';')) + + return True + + @classmethod + def Devices(cls): + """Get a generator of UsbHandle for devices available.""" + return common.UsbHandle.FindDevices(DeviceIsAvailable) + + def GetState(self): + return self._device_state + + def Install(self, apk_path, destination_dir='', timeout_ms=None, replace_existing=True, + transfer_progress_callback=None): + """Install an apk to the device. + + Doesn't support verifier file, instead allows destination directory to be + overridden. + + Args: + apk_path: Local path to apk to install. + destination_dir: Optional destination directory. Use /system/app/ for + persistent applications. + replace_existing: whether to replace existing application + timeout_ms: Expected timeout for pushing and installing. + transfer_progress_callback: callback method that accepts filename, bytes_written and total_bytes of APK transfer + + Returns: + The pm install output. + """ + if not destination_dir: + destination_dir = '/data/local/tmp/' + basename = os.path.basename(apk_path) + destination_path = posixpath.join(destination_dir, basename) + self.Push(apk_path, destination_path, timeout_ms=timeout_ms, progress_callback=transfer_progress_callback) + + cmd = ['pm install'] + if replace_existing: + cmd.append('-r') + cmd.append('"{}"'.format(destination_path)) + return self.Shell(' '.join(cmd), timeout_ms=timeout_ms) + + def Uninstall(self, package_name, keep_data=False, timeout_ms=None): + """Removes a package from the device. + + Args: + package_name: Package name of target package. + keep_data: whether to keep the data and cache directories + timeout_ms: Expected timeout for pushing and installing. + + Returns: + The pm uninstall output. + """ + cmd = ['pm uninstall'] + if keep_data: + cmd.append('-k') + cmd.append('"%s"' % package_name) + return self.Shell(' '.join(cmd), timeout_ms=timeout_ms) + + def Push(self, source_file, device_filename, mtime='0', timeout_ms=None, progress_callback=None): + """Push a file or directory to the device. + + Args: + source_file: Either a filename, a directory or file-like object to push to + the device. + device_filename: Destination on the device to write to. + mtime: Optional, modification time to set on the file. + timeout_ms: Expected timeout for any part of the push. + progress_callback: callback method that accepts filename, bytes_written and total_bytes, + total_bytes will be -1 for file-like objects + """ + if isinstance(source_file, str): + if os.path.isdir(source_file): + self.Shell("mkdir " + device_filename) + for f in os.listdir(source_file): + self.Push(os.path.join(source_file, f), device_filename + '/' + f, + progress_callback=progress_callback) + return + source_file = open(source_file, "rb") + + with source_file: + connection = self.protocol_handler.Open( + self._handle, destination=b'sync:', timeout_ms=timeout_ms) + self.filesync_handler.Push(connection, source_file, device_filename, + mtime=int(mtime), progress_callback=progress_callback) + connection.Close() + + def Pull(self, device_filename, dest_file=None, timeout_ms=None, progress_callback=None): + """Pull a file from the device. + + Args: + device_filename: Filename on the device to pull. + dest_file: If set, a filename or writable file-like object. + timeout_ms: Expected timeout for any part of the pull. + progress_callback: callback method that accepts filename, bytes_written and total_bytes, + total_bytes will be -1 for file-like objects + + Returns: + The file data if dest_file is not set. Otherwise, True if the destination file exists + """ + if not dest_file: + dest_file = io.BytesIO() + elif isinstance(dest_file, str): + dest_file = open(dest_file, 'w') + else: + raise ValueError("destfile is of unknown type") + + conn = self.protocol_handler.Open( + self._handle, destination=b'sync:', timeout_ms=timeout_ms) + + self.filesync_handler.Pull(conn, device_filename, dest_file, progress_callback) + + conn.Close() + if isinstance(dest_file, io.BytesIO): + return dest_file.getvalue() + else: + dest_file.close() + return os.path.exists(dest_file) + + def Stat(self, device_filename): + """Get a file's stat() information.""" + connection = self.protocol_handler.Open(self._handle, destination=b'sync:') + mode, size, mtime = self.filesync_handler.Stat( + connection, device_filename) + connection.Close() + return mode, size, mtime + + def List(self, device_path): + """Return a directory listing of the given path. + + Args: + device_path: Directory to list. + """ + connection = self.protocol_handler.Open(self._handle, destination=b'sync:') + listing = self.filesync_handler.List(connection, device_path) + connection.Close() + return listing + + def Reboot(self, destination=b''): + """Reboot the device. + + Args: + destination: Specify 'bootloader' for fastboot. + """ + self.protocol_handler.Open(self._handle, b'reboot:%s' % destination) + + def RebootBootloader(self): + """Reboot device into fastboot.""" + self.Reboot(b'bootloader') + + def Remount(self): + """Remount / as read-write.""" + return self.protocol_handler.Command(self._handle, service=b'remount') + + def Root(self): + """Restart adbd as root on the device.""" + return self.protocol_handler.Command(self._handle, service=b'root') + + def EnableVerity(self): + """Re-enable dm-verity checking on userdebug builds""" + return self.protocol_handler.Command(self._handle, service=b'enable-verity') + + def DisableVerity(self): + """Disable dm-verity checking on userdebug builds""" + return self.protocol_handler.Command(self._handle, service=b'disable-verity') + + def Shell(self, command, timeout_ms=None): + """Run command on the device, returning the output. + + Args: + command: Shell command to run + timeout_ms: Maximum time to allow the command to run. + """ + return self.protocol_handler.Command( + self._handle, service=b'shell', command=command, + timeout_ms=timeout_ms) + + def StreamingShell(self, command, timeout_ms=None): + """Run command on the device, yielding each line of output. + + Args: + command: Command to run on the target. + timeout_ms: Maximum time to allow the command to run. + + Yields: + The responses from the shell command. + """ + return self.protocol_handler.StreamingCommand( + self._handle, service=b'shell', command=command, + timeout_ms=timeout_ms) + + def Logcat(self, options, timeout_ms=None): + """Run 'shell logcat' and stream the output to stdout. + + Args: + options: Arguments to pass to 'logcat'. + timeout_ms: Maximum time to allow the command to run. + """ + return self.StreamingShell('logcat %s' % options, timeout_ms) + + def InteractiveShell(self, cmd=None, strip_cmd=True, delim=None, strip_delim=True): + """Get stdout from the currently open interactive shell and optionally run a command + on the device, returning all output. + + Args: + cmd: Optional. Command to run on the target. + strip_cmd: Optional (default True). Strip command name from stdout. + delim: Optional. Delimiter to look for in the output to know when to stop expecting more output + (usually the shell prompt) + strip_delim: Optional (default True): Strip the provided delimiter from the output + + Returns: + The stdout from the shell command. + """ + conn = self._get_service_connection(b'shell:') + + return self.protocol_handler.InteractiveShellCommand( + conn, cmd=cmd, strip_cmd=strip_cmd, + delim=delim, strip_delim=strip_delim) diff --git a/adb/adb_debug.py b/adb/adb_debug.py index 4fa974f..18cb8ad 100644 --- a/adb/adb_debug.py +++ b/adb/adb_debug.py @@ -27,184 +27,185 @@ from adb import common_cli try: - from adb import sign_m2crypto - rsa_signer = sign_m2crypto.M2CryptoSigner + from adb import sign_m2crypto + + rsa_signer = sign_m2crypto.M2CryptoSigner except ImportError: - try: - from adb import sign_pythonrsa - rsa_signer = sign_pythonrsa.PythonRSASigner.FromRSAKeyPath - except ImportError: try: - from adb import sign_pycryptodome + from adb import sign_pythonrsa - rsa_signer = sign_pycryptodome.PycryptodomeAuthSigner + rsa_signer = sign_pythonrsa.PythonRSASigner.FromRSAKeyPath except ImportError: - rsa_signer = None + try: + from adb import sign_pycryptodome + + rsa_signer = sign_pycryptodome.PycryptodomeAuthSigner + except ImportError: + rsa_signer = None def Devices(args): - """Lists the available devices. - - Mimics 'adb devices' output: - List of devices attached - 015DB7591102001A device 1,2 - """ - for d in adb_commands.AdbCommands.Devices(): - if args.output_port_path: - print('%s\tdevice\t%s' % ( - d.serial_number, ','.join(str(p) for p in d.port_path))) - else: - print('%s\tdevice' % d.serial_number) - return 0 + """Lists the available devices. + + Mimics 'adb devices' output: + List of devices attached + 015DB7591102001A device 1,2 + """ + for d in adb_commands.AdbCommands.Devices(): + if args.output_port_path: + print('%s\tdevice\t%s' % ( + d.serial_number, ','.join(str(p) for p in d.port_path))) + else: + print('%s\tdevice' % d.serial_number) + return 0 def List(device, device_path): - """Prints a directory listing. - - Args: - device_path: Directory to list. - """ - files = device.List(device_path) - files.sort(key=lambda x: x.filename) - maxname = max(len(f.filename) for f in files) - maxsize = max(len(str(f.size)) for f in files) - for f in files: - mode = ( - ('d' if stat.S_ISDIR(f.mode) else '-') + - ('r' if f.mode & stat.S_IRUSR else '-') + - ('w' if f.mode & stat.S_IWUSR else '-') + - ('x' if f.mode & stat.S_IXUSR else '-') + - ('r' if f.mode & stat.S_IRGRP else '-') + - ('w' if f.mode & stat.S_IWGRP else '-') + - ('x' if f.mode & stat.S_IXGRP else '-') + - ('r' if f.mode & stat.S_IROTH else '-') + - ('w' if f.mode & stat.S_IWOTH else '-') + - ('x' if f.mode & stat.S_IXOTH else '-')) - t = time.gmtime(f.mtime) - yield '%s %*d %04d-%02d-%02d %02d:%02d:%02d %-*s\n' % ( - mode, maxsize, f.size, - t.tm_year, t.tm_mon, t.tm_mday, t.tm_hour, t.tm_min, t.tm_sec, - maxname, f.filename) + """Prints a directory listing. + + Args: + device_path: Directory to list. + """ + files = device.List(device_path) + files.sort(key=lambda x: x.filename) + maxname = max(len(f.filename) for f in files) + maxsize = max(len(str(f.size)) for f in files) + for f in files: + mode = ( + ('d' if stat.S_ISDIR(f.mode) else '-') + + ('r' if f.mode & stat.S_IRUSR else '-') + + ('w' if f.mode & stat.S_IWUSR else '-') + + ('x' if f.mode & stat.S_IXUSR else '-') + + ('r' if f.mode & stat.S_IRGRP else '-') + + ('w' if f.mode & stat.S_IWGRP else '-') + + ('x' if f.mode & stat.S_IXGRP else '-') + + ('r' if f.mode & stat.S_IROTH else '-') + + ('w' if f.mode & stat.S_IWOTH else '-') + + ('x' if f.mode & stat.S_IXOTH else '-')) + t = time.gmtime(f.mtime) + yield '%s %*d %04d-%02d-%02d %02d:%02d:%02d %-*s\n' % ( + mode, maxsize, f.size, + t.tm_year, t.tm_mon, t.tm_mday, t.tm_hour, t.tm_min, t.tm_sec, + maxname, f.filename) @functools.wraps(adb_commands.AdbCommands.Logcat) def Logcat(device, *options): - return device.Logcat( - self, ' '.join(options), timeout_ms=0) + return device.Logcat( + device, ' '.join(options), timeout_ms=0) def Shell(device, *command): - """Runs a command on the device and prints the stdout. - - Args: - command: Command to run on the target. - """ - if command: - return device.StreamingShell(' '.join(command)) - else: - # Retrieve the initial terminal prompt to use as a delimiter for future reads - terminal_prompt = device.InteractiveShell() - print(terminal_prompt.decode('utf-8')) - - # Accept user input in a loop and write that into the interactive shells stdin, then print output - while True: - cmd = input('> ') - if not cmd: - continue - elif cmd == 'exit': - break - else: - stdout = device.InteractiveShell(cmd, strip_cmd=True, delim=terminal_prompt, strip_delim=True) - if stdout: - if isinstance(stdout, bytes): - stdout = stdout.decode('utf-8') - print(stdout) - - - device.Close() + """Runs a command on the device and prints the stdout. + Args: + command: Command to run on the target. + """ + if command: + return device.StreamingShell(' '.join(command)) + else: + # Retrieve the initial terminal prompt to use as a delimiter for future reads + terminal_prompt = device.InteractiveShell() + print(terminal_prompt.decode('utf-8')) + + # Accept user input in a loop and write that into the interactive shells stdin, then print output + while True: + cmd = input('> ') + if not cmd: + continue + elif cmd == 'exit': + break + else: + stdout = device.InteractiveShell(cmd, strip_cmd=True, delim=terminal_prompt, strip_delim=True) + if stdout: + if isinstance(stdout, bytes): + stdout = stdout.decode('utf-8') + print(stdout) + + device.Close() -def main(): - common = common_cli.GetCommonArguments() - common.add_argument( - '--rsa_key_path', action='append', default=[], - metavar='~/.android/adbkey', - help='RSA key(s) to use, use multiple times to load mulitple keys') - common.add_argument( - '--auth_timeout_s', default=60., metavar='60', type=int, - help='Seconds to wait for the dialog to be accepted when using ' - 'authenticated ADB.') - device = common_cli.GetDeviceArguments() - parents = [common, device] - - parser = argparse.ArgumentParser( - description=sys.modules[__name__].__doc__, parents=[common]) - subparsers = parser.add_subparsers(title='Commands', dest='command_name') - - subparser = subparsers.add_parser( - name='help', help='Prints the commands available') - subparser = subparsers.add_parser( - name='devices', help='Lists the available devices', parents=[common]) - subparser.add_argument( - '--output_port_path', action='store_true', - help='Outputs the port_path alongside the serial') - - common_cli.MakeSubparser( - subparsers, parents, adb_commands.AdbCommands.Install) - common_cli.MakeSubparser(subparsers, parents, adb_commands.AdbCommands.Uninstall) - common_cli.MakeSubparser(subparsers, parents, List) - common_cli.MakeSubparser(subparsers, parents, Logcat) - common_cli.MakeSubparser( - subparsers, parents, adb_commands.AdbCommands.Push, - {'source_file': 'Filename or directory to push to the device.'}) - common_cli.MakeSubparser( - subparsers, parents, adb_commands.AdbCommands.Pull, - { - 'dest_file': 'Filename to write to on the host, if not specified, ' - 'prints the content to stdout.', - }) - common_cli.MakeSubparser( - subparsers, parents, adb_commands.AdbCommands.Reboot) - common_cli.MakeSubparser( - subparsers, parents, adb_commands.AdbCommands.RebootBootloader) - common_cli.MakeSubparser( - subparsers, parents, adb_commands.AdbCommands.Remount) - common_cli.MakeSubparser(subparsers, parents, adb_commands.AdbCommands.Root) - common_cli.MakeSubparser(subparsers, parents, adb_commands.AdbCommands.EnableVerity) - common_cli.MakeSubparser(subparsers, parents, adb_commands.AdbCommands.DisableVerity) - common_cli.MakeSubparser(subparsers, parents, Shell) - - if len(sys.argv) == 1: - parser.print_help() - return 2 - - args = parser.parse_args() - if args.verbose: - logging.basicConfig(level=logging.DEBUG) - if not args.rsa_key_path: - default = os.path.expanduser('~/.android/adbkey') - if os.path.isfile(default): - args.rsa_key_path = [default] - if args.rsa_key_path and not rsa_signer: - parser.error('Please install either M2Crypto, python-rsa, or PycryptoDome') - - # Hacks so that the generated doc is nicer. - if args.command_name == 'devices': - return Devices(args) - if args.command_name == 'help': - parser.print_help() - return 0 - if args.command_name == 'logcat': - args.positional = args.options - elif args.command_name == 'shell': - args.positional = args.command - return common_cli.StartCli( - args, - adb_commands.AdbCommands, - auth_timeout_ms=int(args.auth_timeout_s * 1000), - rsa_keys=[rsa_signer(path) for path in args.rsa_key_path]) +def main(): + common = common_cli.GetCommonArguments() + common.add_argument( + '--rsa_key_path', action='append', default=[], + metavar='~/.android/adbkey', + help='RSA key(s) to use, use multiple times to load mulitple keys') + common.add_argument( + '--auth_timeout_s', default=60., metavar='60', type=int, + help='Seconds to wait for the dialog to be accepted when using ' + 'authenticated ADB.') + device = common_cli.GetDeviceArguments() + parents = [common, device] + + parser = argparse.ArgumentParser( + description=sys.modules[__name__].__doc__, parents=[common]) + subparsers = parser.add_subparsers(title='Commands', dest='command_name') + + subparser = subparsers.add_parser( + name='help', help='Prints the commands available') + subparser = subparsers.add_parser( + name='devices', help='Lists the available devices', parents=[common]) + subparser.add_argument( + '--output_port_path', action='store_true', + help='Outputs the port_path alongside the serial') + + common_cli.MakeSubparser( + subparsers, parents, adb_commands.AdbCommands.Install) + common_cli.MakeSubparser(subparsers, parents, adb_commands.AdbCommands.Uninstall) + common_cli.MakeSubparser(subparsers, parents, List) + common_cli.MakeSubparser(subparsers, parents, Logcat) + common_cli.MakeSubparser( + subparsers, parents, adb_commands.AdbCommands.Push, + {'source_file': 'Filename or directory to push to the device.'}) + common_cli.MakeSubparser( + subparsers, parents, adb_commands.AdbCommands.Pull, + { + 'dest_file': 'Filename to write to on the host, if not specified, ' + 'prints the content to stdout.', + }) + common_cli.MakeSubparser( + subparsers, parents, adb_commands.AdbCommands.Reboot) + common_cli.MakeSubparser( + subparsers, parents, adb_commands.AdbCommands.RebootBootloader) + common_cli.MakeSubparser( + subparsers, parents, adb_commands.AdbCommands.Remount) + common_cli.MakeSubparser(subparsers, parents, adb_commands.AdbCommands.Root) + common_cli.MakeSubparser(subparsers, parents, adb_commands.AdbCommands.EnableVerity) + common_cli.MakeSubparser(subparsers, parents, adb_commands.AdbCommands.DisableVerity) + common_cli.MakeSubparser(subparsers, parents, Shell) + + if len(sys.argv) == 1: + parser.print_help() + return 2 + + args = parser.parse_args() + if args.verbose: + logging.basicConfig(level=logging.DEBUG) + if not args.rsa_key_path: + default = os.path.expanduser('~/.android/adbkey') + if os.path.isfile(default): + args.rsa_key_path = [default] + if args.rsa_key_path and not rsa_signer: + parser.error('Please install either M2Crypto, python-rsa, or PycryptoDome') + + # Hacks so that the generated doc is nicer. + if args.command_name == 'devices': + return Devices(args) + if args.command_name == 'help': + parser.print_help() + return 0 + if args.command_name == 'logcat': + args.positional = args.options + elif args.command_name == 'shell': + args.positional = args.command + + return common_cli.StartCli( + args, + adb_commands.AdbCommands, + auth_timeout_ms=int(args.auth_timeout_s * 1000), + rsa_keys=[rsa_signer(path) for path in args.rsa_key_path]) if __name__ == '__main__': - sys.exit(main()) + sys.exit(main()) diff --git a/adb/adb_protocol.py b/adb/adb_protocol.py index 32ea651..6f91867 100644 --- a/adb/adb_protocol.py +++ b/adb/adb_protocol.py @@ -34,526 +34,525 @@ def find_backspace_runs(stdout_bytes, start_pos): + first_backspace_pos = stdout_bytes[start_pos:].find(b'\x08') + if first_backspace_pos == -1: + return -1, 0 - first_backspace_pos = stdout_bytes[start_pos:].find(b'\x08') - if first_backspace_pos == -1: - return -1, 0 + end_backspace_pos = (start_pos + first_backspace_pos) + 1 + while True: + if chr(stdout_bytes[end_backspace_pos]) == '\b': + end_backspace_pos += 1 + else: + break - end_backspace_pos = (start_pos + first_backspace_pos) + 1 - while True: - if chr(stdout_bytes[end_backspace_pos]) == '\b': - end_backspace_pos += 1 - else: - break + num_backspaces = end_backspace_pos - (start_pos + first_backspace_pos) - num_backspaces = end_backspace_pos - (start_pos + first_backspace_pos) + return (start_pos + first_backspace_pos), num_backspaces - return (start_pos + first_backspace_pos), num_backspaces class InvalidCommandError(Exception): - """Got an invalid command over USB.""" + """Got an invalid command over USB.""" - def __init__(self, message, response_header, response_data): - if response_header == b'FAIL': - message = 'Command failed, device said so. (%s)' % message - super(InvalidCommandError, self).__init__( - message, response_header, response_data) + def __init__(self, message, response_header, response_data): + if response_header == b'FAIL': + message = 'Command failed, device said so. (%s)' % message + super(InvalidCommandError, self).__init__( + message, response_header, response_data) class InvalidResponseError(Exception): - """Got an invalid response to our command.""" + """Got an invalid response to our command.""" class InvalidChecksumError(Exception): - """Checksum of data didn't match expected checksum.""" + """Checksum of data didn't match expected checksum.""" class InterleavedDataError(Exception): - """We only support command sent serially.""" + """We only support command sent serially.""" def MakeWireIDs(ids): - id_to_wire = { - cmd_id: sum(c << (i * 8) for i, c in enumerate(bytearray(cmd_id))) - for cmd_id in ids - } - wire_to_id = {wire: cmd_id for cmd_id, wire in id_to_wire.items()} - return id_to_wire, wire_to_id + id_to_wire = { + cmd_id: sum(c << (i * 8) for i, c in enumerate(bytearray(cmd_id))) + for cmd_id in ids + } + wire_to_id = {wire: cmd_id for cmd_id, wire in id_to_wire.items()} + return id_to_wire, wire_to_id class AuthSigner(object): - """Signer for use with authenticated ADB, introduced in 4.4.x/KitKat.""" + """Signer for use with authenticated ADB, introduced in 4.4.x/KitKat.""" - def Sign(self, data): - """Signs given data using a private key.""" - raise NotImplementedError() + def Sign(self, data): + """Signs given data using a private key.""" + raise NotImplementedError() - def GetPublicKey(self): - """Returns the public key in PEM format without headers or newlines.""" - raise NotImplementedError() + def GetPublicKey(self): + """Returns the public key in PEM format without headers or newlines.""" + raise NotImplementedError() class _AdbConnection(object): - """ADB Connection.""" - - def __init__(self, usb, local_id, remote_id, timeout_ms): - self.usb = usb - self.local_id = local_id - self.remote_id = remote_id - self.timeout_ms = timeout_ms - - def _Send(self, command, arg0, arg1, data=b''): - message = AdbMessage(command, arg0, arg1, data) - message.Send(self.usb, self.timeout_ms) - - def Write(self, data): - """Write a packet and expect an Ack.""" - self._Send(b'WRTE', arg0=self.local_id, arg1=self.remote_id, data=data) - # Expect an ack in response. - cmd, okay_data = self.ReadUntil(b'OKAY') - if cmd != b'OKAY': - if cmd == b'FAIL': - raise usb_exceptions.AdbCommandFailureException( - 'Command failed.', okay_data) - raise InvalidCommandError( - 'Expected an OKAY in response to a WRITE, got %s (%s)', - cmd, okay_data) - return len(data) - - def Okay(self): - self._Send(b'OKAY', arg0=self.local_id, arg1=self.remote_id) - - def ReadUntil(self, *expected_cmds): - """Read a packet, Ack any write packets.""" - cmd, remote_id, local_id, data = AdbMessage.Read( - self.usb, expected_cmds, self.timeout_ms) - if local_id != 0 and self.local_id != local_id: - raise InterleavedDataError("We don't support multiple streams...") - if remote_id != 0 and self.remote_id != remote_id: - raise InvalidResponseError( - 'Incorrect remote id, expected %s got %s' % ( - self.remote_id, remote_id)) - # Ack write packets. - if cmd == b'WRTE': - self.Okay() - return cmd, data - - def ReadUntilClose(self): - """Yield packets until a Close packet is received.""" - while True: - cmd, data = self.ReadUntil(b'CLSE', b'WRTE') - if cmd == b'CLSE': + """ADB Connection.""" + + def __init__(self, usb, local_id, remote_id, timeout_ms): + self.usb = usb + self.local_id = local_id + self.remote_id = remote_id + self.timeout_ms = timeout_ms + + def _Send(self, command, arg0, arg1, data=b''): + message = AdbMessage(command, arg0, arg1, data) + message.Send(self.usb, self.timeout_ms) + + def Write(self, data): + """Write a packet and expect an Ack.""" + self._Send(b'WRTE', arg0=self.local_id, arg1=self.remote_id, data=data) + # Expect an ack in response. + cmd, okay_data = self.ReadUntil(b'OKAY') + if cmd != b'OKAY': + if cmd == b'FAIL': + raise usb_exceptions.AdbCommandFailureException( + 'Command failed.', okay_data) + raise InvalidCommandError( + 'Expected an OKAY in response to a WRITE, got %s (%s)', + cmd, okay_data) + return len(data) + + def Okay(self): + self._Send(b'OKAY', arg0=self.local_id, arg1=self.remote_id) + + def ReadUntil(self, *expected_cmds): + """Read a packet, Ack any write packets.""" + cmd, remote_id, local_id, data = AdbMessage.Read( + self.usb, expected_cmds, self.timeout_ms) + if local_id != 0 and self.local_id != local_id: + raise InterleavedDataError("We don't support multiple streams...") + if remote_id != 0 and self.remote_id != remote_id: + raise InvalidResponseError( + 'Incorrect remote id, expected %s got %s' % ( + self.remote_id, remote_id)) + # Ack write packets. + if cmd == b'WRTE': + self.Okay() + return cmd, data + + def ReadUntilClose(self): + """Yield packets until a Close packet is received.""" + while True: + cmd, data = self.ReadUntil(b'CLSE', b'WRTE') + if cmd == b'CLSE': + self._Send(b'CLSE', arg0=self.local_id, arg1=self.remote_id) + break + if cmd != b'WRTE': + if cmd == b'FAIL': + raise usb_exceptions.AdbCommandFailureException( + 'Command failed.', data) + raise InvalidCommandError('Expected a WRITE or a CLOSE, got %s (%s)', + cmd, data) + yield data + + def Close(self): self._Send(b'CLSE', arg0=self.local_id, arg1=self.remote_id) - break - if cmd != b'WRTE': - if cmd == b'FAIL': - raise usb_exceptions.AdbCommandFailureException( - 'Command failed.', data) - raise InvalidCommandError('Expected a WRITE or a CLOSE, got %s (%s)', - cmd, data) - yield data - - def Close(self): - self._Send(b'CLSE', arg0=self.local_id, arg1=self.remote_id) - cmd, data = self.ReadUntil(b'CLSE') - if cmd != b'CLSE': - if cmd == b'FAIL': - raise usb_exceptions.AdbCommandFailureException('Command failed.', data) - raise InvalidCommandError('Expected a CLSE response, got %s (%s)', - cmd, data) + cmd, data = self.ReadUntil(b'CLSE') + if cmd != b'CLSE': + if cmd == b'FAIL': + raise usb_exceptions.AdbCommandFailureException('Command failed.', data) + raise InvalidCommandError('Expected a CLSE response, got %s (%s)', + cmd, data) class AdbMessage(object): - """ADB Protocol and message class. - - Protocol Notes - - local_id/remote_id: - Turns out the documentation is host/device ambidextrous, so local_id is the - id for 'the sender' and remote_id is for 'the recipient'. So since we're - only on the host, we'll re-document with host_id and device_id: - - OPEN(host_id, 0, 'shell:XXX') - READY/OKAY(device_id, host_id, '') - WRITE(0, host_id, 'data') - CLOSE(device_id, host_id, '') - """ - - ids = [b'SYNC', b'CNXN', b'AUTH', b'OPEN', b'OKAY', b'CLSE', b'WRTE'] - commands, constants = MakeWireIDs(ids) - # An ADB message is 6 words in little-endian. - format = b'<6I' - - connections = 0 - - def __init__(self, command=None, arg0=None, arg1=None, data=b''): - self.command = self.commands[command] - self.magic = self.command ^ 0xFFFFFFFF - self.arg0 = arg0 - self.arg1 = arg1 - self.data = data - - @property - def checksum(self): - return self.CalculateChecksum(self.data) - - @staticmethod - def CalculateChecksum(data): - # The checksum is just a sum of all the bytes. I swear. - if isinstance(data, bytearray): - total = sum(data) - elif isinstance(data, bytes): - if data and isinstance(data[0], bytes): - # Python 2 bytes (str) index as single-character strings. - total = sum(map(ord, data)) - else: - # Python 3 bytes index as numbers (and PY2 empty strings sum() to 0) - total = sum(data) - else: - # Unicode strings (should never see?) - total = sum(map(ord, data)) - return total & 0xFFFFFFFF - - def Pack(self): - """Returns this message in an over-the-wire format.""" - return struct.pack(self.format, self.command, self.arg0, self.arg1, - len(self.data), self.checksum, self.magic) - - @classmethod - def Unpack(cls, message): - try: - cmd, arg0, arg1, data_length, data_checksum, unused_magic = struct.unpack( - cls.format, message) - except struct.error as e: - raise ValueError('Unable to unpack ADB command.', cls.format, message, e) - return cmd, arg0, arg1, data_length, data_checksum - - def Send(self, usb, timeout_ms=None): - """Send this message over USB.""" - usb.BulkWrite(self.Pack(), timeout_ms) - usb.BulkWrite(self.data, timeout_ms) - - @classmethod - def Read(cls, usb, expected_cmds, timeout_ms=None, total_timeout_ms=None): - """Receive a response from the device.""" - total_timeout_ms = usb.Timeout(total_timeout_ms) - start = time.time() - while True: - msg = usb.BulkRead(24, timeout_ms) - cmd, arg0, arg1, data_length, data_checksum = cls.Unpack(msg) - command = cls.constants.get(cmd) - if not command: - raise InvalidCommandError( - 'Unknown command: %x' % cmd, cmd, (arg0, arg1)) - if command in expected_cmds: - break - - if time.time() - start > total_timeout_ms: - raise InvalidCommandError( - 'Never got one of the expected responses (%s)' % expected_cmds, - cmd, (timeout_ms, total_timeout_ms)) - - if data_length > 0: - data = bytearray() - while data_length > 0: - temp = usb.BulkRead(data_length, timeout_ms) - if len(temp) != data_length: - print("Data_length {} does not match actual number of bytes read: {}".format(data_length, len(temp))) - data += temp - - data_length -= len(temp) - - actual_checksum = cls.CalculateChecksum(data) - if actual_checksum != data_checksum: - raise InvalidChecksumError( - 'Received checksum %s != %s', (actual_checksum, data_checksum)) - else: - data = b'' - return command, arg0, arg1, bytes(data) - - @classmethod - def Connect(cls, usb, banner=b'notadb', rsa_keys=None, auth_timeout_ms=100): - """Establish a new connection to the device. - - Args: - usb: A USBHandle with BulkRead and BulkWrite methods. - banner: A string to send as a host identifier. - rsa_keys: List of AuthSigner subclass instances to be used for - authentication. The device can either accept one of these via the Sign - method, or we will send the result of GetPublicKey from the first one - if the device doesn't accept any of them. - auth_timeout_ms: Timeout to wait for when sending a new public key. This - is only relevant when we send a new public key. The device shows a - dialog and this timeout is how long to wait for that dialog. If used - in automation, this should be low to catch such a case as a failure - quickly; while in interactive settings it should be high to allow - users to accept the dialog. We default to automation here, so it's low - by default. - - Returns: - The device's reported banner. Always starts with the state (device, - recovery, or sideload), sometimes includes information after a : with - various product information. - - Raises: - usb_exceptions.DeviceAuthError: When the device expects authentication, - but we weren't given any valid keys. - InvalidResponseError: When the device does authentication in an - unexpected way. - """ - msg = cls( - command=b'CNXN', arg0=VERSION, arg1=MAX_ADB_DATA, - data=b'host::%s\0' % banner) - msg.Send(usb) - cmd, arg0, arg1, banner = cls.Read(usb, [b'CNXN', b'AUTH']) - if cmd == b'AUTH': - if not rsa_keys: - raise usb_exceptions.DeviceAuthError( - 'Device authentication required, no keys available.') - # Loop through our keys, signing the last 'banner' or token. - for rsa_key in rsa_keys: - if arg0 != AUTH_TOKEN: - raise InvalidResponseError( - 'Unknown AUTH response: %s %s %s' % (arg0, arg1, banner)) - - # Do not mangle the banner property here by converting it to a string - signed_token = rsa_key.Sign(banner) - msg = cls( - command=b'AUTH', arg0=AUTH_SIGNATURE, arg1=0, data=signed_token) - msg.Send(usb) - cmd, arg0, unused_arg1, banner = cls.Read(usb, [b'CNXN', b'AUTH']) - if cmd == b'CNXN': - return banner - # None of the keys worked, so send a public key. - msg = cls( - command=b'AUTH', arg0=AUTH_RSAPUBLICKEY, arg1=0, - data=rsa_keys[0].GetPublicKey() + b'\0') - msg.Send(usb) - try: - cmd, arg0, unused_arg1, banner = cls.Read( - usb, [b'CNXN'], timeout_ms=auth_timeout_ms) - except usb_exceptions.ReadFailedError as e: - if e.usb_error.value == -7: # Timeout. - raise usb_exceptions.DeviceAuthError( - 'Accept auth key on device, then retry.') - raise - # This didn't time-out, so we got a CNXN response. - return banner - return banner - - @classmethod - def Open(cls, usb, destination, timeout_ms=None): - """Opens a new connection to the device via an OPEN message. - - Not the same as the posix 'open' or any other google3 Open methods. - - Args: - usb: USB device handle with BulkRead and BulkWrite methods. - destination: The service:command string. - timeout_ms: Timeout in milliseconds for USB packets. - - Raises: - InvalidResponseError: Wrong local_id sent to us. - InvalidCommandError: Didn't get a ready response. - - Returns: - The local connection id. - """ - local_id = 1 - msg = cls( - command=b'OPEN', arg0=local_id, arg1=0, - data=destination + b'\0') - msg.Send(usb, timeout_ms) - cmd, remote_id, their_local_id, _ = cls.Read(usb, [b'CLSE', b'OKAY'], - timeout_ms=timeout_ms) - if local_id != their_local_id: - raise InvalidResponseError( - 'Expected the local_id to be {}, got {}'.format(local_id, their_local_id)) - if cmd == b'CLSE': - # Some devices seem to be sending CLSE once more after a request, this *should* handle it - cmd, remote_id, their_local_id, _ = cls.Read(usb, [b'CLSE', b'OKAY'], - timeout_ms=timeout_ms) - # Device doesn't support this service. - if cmd == b'CLSE': - return None - if cmd != b'OKAY': - raise InvalidCommandError('Expected a ready response, got {}'.format(cmd), - cmd, (remote_id, their_local_id)) - return _AdbConnection(usb, local_id, remote_id, timeout_ms) - - @classmethod - def Command(cls, usb, service, command='', timeout_ms=None): - """One complete set of USB packets for a single command. - - Sends service:command in a new connection, reading the data for the - response. All the data is held in memory, large responses will be slow and - can fill up memory. - - Args: - usb: USB device handle with BulkRead and BulkWrite methods. - service: The service on the device to talk to. - command: The command to send to the service. - timeout_ms: Timeout for USB packets, in milliseconds. - - Raises: - InterleavedDataError: Multiple streams running over usb. - InvalidCommandError: Got an unexpected response command. - - Returns: - The response from the service. - """ - return ''.join(cls.StreamingCommand(usb, service, command, timeout_ms)) - - @classmethod - def StreamingCommand(cls, usb, service, command='', timeout_ms=None): - """One complete set of USB packets for a single command. + """ADB Protocol and message class. - Sends service:command in a new connection, reading the data for the - response. All the data is held in memory, large responses will be slow and - can fill up memory. + Protocol Notes - Args: - usb: USB device handle with BulkRead and BulkWrite methods. - service: The service on the device to talk to. - command: The command to send to the service. - timeout_ms: Timeout for USB packets, in milliseconds. + local_id/remote_id: + Turns out the documentation is host/device ambidextrous, so local_id is the + id for 'the sender' and remote_id is for 'the recipient'. So since we're + only on the host, we'll re-document with host_id and device_id: - Raises: - InterleavedDataError: Multiple streams running over usb. - InvalidCommandError: Got an unexpected response command. - - Yields: - The responses from the service. - """ - if isinstance(command, str): - command = command.encode('utf8') - connection = cls.Open( - usb, destination=b'%s:%s' % (service, command), - timeout_ms=timeout_ms) - for data in connection.ReadUntilClose(): - yield data.decode('utf8') - - @classmethod - def InteractiveShellCommand(cls, conn, cmd=None, strip_cmd=True, delim=None, strip_delim=True, clean_stdout=True): - """Retrieves stdout of the current InteractiveShell and sends a shell command if provided - TODO: Should we turn this into a yield based function so we can stream all output? - - Args: - conn: Instance of AdbConnection - cmd: Optional. Command to run on the target. - strip_cmd: Optional (default True). Strip command name from stdout. - delim: Optional. Delimiter to look for in the output to know when to stop expecting more output - (usually the shell prompt) - strip_delim: Optional (default True): Strip the provided delimiter from the output - clean_stdout: Cleanup the stdout stream of any backspaces and the characters that were deleted by the backspace - Returns: - The stdout from the shell command. + OPEN(host_id, 0, 'shell:XXX') + READY/OKAY(device_id, host_id, '') + WRITE(0, host_id, 'data') + CLOSE(device_id, host_id, '') """ - if isinstance(delim, str): - delimiter = delim.encode('utf-8') - - # Delimiter may be shell@hammerhead:/ $ - # The user or directory could change, making the delimiter somthing like root@hammerhead:/data/local/tmp $ - # Handle a partial delimiter to search on and clean up - if delim: - user_pos = delim.find(b'@') - dir_pos = delim.rfind(b':/') - if user_pos != -1 and dir_pos != -1: - partial_delim = delim[user_pos:dir_pos+1] # e.g. @hammerhead: - else: - partial_delim = delim - else: - partial_delim = None - - stdout = '' - stdout_stream = BytesIO() - original_cmd = '' - - try: + ids = [b'SYNC', b'CNXN', b'AUTH', b'OPEN', b'OKAY', b'CLSE', b'WRTE'] + commands, constants = MakeWireIDs(ids) + # An ADB message is 6 words in little-endian. + format = b'<6I' + + connections = 0 + + def __init__(self, command=None, arg0=None, arg1=None, data=b''): + self.command = self.commands[command] + self.magic = self.command ^ 0xFFFFFFFF + self.arg0 = arg0 + self.arg1 = arg1 + self.data = data + + @property + def checksum(self): + return self.CalculateChecksum(self.data) + + @staticmethod + def CalculateChecksum(data): + # The checksum is just a sum of all the bytes. I swear. + if isinstance(data, bytearray): + total = sum(data) + elif isinstance(data, bytes): + if data and isinstance(data[0], bytes): + # Python 2 bytes (str) index as single-character strings. + total = sum(map(ord, data)) + else: + # Python 3 bytes index as numbers (and PY2 empty strings sum() to 0) + total = sum(data) + else: + # Unicode strings (should never see?) + total = sum(map(ord, data)) + return total & 0xFFFFFFFF + + def Pack(self): + """Returns this message in an over-the-wire format.""" + return struct.pack(self.format, self.command, self.arg0, self.arg1, + len(self.data), self.checksum, self.magic) + + @classmethod + def Unpack(cls, message): + try: + cmd, arg0, arg1, data_length, data_checksum, unused_magic = struct.unpack( + cls.format, message) + except struct.error as e: + raise ValueError('Unable to unpack ADB command.', cls.format, message, e) + return cmd, arg0, arg1, data_length, data_checksum + + def Send(self, usb, timeout_ms=None): + """Send this message over USB.""" + usb.BulkWrite(self.Pack(), timeout_ms) + usb.BulkWrite(self.data, timeout_ms) + + @classmethod + def Read(cls, usb, expected_cmds, timeout_ms=None, total_timeout_ms=None): + """Receive a response from the device.""" + total_timeout_ms = usb.Timeout(total_timeout_ms) + start = time.time() + while True: + msg = usb.BulkRead(24, timeout_ms) + cmd, arg0, arg1, data_length, data_checksum = cls.Unpack(msg) + command = cls.constants.get(cmd) + if not command: + raise InvalidCommandError( + 'Unknown command: %x' % cmd, cmd, (arg0, arg1)) + if command in expected_cmds: + break + + if time.time() - start > total_timeout_ms: + raise InvalidCommandError( + 'Never got one of the expected responses (%s)' % expected_cmds, + cmd, (timeout_ms, total_timeout_ms)) + + if data_length > 0: + data = bytearray() + while data_length > 0: + temp = usb.BulkRead(data_length, timeout_ms) + if len(temp) != data_length: + print( + "Data_length {} does not match actual number of bytes read: {}".format(data_length, len(temp))) + data += temp + + data_length -= len(temp) + + actual_checksum = cls.CalculateChecksum(data) + if actual_checksum != data_checksum: + raise InvalidChecksumError( + 'Received checksum %s != %s', (actual_checksum, data_checksum)) + else: + data = b'' + return command, arg0, arg1, bytes(data) + + @classmethod + def Connect(cls, usb, banner=b'notadb', rsa_keys=None, auth_timeout_ms=100): + """Establish a new connection to the device. + + Args: + usb: A USBHandle with BulkRead and BulkWrite methods. + banner: A string to send as a host identifier. + rsa_keys: List of AuthSigner subclass instances to be used for + authentication. The device can either accept one of these via the Sign + method, or we will send the result of GetPublicKey from the first one + if the device doesn't accept any of them. + auth_timeout_ms: Timeout to wait for when sending a new public key. This + is only relevant when we send a new public key. The device shows a + dialog and this timeout is how long to wait for that dialog. If used + in automation, this should be low to catch such a case as a failure + quickly; while in interactive settings it should be high to allow + users to accept the dialog. We default to automation here, so it's low + by default. + + Returns: + The device's reported banner. Always starts with the state (device, + recovery, or sideload), sometimes includes information after a : with + various product information. + + Raises: + usb_exceptions.DeviceAuthError: When the device expects authentication, + but we weren't given any valid keys. + InvalidResponseError: When the device does authentication in an + unexpected way. + """ + msg = cls( + command=b'CNXN', arg0=VERSION, arg1=MAX_ADB_DATA, + data=b'host::%s\0' % banner) + msg.Send(usb) + cmd, arg0, arg1, banner = cls.Read(usb, [b'CNXN', b'AUTH']) + if cmd == b'AUTH': + if not rsa_keys: + raise usb_exceptions.DeviceAuthError( + 'Device authentication required, no keys available.') + # Loop through our keys, signing the last 'banner' or token. + for rsa_key in rsa_keys: + if arg0 != AUTH_TOKEN: + raise InvalidResponseError( + 'Unknown AUTH response: %s %s %s' % (arg0, arg1, banner)) + + # Do not mangle the banner property here by converting it to a string + signed_token = rsa_key.Sign(banner) + msg = cls( + command=b'AUTH', arg0=AUTH_SIGNATURE, arg1=0, data=signed_token) + msg.Send(usb) + cmd, arg0, unused_arg1, banner = cls.Read(usb, [b'CNXN', b'AUTH']) + if cmd == b'CNXN': + return banner + # None of the keys worked, so send a public key. + msg = cls( + command=b'AUTH', arg0=AUTH_RSAPUBLICKEY, arg1=0, + data=rsa_keys[0].GetPublicKey() + b'\0') + msg.Send(usb) + try: + cmd, arg0, unused_arg1, banner = cls.Read( + usb, [b'CNXN'], timeout_ms=auth_timeout_ms) + except usb_exceptions.ReadFailedError as e: + if e.usb_error.value == -7: # Timeout. + raise usb_exceptions.DeviceAuthError( + 'Accept auth key on device, then retry.') + raise + # This didn't time-out, so we got a CNXN response. + return banner + return banner + + @classmethod + def Open(cls, usb, destination, timeout_ms=None): + """Opens a new connection to the device via an OPEN message. + + Not the same as the posix 'open' or any other google3 Open methods. + + Args: + usb: USB device handle with BulkRead and BulkWrite methods. + destination: The service:command string. + timeout_ms: Timeout in milliseconds for USB packets. + + Raises: + InvalidResponseError: Wrong local_id sent to us. + InvalidCommandError: Didn't get a ready response. + + Returns: + The local connection id. + """ + local_id = 1 + msg = cls( + command=b'OPEN', arg0=local_id, arg1=0, + data=destination + b'\0') + msg.Send(usb, timeout_ms) + cmd, remote_id, their_local_id, _ = cls.Read(usb, [b'CLSE', b'OKAY'], + timeout_ms=timeout_ms) + if local_id != their_local_id: + raise InvalidResponseError( + 'Expected the local_id to be {}, got {}'.format(local_id, their_local_id)) + if cmd == b'CLSE': + # Some devices seem to be sending CLSE once more after a request, this *should* handle it + cmd, remote_id, their_local_id, _ = cls.Read(usb, [b'CLSE', b'OKAY'], + timeout_ms=timeout_ms) + # Device doesn't support this service. + if cmd == b'CLSE': + return None + if cmd != b'OKAY': + raise InvalidCommandError('Expected a ready response, got {}'.format(cmd), + cmd, (remote_id, their_local_id)) + return _AdbConnection(usb, local_id, remote_id, timeout_ms) + + @classmethod + def Command(cls, usb, service, command='', timeout_ms=None): + """One complete set of USB packets for a single command. + + Sends service:command in a new connection, reading the data for the + response. All the data is held in memory, large responses will be slow and + can fill up memory. + + Args: + usb: USB device handle with BulkRead and BulkWrite methods. + service: The service on the device to talk to. + command: The command to send to the service. + timeout_ms: Timeout for USB packets, in milliseconds. + + Raises: + InterleavedDataError: Multiple streams running over usb. + InvalidCommandError: Got an unexpected response command. + + Returns: + The response from the service. + """ + return ''.join(cls.StreamingCommand(usb, service, command, timeout_ms)) + + @classmethod + def StreamingCommand(cls, usb, service, command='', timeout_ms=None): + """One complete set of USB packets for a single command. + + Sends service:command in a new connection, reading the data for the + response. All the data is held in memory, large responses will be slow and + can fill up memory. + + Args: + usb: USB device handle with BulkRead and BulkWrite methods. + service: The service on the device to talk to. + command: The command to send to the service. + timeout_ms: Timeout for USB packets, in milliseconds. + + Raises: + InterleavedDataError: Multiple streams running over usb. + InvalidCommandError: Got an unexpected response command. + + Yields: + The responses from the service. + """ + if isinstance(command, str): + command = command.encode('utf8') + connection = cls.Open( + usb, destination=b'%s:%s' % (service, command), + timeout_ms=timeout_ms) + for data in connection.ReadUntilClose(): + yield data.decode('utf8') + + @classmethod + def InteractiveShellCommand(cls, conn, cmd=None, strip_cmd=True, delim=None, strip_delim=True, clean_stdout=True): + """Retrieves stdout of the current InteractiveShell and sends a shell command if provided + TODO: Should we turn this into a yield based function so we can stream all output? + + Args: + conn: Instance of AdbConnection + cmd: Optional. Command to run on the target. + strip_cmd: Optional (default True). Strip command name from stdout. + delim: Optional. Delimiter to look for in the output to know when to stop expecting more output + (usually the shell prompt) + strip_delim: Optional (default True): Strip the provided delimiter from the output + clean_stdout: Cleanup the stdout stream of any backspaces and the characters that were deleted by the backspace + Returns: + The stdout from the shell command. + """ + + if isinstance(delim, str): + delim = delim.encode('utf-8') + + # Delimiter may be shell@hammerhead:/ $ + # The user or directory could change, making the delimiter somthing like root@hammerhead:/data/local/tmp $ + # Handle a partial delimiter to search on and clean up + if delim: + user_pos = delim.find(b'@') + dir_pos = delim.rfind(b':/') + if user_pos != -1 and dir_pos != -1: + partial_delim = delim[user_pos:dir_pos + 1] # e.g. @hammerhead: + else: + partial_delim = delim + else: + partial_delim = None - if cmd: - original_cmd = str(cmd) - cmd += '\r' # Required. Send a carriage return right after the cmd - cmd = cmd.encode('utf8') + stdout = '' + stdout_stream = BytesIO() + original_cmd = '' - # Send the cmd raw - bytes_written = conn.Write(cmd) + try: - if delim: - # Expect multiple WRTE cmds until the delim (usually terminal prompt) is detected + if cmd: + original_cmd = str(cmd) + cmd += '\r' # Required. Send a carriage return right after the cmd + cmd = cmd.encode('utf8') - data = b'' - while partial_delim not in data: + # Send the cmd raw + bytes_written = conn.Write(cmd) - cmd, data = conn.ReadUntil(b'WRTE') - stdout_stream.write(data) + if delim: + # Expect multiple WRTE cmds until the delim (usually terminal prompt) is detected - else: - # Otherwise, expect only a single WRTE - cmd, data = conn.ReadUntil(b'WRTE') + data = b'' + while partial_delim not in data: + cmd, data = conn.ReadUntil(b'WRTE') + stdout_stream.write(data) - # WRTE cmd from device will follow with stdout data - stdout_stream.write(data) + else: + # Otherwise, expect only a single WRTE + cmd, data = conn.ReadUntil(b'WRTE') - else: + # WRTE cmd from device will follow with stdout data + stdout_stream.write(data) - # No cmd provided means we should just expect a single line from the terminal. Use this sparingly - cmd, data = conn.ReadUntil(b'WRTE') - if cmd == b'WRTE': - # WRTE cmd from device will follow with stdout data - stdout_stream.write(data) - else: - print("Unhandled cmd: {}".format(cmd)) + else: - cleaned_stdout_stream = BytesIO() - if clean_stdout: - stdout_bytes = stdout_stream.getvalue() + # No cmd provided means we should just expect a single line from the terminal. Use this sparingly + cmd, data = conn.ReadUntil(b'WRTE') + if cmd == b'WRTE': + # WRTE cmd from device will follow with stdout data + stdout_stream.write(data) + else: + print("Unhandled cmd: {}".format(cmd)) - bsruns = {} # Backspace runs tracking - next_start_pos = 0 - last_run_pos, last_run_len = find_backspace_runs(stdout_bytes, next_start_pos) + cleaned_stdout_stream = BytesIO() + if clean_stdout: + stdout_bytes = stdout_stream.getvalue() - if last_run_pos != -1 and last_run_len != 0: - bsruns.update({last_run_pos: last_run_len}) - cleaned_stdout_stream.write(stdout_bytes[next_start_pos:(last_run_pos-last_run_len)]) - next_start_pos += last_run_pos + last_run_len + bsruns = {} # Backspace runs tracking + next_start_pos = 0 + last_run_pos, last_run_len = find_backspace_runs(stdout_bytes, next_start_pos) - while last_run_pos != -1: - last_run_pos, last_run_len = find_backspace_runs(stdout_bytes[next_start_pos:], next_start_pos) + if last_run_pos != -1 and last_run_len != 0: + bsruns.update({last_run_pos: last_run_len}) + cleaned_stdout_stream.write(stdout_bytes[next_start_pos:(last_run_pos - last_run_len)]) + next_start_pos += last_run_pos + last_run_len - if last_run_pos != -1: - bsruns.update({last_run_pos: last_run_len}) - cleaned_stdout_stream.write(stdout_bytes[next_start_pos:(last_run_pos - last_run_len)]) - next_start_pos += last_run_pos + last_run_len + while last_run_pos != -1: + last_run_pos, last_run_len = find_backspace_runs(stdout_bytes[next_start_pos:], next_start_pos) - cleaned_stdout_stream.write(stdout_bytes[next_start_pos:]) + if last_run_pos != -1: + bsruns.update({last_run_pos: last_run_len}) + cleaned_stdout_stream.write(stdout_bytes[next_start_pos:(last_run_pos - last_run_len)]) + next_start_pos += last_run_pos + last_run_len - else: - cleaned_stdout_stream.write(stdout_stream.getvalue()) + cleaned_stdout_stream.write(stdout_bytes[next_start_pos:]) - stdout = cleaned_stdout_stream.getvalue() + else: + cleaned_stdout_stream.write(stdout_stream.getvalue()) - # Strip original cmd that will come back in stdout - if original_cmd and strip_cmd: - findstr = original_cmd.encode('utf-8') + b'\r\r\n' - pos = stdout.find(findstr) - while pos >= 0: - stdout = stdout.replace(findstr, b'') - pos = stdout.find(findstr) + stdout = cleaned_stdout_stream.getvalue() - if b'\r\r\n' in stdout: - stdout = stdout.split(b'\r\r\n')[1] + # Strip original cmd that will come back in stdout + if original_cmd and strip_cmd: + findstr = original_cmd.encode('utf-8') + b'\r\r\n' + pos = stdout.find(findstr) + while pos >= 0: + stdout = stdout.replace(findstr, b'') + pos = stdout.find(findstr) - # Strip delim if requested - # TODO: Handling stripping partial delims here - not a deal breaker the way we're handling it now - if delim and strip_delim: + if b'\r\r\n' in stdout: + stdout = stdout.split(b'\r\r\n')[1] - stdout = stdout.replace(delim, b'') + # Strip delim if requested + # TODO: Handling stripping partial delims here - not a deal breaker the way we're handling it now + if delim and strip_delim: + stdout = stdout.replace(delim, b'') - stdout = stdout.rstrip() + stdout = stdout.rstrip() - except Exception as e: - print("InteractiveShell exception (most likely timeout): {}".format(e)) + except Exception as e: + print("InteractiveShell exception (most likely timeout): {}".format(e)) - return stdout + return stdout diff --git a/adb/common.py b/adb/common.py index 2c95f8e..a68414f 100644 --- a/adb/common.py +++ b/adb/common.py @@ -33,308 +33,311 @@ def GetInterface(setting): - """Get the class, subclass, and protocol for the given USB setting.""" - return (setting.getClass(), setting.getSubClass(), setting.getProtocol()) + """Get the class, subclass, and protocol for the given USB setting.""" + return (setting.getClass(), setting.getSubClass(), setting.getProtocol()) def InterfaceMatcher(clazz, subclass, protocol): - """Returns a matcher that returns the setting with the given interface.""" - interface = (clazz, subclass, protocol) - def Matcher(device): - for setting in device.iterSettings(): - if GetInterface(setting) == interface: - return setting - return Matcher + """Returns a matcher that returns the setting with the given interface.""" + interface = (clazz, subclass, protocol) + def Matcher(device): + for setting in device.iterSettings(): + if GetInterface(setting) == interface: + return setting -class UsbHandle(object): - """USB communication object. Not thread-safe. - - Handles reading and writing over USB with the proper endpoints, exceptions, - and interface claiming. + return Matcher - Important methods: - FlushBuffers() - BulkRead(int length) - BulkWrite(bytes data) - """ - _HANDLE_CACHE = weakref.WeakValueDictionary() - _HANDLE_CACHE_LOCK = threading.Lock() +class UsbHandle(object): + """USB communication object. Not thread-safe. - def __init__(self, device, setting, usb_info=None, timeout_ms=None): - """Initialize USB Handle. + Handles reading and writing over USB with the proper endpoints, exceptions, + and interface claiming. - Arguments: - device: libusb_device to connect to. - setting: libusb setting with the correct endpoints to communicate with. - usb_info: String describing the usb path/serial/device, for debugging. - timeout_ms: Timeout in milliseconds for all I/O. - """ - self._setting = setting - self._device = device - self._handle = None - - self._usb_info = usb_info or '' - self._timeout_ms = timeout_ms if timeout_ms else DEFAULT_TIMEOUT_MS - self._max_read_packet_len = 0 - - @property - def usb_info(self): - try: - sn = self.serial_number - except libusb1.USBError: - sn = '' - if sn and sn != self._usb_info: - return '%s %s' % (self._usb_info, sn) - return self._usb_info - - def Open(self): - """Opens the USB device for this setting, and claims the interface.""" - # Make sure we close any previous handle open to this usb device. - port_path = tuple(self.port_path) - with self._HANDLE_CACHE_LOCK: - old_handle = self._HANDLE_CACHE.get(port_path) - if old_handle is not None: - old_handle.Close() - - self._read_endpoint = None - self._write_endpoint = None - - for endpoint in self._setting.iterEndpoints(): - address = endpoint.getAddress() - if address & libusb1.USB_ENDPOINT_DIR_MASK: - self._read_endpoint = address - self._max_read_packet_len = endpoint.getMaxPacketSize() - else: - self._write_endpoint = address - - assert self._read_endpoint is not None - assert self._write_endpoint is not None - - handle = self._device.open() - iface_number = self._setting.getNumber() - try: - if (platform.system() != 'Windows' - and handle.kernelDriverActive(iface_number)): - handle.detachKernelDriver(iface_number) - except libusb1.USBError as e: - if e.value == libusb1.LIBUSB_ERROR_NOT_FOUND: - _LOG.warning('Kernel driver not found for interface: %s.', iface_number) - else: - raise - handle.claimInterface(iface_number) - self._handle = handle - self._interface_number = iface_number - - with self._HANDLE_CACHE_LOCK: - self._HANDLE_CACHE[port_path] = self - # When this object is deleted, make sure it's closed. - weakref.ref(self, self.Close) - - @property - def serial_number(self): - return self._device.getSerialNumber() - - @property - def port_path(self): - return [self._device.getBusNumber()] + self._device.getPortNumberList() - - def Close(self): - if self._handle is None: - return - try: - self._handle.releaseInterface(self._interface_number) - self._handle.close() - except libusb1.USBError: - _LOG.info('USBError while closing handle %s: ', - self.usb_info, exc_info=True) - finally: - self._handle = None - - def Timeout(self, timeout_ms): - return timeout_ms if timeout_ms is not None else self._timeout_ms - - def FlushBuffers(self): - while True: - try: - self.BulkRead(self._max_read_packet_len, timeout_ms=10) - except usb_exceptions.ReadFailedError as e: - if e.usb_error.value == libusb1.LIBUSB_ERROR_TIMEOUT: - break - raise - - def BulkWrite(self, data, timeout_ms=None): - if self._handle is None: - raise usb_exceptions.WriteFailedError( - 'This handle has been closed, probably due to another being opened.', - None) - try: - return self._handle.bulkWrite( - self._write_endpoint, data, timeout=self.Timeout(timeout_ms)) - except libusb1.USBError as e: - raise usb_exceptions.WriteFailedError( - 'Could not send data to %s (timeout %sms)' % ( - self.usb_info, self.Timeout(timeout_ms)), e) - - def BulkRead(self, length, timeout_ms=None): - if self._handle is None: - raise usb_exceptions.ReadFailedError( - 'This handle has been closed, probably due to another being opened.', - None) - try: - # python-libusb1 > 1.6 exposes bytearray()s now instead of bytes/str. - # To support older and newer versions, we ensure everything's bytearray() - # from here on out. - return bytearray(self._handle.bulkRead( - self._read_endpoint, length, timeout=self.Timeout(timeout_ms))) - except libusb1.USBError as e: - raise usb_exceptions.ReadFailedError( - 'Could not receive data from %s (timeout %sms)' % ( - self.usb_info, self.Timeout(timeout_ms)), e) - - def BulkReadAsync(self, length, timeout_ms=None): - # See: https://pypi.python.org/pypi/libusb1 "Asynchronous I/O" section - return - - @classmethod - def PortPathMatcher(cls, port_path): - """Returns a device matcher for the given port path.""" - if isinstance(port_path, str): - # Convert from sysfs path to port_path. - port_path = [int(part) for part in SYSFS_PORT_SPLIT_RE.split(port_path)] - return lambda device: device.port_path == port_path - - @classmethod - def SerialMatcher(cls, serial): - """Returns a device matcher for the given serial.""" - return lambda device: device.serial_number == serial - - @classmethod - def FindAndOpen(cls, setting_matcher, - port_path=None, serial=None, timeout_ms=None): - dev = cls.Find( - setting_matcher, port_path=port_path, serial=serial, - timeout_ms=timeout_ms) - dev.Open() - dev.FlushBuffers() - return dev - - @classmethod - def Find(cls, setting_matcher, port_path=None, serial=None, timeout_ms=None): - """Gets the first device that matches according to the keyword args.""" - if port_path: - device_matcher = cls.PortPathMatcher(port_path) - usb_info = port_path - elif serial: - device_matcher = cls.SerialMatcher(serial) - usb_info = serial - else: - device_matcher = None - usb_info = 'first' - return cls.FindFirst(setting_matcher, device_matcher, - usb_info=usb_info, timeout_ms=timeout_ms) - - @classmethod - def FindFirst(cls, setting_matcher, device_matcher=None, **kwargs): - """Find and return the first matching device. - - Args: - setting_matcher: See cls.FindDevices. - device_matcher: See cls.FindDevices. - **kwargs: See cls.FindDevices. - - Returns: - An instance of UsbHandle. - - Raises: - DeviceNotFoundError: Raised if the device is not available. - """ - try: - return next(cls.FindDevices( - setting_matcher, device_matcher=device_matcher, **kwargs)) - except StopIteration: - raise usb_exceptions.DeviceNotFoundError( - 'No device available, or it is in the wrong configuration.') - - @classmethod - def FindDevices(cls, setting_matcher, device_matcher=None, - usb_info='', timeout_ms=None): - """Find and yield the devices that match. - - Args: - setting_matcher: Function that returns the setting to use given a - usb1.USBDevice, or None if the device doesn't have a valid setting. - device_matcher: Function that returns True if the given UsbHandle is - valid. None to match any device. - usb_info: Info string describing device(s). - timeout_ms: Default timeout of commands in milliseconds. - - Yields: - UsbHandle instances + Important methods: + FlushBuffers() + BulkRead(int length) + BulkWrite(bytes data) """ - ctx = usb1.USBContext() - for device in ctx.getDeviceList(skip_on_error=True): - setting = setting_matcher(device) - if setting is None: - continue - handle = cls(device, setting, usb_info=usb_info, timeout_ms=timeout_ms) - if device_matcher is None or device_matcher(handle): - yield handle + _HANDLE_CACHE = weakref.WeakValueDictionary() + _HANDLE_CACHE_LOCK = threading.Lock() + + def __init__(self, device, setting, usb_info=None, timeout_ms=None): + """Initialize USB Handle. + + Arguments: + device: libusb_device to connect to. + setting: libusb setting with the correct endpoints to communicate with. + usb_info: String describing the usb path/serial/device, for debugging. + timeout_ms: Timeout in milliseconds for all I/O. + """ + self._setting = setting + self._device = device + self._handle = None + + self._usb_info = usb_info or '' + self._timeout_ms = timeout_ms if timeout_ms else DEFAULT_TIMEOUT_MS + self._max_read_packet_len = 0 + + @property + def usb_info(self): + try: + sn = self.serial_number + except libusb1.USBError: + sn = '' + if sn and sn != self._usb_info: + return '%s %s' % (self._usb_info, sn) + return self._usb_info + + def Open(self): + """Opens the USB device for this setting, and claims the interface.""" + # Make sure we close any previous handle open to this usb device. + port_path = tuple(self.port_path) + with self._HANDLE_CACHE_LOCK: + old_handle = self._HANDLE_CACHE.get(port_path) + if old_handle is not None: + old_handle.Close() + + self._read_endpoint = None + self._write_endpoint = None + + for endpoint in self._setting.iterEndpoints(): + address = endpoint.getAddress() + if address & libusb1.USB_ENDPOINT_DIR_MASK: + self._read_endpoint = address + self._max_read_packet_len = endpoint.getMaxPacketSize() + else: + self._write_endpoint = address + + assert self._read_endpoint is not None + assert self._write_endpoint is not None + + handle = self._device.open() + iface_number = self._setting.getNumber() + try: + if (platform.system() != 'Windows' + and handle.kernelDriverActive(iface_number)): + handle.detachKernelDriver(iface_number) + except libusb1.USBError as e: + if e.value == libusb1.LIBUSB_ERROR_NOT_FOUND: + _LOG.warning('Kernel driver not found for interface: %s.', iface_number) + else: + raise + handle.claimInterface(iface_number) + self._handle = handle + self._interface_number = iface_number + + with self._HANDLE_CACHE_LOCK: + self._HANDLE_CACHE[port_path] = self + # When this object is deleted, make sure it's closed. + weakref.ref(self, self.Close) + + @property + def serial_number(self): + return self._device.getSerialNumber() + + @property + def port_path(self): + return [self._device.getBusNumber()] + self._device.getPortNumberList() + + def Close(self): + if self._handle is None: + return + try: + self._handle.releaseInterface(self._interface_number) + self._handle.close() + except libusb1.USBError: + _LOG.info('USBError while closing handle %s: ', + self.usb_info, exc_info=True) + finally: + self._handle = None + + def Timeout(self, timeout_ms): + return timeout_ms if timeout_ms is not None else self._timeout_ms + + def FlushBuffers(self): + while True: + try: + self.BulkRead(self._max_read_packet_len, timeout_ms=10) + except usb_exceptions.ReadFailedError as e: + if e.usb_error.value == libusb1.LIBUSB_ERROR_TIMEOUT: + break + raise + + def BulkWrite(self, data, timeout_ms=None): + if self._handle is None: + raise usb_exceptions.WriteFailedError( + 'This handle has been closed, probably due to another being opened.', + None) + try: + return self._handle.bulkWrite( + self._write_endpoint, data, timeout=self.Timeout(timeout_ms)) + except libusb1.USBError as e: + raise usb_exceptions.WriteFailedError( + 'Could not send data to %s (timeout %sms)' % ( + self.usb_info, self.Timeout(timeout_ms)), e) + + def BulkRead(self, length, timeout_ms=None): + if self._handle is None: + raise usb_exceptions.ReadFailedError( + 'This handle has been closed, probably due to another being opened.', + None) + try: + # python-libusb1 > 1.6 exposes bytearray()s now instead of bytes/str. + # To support older and newer versions, we ensure everything's bytearray() + # from here on out. + return bytearray(self._handle.bulkRead( + self._read_endpoint, length, timeout=self.Timeout(timeout_ms))) + except libusb1.USBError as e: + raise usb_exceptions.ReadFailedError( + 'Could not receive data from %s (timeout %sms)' % ( + self.usb_info, self.Timeout(timeout_ms)), e) + + def BulkReadAsync(self, length, timeout_ms=None): + # See: https://pypi.python.org/pypi/libusb1 "Asynchronous I/O" section + return + + @classmethod + def PortPathMatcher(cls, port_path): + """Returns a device matcher for the given port path.""" + if isinstance(port_path, str): + # Convert from sysfs path to port_path. + port_path = [int(part) for part in SYSFS_PORT_SPLIT_RE.split(port_path)] + return lambda device: device.port_path == port_path + + @classmethod + def SerialMatcher(cls, serial): + """Returns a device matcher for the given serial.""" + return lambda device: device.serial_number == serial + + @classmethod + def FindAndOpen(cls, setting_matcher, + port_path=None, serial=None, timeout_ms=None): + dev = cls.Find( + setting_matcher, port_path=port_path, serial=serial, + timeout_ms=timeout_ms) + dev.Open() + dev.FlushBuffers() + return dev + + @classmethod + def Find(cls, setting_matcher, port_path=None, serial=None, timeout_ms=None): + """Gets the first device that matches according to the keyword args.""" + if port_path: + device_matcher = cls.PortPathMatcher(port_path) + usb_info = port_path + elif serial: + device_matcher = cls.SerialMatcher(serial) + usb_info = serial + else: + device_matcher = None + usb_info = 'first' + return cls.FindFirst(setting_matcher, device_matcher, + usb_info=usb_info, timeout_ms=timeout_ms) + + @classmethod + def FindFirst(cls, setting_matcher, device_matcher=None, **kwargs): + """Find and return the first matching device. + + Args: + setting_matcher: See cls.FindDevices. + device_matcher: See cls.FindDevices. + **kwargs: See cls.FindDevices. + + Returns: + An instance of UsbHandle. + + Raises: + DeviceNotFoundError: Raised if the device is not available. + """ + try: + return next(cls.FindDevices( + setting_matcher, device_matcher=device_matcher, **kwargs)) + except StopIteration: + raise usb_exceptions.DeviceNotFoundError( + 'No device available, or it is in the wrong configuration.') + + @classmethod + def FindDevices(cls, setting_matcher, device_matcher=None, + usb_info='', timeout_ms=None): + """Find and yield the devices that match. + + Args: + setting_matcher: Function that returns the setting to use given a + usb1.USBDevice, or None if the device doesn't have a valid setting. + device_matcher: Function that returns True if the given UsbHandle is + valid. None to match any device. + usb_info: Info string describing device(s). + timeout_ms: Default timeout of commands in milliseconds. + + Yields: + UsbHandle instances + """ + ctx = usb1.USBContext() + for device in ctx.getDeviceList(skip_on_error=True): + setting = setting_matcher(device) + if setting is None: + continue + + handle = cls(device, setting, usb_info=usb_info, timeout_ms=timeout_ms) + if device_matcher is None or device_matcher(handle): + yield handle -class TcpHandle(object): - """TCP connection object. - - Provides same interface as UsbHandle. """ - - def __init__(self, serial, timeout_ms=None): - """Initialize the TCP Handle. - Arguments: - serial: Android device serial of the form host or host:port. - Host may be an IP address or a host name. - """ - if b':' in serial: - (host, port) = serial.split(b':') - else: - host = serial - port = 5555 - self._serial_number = '%s:%s' % (host, port) - self._timeout_ms = float(timeout_ms) if timeout_ms else None - timeout = self.TimeoutSeconds(self._timeout_ms) - self._connection = socket.create_connection((host, port), timeout=timeout) - if timeout: - self._connection.setblocking(0) - - @property - def serial_number(self): - return self._serial_number - - def BulkWrite(self, data, timeout=None): - t = self.TimeoutSeconds(timeout) - _, writeable, _ = select.select([], [self._connection], [], t) - if writeable: - return self._connection.send(data) - msg = 'Sending data to {} timed out after {}s. No data was sent.'.format( - self.serial_number, t) - raise usb_exceptions.TcpTimeoutException(msg) - - def BulkRead(self, numbytes, timeout=None): - t = self.TimeoutSeconds(timeout) - readable, _, _ = select.select([self._connection], [], [], t) - if readable: - return self._connection.recv(numbytes) - msg = 'Reading from {} timed out (Timeout {}s)'.format( - self._serial_number, t) - raise usb_exceptions.TcpTimeoutException(msg) - - def Timeout(self, timeout_ms): - return float(timeout_ms) if timeout_ms is not None else self._timeout_ms - - def TimeoutSeconds(self, timeout_ms): - timeout = self.Timeout(timeout_ms) - return timeout / 1000.0 if timeout is not None else timeout - - def Close(self): - return self._connection.close() +class TcpHandle(object): + """TCP connection object. + + Provides same interface as UsbHandle. """ + + def __init__(self, serial, timeout_ms=None): + """Initialize the TCP Handle. + Arguments: + serial: Android device serial of the form host or host:port. + + Host may be an IP address or a host name. + """ + if b':' in serial: + (host, port) = serial.split(b':') + else: + host = serial + port = 5555 + self._serial_number = '%s:%s' % (host, port) + self._timeout_ms = float(timeout_ms) if timeout_ms else None + timeout = self.TimeoutSeconds(self._timeout_ms) + self._connection = socket.create_connection((host, port), timeout=timeout) + if timeout: + self._connection.setblocking(0) + + @property + def serial_number(self): + return self._serial_number + + def BulkWrite(self, data, timeout=None): + t = self.TimeoutSeconds(timeout) + _, writeable, _ = select.select([], [self._connection], [], t) + if writeable: + return self._connection.send(data) + msg = 'Sending data to {} timed out after {}s. No data was sent.'.format( + self.serial_number, t) + raise usb_exceptions.TcpTimeoutException(msg) + + def BulkRead(self, numbytes, timeout=None): + t = self.TimeoutSeconds(timeout) + readable, _, _ = select.select([self._connection], [], [], t) + if readable: + return self._connection.recv(numbytes) + msg = 'Reading from {} timed out (Timeout {}s)'.format( + self._serial_number, t) + raise usb_exceptions.TcpTimeoutException(msg) + + def Timeout(self, timeout_ms): + return float(timeout_ms) if timeout_ms is not None else self._timeout_ms + + def TimeoutSeconds(self, timeout_ms): + timeout = self.Timeout(timeout_ms) + return timeout / 1000.0 if timeout is not None else timeout + + def Close(self): + return self._connection.close() diff --git a/adb/common_cli.py b/adb/common_cli.py index f3d2be6..fce327e 100644 --- a/adb/common_cli.py +++ b/adb/common_cli.py @@ -32,131 +32,132 @@ class _PortPathAction(argparse.Action): - def __call__(self, parser, namespace, values, option_string=None): - setattr( - namespace, self.dest, - [int(i) for i in values.replace('/', ',').split(',')]) + def __call__(self, parser, namespace, values, option_string=None): + setattr( + namespace, self.dest, + [int(i) for i in values.replace('/', ',').split(',')]) class PositionalArg(argparse.Action): - def __call__(self, parser, namespace, values, option_string=None): - namespace.positional.append(values) + def __call__(self, parser, namespace, values, option_string=None): + namespace.positional.append(values) def GetDeviceArguments(): - group = argparse.ArgumentParser('Device', add_help=False) - group.add_argument( - '--timeout_ms', default=10000, type=int, metavar='10000', - help='Timeout in milliseconds.') - group.add_argument( - '--port_path', action=_PortPathAction, - help='USB port path integers (eg 1,2 or 2,1,1)') - group.add_argument( - '-s', '--serial', - help='Device serial to look for (host:port or USB serial)') - return group + group = argparse.ArgumentParser('Device', add_help=False) + group.add_argument( + '--timeout_ms', default=10000, type=int, metavar='10000', + help='Timeout in milliseconds.') + group.add_argument( + '--port_path', action=_PortPathAction, + help='USB port path integers (eg 1,2 or 2,1,1)') + group.add_argument( + '-s', '--serial', + help='Device serial to look for (host:port or USB serial)') + return group def GetCommonArguments(): - group = argparse.ArgumentParser('Common', add_help=False) - group.add_argument('--verbose', action='store_true', help='Enable logging') - return group + group = argparse.ArgumentParser('Common', add_help=False) + group.add_argument('--verbose', action='store_true', help='Enable logging') + return group def _DocToArgs(doc): - """Converts a docstring documenting arguments into a dict.""" - offset = None - in_arg = False - out = {} - for l in doc.splitlines(): - if l.strip() == 'Args:': - in_arg = True - elif in_arg: - if not l.strip(): - break - if offset is None: - offset = len(l) - len(l.lstrip()) - l = l[offset:] - if l[0] == ' ': - out[m.group(1)] += ' ' + l.lstrip() - else: - m = re.match(r'^([a-z_]+): (.+)$', l.strip()) - out[m.group(1)] = m.group(2) - return out + """Converts a docstring documenting arguments into a dict.""" + offset = None + in_arg = False + out = {} + for l in doc.splitlines(): + if l.strip() == 'Args:': + in_arg = True + elif in_arg: + if not l.strip(): + break + if offset is None: + offset = len(l) - len(l.lstrip()) + l = l[offset:] + if l[0] == ' ': + out[m.group(1)] += ' ' + l.lstrip() + else: + m = re.match(r'^([a-z_]+): (.+)$', l.strip()) + out[m.group(1)] = m.group(2) + return out def MakeSubparser(subparsers, parents, method, arguments=None): - """Returns an argparse subparser to create a 'subcommand' to adb.""" - name = ('-'.join(re.split(r'([A-Z][a-z]+)', method.__name__)[1:-1:2])).lower() - help = method.__doc__.splitlines()[0] - subparser = subparsers.add_parser( - name=name, description=help, help=help.rstrip('.'), parents=parents) - subparser.set_defaults(method=method, positional=[]) - argspec = inspect.getargspec(method) - - # Figure out positionals and default argument, if any. Explicitly includes - # arguments that default to '' but excludes arguments that default to None. - offset = len(argspec.args) - len(argspec.defaults or []) - 1 - positional = [] - for i in range(1, len(argspec.args)): - if i > offset and argspec.defaults[i-offset-1] is None: - break - positional.append(argspec.args[i]) - defaults = [None] * offset + list(argspec.defaults or []) - - # Add all arguments so they append to args.positional. - args_help = _DocToArgs(method.__doc__) - for name, default in zip(positional, defaults): - if not isinstance(default, (None.__class__, str)): - continue - subparser.add_argument( - name, help=(arguments or {}).get(name, args_help.get(name)), - default=default, nargs='?' if default is not None else None, - action=PositionalArg) - if argspec.varargs: - subparser.add_argument( - argspec.varargs, nargs=argparse.REMAINDER, - help=(arguments or {}).get(argspec.varargs, args_help.get(argspec.varargs))) - return subparser + """Returns an argparse subparser to create a 'subcommand' to adb.""" + name = ('-'.join(re.split(r'([A-Z][a-z]+)', method.__name__)[1:-1:2])).lower() + help = method.__doc__.splitlines()[0] + subparser = subparsers.add_parser( + name=name, description=help, help=help.rstrip('.'), parents=parents) + subparser.set_defaults(method=method, positional=[]) + argspec = inspect.getargspec(method) + + # Figure out positionals and default argument, if any. Explicitly includes + # arguments that default to '' but excludes arguments that default to None. + offset = len(argspec.args) - len(argspec.defaults or []) - 1 + positional = [] + for i in range(1, len(argspec.args)): + if i > offset and argspec.defaults[i - offset - 1] is None: + break + positional.append(argspec.args[i]) + defaults = [None] * offset + list(argspec.defaults or []) + + # Add all arguments so they append to args.positional. + args_help = _DocToArgs(method.__doc__) + for name, default in zip(positional, defaults): + if not isinstance(default, (None.__class__, str)): + continue + subparser.add_argument( + name, help=(arguments or {}).get(name, args_help.get(name)), + default=default, nargs='?' if default is not None else None, + action=PositionalArg) + if argspec.varargs: + subparser.add_argument( + argspec.varargs, nargs=argparse.REMAINDER, + help=(arguments or {}).get(argspec.varargs, args_help.get(argspec.varargs))) + return subparser def _RunMethod(dev, args, extra): - """Runs a method registered via MakeSubparser.""" - logging.info('%s(%s)', args.method.__name__, ', '.join(args.positional)) - result = args.method(dev, *args.positional, **extra) - if result is not None: - if isinstance(result, io.StringIO): - sys.stdout.write(result.getvalue()) - elif isinstance(result, (list, types.GeneratorType)): - r = '' - for r in result: - r = str(r) - sys.stdout.write(r) - if not r.endswith('\n'): - sys.stdout.write('\n') - else: - result = str(result) - sys.stdout.write(result) - if not result.endswith('\n'): - sys.stdout.write('\n') - return 0 + """Runs a method registered via MakeSubparser.""" + logging.info('%s(%s)', args.method.__name__, ', '.join(args.positional)) + result = args.method(dev, *args.positional, **extra) + if result is not None: + if isinstance(result, io.StringIO): + sys.stdout.write(result.getvalue()) + elif isinstance(result, (list, types.GeneratorType)): + r = '' + for r in result: + r = str(r) + sys.stdout.write(r) + if not r.endswith('\n'): + sys.stdout.write('\n') + else: + result = str(result) + sys.stdout.write(result) + if not result.endswith('\n'): + sys.stdout.write('\n') + return 0 def StartCli(args, adb_commands, extra=None, **device_kwargs): - """Starts a common CLI interface for this usb path and protocol.""" - try: - dev = adb_commands() - dev.ConnectDevice(port_path=args.port_path, serial=args.serial, default_timeout_ms=args.timeout_ms, **device_kwargs) - except usb_exceptions.DeviceNotFoundError as e: - print('No device found: {}'.format(e), file=sys.stderr) - return 1 - except usb_exceptions.CommonUsbError as e: - print('Could not connect to device: {}'.format(e), file=sys.stderr) - return 1 - try: - return _RunMethod(dev, args, extra or {}) - except Exception as e: # pylint: disable=broad-except - sys.stdout.write(str(e)) - return 1 - finally: - dev.Close() + """Starts a common CLI interface for this usb path and protocol.""" + try: + dev = adb_commands() + dev.ConnectDevice(port_path=args.port_path, serial=args.serial, default_timeout_ms=args.timeout_ms, + **device_kwargs) + except usb_exceptions.DeviceNotFoundError as e: + print('No device found: {}'.format(e), file=sys.stderr) + return 1 + except usb_exceptions.CommonUsbError as e: + print('Could not connect to device: {}'.format(e), file=sys.stderr) + return 1 + try: + return _RunMethod(dev, args, extra or {}) + except Exception as e: # pylint: disable=broad-except + sys.stdout.write(str(e)) + return 1 + finally: + dev.Close() diff --git a/adb/fastboot.py b/adb/fastboot.py index e9028a7..abc482b 100644 --- a/adb/fastboot.py +++ b/adb/fastboot.py @@ -25,7 +25,6 @@ from adb import common from adb import usb_exceptions - _LOG = logging.getLogger('fastboot') DEFAULT_MESSAGE_CALLBACK = lambda m: logging.info('Got %s from device', m) @@ -45,357 +44,357 @@ # pylint doesn't understand cross-module exception baseclasses. # pylint: disable=nonstandard-exception class FastbootTransferError(usb_exceptions.FormatMessageWithArgumentsException): - """Transfer error.""" + """Transfer error.""" class FastbootRemoteFailure(usb_exceptions.FormatMessageWithArgumentsException): - """Remote error.""" + """Remote error.""" class FastbootStateMismatch(usb_exceptions.FormatMessageWithArgumentsException): - """Fastboot and uboot's state machines are arguing. You Lose.""" + """Fastboot and uboot's state machines are arguing. You Lose.""" class FastbootInvalidResponse( usb_exceptions.FormatMessageWithArgumentsException): - """Fastboot responded with a header we didn't expect.""" + """Fastboot responded with a header we didn't expect.""" class FastbootProtocol(object): - """Encapsulates the fastboot protocol.""" - FINAL_HEADERS = {b'OKAY', b'DATA'} - - def __init__(self, usb, chunk_kb=1024): - """Constructs a FastbootProtocol instance. - - Args: - usb: UsbHandle instance. - chunk_kb: Packet size. For older devices, 4 may be required. - """ - self.usb = usb - self.chunk_kb = chunk_kb - - @property - def usb_handle(self): - return self.usb - - def SendCommand(self, command, arg=None): - """Sends a command to the device. - - Args: - command: The command to send. - arg: Optional argument to the command. - """ - if arg is not None: - if not isinstance(arg, bytes): - arg = arg.encode('utf8') - command = b'%s:%s' % (command, arg) - - self._Write(io.BytesIO(command), len(command)) - - def HandleSimpleResponses( - self, timeout_ms=None, info_cb=DEFAULT_MESSAGE_CALLBACK): - """Accepts normal responses from the device. - - Args: - timeout_ms: Timeout in milliseconds to wait for each response. - info_cb: Optional callback for text sent from the bootloader. - - Returns: - OKAY packet's message. - """ - return self._AcceptResponses(b'OKAY', info_cb, timeout_ms=timeout_ms) - - def HandleDataSending(self, source_file, source_len, - info_cb=DEFAULT_MESSAGE_CALLBACK, - progress_callback=None, timeout_ms=None): - """Handles the protocol for sending data to the device. - - Args: - source_file: File-object to read from for the device. - source_len: Amount of data, in bytes, to send to the device. - info_cb: Optional callback for text sent from the bootloader. - progress_callback: Callback that takes the current and the total progress - of the current file. - timeout_ms: Timeout in milliseconds to wait for each response. - - Raises: - FastbootTransferError: When fastboot can't handle this amount of data. - FastbootStateMismatch: Fastboot responded with the wrong packet type. - FastbootRemoteFailure: Fastboot reported failure. - FastbootInvalidResponse: Fastboot responded with an unknown packet type. - - Returns: - OKAY packet's message. - """ - accepted_size = self._AcceptResponses( - b'DATA', info_cb, timeout_ms=timeout_ms) - - accepted_size = binascii.unhexlify(accepted_size[:8]) - accepted_size, = struct.unpack(b'>I', accepted_size) - if accepted_size != source_len: - raise FastbootTransferError( - 'Device refused to download %s bytes of data (accepts %s bytes)', - source_len, accepted_size) - self._Write(source_file, accepted_size, progress_callback) - return self._AcceptResponses(b'OKAY', info_cb, timeout_ms=timeout_ms) - - def _AcceptResponses(self, expected_header, info_cb, timeout_ms=None): - """Accepts responses until the expected header or a FAIL. - - Args: - expected_header: OKAY or DATA - info_cb: Optional callback for text sent from the bootloader. - timeout_ms: Timeout in milliseconds to wait for each response. - - Raises: - FastbootStateMismatch: Fastboot responded with the wrong packet type. - FastbootRemoteFailure: Fastboot reported failure. - FastbootInvalidResponse: Fastboot responded with an unknown packet type. - - Returns: - OKAY packet's message. - """ - while True: - response = self.usb.BulkRead(64, timeout_ms=timeout_ms) - header = bytes(response[:4]) - remaining = bytes(response[4:]) - - if header == b'INFO': - info_cb(FastbootMessage(remaining, header)) - elif header in self.FINAL_HEADERS: - if header != expected_header: - raise FastbootStateMismatch( - 'Expected %s, got %s', expected_header, header) - if header == b'OKAY': - info_cb(FastbootMessage(remaining, header)) - return remaining - elif header == b'FAIL': - info_cb(FastbootMessage(remaining, header)) - raise FastbootRemoteFailure('FAIL: %s', remaining) - else: - raise FastbootInvalidResponse( - 'Got unknown header %s and response %s', header, remaining) - - def _HandleProgress(self, total, progress_callback): - """Calls the callback with the current progress and total .""" - current = 0 - while True: - current += yield - try: - progress_callback(current, total) - except Exception: # pylint: disable=broad-except - _LOG.exception('Progress callback raised an exception. %s', - progress_callback) - continue - - def _Write(self, data, length, progress_callback=None): - """Sends the data to the device, tracking progress with the callback.""" - if progress_callback: - progress = self._HandleProgress(length, progress_callback) - next(progress) - while length: - tmp = data.read(self.chunk_kb * 1024) - length -= len(tmp) - self.usb.BulkWrite(tmp) - - if progress_callback: - progress.send(len(tmp)) + """Encapsulates the fastboot protocol.""" + FINAL_HEADERS = {b'OKAY', b'DATA'} + + def __init__(self, usb, chunk_kb=1024): + """Constructs a FastbootProtocol instance. + + Args: + usb: UsbHandle instance. + chunk_kb: Packet size. For older devices, 4 may be required. + """ + self.usb = usb + self.chunk_kb = chunk_kb + + @property + def usb_handle(self): + return self.usb + + def SendCommand(self, command, arg=None): + """Sends a command to the device. + + Args: + command: The command to send. + arg: Optional argument to the command. + """ + if arg is not None: + if not isinstance(arg, bytes): + arg = arg.encode('utf8') + command = b'%s:%s' % (command, arg) + + self._Write(io.BytesIO(command), len(command)) + + def HandleSimpleResponses( + self, timeout_ms=None, info_cb=DEFAULT_MESSAGE_CALLBACK): + """Accepts normal responses from the device. + + Args: + timeout_ms: Timeout in milliseconds to wait for each response. + info_cb: Optional callback for text sent from the bootloader. + + Returns: + OKAY packet's message. + """ + return self._AcceptResponses(b'OKAY', info_cb, timeout_ms=timeout_ms) + + def HandleDataSending(self, source_file, source_len, + info_cb=DEFAULT_MESSAGE_CALLBACK, + progress_callback=None, timeout_ms=None): + """Handles the protocol for sending data to the device. + + Args: + source_file: File-object to read from for the device. + source_len: Amount of data, in bytes, to send to the device. + info_cb: Optional callback for text sent from the bootloader. + progress_callback: Callback that takes the current and the total progress + of the current file. + timeout_ms: Timeout in milliseconds to wait for each response. + + Raises: + FastbootTransferError: When fastboot can't handle this amount of data. + FastbootStateMismatch: Fastboot responded with the wrong packet type. + FastbootRemoteFailure: Fastboot reported failure. + FastbootInvalidResponse: Fastboot responded with an unknown packet type. + + Returns: + OKAY packet's message. + """ + accepted_size = self._AcceptResponses( + b'DATA', info_cb, timeout_ms=timeout_ms) + + accepted_size = binascii.unhexlify(accepted_size[:8]) + accepted_size, = struct.unpack(b'>I', accepted_size) + if accepted_size != source_len: + raise FastbootTransferError( + 'Device refused to download %s bytes of data (accepts %s bytes)', + source_len, accepted_size) + self._Write(source_file, accepted_size, progress_callback) + return self._AcceptResponses(b'OKAY', info_cb, timeout_ms=timeout_ms) + + def _AcceptResponses(self, expected_header, info_cb, timeout_ms=None): + """Accepts responses until the expected header or a FAIL. + + Args: + expected_header: OKAY or DATA + info_cb: Optional callback for text sent from the bootloader. + timeout_ms: Timeout in milliseconds to wait for each response. + + Raises: + FastbootStateMismatch: Fastboot responded with the wrong packet type. + FastbootRemoteFailure: Fastboot reported failure. + FastbootInvalidResponse: Fastboot responded with an unknown packet type. + + Returns: + OKAY packet's message. + """ + while True: + response = self.usb.BulkRead(64, timeout_ms=timeout_ms) + header = bytes(response[:4]) + remaining = bytes(response[4:]) + + if header == b'INFO': + info_cb(FastbootMessage(remaining, header)) + elif header in self.FINAL_HEADERS: + if header != expected_header: + raise FastbootStateMismatch( + 'Expected %s, got %s', expected_header, header) + if header == b'OKAY': + info_cb(FastbootMessage(remaining, header)) + return remaining + elif header == b'FAIL': + info_cb(FastbootMessage(remaining, header)) + raise FastbootRemoteFailure('FAIL: %s', remaining) + else: + raise FastbootInvalidResponse( + 'Got unknown header %s and response %s', header, remaining) + + def _HandleProgress(self, total, progress_callback): + """Calls the callback with the current progress and total .""" + current = 0 + while True: + current += yield + try: + progress_callback(current, total) + except Exception: # pylint: disable=broad-except + _LOG.exception('Progress callback raised an exception. %s', + progress_callback) + continue + + def _Write(self, data, length, progress_callback=None): + """Sends the data to the device, tracking progress with the callback.""" + if progress_callback: + progress = self._HandleProgress(length, progress_callback) + next(progress) + while length: + tmp = data.read(self.chunk_kb * 1024) + length -= len(tmp) + self.usb.BulkWrite(tmp) + + if progress_callback: + progress.send(len(tmp)) class FastbootCommands(object): - """Encapsulates the fastboot commands.""" - - def __init__(self): - """Constructs a FastbootCommands instance. - - Args: - usb: UsbHandle instance. - """ - self.__reset() - - def __reset(self): - self._handle = None - self._protocol = None - - @property - def usb_handle(self): - return self._handle - - def Close(self): - self._handle.Close() - - def ConnectDevice(self, port_path=None, serial=None, default_timeout_ms=None, chunk_kb=1024, **kwargs): - """Convenience function to get an adb device from usb path or serial. - - Args: - port_path: The filename of usb port to use. - serial: The serial number of the device to use. - default_timeout_ms: The default timeout in milliseconds to use. - chunk_kb: Amount of data, in kilobytes, to break fastboot packets up into - kwargs: handle: Device handle to use (instance of common.TcpHandle or common.UsbHandle) - banner: Connection banner to pass to the remote device - rsa_keys: List of AuthSigner subclass instances to be used for - authentication. The device can either accept one of these via the Sign - method, or we will send the result of GetPublicKey from the first one - if the device doesn't accept any of them. - auth_timeout_ms: Timeout to wait for when sending a new public key. This - is only relevant when we send a new public key. The device shows a - dialog and this timeout is how long to wait for that dialog. If used - in automation, this should be low to catch such a case as a failure - quickly; while in interactive settings it should be high to allow - users to accept the dialog. We default to automation here, so it's low - by default. - - If serial specifies a TCP address:port, then a TCP connection is - used instead of a USB connection. - """ - - if 'handle' in kwargs: - self._handle = kwargs['handle'] - - else: - self._handle = common.UsbHandle.FindAndOpen( - DeviceIsAvailable, port_path=port_path, serial=serial, - timeout_ms=default_timeout_ms) - - self._protocol = FastbootProtocol(self._handle, chunk_kb) - - return self - - @classmethod - def Devices(cls): - """Get a generator of UsbHandle for devices available.""" - return common.UsbHandle.FindDevices(DeviceIsAvailable) - - def _SimpleCommand(self, command, arg=None, **kwargs): - self._protocol.SendCommand(command, arg) - return self._protocol.HandleSimpleResponses(**kwargs) - - def FlashFromFile(self, partition, source_file, source_len=0, - info_cb=DEFAULT_MESSAGE_CALLBACK, progress_callback=None): - """Flashes a partition from the file on disk. - - Args: - partition: Partition name to flash to. - source_file: Filename to download to the device. - source_len: Optional length of source_file, uses os.stat if not provided. - info_cb: See Download. - progress_callback: See Download. - - Returns: - Download and flash responses, normally nothing. - """ - if source_len == 0: - # Fall back to stat. - source_len = os.stat(source_file).st_size - download_response = self.Download( - source_file, source_len=source_len, info_cb=info_cb, - progress_callback=progress_callback) - flash_response = self.Flash(partition, info_cb=info_cb) - return download_response + flash_response - - def Download(self, source_file, source_len=0, - info_cb=DEFAULT_MESSAGE_CALLBACK, progress_callback=None): - """Downloads a file to the device. - - Args: - source_file: A filename or file-like object to download to the device. - source_len: Optional length of source_file. If source_file is a file-like - object and source_len is not provided, source_file is read into - memory. - info_cb: Optional callback accepting FastbootMessage for text sent from - the bootloader. - progress_callback: Optional callback called with the percent of the - source_file downloaded. Note, this doesn't include progress of the - actual flashing. - - Returns: - Response to a download request, normally nothing. - """ - if isinstance(source_file, str): - source_len = os.stat(source_file).st_size - source_file = open(source_file) - - with source_file: - if source_len == 0: - # Fall back to storing it all in memory :( - data = source_file.read() - source_file = io.BytesIO(data.encode('utf8')) - source_len = len(data) - - self._protocol.SendCommand(b'download', b'%08x' % source_len) - return self._protocol.HandleDataSending( - source_file, source_len, info_cb, progress_callback=progress_callback) - - def Flash(self, partition, timeout_ms=0, info_cb=DEFAULT_MESSAGE_CALLBACK): - """Flashes the last downloaded file to the given partition. - - Args: - partition: Partition to overwrite with the new image. - timeout_ms: Optional timeout in milliseconds to wait for it to finish. - info_cb: See Download. Usually no messages. - - Returns: - Response to a download request, normally nothing. - """ - return self._SimpleCommand(b'flash', arg=partition, info_cb=info_cb, - timeout_ms=timeout_ms) - - def Erase(self, partition, timeout_ms=None): - """Erases the given partition. - - Args: - partition: Partition to clear. - """ - self._SimpleCommand(b'erase', arg=partition, timeout_ms=timeout_ms) - - def Getvar(self, var, info_cb=DEFAULT_MESSAGE_CALLBACK): - """Returns the given variable's definition. - - Args: - var: A variable the bootloader tracks. Use 'all' to get them all. - info_cb: See Download. Usually no messages. - - Returns: - Value of var according to the current bootloader. - """ - return self._SimpleCommand(b'getvar', arg=var, info_cb=info_cb) - - def Oem(self, command, timeout_ms=None, info_cb=DEFAULT_MESSAGE_CALLBACK): - """Executes an OEM command on the device. - - Args: - command: Command to execute, such as 'poweroff' or 'bootconfig read'. - timeout_ms: Optional timeout in milliseconds to wait for a response. - info_cb: See Download. Messages vary based on command. - - Returns: - The final response from the device. - """ - if not isinstance(command, bytes): - command = command.encode('utf8') - return self._SimpleCommand( - b'oem %s' % command, timeout_ms=timeout_ms, info_cb=info_cb) - - def Continue(self): - """Continues execution past fastboot into the system.""" - return self._SimpleCommand(b'continue') - - def Reboot(self, target_mode=b'', timeout_ms=None): - """Reboots the device. - - Args: - target_mode: Normal reboot when unspecified. Can specify other target - modes such as 'recovery' or 'bootloader'. - timeout_ms: Optional timeout in milliseconds to wait for a response. - - Returns: - Usually the empty string. Depends on the bootloader and the target_mode. - """ - return self._SimpleCommand( - b'reboot', arg=target_mode or None, timeout_ms=timeout_ms) - - def RebootBootloader(self, timeout_ms=None): - """Reboots into the bootloader, usually equiv to Reboot('bootloader').""" - return self._SimpleCommand(b'reboot-bootloader', timeout_ms=timeout_ms) + """Encapsulates the fastboot commands.""" + + def __init__(self): + """Constructs a FastbootCommands instance. + + Args: + usb: UsbHandle instance. + """ + self.__reset() + + def __reset(self): + self._handle = None + self._protocol = None + + @property + def usb_handle(self): + return self._handle + + def Close(self): + self._handle.Close() + + def ConnectDevice(self, port_path=None, serial=None, default_timeout_ms=None, chunk_kb=1024, **kwargs): + """Convenience function to get an adb device from usb path or serial. + + Args: + port_path: The filename of usb port to use. + serial: The serial number of the device to use. + default_timeout_ms: The default timeout in milliseconds to use. + chunk_kb: Amount of data, in kilobytes, to break fastboot packets up into + kwargs: handle: Device handle to use (instance of common.TcpHandle or common.UsbHandle) + banner: Connection banner to pass to the remote device + rsa_keys: List of AuthSigner subclass instances to be used for + authentication. The device can either accept one of these via the Sign + method, or we will send the result of GetPublicKey from the first one + if the device doesn't accept any of them. + auth_timeout_ms: Timeout to wait for when sending a new public key. This + is only relevant when we send a new public key. The device shows a + dialog and this timeout is how long to wait for that dialog. If used + in automation, this should be low to catch such a case as a failure + quickly; while in interactive settings it should be high to allow + users to accept the dialog. We default to automation here, so it's low + by default. + + If serial specifies a TCP address:port, then a TCP connection is + used instead of a USB connection. + """ + + if 'handle' in kwargs: + self._handle = kwargs['handle'] + + else: + self._handle = common.UsbHandle.FindAndOpen( + DeviceIsAvailable, port_path=port_path, serial=serial, + timeout_ms=default_timeout_ms) + + self._protocol = FastbootProtocol(self._handle, chunk_kb) + + return self + + @classmethod + def Devices(cls): + """Get a generator of UsbHandle for devices available.""" + return common.UsbHandle.FindDevices(DeviceIsAvailable) + + def _SimpleCommand(self, command, arg=None, **kwargs): + self._protocol.SendCommand(command, arg) + return self._protocol.HandleSimpleResponses(**kwargs) + + def FlashFromFile(self, partition, source_file, source_len=0, + info_cb=DEFAULT_MESSAGE_CALLBACK, progress_callback=None): + """Flashes a partition from the file on disk. + + Args: + partition: Partition name to flash to. + source_file: Filename to download to the device. + source_len: Optional length of source_file, uses os.stat if not provided. + info_cb: See Download. + progress_callback: See Download. + + Returns: + Download and flash responses, normally nothing. + """ + if source_len == 0: + # Fall back to stat. + source_len = os.stat(source_file).st_size + download_response = self.Download( + source_file, source_len=source_len, info_cb=info_cb, + progress_callback=progress_callback) + flash_response = self.Flash(partition, info_cb=info_cb) + return download_response + flash_response + + def Download(self, source_file, source_len=0, + info_cb=DEFAULT_MESSAGE_CALLBACK, progress_callback=None): + """Downloads a file to the device. + + Args: + source_file: A filename or file-like object to download to the device. + source_len: Optional length of source_file. If source_file is a file-like + object and source_len is not provided, source_file is read into + memory. + info_cb: Optional callback accepting FastbootMessage for text sent from + the bootloader. + progress_callback: Optional callback called with the percent of the + source_file downloaded. Note, this doesn't include progress of the + actual flashing. + + Returns: + Response to a download request, normally nothing. + """ + if isinstance(source_file, str): + source_len = os.stat(source_file).st_size + source_file = open(source_file) + + with source_file: + if source_len == 0: + # Fall back to storing it all in memory :( + data = source_file.read() + source_file = io.BytesIO(data.encode('utf8')) + source_len = len(data) + + self._protocol.SendCommand(b'download', b'%08x' % source_len) + return self._protocol.HandleDataSending( + source_file, source_len, info_cb, progress_callback=progress_callback) + + def Flash(self, partition, timeout_ms=0, info_cb=DEFAULT_MESSAGE_CALLBACK): + """Flashes the last downloaded file to the given partition. + + Args: + partition: Partition to overwrite with the new image. + timeout_ms: Optional timeout in milliseconds to wait for it to finish. + info_cb: See Download. Usually no messages. + + Returns: + Response to a download request, normally nothing. + """ + return self._SimpleCommand(b'flash', arg=partition, info_cb=info_cb, + timeout_ms=timeout_ms) + + def Erase(self, partition, timeout_ms=None): + """Erases the given partition. + + Args: + partition: Partition to clear. + """ + self._SimpleCommand(b'erase', arg=partition, timeout_ms=timeout_ms) + + def Getvar(self, var, info_cb=DEFAULT_MESSAGE_CALLBACK): + """Returns the given variable's definition. + + Args: + var: A variable the bootloader tracks. Use 'all' to get them all. + info_cb: See Download. Usually no messages. + + Returns: + Value of var according to the current bootloader. + """ + return self._SimpleCommand(b'getvar', arg=var, info_cb=info_cb) + + def Oem(self, command, timeout_ms=None, info_cb=DEFAULT_MESSAGE_CALLBACK): + """Executes an OEM command on the device. + + Args: + command: Command to execute, such as 'poweroff' or 'bootconfig read'. + timeout_ms: Optional timeout in milliseconds to wait for a response. + info_cb: See Download. Messages vary based on command. + + Returns: + The final response from the device. + """ + if not isinstance(command, bytes): + command = command.encode('utf8') + return self._SimpleCommand( + b'oem %s' % command, timeout_ms=timeout_ms, info_cb=info_cb) + + def Continue(self): + """Continues execution past fastboot into the system.""" + return self._SimpleCommand(b'continue') + + def Reboot(self, target_mode=b'', timeout_ms=None): + """Reboots the device. + + Args: + target_mode: Normal reboot when unspecified. Can specify other target + modes such as 'recovery' or 'bootloader'. + timeout_ms: Optional timeout in milliseconds to wait for a response. + + Returns: + Usually the empty string. Depends on the bootloader and the target_mode. + """ + return self._SimpleCommand( + b'reboot', arg=target_mode or None, timeout_ms=timeout_ms) + + def RebootBootloader(self, timeout_ms=None): + """Reboots into the bootloader, usually equiv to Reboot('bootloader').""" + return self._SimpleCommand(b'reboot-bootloader', timeout_ms=timeout_ms) diff --git a/adb/fastboot_debug.py b/adb/fastboot_debug.py index f904b90..e168f69 100755 --- a/adb/fastboot_debug.py +++ b/adb/fastboot_debug.py @@ -28,98 +28,100 @@ from adb import fastboot try: - import progressbar + import progressbar except ImportError: - # progressbar is optional. - progressbar = None + # progressbar is optional. + progressbar = None def Devices(args): - """Lists the available devices. + """Lists the available devices. - List of devices attached - 015DB7591102001A device - """ - for device in fastboot.FastbootCommands.Devices(): - print('%s\tdevice' % device.serial_number) - return 0 + List of devices attached + 015DB7591102001A device + """ + for device in fastboot.FastbootCommands.Devices(): + print('%s\tdevice' % device.serial_number) + return 0 def _InfoCb(message): - # Use an unbuffered version of stdout. - if not message.message: - return - sys.stdout.write('%s: %s\n' % (message.header, message.message)) - sys.stdout.flush() + # Use an unbuffered version of stdout. + if not message.message: + return + sys.stdout.write('%s: %s\n' % (message.header, message.message)) + sys.stdout.flush() def main(): - common = common_cli.GetCommonArguments() - device = common_cli.GetDeviceArguments() - device.add_argument( - '--chunk_kb', type=int, default=1024, metavar='1024', - help='Size of packets to write in Kb. For older devices, it may be ' - 'required to use 4.') - parents = [common, device] - - parser = argparse.ArgumentParser( - description=sys.modules[__name__].__doc__, parents=[common]) - subparsers = parser.add_subparsers(title='Commands', dest='command_name') - - subparser = subparsers.add_parser( - name='help', help='Prints the commands available') - subparser = subparsers.add_parser( - name='devices', help='Lists the available devices', parents=[common]) - common_cli.MakeSubparser( - subparsers, parents, fastboot.FastbootCommands.Continue) - - common_cli.MakeSubparser( - subparsers, parents, fastboot.FastbootCommands.Download, - {'source_file': 'Filename on the host to push'}) - common_cli.MakeSubparser( - subparsers, parents, fastboot.FastbootCommands.Erase) - common_cli.MakeSubparser( - subparsers, parents, fastboot.FastbootCommands.Flash) - common_cli.MakeSubparser( - subparsers, parents, fastboot.FastbootCommands.Getvar) - common_cli.MakeSubparser( - subparsers, parents, fastboot.FastbootCommands.Oem) - common_cli.MakeSubparser( - subparsers, parents, fastboot.FastbootCommands.Reboot) - - if len(sys.argv) == 1: - parser.print_help() - return 2 - - args = parser.parse_args() - if args.verbose: - logging.basicConfig(level=logging.DEBUG) - if args.command_name == 'devices': - return Devices(args) - if args.command_name == 'help': - parser.print_help() - return 0 - - kwargs = {} - argspec = inspect.getargspec(args.method) - if 'info_cb' in argspec.args: - kwargs['info_cb'] = _InfoCb - if 'progress_callback' in argspec.args and progressbar: - bar = progressbar.ProgessBar( - widgets=[progressbar.Bar(), progressbar.Percentage()]) - bar.start() - def SetProgress(current, total): - bar.update(current / total * 100.0) - if current == total: - bar.finish() - kwargs['progress_callback'] = SetProgress - - return common_cli.StartCli( - args, - fastboot.FastbootCommands, - chunk_kb=args.chunk_kb, - extra=kwargs) + common = common_cli.GetCommonArguments() + device = common_cli.GetDeviceArguments() + device.add_argument( + '--chunk_kb', type=int, default=1024, metavar='1024', + help='Size of packets to write in Kb. For older devices, it may be ' + 'required to use 4.') + parents = [common, device] + + parser = argparse.ArgumentParser( + description=sys.modules[__name__].__doc__, parents=[common]) + subparsers = parser.add_subparsers(title='Commands', dest='command_name') + + subparser = subparsers.add_parser( + name='help', help='Prints the commands available') + subparser = subparsers.add_parser( + name='devices', help='Lists the available devices', parents=[common]) + common_cli.MakeSubparser( + subparsers, parents, fastboot.FastbootCommands.Continue) + + common_cli.MakeSubparser( + subparsers, parents, fastboot.FastbootCommands.Download, + {'source_file': 'Filename on the host to push'}) + common_cli.MakeSubparser( + subparsers, parents, fastboot.FastbootCommands.Erase) + common_cli.MakeSubparser( + subparsers, parents, fastboot.FastbootCommands.Flash) + common_cli.MakeSubparser( + subparsers, parents, fastboot.FastbootCommands.Getvar) + common_cli.MakeSubparser( + subparsers, parents, fastboot.FastbootCommands.Oem) + common_cli.MakeSubparser( + subparsers, parents, fastboot.FastbootCommands.Reboot) + + if len(sys.argv) == 1: + parser.print_help() + return 2 + + args = parser.parse_args() + if args.verbose: + logging.basicConfig(level=logging.DEBUG) + if args.command_name == 'devices': + return Devices(args) + if args.command_name == 'help': + parser.print_help() + return 0 + + kwargs = {} + argspec = inspect.getargspec(args.method) + if 'info_cb' in argspec.args: + kwargs['info_cb'] = _InfoCb + if 'progress_callback' in argspec.args and progressbar: + bar = progressbar.ProgessBar( + widgets=[progressbar.Bar(), progressbar.Percentage()]) + bar.start() + + def SetProgress(current, total): + bar.update(current / total * 100.0) + if current == total: + bar.finish() + + kwargs['progress_callback'] = SetProgress + + return common_cli.StartCli( + args, + fastboot.FastbootCommands, + chunk_kb=args.chunk_kb, + extra=kwargs) if __name__ == '__main__': - sys.exit(main()) + sys.exit(main()) diff --git a/adb/filesync_protocol.py b/adb/filesync_protocol.py index fdecd4a..4b7c1c3 100644 --- a/adb/filesync_protocol.py +++ b/adb/filesync_protocol.py @@ -35,15 +35,15 @@ class InvalidChecksumError(Exception): - """Checksum of data didn't match expected checksum.""" + """Checksum of data didn't match expected checksum.""" class InterleavedDataError(Exception): - """We only support command sent serially.""" + """We only support command sent serially.""" class PushFailedError(Exception): - """Pushing a file failed for some reason.""" + """Pushing a file failed for some reason.""" DeviceFile = collections.namedtuple('DeviceFile', [ @@ -51,211 +51,211 @@ class PushFailedError(Exception): class FilesyncProtocol(object): - """Implements the FileSync protocol as described in sync.txt.""" - - @staticmethod - def Stat(connection, filename): - cnxn = FileSyncConnection(connection, b'<4I') - cnxn.Send(b'STAT', filename) - command, (mode, size, mtime) = cnxn.Read((b'STAT',), read_data=False) - - if command != b'STAT': - raise adb_protocol.InvalidResponseError( - 'Expected STAT response to STAT, got %s' % command) - return mode, size, mtime - - @classmethod - def List(cls, connection, path): - cnxn = FileSyncConnection(connection, b'<5I') - cnxn.Send(b'LIST', path) - files = [] - for cmd_id, header, filename in cnxn.ReadUntil((b'DENT',), b'DONE'): - if cmd_id == b'DONE': - break - mode, size, mtime = header - files.append(DeviceFile(filename, mode, size, mtime)) - return files - - @classmethod - def Pull(cls, connection, filename, dest_file, progress_callback): - """Pull a file from the device into the file-like dest_file.""" - if progress_callback: - total_bytes = cls.Stat(connection, filename)[1] - progress = cls._HandleProgress(lambda current: progress_callback(filename, current, total_bytes)) - next(progress) - - cnxn = FileSyncConnection(connection, b'<2I') - cnxn.Send(b'RECV', filename) - for cmd_id, _, data in cnxn.ReadUntil((b'DATA',), b'DONE'): - if cmd_id == b'DONE': - break - dest_file.write(data) - if progress_callback: - progress.send(len(data)) - - @classmethod - def _HandleProgress(cls, progress_callback): - """Calls the callback with the current progress and total bytes written/received. - - Args: - progress_callback: callback method that accepts filename, bytes_written and total_bytes, - total_bytes will be -1 for file-like objects - """ - current = 0 - while True: - current += yield - try: - progress_callback(current) - except Exception: # pylint: disable=broad-except - continue - - @classmethod - def Push(cls, connection, datafile, filename, - st_mode=DEFAULT_PUSH_MODE, mtime=0, progress_callback=None): - """Push a file-like object to the device. - - Args: - connection: ADB connection - datafile: File-like object for reading from - filename: Filename to push to - st_mode: stat mode for filename - mtime: modification time - progress_callback: callback method that accepts filename, bytes_written and total_bytes - - Raises: - PushFailedError: Raised on push failure. - """ - - fileinfo = ('{},{}'.format(filename, int(st_mode))).encode('utf-8') - - cnxn = FileSyncConnection(connection, b'<2I') - cnxn.Send(b'SEND', fileinfo) - - if progress_callback: - total_bytes = os.fstat(datafile.fileno()).st_size if isinstance(datafile, file) else -1 - progress = cls._HandleProgress(lambda current: progress_callback(filename, current, total_bytes)) - next(progress) - - while True: - data = datafile.read(MAX_PUSH_DATA) - if data: - cnxn.Send(b'DATA', data) - + """Implements the FileSync protocol as described in sync.txt.""" + + @staticmethod + def Stat(connection, filename): + cnxn = FileSyncConnection(connection, b'<4I') + cnxn.Send(b'STAT', filename) + command, (mode, size, mtime) = cnxn.Read((b'STAT',), read_data=False) + + if command != b'STAT': + raise adb_protocol.InvalidResponseError( + 'Expected STAT response to STAT, got %s' % command) + return mode, size, mtime + + @classmethod + def List(cls, connection, path): + cnxn = FileSyncConnection(connection, b'<5I') + cnxn.Send(b'LIST', path) + files = [] + for cmd_id, header, filename in cnxn.ReadUntil((b'DENT',), b'DONE'): + if cmd_id == b'DONE': + break + mode, size, mtime = header + files.append(DeviceFile(filename, mode, size, mtime)) + return files + + @classmethod + def Pull(cls, connection, filename, dest_file, progress_callback): + """Pull a file from the device into the file-like dest_file.""" if progress_callback: - progress.send(len(data)) - else: - break + total_bytes = cls.Stat(connection, filename)[1] + progress = cls._HandleProgress(lambda current: progress_callback(filename, current, total_bytes)) + next(progress) + + cnxn = FileSyncConnection(connection, b'<2I') + cnxn.Send(b'RECV', filename) + for cmd_id, _, data in cnxn.ReadUntil((b'DATA',), b'DONE'): + if cmd_id == b'DONE': + break + dest_file.write(data) + if progress_callback: + progress.send(len(data)) + + @classmethod + def _HandleProgress(cls, progress_callback): + """Calls the callback with the current progress and total bytes written/received. + + Args: + progress_callback: callback method that accepts filename, bytes_written and total_bytes, + total_bytes will be -1 for file-like objects + """ + current = 0 + while True: + current += yield + try: + progress_callback(current) + except Exception: # pylint: disable=broad-except + continue + + @classmethod + def Push(cls, connection, datafile, filename, + st_mode=DEFAULT_PUSH_MODE, mtime=0, progress_callback=None): + """Push a file-like object to the device. + + Args: + connection: ADB connection + datafile: File-like object for reading from + filename: Filename to push to + st_mode: stat mode for filename + mtime: modification time + progress_callback: callback method that accepts filename, bytes_written and total_bytes + + Raises: + PushFailedError: Raised on push failure. + """ + + fileinfo = ('{},{}'.format(filename, int(st_mode))).encode('utf-8') + + cnxn = FileSyncConnection(connection, b'<2I') + cnxn.Send(b'SEND', fileinfo) - if mtime == 0: - mtime = int(time.time()) - # DONE doesn't send data, but it hides the last bit of data in the size - # field. - cnxn.Send(b'DONE', size=mtime) - for cmd_id, _, data in cnxn.ReadUntil((), b'OKAY', b'FAIL'): - if cmd_id == b'OKAY': - return - raise PushFailedError(data) + if progress_callback: + total_bytes = os.fstat(datafile.fileno()).st_size if isinstance(datafile, file) else -1 + progress = cls._HandleProgress(lambda current: progress_callback(filename, current, total_bytes)) + next(progress) + + while True: + data = datafile.read(MAX_PUSH_DATA) + if data: + cnxn.Send(b'DATA', data) + + if progress_callback: + progress.send(len(data)) + else: + break + + if mtime == 0: + mtime = int(time.time()) + # DONE doesn't send data, but it hides the last bit of data in the size + # field. + cnxn.Send(b'DONE', size=mtime) + for cmd_id, _, data in cnxn.ReadUntil((), b'OKAY', b'FAIL'): + if cmd_id == b'OKAY': + return + raise PushFailedError(data) class FileSyncConnection(object): - """Encapsulate a FileSync service connection.""" - - ids = [ - b'STAT', b'LIST', b'SEND', b'RECV', b'DENT', b'DONE', b'DATA', b'OKAY', - b'FAIL', b'QUIT', - ] - id_to_wire, wire_to_id = adb_protocol.MakeWireIDs(ids) - - def __init__(self, adb_connection, recv_header_format): - self.adb = adb_connection - - # Sending - # Using a bytearray() saves a copy later when using libusb. - self.send_buffer = bytearray(adb_protocol.MAX_ADB_DATA) - self.send_idx = 0 - self.send_header_len = struct.calcsize(b'<2I') - - # Receiving - self.recv_buffer = bytearray() - self.recv_header_format = recv_header_format - self.recv_header_len = struct.calcsize(recv_header_format) - - def Send(self, command_id, data=b'', size=0): - """Send/buffer FileSync packets. - - Packets are buffered and only flushed when this connection is read from. All - messages have a response from the device, so this will always get flushed. - - Args: - command_id: Command to send. - data: Optional data to send, must set data or size. - size: Optionally override size from len(data). - """ - if data: - if not isinstance(data, bytes): - data = data.encode('utf8') - size = len(data) - - if not self._CanAddToSendBuffer(len(data)): - self._Flush() - buf = struct.pack(b'<2I', self.id_to_wire[command_id], size) + data - self.send_buffer[self.send_idx:self.send_idx + len(buf)] = buf - self.send_idx += len(buf) - - def Read(self, expected_ids, read_data=True): - """Read ADB messages and return FileSync packets.""" - if self.send_idx: - self._Flush() - - # Read one filesync packet off the recv buffer. - header_data = self._ReadBuffered(self.recv_header_len) - header = struct.unpack(self.recv_header_format, header_data) - # Header is (ID, ...). - command_id = self.wire_to_id[header[0]] - - if command_id not in expected_ids: - if command_id == b'FAIL': - reason = '' - if self.recv_buffer: - reason = self.recv_buffer.decode('utf-8', errors='ignore') - raise usb_exceptions.AdbCommandFailureException('Command failed: {}'.format(reason)) - raise adb_protocol.InvalidResponseError( - 'Expected one of %s, got %s' % (expected_ids, command_id)) - - if not read_data: - return command_id, header[1:] - - # Header is (ID, ..., size). - size = header[-1] - data = self._ReadBuffered(size) - return command_id, header[1:-1], data - - def ReadUntil(self, expected_ids, *finish_ids): - """Useful wrapper around Read.""" - while True: - cmd_id, header, data = self.Read(expected_ids + finish_ids) - yield cmd_id, header, data - if cmd_id in finish_ids: - break - - def _CanAddToSendBuffer(self, data_len): - added_len = self.send_header_len + data_len - return self.send_idx + added_len < adb_protocol.MAX_ADB_DATA - - def _Flush(self): - try: - self.adb.Write(self.send_buffer[:self.send_idx]) - except libusb1.USBError as e: - raise adb_protocol.SendFailedError( - 'Could not send data %s' % self.send_buffer, e) - self.send_idx = 0 - - def _ReadBuffered(self, size): - # Ensure recv buffer has enough data. - while len(self.recv_buffer) < size: - _, data = self.adb.ReadUntil(b'WRTE') - self.recv_buffer += data - - result = self.recv_buffer[:size] - self.recv_buffer = self.recv_buffer[size:] - return result + """Encapsulate a FileSync service connection.""" + + ids = [ + b'STAT', b'LIST', b'SEND', b'RECV', b'DENT', b'DONE', b'DATA', b'OKAY', + b'FAIL', b'QUIT', + ] + id_to_wire, wire_to_id = adb_protocol.MakeWireIDs(ids) + + def __init__(self, adb_connection, recv_header_format): + self.adb = adb_connection + + # Sending + # Using a bytearray() saves a copy later when using libusb. + self.send_buffer = bytearray(adb_protocol.MAX_ADB_DATA) + self.send_idx = 0 + self.send_header_len = struct.calcsize(b'<2I') + + # Receiving + self.recv_buffer = bytearray() + self.recv_header_format = recv_header_format + self.recv_header_len = struct.calcsize(recv_header_format) + + def Send(self, command_id, data=b'', size=0): + """Send/buffer FileSync packets. + + Packets are buffered and only flushed when this connection is read from. All + messages have a response from the device, so this will always get flushed. + + Args: + command_id: Command to send. + data: Optional data to send, must set data or size. + size: Optionally override size from len(data). + """ + if data: + if not isinstance(data, bytes): + data = data.encode('utf8') + size = len(data) + + if not self._CanAddToSendBuffer(len(data)): + self._Flush() + buf = struct.pack(b'<2I', self.id_to_wire[command_id], size) + data + self.send_buffer[self.send_idx:self.send_idx + len(buf)] = buf + self.send_idx += len(buf) + + def Read(self, expected_ids, read_data=True): + """Read ADB messages and return FileSync packets.""" + if self.send_idx: + self._Flush() + + # Read one filesync packet off the recv buffer. + header_data = self._ReadBuffered(self.recv_header_len) + header = struct.unpack(self.recv_header_format, header_data) + # Header is (ID, ...). + command_id = self.wire_to_id[header[0]] + + if command_id not in expected_ids: + if command_id == b'FAIL': + reason = '' + if self.recv_buffer: + reason = self.recv_buffer.decode('utf-8', errors='ignore') + raise usb_exceptions.AdbCommandFailureException('Command failed: {}'.format(reason)) + raise adb_protocol.InvalidResponseError( + 'Expected one of %s, got %s' % (expected_ids, command_id)) + + if not read_data: + return command_id, header[1:] + + # Header is (ID, ..., size). + size = header[-1] + data = self._ReadBuffered(size) + return command_id, header[1:-1], data + + def ReadUntil(self, expected_ids, *finish_ids): + """Useful wrapper around Read.""" + while True: + cmd_id, header, data = self.Read(expected_ids + finish_ids) + yield cmd_id, header, data + if cmd_id in finish_ids: + break + + def _CanAddToSendBuffer(self, data_len): + added_len = self.send_header_len + data_len + return self.send_idx + added_len < adb_protocol.MAX_ADB_DATA + + def _Flush(self): + try: + self.adb.Write(self.send_buffer[:self.send_idx]) + except libusb1.USBError as e: + raise adb_protocol.SendFailedError( + 'Could not send data %s' % self.send_buffer, e) + self.send_idx = 0 + + def _ReadBuffered(self, size): + # Ensure recv buffer has enough data. + while len(self.recv_buffer) < size: + _, data = self.adb.ReadUntil(b'WRTE') + self.recv_buffer += data + + result = self.recv_buffer[:size] + self.recv_buffer = self.recv_buffer[size:] + return result diff --git a/adb/sign_m2crypto.py b/adb/sign_m2crypto.py index c8185fa..b8b6d0d 100644 --- a/adb/sign_m2crypto.py +++ b/adb/sign_m2crypto.py @@ -18,17 +18,16 @@ class M2CryptoSigner(adb_protocol.AuthSigner): - """AuthSigner using M2Crypto.""" + """AuthSigner using M2Crypto.""" - def __init__(self, rsa_key_path): - with open(rsa_key_path + '.pub') as rsa_pub_file: - self.public_key = rsa_pub_file.read() + def __init__(self, rsa_key_path): + with open(rsa_key_path + '.pub') as rsa_pub_file: + self.public_key = rsa_pub_file.read() - self.rsa_key = RSA.load_key(rsa_key_path) + self.rsa_key = RSA.load_key(rsa_key_path) - def Sign(self, data): - return self.rsa_key.sign(data, 'sha1') - - def GetPublicKey(self): - return self.public_key + def Sign(self, data): + return self.rsa_key.sign(data, 'sha1') + def GetPublicKey(self): + return self.public_key diff --git a/adb/sign_pycryptodome.py b/adb/sign_pycryptodome.py index 1a56f6a..6a61ce9 100644 --- a/adb/sign_pycryptodome.py +++ b/adb/sign_pycryptodome.py @@ -4,10 +4,10 @@ from Crypto.PublicKey import RSA from Crypto.Signature import pkcs1_15 + class PycryptodomeAuthSigner(adb_protocol.AuthSigner): def __init__(self, rsa_key_path=None): - super(PycryptodomeAuthSigner, self).__init__() if rsa_key_path: diff --git a/adb/sign_pythonrsa.py b/adb/sign_pythonrsa.py index 5c00bb2..bc6a221 100644 --- a/adb/sign_pythonrsa.py +++ b/adb/sign_pythonrsa.py @@ -23,50 +23,55 @@ # need to slap a signature on top of already hashed message. Introduce "fake" # hashing algo for this. class _Accum(object): - def __init__(self): - self._buf = '' - def update(self, msg): - self._buf += msg - def digest(self): - return self._buf + def __init__(self): + self._buf = '' + + def update(self, msg): + self._buf += msg + + def digest(self): + return self._buf + + pkcs1.HASH_METHODS['SHA-1-PREHASHED'] = _Accum pkcs1.HASH_ASN1['SHA-1-PREHASHED'] = pkcs1.HASH_ASN1['SHA-1'] def _load_rsa_private_key(pem): - """PEM encoded PKCS#8 private key -> rsa.PrivateKey.""" - # ADB uses private RSA keys in pkcs#8 format. 'rsa' library doesn't support - # them natively. Do some ASN unwrapping to extract naked RSA key - # (in der-encoded form). See https://www.ietf.org/rfc/rfc2313.txt. - # Also http://superuser.com/a/606266. - try: - der = rsa.pem.load_pem(pem, 'PRIVATE KEY') - keyinfo, _ = decoder.decode(der) - if keyinfo[1][0] != univ.ObjectIdentifier( - '1.2.840.113549.1.1.1'): # pragma: no cover - raise ValueError('Not a DER-encoded OpenSSL private RSA key') - private_key_der = keyinfo[2].asOctets() - except IndexError: # pragma: no cover - raise ValueError('Not a DER-encoded OpenSSL private RSA key') - return rsa.PrivateKey.load_pkcs1(private_key_der, format='DER') + """PEM encoded PKCS#8 private key -> rsa.PrivateKey.""" + # ADB uses private RSA keys in pkcs#8 format. 'rsa' library doesn't support + # them natively. Do some ASN unwrapping to extract naked RSA key + # (in der-encoded form). See https://www.ietf.org/rfc/rfc2313.txt. + # Also http://superuser.com/a/606266. + try: + der = rsa.pem.load_pem(pem, 'PRIVATE KEY') + keyinfo, _ = decoder.decode(der) + if keyinfo[1][0] != univ.ObjectIdentifier( + '1.2.840.113549.1.1.1'): # pragma: no cover + raise ValueError('Not a DER-encoded OpenSSL private RSA key') + private_key_der = keyinfo[2].asOctets() + except IndexError: # pragma: no cover + raise ValueError('Not a DER-encoded OpenSSL private RSA key') + return rsa.PrivateKey.load_pkcs1(private_key_der, format='DER') class PythonRSASigner(object): - """Implements adb_protocol.AuthSigner using http://stuvel.eu/rsa.""" - @classmethod - def FromRSAKeyPath(cls, rsa_key_path): - with open(rsa_key_path + '.pub') as f: - pub = f.read() - with open(rsa_key_path) as f: - priv = f.read() - return cls(pub, priv) - - def __init__(self, pub=None, priv=None): - self.priv_key = _load_rsa_private_key(priv) - self.pub_key = pub - - def Sign(self, data): - return rsa.sign(data, self.priv_key, 'SHA-1-PREHASHED') - - def GetPublicKey(self): - return self.pub_key + """Implements adb_protocol.AuthSigner using http://stuvel.eu/rsa.""" + + @classmethod + def FromRSAKeyPath(cls, rsa_key_path): + with open(rsa_key_path + '.pub') as f: + pub = f.read() + with open(rsa_key_path) as f: + priv = f.read() + return cls(pub, priv) + + def __init__(self, pub=None, priv=None): + self.priv_key = _load_rsa_private_key(priv) + self.pub_key = pub + + def Sign(self, data): + return rsa.sign(data, self.priv_key, 'SHA-1-PREHASHED') + + def GetPublicKey(self): + return self.pub_key diff --git a/adb/usb_exceptions.py b/adb/usb_exceptions.py index d563784..54f7e0b 100644 --- a/adb/usb_exceptions.py +++ b/adb/usb_exceptions.py @@ -15,61 +15,61 @@ class CommonUsbError(Exception): - """Base class for usb communication errors.""" + """Base class for usb communication errors.""" class FormatMessageWithArgumentsException(CommonUsbError): - """Exception that both looks good and is functional. + """Exception that both looks good and is functional. - Okay, not that kind of functional, it's still a class. + Okay, not that kind of functional, it's still a class. - This interpolates the message with the given arguments to make it - human-readable, but keeps the arguments in case other code try-excepts it. - """ + This interpolates the message with the given arguments to make it + human-readable, but keeps the arguments in case other code try-excepts it. + """ - def __init__(self, message, *args): - message %= args - super(FormatMessageWithArgumentsException, self).__init__(message, *args) + def __init__(self, message, *args): + message %= args + super(FormatMessageWithArgumentsException, self).__init__(message, *args) class DeviceNotFoundError(FormatMessageWithArgumentsException): - """Device isn't on USB.""" + """Device isn't on USB.""" class DeviceAuthError(FormatMessageWithArgumentsException): - """Device authentication failed.""" + """Device authentication failed.""" class LibusbWrappingError(CommonUsbError): - """Wraps libusb1 errors while keeping its original usefulness. + """Wraps libusb1 errors while keeping its original usefulness. - Attributes: - usb_error: Instance of libusb1.USBError - """ + Attributes: + usb_error: Instance of libusb1.USBError + """ - def __init__(self, msg, usb_error): - super(LibusbWrappingError, self).__init__(msg) - self.usb_error = usb_error + def __init__(self, msg, usb_error): + super(LibusbWrappingError, self).__init__(msg) + self.usb_error = usb_error - def __str__(self): - return '%s: %s' % ( - super(LibusbWrappingError, self).__str__(), str(self.usb_error)) + def __str__(self): + return '%s: %s' % ( + super(LibusbWrappingError, self).__str__(), str(self.usb_error)) class WriteFailedError(LibusbWrappingError): - """Raised when the device doesn't accept our command.""" + """Raised when the device doesn't accept our command.""" class ReadFailedError(LibusbWrappingError): - """Raised when the device doesn't respond to our commands.""" + """Raised when the device doesn't respond to our commands.""" class AdbCommandFailureException(Exception): - """ADB Command returned a FAIL.""" + """ADB Command returned a FAIL.""" class AdbOperationException(Exception): - """Failed to communicate over adb with device after multiple retries.""" + """Failed to communicate over adb with device after multiple retries.""" class TcpTimeoutException(FormatMessageWithArgumentsException): From 803d457d5ba057d00ac6bf6a5588534d38260823 Mon Sep 17 00:00:00 2001 From: greateggsgreg Date: Sun, 4 Mar 2018 14:28:57 -0500 Subject: [PATCH 2/3] Fixed reference before use checks, python3 compatible bytes vs str isinstance --- adb/adb_protocol.py | 4 ++-- adb/common_cli.py | 2 +- adb/fastboot.py | 4 +--- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/adb/adb_protocol.py b/adb/adb_protocol.py index 6f91867..ee4920c 100644 --- a/adb/adb_protocol.py +++ b/adb/adb_protocol.py @@ -426,7 +426,7 @@ def StreamingCommand(cls, usb, service, command='', timeout_ms=None): Yields: The responses from the service. """ - if isinstance(command, str): + if not isinstance(command, bytes): command = command.encode('utf8') connection = cls.Open( usb, destination=b'%s:%s' % (service, command), @@ -451,7 +451,7 @@ def InteractiveShellCommand(cls, conn, cmd=None, strip_cmd=True, delim=None, str The stdout from the shell command. """ - if isinstance(delim, str): + if not isinstance(delim, bytes): delim = delim.encode('utf-8') # Delimiter may be shell@hammerhead:/ $ diff --git a/adb/common_cli.py b/adb/common_cli.py index fce327e..519b7e0 100644 --- a/adb/common_cli.py +++ b/adb/common_cli.py @@ -77,7 +77,7 @@ def _DocToArgs(doc): if offset is None: offset = len(l) - len(l.lstrip()) l = l[offset:] - if l[0] == ' ': + if l[0] == ' ' and m: out[m.group(1)] += ' ' + l.lstrip() else: m = re.match(r'^([a-z_]+): (.+)$', l.strip()) diff --git a/adb/fastboot.py b/adb/fastboot.py index abc482b..1507494 100644 --- a/adb/fastboot.py +++ b/adb/fastboot.py @@ -13,14 +13,12 @@ # limitations under the License. """A libusb1-based fastboot implementation.""" -import argparse import binascii import collections import io import logging import os import struct -import sys from adb import common from adb import usb_exceptions @@ -198,7 +196,7 @@ def _Write(self, data, length, progress_callback=None): length -= len(tmp) self.usb.BulkWrite(tmp) - if progress_callback: + if progress_callback and progress: progress.send(len(tmp)) From 074f51b2baa4b3a88382fbf900713a713f0aceed Mon Sep 17 00:00:00 2001 From: greateggsgreg Date: Sun, 4 Mar 2018 14:29:49 -0500 Subject: [PATCH 3/3] common_cli: Fixed reference before set --- adb/common_cli.py | 1 + 1 file changed, 1 insertion(+) diff --git a/adb/common_cli.py b/adb/common_cli.py index 519b7e0..b4ab5e8 100644 --- a/adb/common_cli.py +++ b/adb/common_cli.py @@ -65,6 +65,7 @@ def GetCommonArguments(): def _DocToArgs(doc): """Converts a docstring documenting arguments into a dict.""" + m = None offset = None in_arg = False out = {}