Skip to content

Commit

Permalink
access.direct.demultiplexer: pipeline reads and writes.
Browse files Browse the repository at this point in the history
This significantly improves throughput (up to ~21 MB/s) without any
particular action on behalf of applets.

Hinting is now obsolete. Two applets used hinting:
  * memory-floppy: hinting is removed (untested);
  * audio-yamaha-opl: hinting and custom pipelining are removed.

The audio-yamaha-opl applet requires explicit flushes now, since
otherwise the playback coroutine will not yield nearly often enough.

The `benchmark loopback` mode also now works, since it was hanging
not because of the arbiter (the arbiter will not, in fact, cause
starvation) but because of a deadlock between read and write
coroutines inherent in the old demultiplexer implementation.

Fixes #73.
  • Loading branch information
whitequark committed Apr 3, 2019
1 parent a3f7771 commit e59730f
Show file tree
Hide file tree
Showing 7 changed files with 252 additions and 76 deletions.
2 changes: 1 addition & 1 deletion software/glasgow/access/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ async def reset(self):
pass

@abstractmethod
async def read(self, length=None, hint=0):
async def read(self, length=None):
pass

async def read_str(self, *args, encoding="utf-8", **kwargs):
Expand Down
187 changes: 142 additions & 45 deletions software/glasgow/access/direct/demultiplexer.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import usb1
import math
import usb1
import asyncio

from ...support.logging import *
from ...support.chunked_fifo import *
from ...support.task_queue import *
from .. import AccessDemultiplexer, AccessDemultiplexerInterface


Expand All @@ -28,7 +30,33 @@
#
# To deal with this, use requests of at most 1024 EP buffer sizes (512 KiB with the FX2) as
# an arbitrary cutoff, and hope for the best.
_max_buffers_per_io = 1024
_max_packets_per_ep = 1024

# USB has the limitation that all transactions are host-initiated. Therefore, if we do not queue
# reads for the IN endpoints quickly enough, the HC will not even poll the device, and the buffer
# will quickly overflow (provided it is being filled with data). To address this, we issue many
# pipelined reads, to compensate for the non-realtime nature of Python and the host OS.
#
# This, however, has an inherent tradeoff. If we submit small reads (down to a single EP buffer
# size), we get the data back as early as possible, but the CPU load is much higher, and we have
# to submit many more buffers to tolerate the same amount of scheduling latency. If we submit large
# reads, it's much easier to service the device quickly enough, but the maximum latency of reads
# rises.
#
# The relationship between buffer size and latency is quite complex. If only one 512-byte buffer
# is available but a 10240-byte read is requested, the read will finish almost immediately with
# those 512 bytes. On the other hand, if 20 512-byte buffers are available and the HC can read one
# each time it sends an IN token, they will all be read before the read finishes; if we request
# a read of dozens of megabytes, this can take seconds.
#
# To try and balance these effects, we choose a medium buffer size that should work well with most
# applications. It's possible that this will need to become customizable later, but for now
# a single fixed value works.
_packets_per_xfer = 16

