Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiple streams #177

Merged
merged 11 commits into from
Jul 17, 2021
1 change: 0 additions & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ on:
push:
branches: [ master ]
pull_request:
branches: [ master ]

jobs:
build:
Expand Down
808 changes: 509 additions & 299 deletions adb_shell/adb_device.py

Large diffs are not rendered by default.

806 changes: 507 additions & 299 deletions adb_shell/adb_device_async.py

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions adb_shell/adb_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ def unpack(message):
arg1 : int
TODO
data_length : int
The length of the data sent by the device (used by :meth:`adb_shell.adb_device.AdbDevice._read` and :meth:`adb_shell.adb_device_async.AdbDeviceAsync._read`)
The length of the message's data
data_checksum : int
The checksum of the data sent by the device
The checksum of the message's data

Raises
------
Expand Down
4 changes: 2 additions & 2 deletions adb_shell/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
SEND = b'SEND'
STAT = b'STAT'

#: Commands that are recognized by :meth:`adb_shell.adb_device.AdbDevice._read` and :meth:`adb_shell.adb_device_async.AdbDeviceAsync._read`
#: Commands that are recognized by :meth:`adb_shell.adb_device._AdbIOManager._read_packet_from_device` and :meth:`adb_shell.adb_device_async._AdbIOManagerAsync._read_packet_from_device`
IDS = (AUTH, CLSE, CNXN, OKAY, OPEN, SYNC, WRTE)

#: A dictionary where the keys are the commands in :const:`IDS` and the values are the keys converted to integers
Expand Down Expand Up @@ -118,5 +118,5 @@
#: Default authentication timeout (in s) for :meth:`adb_shell.adb_device.AdbDevice.connect` and :meth:`adb_shell.adb_device_async.AdbDeviceAsync.connect`
DEFAULT_AUTH_TIMEOUT_S = 10.

#: Default total timeout (in s) for :meth:`adb_shell.adb_device.AdbDevice._read`, :meth:`adb_shell.adb_device.AdbDevice._read_until`, :meth:`adb_shell.adb_device_async.AdbDeviceAsync._read`, and :meth:`adb_shell.adb_device_async.AdbDeviceAsync._read_until`
#: Default total timeout (in s) for reading data from the device
DEFAULT_READ_TIMEOUT_S = 10.
6 changes: 0 additions & 6 deletions adb_shell/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,6 @@ def __init__(self, message, *args):
super(DeviceAuthError, self).__init__(message, *args)


class InterleavedDataError(Exception):
"""We only support command sent serially.

"""


