diff --git a/design/mvp/CanonicalABI.md b/design/mvp/CanonicalABI.md index ad3c0469..9b2b5523 100644 --- a/design/mvp/CanonicalABI.md +++ b/design/mvp/CanonicalABI.md @@ -1053,9 +1053,9 @@ class ReadableStream: t: ValType read: Callable[[WritableBuffer, OnPartialCopy, OnCopyDone], Literal['done','blocked']] cancel: Callable[[], None] - close: Callable[[]] + close: Callable[[Optional[ErrorContext]]] closed: Callable[[], bool] - closed_with_error: Callable[[], Optional[ErrorContext]] + closed_with: Callable[[], Optional[ErrorContext]] ``` The key operation is `read` which works as follows: * `read` is non-blocking, returning `'blocked'` if it would have blocked. @@ -1091,7 +1091,7 @@ class in chunks, starting with the fields and initialization: class ReadableStreamGuestImpl(ReadableStream): impl: ComponentInstance closed_: bool - errctx: Optional[ErrorContext] + maybe_errctx: Optional[ErrorContext] pending_buffer: Optional[Buffer] pending_on_partial_copy: Optional[OnPartialCopy] pending_on_copy_done: Optional[OnCopyDone] @@ -1100,7 +1100,7 @@ class ReadableStreamGuestImpl(ReadableStream): self.t = t self.impl = inst self.closed_ = False - self.errctx = None + self.maybe_errctx = None self.reset_pending() def reset_pending(self): @@ -1126,19 +1126,19 @@ been returned: def cancel(self): self.reset_and_notify_pending() - def close(self, errctx = None): + def close(self, maybe_errctx): if not self.closed_: self.closed_ = True - self.errctx = errctx + self.maybe_errctx = maybe_errctx if self.pending_buffer: self.reset_and_notify_pending() def closed(self): return self.closed_ - def closed_with_error(self): + def closed_with(self): assert(self.closed_) - return self.errctx + return self.maybe_errctx ``` While the abstract `ReadableStream` interface *allows* `cancel` to return without having returned ownership of the buffer (which, in general, is @@ -1146,11 +1146,6 @@ necessary for [various][OIO] [host][io_uring] APIs), when *wasm* is implementing the stream, `cancel` always returns ownership of the buffer immediately. -`close` takes an optional `error-context` value which can only be supplied to -the writable end of a stream via `stream.close-writable`. Thus, for the -readable end of the stream, `close` is effectively nullary, matching -`ReadableStream.close`. - Note that `cancel` and `close` notify in opposite directions: * `cancel` *must* be called on a readable or writable end with an operation pending, and thus `cancel` notifies the same end that called it. @@ -1208,9 +1203,9 @@ class StreamEnd(Waitable): self.stream = stream self.copying = False - def drop(self, errctx): + def drop(self, maybe_errctx): trap_if(self.copying) - self.stream.close(errctx) + self.stream.close(maybe_errctx) Waitable.drop(self) class ReadableStreamEnd(StreamEnd): @@ -1247,11 +1242,11 @@ class FutureEnd(StreamEnd): assert(buffer.remain() == 1) def on_copy_done_wrapper(): if buffer.remain() == 0: - self.stream.close() + self.stream.close(maybe_errctx = None) on_copy_done() ret = copy_op(buffer, on_partial_copy = None, on_copy_done = on_copy_done_wrapper) if ret == 'done' and buffer.remain() == 0: - self.stream.close() + self.stream.close(maybe_errctx = None) return ret class ReadableFutureEnd(FutureEnd): @@ -1262,9 +1257,9 @@ class WritableFutureEnd(FutureEnd): paired: bool = False def copy(self, src, on_partial_copy, on_copy_done): return self.close_after_copy(self.stream.write, src, on_copy_done) - def drop(self, errctx): - trap_if(not self.stream.closed() and not errctx) - FutureEnd.drop(self, errctx) + def drop(self, maybe_errctx): + trap_if(not self.stream.closed() and not maybe_errctx) + FutureEnd.drop(self, maybe_errctx) ``` The `future.{read,write}` built-ins fix the buffer length to `1`, ensuring the `assert(buffer.remain() == 1)` holds. Because of this, there are no partial @@ -3541,9 +3536,8 @@ def pack_copy_result(task, buffer, e): assert(not (buffer.progress & CLOSED)) return buffer.progress else: - if (errctx := e.stream.closed_with_error()): - assert(isinstance(e, ReadableStreamEnd|ReadableFutureEnd)) - errctxi = task.inst.error_contexts.add(errctx) + if (maybe_errctx := e.stream.closed_with()): + errctxi = task.inst.error_contexts.add(maybe_errctx) assert(errctxi != 0) else: errctxi = 0 @@ -3555,11 +3549,6 @@ The order of tests here indicates that, if some progress was made and then the stream was closed, only the progress is reported and the `CLOSED` status is left to be discovered next time. -As asserted here, `error-context`s are only possible on the *readable* end of a -stream or future (since, as defined below, only the *writable* end can close -the stream with an `error-context`). Thus, `error-context`s only flow in the -same direction as values, as an optional last value of the stream or future. - ### 🔀 `canon {stream,future}.cancel-{read,write}` @@ -3632,14 +3621,8 @@ caller can assume that ownership of the buffer has been returned. For canonical definitions: ```wat (canon stream.close-readable $t (core func $f)) -(canon future.close-readable $t (core func $f)) -``` -validation specifies: -* `$f` is given type `(func (param i32))` - -and for canonical definitions: -```wat (canon stream.close-writable $t (core func $f)) +(canon future.close-readable $t (core func $f)) (canon future.close-writable $t (core func $f)) ``` validation specifies: @@ -3650,14 +3633,14 @@ the given index from the current component instance's `waitable` table, performing the guards and bookkeeping defined by `{Readable,Writable}{Stream,Future}End.drop()` above. ```python -async def canon_stream_close_readable(t, task, i): - return await close(ReadableStreamEnd, t, task, i, 0) +async def canon_stream_close_readable(t, task, i, errctxi): + return await close(ReadableStreamEnd, t, task, i, errctxi) async def canon_stream_close_writable(t, task, hi, errctxi): return await close(WritableStreamEnd, t, task, hi, errctxi) -async def canon_future_close_readable(t, task, i): - return await close(ReadableFutureEnd, t, task, i, 0) +async def canon_future_close_readable(t, task, i, errctxi): + return await close(ReadableFutureEnd, t, task, i, errctxi) async def canon_future_close_writable(t, task, hi, errctxi): return await close(WritableFutureEnd, t, task, hi, errctxi) @@ -3666,17 +3649,22 @@ async def close(EndT, t, task, hi, errctxi): trap_if(not task.inst.may_leave) e = task.inst.waitables.remove(hi) if errctxi == 0: - errctx = None + maybe_errctx = None else: - errctx = task.inst.error_contexts.get(errctxi) + maybe_errctx = task.inst.error_contexts.get(errctxi) trap_if(not isinstance(e, EndT)) trap_if(e.stream.t != t) - e.drop(errctx) + e.drop(maybe_errctx) return [] ``` -Note that only the writable ends of streams and futures can be closed with a -final `error-context` value and thus `error-context`s only flow in the same -direction as values as an optional last value of the stream or future. +Passing a non-zero `errctxi` index indicates that this stream end is being +closed due to an error, with the given `error-context` providing information +that can be printed to aid in debugging. While, as explained above, the +*contents* of the `error-context` value are non-deterministic (and may, e.g., +be empty), the presence or absence of an `error-context` value is semantically +meaningful for distinguishing between success or failure. Concretely, the +packed `i32` returned by `{stream,future}.{read,write}` operations indicates +success or failure by whether the `error-context` index is `0` or not. ### 🔀 `canon error-context.new` diff --git a/design/mvp/Explainer.md b/design/mvp/Explainer.md index f26067c0..b34146b8 100644 --- a/design/mvp/Explainer.md +++ b/design/mvp/Explainer.md @@ -1878,12 +1878,11 @@ delivered to indicate the completion of the `read` or `write`. (See | Synopsis | | | ----------------------------------------------------- | ---------------------------------------------------------------- | -| Approximate WIT signature for `stream.close-readable` | `func(e: readable-stream-end)` | +| Approximate WIT signature for `stream.close-readable` | `func(e: readable-stream-end, err: option)` | | Approximate WIT signature for `stream.close-writable` | `func(e: writable-stream-end, err: option)` | -| Approximate WIT signature for `future.close-readable` | `func(e: readable-future-end)` | +| Approximate WIT signature for `future.close-readable` | `func(e: readable-future-end, err: option)` | | Approximate WIT signature for `future.close-writable` | `func(e: writable-future-end, err: option)` | -| Canonical ABI signature for `*.close-readable` | `[readable-end:i32] -> []` | -| Canonical ABI signature for `*.close-writable` | `[writable-end:i32 err:i32] -> []` | +| Canonical ABI signature | `[end:i32 err:i32] -> []` | The `{stream,future}.close-{readable,writable}` built-ins remove the indicated [stream or future] from the current component instance's table of [waitables], diff --git a/design/mvp/canonical-abi/definitions.py b/design/mvp/canonical-abi/definitions.py index f5d3e60d..89a219f4 100644 --- a/design/mvp/canonical-abi/definitions.py +++ b/design/mvp/canonical-abi/definitions.py @@ -640,14 +640,14 @@ class ReadableStream: t: ValType read: Callable[[WritableBuffer, OnPartialCopy, OnCopyDone], Literal['done','blocked']] cancel: Callable[[], None] - close: Callable[[]] + close: Callable[[Optional[ErrorContext]]] closed: Callable[[], bool] - closed_with_error: Callable[[], Optional[ErrorContext]] + closed_with: Callable[[], Optional[ErrorContext]] class ReadableStreamGuestImpl(ReadableStream): impl: ComponentInstance closed_: bool - errctx: Optional[ErrorContext] + maybe_errctx: Optional[ErrorContext] pending_buffer: Optional[Buffer] pending_on_partial_copy: Optional[OnPartialCopy] pending_on_copy_done: Optional[OnCopyDone] @@ -656,7 +656,7 @@ def __init__(self, t, inst): self.t = t self.impl = inst self.closed_ = False - self.errctx = None + self.maybe_errctx = None self.reset_pending() def reset_pending(self): @@ -672,19 +672,19 @@ def reset_and_notify_pending(self): def cancel(self): self.reset_and_notify_pending() - def close(self, errctx = None): + def close(self, maybe_errctx): if not self.closed_: self.closed_ = True - self.errctx = errctx + self.maybe_errctx = maybe_errctx if self.pending_buffer: self.reset_and_notify_pending() def closed(self): return self.closed_ - def closed_with_error(self): + def closed_with(self): assert(self.closed_) - return self.errctx + return self.maybe_errctx def read(self, dst, on_partial_copy, on_copy_done): return self.copy(dst, on_partial_copy, on_copy_done, self.pending_buffer, dst) @@ -719,9 +719,9 @@ def __init__(self, stream): self.stream = stream self.copying = False - def drop(self, errctx): + def drop(self, maybe_errctx): trap_if(self.copying) - self.stream.close(errctx) + self.stream.close(maybe_errctx) Waitable.drop(self) class ReadableStreamEnd(StreamEnd): @@ -740,11 +740,11 @@ def close_after_copy(self, copy_op, buffer, on_copy_done): assert(buffer.remain() == 1) def on_copy_done_wrapper(): if buffer.remain() == 0: - self.stream.close() + self.stream.close(maybe_errctx = None) on_copy_done() ret = copy_op(buffer, on_partial_copy = None, on_copy_done = on_copy_done_wrapper) if ret == 'done' and buffer.remain() == 0: - self.stream.close() + self.stream.close(maybe_errctx = None) return ret class ReadableFutureEnd(FutureEnd): @@ -755,9 +755,9 @@ class WritableFutureEnd(FutureEnd): paired: bool = False def copy(self, src, on_partial_copy, on_copy_done): return self.close_after_copy(self.stream.write, src, on_copy_done) - def drop(self, errctx): - trap_if(not self.stream.closed() and not errctx) - FutureEnd.drop(self, errctx) + def drop(self, maybe_errctx): + trap_if(not self.stream.closed() and not maybe_errctx) + FutureEnd.drop(self, maybe_errctx) ### Despecialization @@ -2071,9 +2071,8 @@ def pack_copy_result(task, buffer, e): assert(not (buffer.progress & CLOSED)) return buffer.progress else: - if (errctx := e.stream.closed_with_error()): - assert(isinstance(e, ReadableStreamEnd|ReadableFutureEnd)) - errctxi = task.inst.error_contexts.add(errctx) + if (maybe_errctx := e.stream.closed_with()): + errctxi = task.inst.error_contexts.add(maybe_errctx) assert(errctxi != 0) else: errctxi = 0 @@ -2114,14 +2113,14 @@ async def cancel_copy(EndT, event_code, t, sync, task, i): ### 🔀 `canon {stream,future}.close-{readable,writable}` -async def canon_stream_close_readable(t, task, i): - return await close(ReadableStreamEnd, t, task, i, 0) +async def canon_stream_close_readable(t, task, i, errctxi): + return await close(ReadableStreamEnd, t, task, i, errctxi) async def canon_stream_close_writable(t, task, hi, errctxi): return await close(WritableStreamEnd, t, task, hi, errctxi) -async def canon_future_close_readable(t, task, i): - return await close(ReadableFutureEnd, t, task, i, 0) +async def canon_future_close_readable(t, task, i, errctxi): + return await close(ReadableFutureEnd, t, task, i, errctxi) async def canon_future_close_writable(t, task, hi, errctxi): return await close(WritableFutureEnd, t, task, hi, errctxi) @@ -2130,12 +2129,12 @@ async def close(EndT, t, task, hi, errctxi): trap_if(not task.inst.may_leave) e = task.inst.waitables.remove(hi) if errctxi == 0: - errctx = None + maybe_errctx = None else: - errctx = task.inst.error_contexts.get(errctxi) + maybe_errctx = task.inst.error_contexts.get(errctxi) trap_if(not isinstance(e, EndT)) trap_if(e.stream.t != t) - e.drop(errctx) + e.drop(maybe_errctx) return [] ### 🔀 `canon error-context.new` diff --git a/design/mvp/canonical-abi/run_tests.py b/design/mvp/canonical-abi/run_tests.py index 8af438d6..c8340b89 100644 --- a/design/mvp/canonical-abi/run_tests.py +++ b/design/mvp/canonical-abi/run_tests.py @@ -985,11 +985,11 @@ def reset_pending(self): def closed(self): return not self.remaining and self.destroy_if_empty - def closed_with_error(self): + def closed_with(self): assert(self.closed()) return None - def close(self, errctx = None): + def close(self, maybe_errctx = None): self.remaining = [] self.destroy_if_empty = True if self.pending_dst: @@ -1170,8 +1170,8 @@ async def core_func(task, args): assert(ret == 4) [ret] = await canon_stream_write(U8Type(), opts, task, wsi1, 0, 4) assert(ret == 4) - [] = await canon_stream_close_readable(U8Type(), task, rsi1) - [] = await canon_stream_close_readable(U8Type(), task, rsi2) + [] = await canon_stream_close_readable(U8Type(), task, rsi1, 0) + [] = await canon_stream_close_readable(U8Type(), task, rsi2, 0) [] = await canon_stream_close_writable(U8Type(), task, wsi1, 0) [] = await canon_stream_close_writable(U8Type(), task, wsi2, 0) return [] @@ -1261,7 +1261,7 @@ async def core_func(task, args): assert(ret == 4) [ret] = await canon_stream_read(U8Type(), sync_opts, task, rsi1, 0, 4) assert(ret == definitions.CLOSED) - [] = await canon_stream_close_readable(U8Type(), task, rsi1) + [] = await canon_stream_close_readable(U8Type(), task, rsi1, 0) assert(mem[0:4] == b'\x05\x06\x07\x08') [ret] = await canon_stream_write(U8Type(), opts, task, wsi2, 0, 4) assert(ret == 4) @@ -1275,7 +1275,7 @@ async def core_func(task, args): assert(mem[retp+4] == 4) [ret] = await canon_stream_read(U8Type(), opts, task, rsi2, 0, 4) assert(ret == definitions.CLOSED) - [] = await canon_stream_close_readable(U8Type(), task, rsi2) + [] = await canon_stream_close_readable(U8Type(), task, rsi2, 0) [ret] = await canon_stream_write(U8Type(), sync_opts, task, wsi1, 0, 4) assert(ret == 4) [] = await canon_stream_close_writable(U8Type(), task, wsi1, 0) @@ -1389,7 +1389,7 @@ async def core_func(task, args): assert(event == EventCode.STREAM_READ) assert(mem[retp+0] == rsi) assert(mem[retp+4] == 2) - [] = await canon_stream_close_readable(U8Type(), task, rsi) + [] = await canon_stream_close_readable(U8Type(), task, rsi, 0) [wsi] = await canon_stream_new(U8Type(), task) assert(wsi == 1) @@ -1511,7 +1511,7 @@ async def core_func2(task, args): [ret] = await canon_stream_read(U8Type(), opts2, task, rsi, 0, 2) errctxi = 1 assert(ret == (definitions.CLOSED | errctxi)) - [] = await canon_stream_close_readable(U8Type(), task, rsi) + [] = await canon_stream_close_readable(U8Type(), task, rsi, 0) [] = await canon_waitable_set_drop(task, seti) [] = await canon_error_context_debug_message(opts2, task, errctxi, 0) [] = await canon_error_context_drop(task, errctxi) @@ -1603,7 +1603,7 @@ async def core_func2(task, args): [ret] = await canon_stream_read(None, opts2, task, rsi, 1000000, 2) errctxi = 1 assert(ret == (definitions.CLOSED | errctxi)) - [] = await canon_stream_close_readable(None, task, rsi) + [] = await canon_stream_close_readable(None, task, rsi, 0) [] = await canon_error_context_debug_message(opts2, task, errctxi, 0) [] = await canon_error_context_drop(task, errctxi) return [] @@ -1674,7 +1674,7 @@ async def core_func(task, args): assert(ret == definitions.BLOCKED) [ret] = await canon_stream_cancel_read(U8Type(), True, task, rsi) assert(ret == 0) - [] = await canon_stream_close_readable(U8Type(), task, rsi) + [] = await canon_stream_close_readable(U8Type(), task, rsi, 0) [ret] = await canon_lower(lower_opts, host_ft2, host_func2, task, [retp]) assert(ret == 0) @@ -1693,7 +1693,7 @@ async def core_func(task, args): assert(mem[retp+0] == rsi) assert(mem[retp+4] == 2) assert(mem[0:2] == b'\x07\x08') - [] = await canon_stream_close_readable(U8Type(), task, rsi) + [] = await canon_stream_close_readable(U8Type(), task, rsi, 0) [] = await canon_waitable_set_drop(task, seti) return [] @@ -1731,7 +1731,7 @@ def __init__(self, t): self.pending_on_copy_done = None def closed(self): return self.is_closed - def closed_with_error(self): + def closed_with(self): assert(self.closed()) return None def read(self, dst, on_partial_copy, on_copy_done): @@ -1758,7 +1758,7 @@ def cancel(self): self.pending_buffer = None self.pending_on_copy_done = None pending_on_copy_done() - def close(self, errctx = None): + def close(self, maybe_errctx): self.is_closed = True if self.pending_buffer: self.cancel() @@ -1806,7 +1806,7 @@ async def core_func(task, args): assert(mem[readp] == 43) [] = await canon_future_close_writable(U8Type(), task, wfi, 0) - [] = await canon_future_close_readable(U8Type(), task, rfi) + [] = await canon_future_close_readable(U8Type(), task, rfi, 0) [] = await canon_waitable_set_drop(task, seti) [wfi] = await canon_future_new(U8Type(), task) @@ -1831,7 +1831,7 @@ async def core_func(task, args): assert(mem[readp] == 43) [] = await canon_future_close_writable(U8Type(), task, wfi, 0) - [] = await canon_future_close_readable(U8Type(), task, rfi) + [] = await canon_future_close_readable(U8Type(), task, rfi, 0) return []