# Queue as many transfers as we can, but no more than 10, as the returns beyond that point
# are diminishing.
_xfers_per_queue = min(10, _max_packets_per_ep // _packets_per_xfer)


class DirectDemultiplexer(AccessDemultiplexer):
Expand Down Expand Up @@ -125,83 +153,152 @@ def __init__(self, device, applet, mux_interface):
assert self._endpoint_in != None and self._endpoint_out != None

self._interface = self.device.usb.claimInterface(self._pipe_num)
self._buffer_in = ChunkedFIFO()
self._buffer_out = ChunkedFIFO()
self._in_tasks = TaskQueue()
self._in_buffer = ChunkedFIFO()
self._out_tasks = TaskQueue()
self._out_buffer = ChunkedFIFO()

async def reset(self):
if self._in_tasks or self._out_tasks:
self.logger.trace("cancelling transactions")
self._in_tasks .cancel()
self._out_tasks.cancel()

self.logger.trace("asserting reset")
await self.device.write_register(self._addr_reset, 1)

self.logger.trace("synchronizing FIFOs")
self.device.usb.setInterfaceAltSetting(self._pipe_num, 1)
self._buffer_in = ChunkedFIFO()
self._buffer_out = ChunkedFIFO()
self._in_buffer .clear()
self._out_buffer.clear()

# Pipeline reads before deasserting reset, so that if the applet immediately starts
# streaming data, there are no overflows. (This is perhaps not the best way to implement
# an applet, but we can support it easily enough, and it avoids surprise overflows.)
self.logger.trace("pipelining reads")
for _ in range(_xfers_per_queue):
self._in_tasks.submit(self._in_task())
# Give the IN tasks a chance to submit their transfers before deasserting reset.
await asyncio.sleep(0)

self.logger.trace("deasserting reset")
await self.device.write_register(self._addr_reset, 0)

async def _read_packet(self, hint=0):
buffers = min(_max_buffers_per_io, max(1, math.ceil(hint / self._endpoint_in)))
packet = await self.device.bulk_read(self._endpoint_in, self._in_packet_size * buffers)
self._buffer_in.write(packet)
async def _in_task(self):
size = self._in_packet_size * _packets_per_xfer
data = await self.device.bulk_read(self._endpoint_in, size)
self._in_buffer.write(data)

async def read(self, length=None, hint=0):
# Always try to allocate at least as many USB buffers as the amount of data we know we're
# going to read from the FIFO. The real value is capped to avoid hitting platform-specific
# limits for USB I/O size (see above).
if length is not None:
hint = max(hint, length)
self._in_tasks.submit(self._in_task())

if len(self._buffer_out) > 0:
async def read(self, length=None):
if len(self._out_buffer) > 0:
# Flush the buffer, so that everything written before the read reaches the device.
await self.flush()

if length is None and len(self._buffer_in) > 0:
if length is None and len(self._in_buffer) > 0:
# Just return whatever is in the buffer.
length = len(self._buffer_in)
length = len(self._in_buffer)
elif length is None:
# Return whatever is received in the next transfer, even if it's nothing.
await self._read_packet(hint)
length = len(self._buffer_in)
# (Gateware doesn't normally submit zero-length packets, so, unless that changes
# or customized gateware is used, we'll always get some data here.)
await self._in_tasks.wait_one()
length = len(self._in_buffer)
else:
# Return exactly the requested length.
while len(self._buffer_in) < length:
self.logger.trace("FIFO: need %d bytes", length - len(self._buffer_in))
await self._read_packet(hint)
while len(self._in_buffer) < length:
self.logger.trace("FIFO: need %d bytes", length - len(self._in_buffer))
await self._in_tasks.wait_one()

result = self._buffer_in.read(length)
result = self._in_buffer.read(length)
if len(result) < length:
result = bytearray(result)
while len(result) < length:
result += self._buffer_in.read(length - len(result))
result += self._in_buffer.read(length - len(result))
# Always return a memoryview object, to avoid hard to detect edge cases downstream.
result = memoryview(result)

self.logger.trace("FIFO: read <%s>", dump_hex(result))
return result

async def _write_packet(self):
# Fast path: read as much contiguous data as possible, but not too much, as there might
# be a platform-specific limit for USB I/O size (see above).
packet = self._buffer_out.read(self._out_packet_size * _max_buffers_per_io)
def _out_slice(self):
# Fast path: read as much contiguous data as possible, up to our transfer size.
size = self._out_packet_size * _packets_per_xfer
data = self._out_buffer.read(size)

if len(packet) < self._out_packet_size and self._buffer_out:
# Slow path: USB is annoyingly high latency with small packets, so if we only got a few
if len(data) < size and len(self._out_buffer) >= self._out_packet_size:
# Slow path: USB is very inefficient with small packets, so if we only got a few
# bytes from the FIFO, and there is much more in it, spend CPU time to aggregate that
# into at least one EP buffer sized packet, as this is likely to result in overall
# reduction of runtime.
packet = bytearray(packet)
while len(packet) < self._out_packet_size and self._buffer_out:
packet += self._buffer_out.read(self._out_packet_size)

await self.device.bulk_write(self._endpoint_out, packet)
# into a larger transfer, as this is likely to result in overall speedup.
data = bytearray(data)
# Make sure the resulting transfer is a multiple of packet size.
data += self._out_buffer.read(self._out_packet_size -
len(data) % self._out_packet_size)
# If we have more data in the buffer still, pad the transfer further, still keeping it
# a multiple of packet size.
while len(data) < size and len(self._out_buffer) >= self._out_packet_size:
data += self._out_buffer.read(self._out_packet_size)

return data

async def _out_task(self, data):
assert len(data) > 0
await self.device.bulk_write(self._endpoint_out, data)

# See the comment in `write` below for an explanation of the following code.
if len(self._out_buffer) >= self._out_packet_size * _packets_per_xfer:
self._out_tasks.submit(self._out_task(self._out_slice()))

async def write(self, data):
self.logger.trace("FIFO: write <%s>", dump_hex(data))
self._buffer_out.write(data)
# Eagerly check if any of our previous queued writes errored out.
await self._out_tasks.poll()

if len(self._buffer_out) > self._out_packet_size:
await self._write_packet()
self.logger.trace("FIFO: write <%s>", dump_hex(data))
self._out_buffer.write(data)

# The write scheduling algorithm attempts to satisfy several partially conflicting goals:
# * We want to schedule writes as early as possible, because this reduces buffer bloat and
# can dramatically improve responsiveness of the system.
# * We want to schedule writes that are as large as possible, up to _packets_per_xfer,
# because this reduces CPU utilization and improves latency.
# * We never want to automatically schedule writes smaller than _out_packet_size,
# because they occupy a whole microframe anyway.
#
# We use an approach that performs well when fed with a steady sequence of very large
# FIFO chunks, yet scales down to packet-size and byte-size FIFO chunks as well.
# * We only submit a write automatically once the buffer level crosses the threshold of
# `_out_packet_size * _packets_per_xfer`. In this case, _slice_packet always returns
# `_out_packet_size * n` bytes, where n is between 1 and _packet_per_xfer.
# * We submit enough writes that there is at least one write for each transfer worth
# of data in the buffer, up to _xfers_per_queue outstanding writes.
# * We submit another write once one finishes, if the buffer level is still above
# the threshold, even if no more explicit write calls are performed.
#
# This provides predictable write behavior; only _packets_per_xfer packet writes are
# automatically submitted, and only the minimum necessary number of tasks are scheduled on
# calls to `write`.
while len(self._out_buffer) >= self._out_packet_size * _packets_per_xfer and \
len(self._out_tasks) < _xfers_per_queue:
self._out_tasks.submit(self._out_task(self._out_slice()))

async def flush(self):
self.logger.trace("FIFO: flush")
while self._buffer_out:
await self._write_packet()

# First, we ensure we can submit one more task. (There can be more tasks than
# _xfers_per_queue because a task may spawn another one just before it terminates.)
while len(self._out_tasks) >= _xfers_per_queue:
await self._out_tasks.wait_one()

# At this point, the buffer can contain at most _packets_per_xfer packets worth
# of data, as anything beyond that crosses the threshold of automatic submission.
# So, we can simply submit the rest of data, which by definition fits into a single
# transfer.
assert len(self._out_buffer) <= self._out_packet_size * _packets_per_xfer
if self._out_buffer:
data = bytearray()
while self._out_buffer:
data += self._out_buffer.read()
self._out_tasks.submit(self._out_task(data))

await self._out_tasks.wait_all()
2 changes: 1 addition & 1 deletion software/glasgow/access/simulation/demultiplexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def reset(self):
pass

@asyncio.coroutine
def read(self, length=None, hint=0):
def read(self, length=None):
data = []
if length is None:
while (yield self._in_fifo.readable):
Expand Down
26 changes: 8 additions & 18 deletions software/glasgow/applet/audio/yamaha_opl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,10 +444,11 @@ async def wait_clocks(self, count):
await self.lower.write([OP_WAIT, *struct.pack(">H", 65535)])
count -= 65535
await self.lower.write([OP_WAIT, *struct.pack(">H", count)])
await self.lower.flush()

async def read_samples(self, count, hint=0):
async def read_samples(self, count):
self._log("read %d samples", count)
return await self.lower.read(count * 2, hint=hint * 2)
return await self.lower.read(count * 2)


class YamahaOPLInterface(YamahaOPxInterface):
Expand Down Expand Up @@ -514,25 +515,14 @@ async def play(self, disable=True):
if disable:
await self._opx_iface.disable()

async def record(self, queue, chunk_count=8192, concurrent=10):
async def record(self, queue, chunk_count=8192):
total_count = int(self._reader.total_seconds / self.sample_time)
done_count = 0

async def queue_samples(count):
samples = await self._opx_iface.read_samples(count)
while done_count < total_count:
chunk_count = min(chunk_count, total_count - done_count)
samples = await self._opx_iface.read_samples(chunk_count)
await queue.put(samples)

tasks = set()
while tasks or done_count == 0:
if tasks:
done, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for task in done:
await task

while done_count < total_count and len(tasks) < concurrent:
chunk_count = min(chunk_count, total_count - done_count)
tasks.add(asyncio.ensure_future(queue_samples(chunk_count)))
done_count += chunk_count
done_count += chunk_count

await queue.put(b"")

Expand Down
19 changes: 9 additions & 10 deletions software/glasgow/applet/memory/floppy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,19 +589,19 @@ async def measure_track(self):
self._sys_clk_freq / cycles * 60)
return cycles