class InvalidChecksumError(Exception):
"""Checksum of data didn't match expected checksum.

Expand Down
257 changes: 253 additions & 4 deletions adb_shell/hidden_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,21 @@

.. rubric:: Contents

* :class:`_AdbPacketStore`

* :meth:`_AdbPacketStore.__contains__`
* :meth:`_AdbPacketStore.__len__`
* :meth:`_AdbPacketStore.clear`
* :meth:`_AdbPacketStore.clear_all`
* :meth:`_AdbPacketStore.find`
* :meth:`_AdbPacketStore.find_allow_zeros`
* :meth:`_AdbPacketStore.get`
* :meth:`_AdbPacketStore.put`

* :class:`_AdbTransactionInfo`

* :meth:`_AdbTransactionInfo.args_match`

* :class:`_FileSyncTransactionInfo`

* :meth:`_FileSyncTransactionInfo.can_add_to_send_buffer`
Expand All @@ -38,6 +52,14 @@
import socket
import struct

try:
from asyncio import Queue
except ImportError: # pragma: no cover
try:
from queue import Queue
except ImportError:
from Queue import Queue

from . import constants


Expand Down Expand Up @@ -105,12 +127,12 @@ class _AdbTransactionInfo(object): # pylint: disable=too-few-public-methods
remote_id : int
The ID for the recipient
transport_timeout_s : float, None
Timeout in seconds for sending and receiving packets, or ``None``; see :meth:`BaseTransport.bulk_read() <adb_shell.transport.base_transport.BaseTransport.bulk_read>`,
Timeout in seconds for sending and receiving data, or ``None``; see :meth:`BaseTransport.bulk_read() <adb_shell.transport.base_transport.BaseTransport.bulk_read>`,
:meth:`BaseTransport.bulk_write() <adb_shell.transport.base_transport.BaseTransport.bulk_write>`,
:meth:`BaseTransportAsync.bulk_read() <adb_shell.transport.base_transport_async.BaseTransportAsync.bulk_read>`, and
:meth:`BaseTransportAsync.bulk_write() <adb_shell.transport.base_transport_async.BaseTransportAsync.bulk_write>`
read_timeout_s : float
The total time in seconds to wait for a command in ``expected_cmds`` in :meth:`AdbDevice._read` and :meth:`AdbDeviceAsync._read`
The total time in seconds to wait for data and packets from the device
timeout_s : float, None
The total time in seconds to wait for the ADB command to finish

Expand All @@ -119,13 +141,13 @@ class _AdbTransactionInfo(object): # pylint: disable=too-few-public-methods
local_id : int
The ID for the sender (i.e., the device running this code)
read_timeout_s : float
The total time in seconds to wait for a command in ``expected_cmds`` in :meth:`AdbDevice._read` and :meth:`AdbDeviceAsync._read`
The total time in seconds to wait for data and packets from the device
remote_id : int
The ID for the recipient
timeout_s : float, None
The total time in seconds to wait for the ADB command to finish
transport_timeout_s : float, None
Timeout in seconds for sending and receiving packets, or ``None``; see :meth:`BaseTransport.bulk_read() <adb_shell.transport.base_transport.BaseTransport.bulk_read>`,
Timeout in seconds for sending and receiving data, or ``None``; see :meth:`BaseTransport.bulk_read() <adb_shell.transport.base_transport.BaseTransport.bulk_read>`,
:meth:`BaseTransport.bulk_write() <adb_shell.transport.base_transport.BaseTransport.bulk_write>`,
:meth:`BaseTransportAsync.bulk_read() <adb_shell.transport.base_transport_async.BaseTransportAsync.bulk_read>`, and
:meth:`BaseTransportAsync.bulk_write() <adb_shell.transport.base_transport_async.BaseTransportAsync.bulk_write>`
Expand All @@ -138,6 +160,30 @@ def __init__(self, local_id, remote_id, transport_timeout_s=None, read_timeout_s
self.read_timeout_s = read_timeout_s if self.timeout_s is None else min(read_timeout_s, self.timeout_s)
self.transport_timeout_s = self.read_timeout_s if transport_timeout_s is None else min(transport_timeout_s, self.read_timeout_s)

def args_match(self, arg0, arg1, allow_zeros=False):
"""Check if ``arg0`` and ``arg1`` match this object's ``remote_id`` and ``local_id`` attributes, respectively.

Parameters
----------
arg0 : int
The ``arg0`` value from an ADB packet, which will be compared to this object's ``remote_id`` attribute
arg1 : int
The ``arg1`` value from an ADB packet, which will be compared to this object's ``local_id`` attribute
allow_zeros : bool
Whether to check if ``arg0`` and ``arg1`` match 0, in addition to this object's ``local_id`` and ``remote_id`` attributes

Returns
-------
bool
Whether ``arg0`` and ``arg1`` match this object's ``local_id`` and ``remote_id`` attributes

"""
if not allow_zeros:
return arg1 == self.local_id and (self.remote_id is None or arg0 == self.remote_id)

# https://github.com/JeffLIrion/adb_shell/blob/17540be9b3b84637aca9b994ae3e0b35d02b1a03/adb_shell/adb_device.py#L923-L929
return arg1 in (0, self.local_id) and (self.remote_id is None or arg0 in (0, self.remote_id))


class _FileSyncTransactionInfo(object): # pylint: disable=too-few-public-methods
"""A class for storing info used during a single FileSync "transaction."
Expand Down Expand Up @@ -191,3 +237,206 @@ def can_add_to_send_buffer(self, data_len):
"""
added_len = self.recv_message_size + data_len
return self.send_idx + added_len < self._maxdata


class _AdbPacketStore(object):
"""A class for storing ADB packets.

