Skip to content

Commit

Permalink
Rewrite the entire thing to use Python asyncio.
Browse files Browse the repository at this point in the history
  • Loading branch information
whitequark committed Jul 30, 2018
1 parent 40a7f93 commit 66aa942
Show file tree
Hide file tree
Showing 14 changed files with 612 additions and 591 deletions.
28 changes: 13 additions & 15 deletions software/glasgow/access/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,16 @@ def get_pads(self, args, pins=[], pin_sets=[]):
class AccessDemultiplexer(metaclass=ABCMeta):
def __init__(self, device):
self.device = device
self._interfaces = []

@abstractmethod
def claim_interface(self, applet, mux_interface, args, timeout=None, async=False):
async def claim_interface(self, applet, mux_interface, args, timeout=None):
pass

async def flush(self):
for iface in self._interfaces:
await iface.flush()


class AccessDemultiplexerInterface(metaclass=ABCMeta):
def __init__(self, device, applet):
Expand All @@ -112,30 +117,23 @@ def __init__(self, device, applet):
self.logger = applet.logger

@abstractmethod
def has_buffered_data(self):
async def read(self, length=None):
pass

@abstractmethod
def read(self, length=None):
pass

def read_str(self, *args, encoding="utf-8", **kwargs):
result = self.read(*args, **kwargs)
async def read_str(self, *args, encoding="utf-8", **kwargs):
result = await self.read(*args, **kwargs)
if result is None:
return None
else:
return result.decode(encoding)

@abstractmethod
def write(self, data):
async def write(self, data):
pass

def write_str(self, data, *args, encoding="utf-8", **kwargs):
return self.write(data.encode(encoding), *args, **kwargs)
async def write_str(self, data, *args, encoding="utf-8", **kwargs):
await self.write(data.encode(encoding), *args, **kwargs)

@abstractmethod
def flush(self):
async def flush(self):
pass

def __del__(self):
self.flush()
152 changes: 30 additions & 122 deletions software/glasgow/access/direct/demultiplexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,21 @@ class DirectDemultiplexer(AccessDemultiplexer):
def __init__(self, device):
super().__init__(device)
self._claimed = set()
self._interfaces = []

def claim_interface(self, applet, mux_interface, args, timeout=None, async=False):
async def claim_interface(self, applet, mux_interface, args):
assert mux_interface._fifo_num not in self._claimed
self._claimed.add(mux_interface._fifo_num)

if async:
self.device.get_poller()

iface = DirectDemultiplexerInterface(self.device, applet, mux_interface, timeout, async)
iface = DirectDemultiplexerInterface(self.device, applet, mux_interface)
self._interfaces.append(iface)

