Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 33 additions & 45 deletions design/mvp/CanonicalABI.md
Original file line number Diff line number Diff line change
Expand Up @@ -1053,9 +1053,9 @@ class ReadableStream:
t: ValType
read: Callable[[WritableBuffer, OnPartialCopy, OnCopyDone], Literal['done','blocked']]
cancel: Callable[[], None]
close: Callable[[]]
close: Callable[[Optional[ErrorContext]]]
closed: Callable[[], bool]
closed_with_error: Callable[[], Optional[ErrorContext]]
closed_with: Callable[[], Optional[ErrorContext]]
```
The key operation is `read` which works as follows:
* `read` is non-blocking, returning `'blocked'` if it would have blocked.
Expand Down Expand Up @@ -1091,7 +1091,7 @@ class in chunks, starting with the fields and initialization:
class ReadableStreamGuestImpl(ReadableStream):
impl: ComponentInstance
closed_: bool
errctx: Optional[ErrorContext]
maybe_errctx: Optional[ErrorContext]
pending_buffer: Optional[Buffer]
pending_on_partial_copy: Optional[OnPartialCopy]
pending_on_copy_done: Optional[OnCopyDone]
Expand All @@ -1100,7 +1100,7 @@ class ReadableStreamGuestImpl(ReadableStream):
self.t = t
self.impl = inst
self.closed_ = False
self.errctx = None
self.maybe_errctx = None
self.reset_pending()

def reset_pending(self):
Expand All @@ -1126,31 +1126,26 @@ been returned:
def cancel(self):
self.reset_and_notify_pending()

def close(self, errctx = None):
def close(self, maybe_errctx):
if not self.closed_:
self.closed_ = True
self.errctx = errctx
self.maybe_errctx = maybe_errctx
if self.pending_buffer:
self.reset_and_notify_pending()

def closed(self):
return self.closed_

def closed_with_error(self):
def closed_with(self):
assert(self.closed_)
return self.errctx
return self.maybe_errctx
```
While the abstract `ReadableStream` interface *allows* `cancel` to return
without having returned ownership of the buffer (which, in general, is
necessary for [various][OIO] [host][io_uring] APIs), when *wasm* is
implementing the stream, `cancel` always returns ownership of the buffer
immediately.

`close` takes an optional `error-context` value which can only be supplied to
the writable end of a stream via `stream.close-writable`. Thus, for the
readable end of the stream, `close` is effectively nullary, matching
`ReadableStream.close`.

Note that `cancel` and `close` notify in opposite directions:
* `cancel` *must* be called on a readable or writable end with an operation
pending, and thus `cancel` notifies the same end that called it.
Expand Down Expand Up @@ -1208,9 +1203,9 @@ class StreamEnd(Waitable):
self.stream = stream
self.copying = False

def drop(self, errctx):
def drop(self, maybe_errctx):
trap_if(self.copying)
self.stream.close(errctx)
self.stream.close(maybe_errctx)
Waitable.drop(self)

class ReadableStreamEnd(StreamEnd):
Expand Down Expand Up @@ -1247,11 +1242,11 @@ class FutureEnd(StreamEnd):
assert(buffer.remain() == 1)
def on_copy_done_wrapper():
if buffer.remain() == 0:
self.stream.close()
self.stream.close(maybe_errctx = None)
on_copy_done()
ret = copy_op(buffer, on_partial_copy = None, on_copy_done = on_copy_done_wrapper)
if ret == 'done' and buffer.remain() == 0:
self.stream.close()
self.stream.close(maybe_errctx = None)
return ret

class ReadableFutureEnd(FutureEnd):
Expand All @@ -1262,9 +1257,9 @@ class WritableFutureEnd(FutureEnd):
paired: bool = False
def copy(self, src, on_partial_copy, on_copy_done):
return self.close_after_copy(self.stream.write, src, on_copy_done)
def drop(self, errctx):
trap_if(not self.stream.closed() and not errctx)
FutureEnd.drop(self, errctx)
def drop(self, maybe_errctx):
trap_if(not self.stream.closed() and not maybe_errctx)
FutureEnd.drop(self, maybe_errctx)
```
The `future.{read,write}` built-ins fix the buffer length to `1`, ensuring the
`assert(buffer.remain() == 1)` holds. Because of this, there are no partial
Expand Down Expand Up @@ -3541,9 +3536,8 @@ def pack_copy_result(task, buffer, e):
assert(not (buffer.progress & CLOSED))
return buffer.progress
else:
if (errctx := e.stream.closed_with_error()):
assert(isinstance(e, ReadableStreamEnd|ReadableFutureEnd))
errctxi = task.inst.error_contexts.add(errctx)
if (maybe_errctx := e.stream.closed_with()):
errctxi = task.inst.error_contexts.add(maybe_errctx)
assert(errctxi != 0)
else:
errctxi = 0
Expand All @@ -3555,11 +3549,6 @@ The order of tests here indicates that, if some progress was made and then the
stream was closed, only the progress is reported and the `CLOSED` status is
left to be discovered next time.

As asserted here, `error-context`s are only possible on the *readable* end of a
stream or future (since, as defined below, only the *writable* end can close
the stream with an `error-context`). Thus, `error-context`s only flow in the
same direction as values, as an optional last value of the stream or future.


### 🔀 `canon {stream,future}.cancel-{read,write}`

Expand Down Expand Up @@ -3632,14 +3621,8 @@ caller can assume that ownership of the buffer has been returned.
For canonical definitions:
```wat
(canon stream.close-readable $t (core func $f))
(canon future.close-readable $t (core func $f))
```
validation specifies:
* `$f` is given type `(func (param i32))`

and for canonical definitions:
```wat
(canon stream.close-writable $t (core func $f))
(canon future.close-readable $t (core func $f))
(canon future.close-writable $t (core func $f))
```
validation specifies:
Expand All @@ -3650,14 +3633,14 @@ the given index from the current component instance's `waitable` table,
performing the guards and bookkeeping defined by
`{Readable,Writable}{Stream,Future}End.drop()` above.
```python
async def canon_stream_close_readable(t, task, i):
return await close(ReadableStreamEnd, t, task, i, 0)
async def canon_stream_close_readable(t, task, i, errctxi):
return await close(ReadableStreamEnd, t, task, i, errctxi)