This class is used to support multiple streams.

Attributes
----------
_dict : dict[int: dict[int: Queue]]
A dictionary of dictionaries of queues. The first (outer) dictionary keys are the ``arg1`` return values from
the :meth:`adb_shell.adb_device._AdbIOManager._read_packet_from_device` and
:meth:`adb_shell.adb_device_async._AdbIOManagerAsync._read_packet_from_device` methods. The second (inner)
dictionary keys are the ``arg0`` return values from those methods. And the values of this inner dictionary are
queues of ``(cmd, data)`` tuples.

"""

def __init__(self):
self._dict = {}

def __contains__(self, value):
"""Check if there are any entries in a queue for the specified value.

Note that ``None`` is used as a wildcard.

Parameters
----------
value : tuple[int, int]
An ``(arg0, arg1)`` pair; either or both values can be ``None``

Returns
-------
bool
Whether the ``(arg0, arg1)`` tuple has any corresponding queue entries

"""
return bool(self.find(value[0], value[1]))

def __len__(self):
"""Get the number of non-empty queues.

Returns
-------
int
The number of non-empty queues

"""
return sum(not val0.empty() for val1 in self._dict.values() for val0 in val1.values())

def clear(self, arg0, arg1):
"""Delete the entry for ``(arg0, arg1)``, if it exists.

Parameters
----------
arg0 : int
The ``arg0`` return value from the :meth:`adb_shell.adb_device._AdbIOManager._read_packet_from_device` and :meth:`adb_shell.adb_device_async._AdbIOManagerAsync._read_packet_from_device` methods
arg1 : int
The ``arg1`` return value from the :meth:`adb_shell.adb_device._AdbIOManager._read_packet_from_device` and :meth:`adb_shell.adb_device_async._AdbIOManagerAsync._read_packet_from_device` methods

"""
if arg1 in self._dict and arg0 in self._dict[arg1]:
del self._dict[arg1][arg0]

if not self._dict[arg1]:
# `self._dict[arg1]` is an empty dictionary now, so delete it
del self._dict[arg1]

def clear_all(self):
"""Clear all the entries."""
self._dict = {}

def find(self, arg0, arg1):
"""Find the entry corresponding to ``arg0`` and ``arg1``.

Parameters
----------
arg0 : int, None
The ``arg0`` value that we are looking for; ``None`` serves as a wildcard
arg1 : int, None
The ``arg1`` value that we are looking for; ``None`` serves as a wildcard

Returns
-------
tuple[int, int], None
The ``(arg0, arg1)`` pair that was found in the dictionary of dictionaries, or ``None`` if no match was found

"""
if not self._dict:
return None

if arg1 is None:
if arg0 is None:
# `value = (None, None)` -> search for any non-empty queue
return next(((key0, key1) for key1, val1 in self._dict.items() for key0, val0 in val1.items() if not val0.empty()), None)

# Search for a non-empty queue with a key of `arg0 == value[0]`
return next(((arg0, key1) for key1, val1 in self._dict.items() for key0, val0 in val1.items() if key0 == arg0 and not val0.empty()), None)

if arg1 not in self._dict:
return None

if arg0 is None:
# Look for a non-empty queue in the `self._dict[value[1]]` dictionary
return next(((key0, arg1) for key0, val0 in self._dict[arg1].items() if not val0.empty()), None)

if arg0 in self._dict[arg1] and not self._dict[arg1][arg0].empty():
return (arg0, arg1)

return None

def find_allow_zeros(self, arg0, arg1):
"""Find the entry corresponding to (``arg0`` or 0) and (``arg1`` or 0).

Parameters
----------
arg0 : int, None
The ``arg0`` value that we are looking for; ``None`` serves as a wildcard
arg1 : int, None
The ``arg1`` value that we are looking for; ``None`` serves as a wildcard