async def _read_packet(self, hint):
data = await self.lower.read(254, hint)
trailer, = await self.lower.read(1, hint)
async def _read_packet(self):
data = await self.lower.read(254)
trailer, = await self.lower.read(1)
if trailer != TLR_ERROR:
return data[:trailer]

async def read_track_raw(self, hint, redundancy=1):
async def read_track_raw(self, redundancy=1):
self._log("read track raw")
index = 0
data = bytearray()
await self.lower.write([CMD_READ_RAW, redundancy])
while True:
packet = await self._read_packet(hint * redundancy)
packet = await self._read_packet()
if packet is None:
raise GlasgowAppletError("FIFO overflow while reading track")

Expand Down Expand Up @@ -881,21 +881,20 @@ def add_interact_arguments(cls, parser):
async def interact(self, device, args, floppy_iface):
self.logger.info("starting up the drive")
await floppy_iface.start()
cycles = await floppy_iface.measure_track()
await floppy_iface.measure_track()

try:
if args.operation == "read-raw":
for track in range(args.first, args.last + 1):
await floppy_iface.seek_track(track)
data = await floppy_iface.read_track_raw(hint=cycles // 8,
redundancy=args.redundancy)
data = await floppy_iface.read_track_raw(redundancy=args.redundancy)
args.file.write(struct.pack(">BBL", track & 1, track >> 1, len(data)))
args.file.write(data)
args.file.flush()