if hasattr(args, "mirror_voltage") and args.mirror_voltage:
for port in args.port_spec:
self.device.mirror_voltage(port)
await self.device.mirror_voltage(port)
applet.logger.info("port %s voltage set to %.1f V",
port, self.device.get_voltage(port))
elif hasattr(args, "voltage") and args.voltage is not None:
self.device.set_voltage(args.port_spec, args.voltage)
await self.device.set_voltage(args.port_spec, args.voltage)
applet.logger.info("port(s) %s voltage set to %.1f V",
", ".join(sorted(args.port_spec)), args.voltage)
elif hasattr(args, "keep_voltage") and args.keep_voltage:
Expand All @@ -35,14 +31,11 @@ def claim_interface(self, applet, mux_interface, args, timeout=None, async=False


class DirectDemultiplexerInterface(AccessDemultiplexerInterface):
def __init__(self, device, applet, mux_interface, timeout, async):
def __init__(self, device, applet, mux_interface):
super().__init__(device, applet)
self._usb = device.usb
self._timeout = None if timeout is None else round(timeout * 1000)
self._async = async

config_num = self._usb.getConfiguration()
for config in self._usb.getDevice().iterConfigurations():
config_num = self.device.usb.getConfiguration()
for config in self.device.usb.getDevice().iterConfigurations():
if config.getConfigurationValue() == config_num:
break

Expand All @@ -63,137 +56,52 @@ def __init__(self, device, applet, mux_interface, timeout, async):
self._out_packet_size = packet_size
assert self._endpoint_in != None and self._endpoint_out != None

self._interface = self._usb.claimInterface(mux_interface._fifo_num)
self._interface = self.device.usb.claimInterface(mux_interface._fifo_num)
self._buffer_in = bytearray()
self._buffer_out = bytearray()

if self._async:
self._in_transfer = self._usb.getTransfer()
def callback(transfer):
self.logger.trace("USB: EP%x IN (completed)", self._endpoint_in & 0x7f)
self._in_transfer.setBulk(self._endpoint_in, self._in_packet_size, callback)
self._in_transfer.submit()
self.logger.trace("USB: EP%x IN (submit)", self._endpoint_in & 0x7f)

self._out_transfer = self._usb.getTransfer()
def callback(transfer):
self.logger.trace("USB: EP%x OUT (completed)", self._endpoint_out)
self._write_packet_async()
self._out_transfer.setBulk(self._endpoint_out, 0, callback)

def has_buffered_data(self):
if len(self._buffer_in) > 0 or len(self._buffer_out) > 0:
return True
if self._async:
return not self._in_transfer.isSubmitted() or self._out_transfer.isSubmitted()
else:
return False

def _append_in_packet(self, packet):
self.logger.trace("USB: EP%x IN: %s", self._endpoint_in & 0x7f, packet.hex())
async def _read_packet(self):
packet = await self.device.bulk_read(self._endpoint_in, self._in_packet_size)
self._buffer_in += packet

def _read_packet_async(self):
if self._in_transfer.isSubmitted():
return False
elif self._in_transfer.getStatus() != usb1.TRANSFER_COMPLETED:
usb1.raiseUSBError(self._in_transfer.getStatus())
else:
packet = self._in_transfer.getBuffer()[:self._in_transfer.getActualLength()]
self._append_in_packet(packet)
self.logger.trace("USB: EP%x IN (submit)", self._endpoint_in & 0x7f)
self._in_transfer.submit()
return True

def _read_packet_sync(self):
packet = self._usb.bulkRead(self._endpoint_in, self._in_packet_size, self._timeout)
self._append_in_packet(packet)

def read(self, length=None):
async def read(self, length=None):
if len(self._buffer_out) > 0:
# Flush the buffer, so that everything written before the read reaches the device.
self.flush()
await self.flush()

if length is None and len(self._buffer_in) > 0:
# Just return whatever is in the buffer.
length = len(self._buffer_in)
if self._async:
# Always check if we have new data waiting to be read, and rearm the transfer.
# This ensures that the poll call will have something to wait on.
self._read_packet_async()
if length is None:
# Return whatever is in the buffer, even if it's nothing.
length = len(self._buffer_in)
elif len(self._buffer_in) >= length:
# Return exactly the requested length if we have it.
pass
else:
# Return None if we can't fulfill the request.
return None
elif length is None:
# Return whatever is received in the next transfer, even if it's nothing.
await self._read_packet()
length = len(self._buffer_in)
else:
if length is None:
# Sync reads with no requested length always block if there's nothing
# in the buffer, or we'll never get a chance to refill the buffer if
# the application code only issues reads with no requested length.
self._read_packet_sync()
length = len(self._buffer_in)
else:
# Sync reads always return exactly the requested length, if any.
while len(self._buffer_in) < length:
self._read_packet_sync()
# Return exactly the requested length.
while len(self._buffer_in) < length:
await self._read_packet()

result = self._buffer_in[:length]
self._buffer_in = self._buffer_in[length:]
self.logger.trace("FIFO: read <%s>", result.hex())
return result

def _slice_out_packet(self):
async def _write_packet(self):
packet = self._buffer_out[:self._out_packet_size]
self._buffer_out = self._buffer_out[self._out_packet_size:]
self.logger.trace("USB: EP%x OUT: <%s>", self._endpoint_out, packet.hex())
return packet

def _write_packet_async(self):
if self._out_transfer.isSubmitted():
pass
elif self._out_transfer.getStatus() != usb1.TRANSFER_COMPLETED:
usb1.raiseUSBError(self._out_transfer.getStatus())
elif len(self._buffer_out) > 0:
packet = self._slice_out_packet()
self._out_transfer.setBuffer(packet)
self.logger.trace("USB: EP%x OUT (submit)", self._endpoint_out)
self._out_transfer.submit()

def _write_packet_sync(self):
packet = self._slice_out_packet()
self._usb.bulkWrite(self._endpoint_out, packet, self._timeout)

def write(self, data, async=False):
data = bytearray(data)
await self.device.bulk_write(self._endpoint_out, packet)

async def write(self, data):
if not isinstance(data, (bytes, bytearray)):
data = bytes(data)

self.logger.trace("FIFO: write <%s>", data.hex())
self._buffer_out += data

if self._async:
if len(self._buffer_out) > self._out_packet_size:
self._write_packet_async()
else:
while len(self._buffer_out) > self._out_packet_size:
self._write_packet_sync()
if len(self._buffer_out) > self._out_packet_size:
await self._write_packet()

def flush(self):
async def flush(self):
self.logger.trace("FIFO: flush")
if self._async:
self._write_packet_async()
else:
while len(self._buffer_out) > 0:
self._write_packet_sync()

def poll(self):
if self.has_buffered_data():
# If we have data in IN endpoint buffers, always return right away, but also
# peek at what other fds might have become ready, for efficiency.
return self.device.poll(0)
else:
# Otherwise, just wait on USB transfers and any other registered fds.
return self.device.poll(self._timeout)
while len(self._buffer_out) > 0:
await self._write_packet()
29 changes: 13 additions & 16 deletions software/glasgow/access/simulation/demultiplexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,28 @@


class SimulationDemultiplexer(AccessDemultiplexer):
def claim_interface(self, applet, mux_interface, args, timeout=None, async=False):
return SimulationDemultiplexerInterface(self.device, applet, mux_interface, timeout, async)
async def claim_interface(self, applet, mux_interface, args):
return SimulationDemultiplexerInterface(self.device, applet, mux_interface)


class SimulationDemultiplexerInterface(AccessDemultiplexerInterface):
def __init__(self, device, applet, mux_interface, timeout, async):
def __init__(self, device, applet, mux_interface):
super().__init__(device, applet)

self._mux = mux_interface
self.in_fifo = mux_interface.in_fifo
self.out_fifo = mux_interface.out_fifo

def has_buffered_data(self):
return False
self._in_fifo = mux_interface.in_fifo
self._out_fifo = mux_interface.out_fifo

@asyncio.coroutine
def read(self, length):
def read(self, length=None):
data = []
if length is None:
while (yield self.in_fifo.readable):
data.append((yield from self.in_fifo.read()))
while (yield self._in_fifo.readable):
data.append((yield from self._in_fifo.read()))
else:
while len(data) < length:
while not (yield self.in_fifo.readable):
while not (yield self._in_fifo.readable):
yield
data.append((yield from self.in_fifo.read()))
data.append((yield from self._in_fifo.read()))

data = bytes(data)
self.logger.trace("FIFO: read <%s>", data.hex())
Expand All @@ -42,9 +38,10 @@ def write(self, data):
self.logger.trace("FIFO: write <%s>", data.hex())

for byte in data:
while not (yield self.out_fifo.writable):
while not (yield self._out_fifo.writable):
yield
yield from self.out_fifo.write(byte)
yield from self._out_fifo.write(byte)

@asyncio.coroutine
def flush(self):
pass

0 comments on commit 66aa942

Please sign in to comment.