Returns
-------
tuple[int, int], None
The first matching ``(arg0, arg1)`` pair that was found in the dictionary of dictionaries, or ``None`` if no match was found

"""
for arg0_, arg1_ in ((arg0, arg1), (arg0, 0), (0, arg1), (0, 0)):
arg0_arg1 = self.find(arg0_, arg1_)
if arg0_arg1:
return arg0_arg1

return None

def get(self, arg0, arg1):
"""Get the next entry from the queue for ``arg0`` and ``arg1``.

This function assumes you have already checked that ``(arg0, arg1) in self``.

Parameters
----------
arg0 : int, None
The ``arg0`` return value from the :meth:`adb_shell.adb_device._AdbIOManager._read_packet_from_device` and :meth:`adb_shell.adb_device_async._AdbIOManagerAsync._read_packet_from_device` methods; ``None`` serves as a wildcard
arg1 : int, None
The ``arg1`` return value from the :meth:`adb_shell.adb_device._AdbIOManager._read_packet_from_device` and :meth:`adb_shell.adb_device_async._AdbIOManagerAsync._read_packet_from_device` methods; ``None`` serves as a wildcard

Returns
-------
cmd : bytes
The ADB packet's command
arg0 : int
The ``arg0`` value from the returned packet
arg1 : int
The ``arg1`` value from the returned packet
data : bytes
The ADB packet's data

"""
if arg0 is None or arg1 is None:
arg0, arg1 = self.find(arg0, arg1)

# Get the data from the queue
cmd, data = self._dict[arg1][arg0].get_nowait()

# If this is a `CLSE` packet, then clear the entry in the store
if cmd == constants.CLSE:
self.clear(arg0, arg1)

return cmd, arg0, arg1, data

def put(self, arg0, arg1, cmd, data):
"""Add an entry to the queue for ``arg0`` and ``arg1``.

Note that a new dictionary entry will not be created if ``cmd == constants.CLSE``.

Parameters
----------
arg0 : int
The ``arg0`` return value from the :meth:`adb_shell.adb_device._AdbIOManager._read_packet_from_device` and :meth:`adb_shell.adb_device_async._AdbIOManagerAsync._read_packet_from_device` methods
arg1 : int
The ``arg1`` return value from the :meth:`adb_shell.adb_device._AdbIOManager._read_packet_from_device` and :meth:`adb_shell.adb_device_async._AdbIOManagerAsync._read_packet_from_device` methods
cmd : bytes
The ADB packet's command
data : bytes
The ADB packet's data

"""
if arg1 in self._dict:
if arg0 not in self._dict[arg1]:
if cmd == constants.CLSE:
return

# Create the `arg0` entry in the `arg1` dict
self._dict[arg1][arg0] = Queue()
else:
if cmd == constants.CLSE:
return

# Create the `arg1` entry with a new dict
self._dict[arg1] = {arg0: Queue()}

# Put the data into the queue
self._dict[arg1][arg0].put_nowait((cmd, data))
10 changes: 5 additions & 5 deletions tests/async_patchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ async def read(self, numbytes):
class FakeTcpTransportAsync(TcpTransportAsync):
def __init__(self, *args, **kwargs):
TcpTransportAsync.__init__(self, *args, **kwargs)
self._bulk_read = b''
self._bulk_write = b''
self.bulk_read_data = b''
self.bulk_write_data = b''

async def close(self):
self._reader = None
Expand All @@ -90,12 +90,12 @@ async def connect(self, transport_timeout_s=None):

async def bulk_read(self, numbytes, transport_timeout_s=None):
num = min(numbytes, constants.MAX_ADB_DATA)
ret = self._bulk_read[:num]
self._bulk_read = self._bulk_read[num:]
ret = self.bulk_read_data[:num]
self.bulk_read_data = self.bulk_read_data[num:]
return ret

async def bulk_write(self, data, transport_timeout_s=None):
self._bulk_write += data
self.bulk_write_data += data
return len(data)


Expand Down
Loading