if args.operation == "read-track":
await floppy_iface.seek_track(args.track)
bytestream = await floppy_iface.read_track_raw(hint=cycles // 8)
bytestream = await floppy_iface.read_track_raw()
mfm = SoftwareMFMDecoder(self.logger)
datastream = mfm.demodulate(mfm.lock(itertools.cycle(mfm.bits(bytestream))))
for comma, data in itertools.islice(datastream, 10):
Expand All @@ -907,7 +906,7 @@ async def interact(self, device, args, floppy_iface):

if args.operation == "test-pll":
await floppy_iface.seek_track(args.track)
bytestream = await floppy_iface.read_track_raw(hint=cycles // 8)
bytestream = await floppy_iface.read_track_raw()
mfm = SoftwareMFMDecoder(self.logger)
bitstream = list(mfm.bits(bytestream)) * 2

Expand Down
14 changes: 13 additions & 1 deletion software/glasgow/support/chunked_fifo.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from collections import deque


__all__ = ["ChunkedFIFO"]


class ChunkedFIFO:
"""
A first-in first-out byte buffer that uses discontiguous storage to operate without copying.
Expand All @@ -10,6 +13,12 @@ def __init__(self):
self._chunk = None
self._offset = 0

def clear(self):
"""Remove all data from the buffer."""
self._queue.clear()
self._chunk = None
self._offset = 0

def write(self, data):
"""Enqueue ``data``."""
if not data:
Expand All @@ -36,7 +45,10 @@ def read(self, max_length=None):
return memoryview(b"")

if self._chunk is None:
self._chunk = self._queue.popleft()
if not self._queue:
return memoryview(b"")

self._chunk = self._queue.popleft()
self._offset = 0

if max_length is None:
Expand Down

0 comments on commit e59730f

Please sign in to comment.