async def canon_stream_close_writable(t, task, hi, errctxi):
return await close(WritableStreamEnd, t, task, hi, errctxi)

async def canon_future_close_readable(t, task, i):
return await close(ReadableFutureEnd, t, task, i, 0)
async def canon_future_close_readable(t, task, i, errctxi):
return await close(ReadableFutureEnd, t, task, i, errctxi)

async def canon_future_close_writable(t, task, hi, errctxi):
return await close(WritableFutureEnd, t, task, hi, errctxi)
Expand All @@ -3666,17 +3649,22 @@ async def close(EndT, t, task, hi, errctxi):
trap_if(not task.inst.may_leave)
e = task.inst.waitables.remove(hi)
if errctxi == 0:
errctx = None
maybe_errctx = None
else:
errctx = task.inst.error_contexts.get(errctxi)
maybe_errctx = task.inst.error_contexts.get(errctxi)
trap_if(not isinstance(e, EndT))
trap_if(e.stream.t != t)
e.drop(errctx)
e.drop(maybe_errctx)
return []
```
Note that only the writable ends of streams and futures can be closed with a
final `error-context` value and thus `error-context`s only flow in the same
direction as values as an optional last value of the stream or future.
Passing a non-zero `errctxi` index indicates that this stream end is being
closed due to an error, with the given `error-context` providing information
that can be printed to aid in debugging. While, as explained above, the
*contents* of the `error-context` value are non-deterministic (and may, e.g.,
be empty), the presence or absence of an `error-context` value is semantically
meaningful for distinguishing between success or failure. Concretely, the
packed `i32` returned by `{stream,future}.{read,write}` operations indicates
success or failure by whether the `error-context` index is `0` or not.


### 🔀 `canon error-context.new`
Expand Down
7 changes: 3 additions & 4 deletions design/mvp/Explainer.md
Original file line number Diff line number Diff line change
Expand Up @@ -1878,12 +1878,11 @@ delivered to indicate the completion of the `read` or `write`. (See

| Synopsis | |
| ----------------------------------------------------- | ---------------------------------------------------------------- |
| Approximate WIT signature for `stream.close-readable` | `func<T>(e: readable-stream-end<T>)` |
| Approximate WIT signature for `stream.close-readable` | `func<T>(e: readable-stream-end<T>, err: option<error-context>)` |
| Approximate WIT signature for `stream.close-writable` | `func<T>(e: writable-stream-end<T>, err: option<error-context>)` |
| Approximate WIT signature for `future.close-readable` | `func<T>(e: readable-future-end<T>)` |
| Approximate WIT signature for `future.close-readable` | `func<T>(e: readable-future-end<T>, err: option<error-context>)` |
| Approximate WIT signature for `future.close-writable` | `func<T>(e: writable-future-end<T>, err: option<error-context>)` |
| Canonical ABI signature for `*.close-readable` | `[readable-end:i32] -> []` |
| Canonical ABI signature for `*.close-writable` | `[writable-end:i32 err:i32] -> []` |
| Canonical ABI signature | `[end:i32 err:i32] -> []` |

The `{stream,future}.close-{readable,writable}` built-ins remove the indicated
[stream or future] from the current component instance's table of [waitables],
Expand Down
49 changes: 24 additions & 25 deletions design/mvp/canonical-abi/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -640,14 +640,14 @@ class ReadableStream:
t: ValType
read: Callable[[WritableBuffer, OnPartialCopy, OnCopyDone], Literal['done','blocked']]
cancel: Callable[[], None]
close: Callable[[]]
close: Callable[[Optional[ErrorContext]]]
closed: Callable[[], bool]
closed_with_error: Callable[[], Optional[ErrorContext]]
closed_with: Callable[[], Optional[ErrorContext]]

class ReadableStreamGuestImpl(ReadableStream):
impl: ComponentInstance
closed_: bool
errctx: Optional[ErrorContext]
maybe_errctx: Optional[ErrorContext]
pending_buffer: Optional[Buffer]
pending_on_partial_copy: Optional[OnPartialCopy]
pending_on_copy_done: Optional[OnCopyDone]
Expand All @@ -656,7 +656,7 @@ def __init__(self, t, inst):
self.t = t
self.impl = inst
self.closed_ = False
self.errctx = None
self.maybe_errctx = None
self.reset_pending()

def reset_pending(self):
Expand All @@ -672,19 +672,19 @@ def reset_and_notify_pending(self):
def cancel(self):
self.reset_and_notify_pending()

def close(self, errctx = None):
def close(self, maybe_errctx):
if not self.closed_:
self.closed_ = True
self.errctx = errctx
self.maybe_errctx = maybe_errctx
if self.pending_buffer:
self.reset_and_notify_pending()

def closed(self):
return self.closed_

def closed_with_error(self):
def closed_with(self):
assert(self.closed_)
return self.errctx
return self.maybe_errctx

def read(self, dst, on_partial_copy, on_copy_done):
return self.copy(dst, on_partial_copy, on_copy_done, self.pending_buffer, dst)
Expand Down Expand Up @@ -719,9 +719,9 @@ def __init__(self, stream):
self.stream = stream
self.copying = False

def drop(self, errctx):
def drop(self, maybe_errctx):
trap_if(self.copying)
self.stream.close(errctx)
self.stream.close(maybe_errctx)
Waitable.drop(self)

class ReadableStreamEnd(StreamEnd):
Expand All @@ -740,11 +740,11 @@ def close_after_copy(self, copy_op, buffer, on_copy_done):
assert(buffer.remain() == 1)
def on_copy_done_wrapper():
if buffer.remain() == 0:
self.stream.close()
self.stream.close(maybe_errctx = None)
on_copy_done()
ret = copy_op(buffer, on_partial_copy = None, on_copy_done = on_copy_done_wrapper)
if ret == 'done' and buffer.remain() == 0:
self.stream.close()
self.stream.close(maybe_errctx = None)
return ret

class ReadableFutureEnd(FutureEnd):
Expand All @@ -755,9 +755,9 @@ class WritableFutureEnd(FutureEnd):
paired: bool = False
def copy(self, src, on_partial_copy, on_copy_done):
return self.close_after_copy(self.stream.write, src, on_copy_done)
def drop(self, errctx):
trap_if(not self.stream.closed() and not errctx)
FutureEnd.drop(self, errctx)
def drop(self, maybe_errctx):
trap_if(not self.stream.closed() and not maybe_errctx)
FutureEnd.drop(self, maybe_errctx)

### Despecialization

Expand Down Expand Up @@ -2071,9 +2071,8 @@ def pack_copy_result(task, buffer, e):
assert(not (buffer.progress & CLOSED))
return buffer.progress
else:
if (errctx := e.stream.closed_with_error()):
assert(isinstance(e, ReadableStreamEnd|ReadableFutureEnd))
errctxi = task.inst.error_contexts.add(errctx)
if (maybe_errctx := e.stream.closed_with()):
errctxi = task.inst.error_contexts.add(maybe_errctx)
assert(errctxi != 0)
else:
errctxi = 0
Expand Down Expand Up @@ -2114,14 +2113,14 @@ async def cancel_copy(EndT, event_code, t, sync, task, i):

### 🔀 `canon {stream,future}.close-{readable,writable}`

async def canon_stream_close_readable(t, task, i):
return await close(ReadableStreamEnd, t, task, i, 0)
async def canon_stream_close_readable(t, task, i, errctxi):
return await close(ReadableStreamEnd, t, task, i, errctxi)

async def canon_stream_close_writable(t, task, hi, errctxi):
return await close(WritableStreamEnd, t, task, hi, errctxi)

async def canon_future_close_readable(t, task, i):
return await close(ReadableFutureEnd, t, task, i, 0)
async def canon_future_close_readable(t, task, i, errctxi):
return await close(ReadableFutureEnd, t, task, i, errctxi)

async def canon_future_close_writable(t, task, hi, errctxi):
return await close(WritableFutureEnd, t, task, hi, errctxi)
Expand All @@ -2130,12 +2129,12 @@ async def close(EndT, t, task, hi, errctxi):
trap_if(not task.inst.may_leave)
e = task.inst.waitables.remove(hi)
if errctxi == 0:
errctx = None
maybe_errctx = None
else:
errctx = task.inst.error_contexts.get(errctxi)
maybe_errctx = task.inst.error_contexts.get(errctxi)
trap_if(not isinstance(e, EndT))
trap_if(e.stream.t != t)
e.drop(errctx)
e.drop(maybe_errctx)
return []

### 🔀 `canon error-context.new`
Expand Down
Loading