From 3517a63b2beb41686b0f70cad0a5b0eaa1d8c869 Mon Sep 17 00:00:00 2001 From: Roman Volosatovs Date: Fri, 10 May 2024 19:27:16 +0200 Subject: [PATCH 1/2] refactor(go): refactor `Serve` and subscription API Signed-off-by: Roman Volosatovs --- crates/wit-bindgen-go/src/interface.rs | 37 +- .../hello/handler/bindings.wrpc.go | 11 +- .../wrpc/keyvalue/store/bindings.wrpc.go | 174 +--- go/{reader.go => chan_reader.go} | 0 go/client.go | 76 -- go/codec.go | 963 ------------------ go/future.go | 46 + .../http/outgoing_handler/bindings.go | 2 +- go/interface/http/types/bindings.go | 38 +- go/list.go | 89 ++ go/nats/client.go | 99 +- go/option.go | 87 ++ go/primitive.go | 187 ++++ go/result.go | 87 ++ go/stream.go | 186 ++++ go/subscriber.go | 22 - go/transmitter.go | 22 - go/tuple.go | 244 +++++ go/writer.go | 24 - go/wrpc.go | 142 +++ 20 files changed, 1125 insertions(+), 1411 deletions(-) rename go/{reader.go => chan_reader.go} (100%) delete mode 100644 go/client.go delete mode 100644 go/codec.go create mode 100644 go/future.go create mode 100644 go/list.go create mode 100644 go/option.go create mode 100644 go/primitive.go create mode 100644 go/result.go create mode 100644 go/stream.go delete mode 100644 go/subscriber.go delete mode 100644 go/transmitter.go create mode 100644 go/tuple.go delete mode 100644 go/writer.go create mode 100644 go/wrpc.go diff --git a/crates/wit-bindgen-go/src/interface.rs b/crates/wit-bindgen-go/src/interface.rs index d6e430776..7ab8934a3 100644 --- a/crates/wit-bindgen-go/src/interface.rs +++ b/crates/wit-bindgen-go/src/interface.rs @@ -1337,38 +1337,8 @@ impl InterfaceGenerator<'_> { let wrpc = self.deps.wrpc(); uwriteln!( self.src, - r#"stop{i}, err := c.Serve("{instance}", "{name}", func(ctx {context}.Context, buffer []byte, tx {wrpc}.Transmitter, inv {wrpc}.IncomingInvocation) error {{"#, + r#"stop{i}, err := c.Serve("{instance}", "{name}", func(ctx {context}.Context, w {wrpc}.IndexWriter, r {wrpc}.IndexReader, errCh <-chan error) error {{"#, ); - if !params.is_empty() { - // TODO: Handle async parameters - uwriteln!( - self.src, - r#"{slog}.DebugContext(ctx, "subscribing for `{instance}.{name}` parameters") - - payload := make(chan []byte) - stop, err := inv.Subscribe(func(ctx {context}.Context, buf []byte) {{ - payload <- buf - }}) - if err != nil {{ - return err - }} - defer func() {{ - if err := stop(); err != nil {{ - {slog}.ErrorContext(ctx, "failed to stop parameter subscription", "err", err) - }} - }}()"#, - ); - } - uwriteln!( - self.src, - r#"{slog}.DebugContext(ctx, "accepting handshake") - if err := inv.Accept(ctx, nil); err != nil {{ - return {fmt}.Errorf("failed to complete handshake: %w", err) - }}"#, - ); - if !params.is_empty() { - uwriteln!(self.src, "r := {wrpc}.NewChanReader(ctx, payload, buffer)"); - } for (i, (_, ty)) in params.iter().enumerate() { uwrite!( self.src, @@ -1425,8 +1395,9 @@ impl InterfaceGenerator<'_> { ); uwriteln!( self.src, - r#"if err := tx.Transmit({context}.Background(), buf.Bytes()); err != nil {{ - return {fmt}.Errorf("failed to transmit result: %w", err) + r#"_, err = w.Write(buf.Bytes()) + if err != nil {{ + return {fmt}.Errorf("failed to write result: %w", err) }}"#, ); self.push_str("return nil\n"); diff --git a/examples/go/hello-server/bindings/exports/wrpc_examples/hello/handler/bindings.wrpc.go b/examples/go/hello-server/bindings/exports/wrpc_examples/hello/handler/bindings.wrpc.go index 0c6061c72..245c6eb90 100644 --- a/examples/go/hello-server/bindings/exports/wrpc_examples/hello/handler/bindings.wrpc.go +++ b/examples/go/hello-server/bindings/exports/wrpc_examples/hello/handler/bindings.wrpc.go @@ -25,11 +25,7 @@ func ServeInterface(c wrpc.Client, h Handler) (stop func() error, err error) { } return nil } - stop0, err := c.Serve("wrpc-examples:hello/handler", "hello", func(ctx context.Context, buffer []byte, tx wrpc.Transmitter, inv wrpc.IncomingInvocation) error { - slog.DebugContext(ctx, "accepting handshake") - if err := inv.Accept(ctx, nil); err != nil { - return fmt.Errorf("failed to complete handshake: %w", err) - } + stop0, err := c.Serve("wrpc-examples:hello/handler", "hello", func(ctx context.Context, w wrpc.IndexWriter, r wrpc.IndexReader, errCh <-chan error) error { slog.DebugContext(ctx, "calling `wrpc-examples:hello/handler.hello` handler") r0, err := h.Hello(ctx) if err != nil { @@ -61,8 +57,9 @@ func ServeInterface(c wrpc.Client, h Handler) (stop func() error, err error) { return fmt.Errorf("failed to write result value 0: %w", err) } slog.DebugContext(ctx, "transmitting `wrpc-examples:hello/handler.hello` result") - if err := tx.Transmit(context.Background(), buf.Bytes()); err != nil { - return fmt.Errorf("failed to transmit result: %w", err) + _, err = w.Write(buf.Bytes()) + if err != nil { + return fmt.Errorf("failed to write result: %w", err) } return nil }) diff --git a/examples/go/keyvalue-server/bindings/exports/wrpc/keyvalue/store/bindings.wrpc.go b/examples/go/keyvalue-server/bindings/exports/wrpc/keyvalue/store/bindings.wrpc.go index 12ac8b291..c7375be30 100644 --- a/examples/go/keyvalue-server/bindings/exports/wrpc/keyvalue/store/bindings.wrpc.go +++ b/examples/go/keyvalue-server/bindings/exports/wrpc/keyvalue/store/bindings.wrpc.go @@ -96,7 +96,7 @@ func (v *Error) WriteTo(w wrpc.ByteWriter) error { if err := func(v uint8, w wrpc.ByteWriter) error { b := make([]byte, 2) i := binary.PutUvarint(b, uint64(v)) - slog.Debug("writing u8") + slog.Debug("writing u8 discriminant") _, err := w.Write(b[:i]) return err }(uint8(v.discriminant), w); err != nil { @@ -144,24 +144,24 @@ func ReadError(r wrpc.ByteReader) (*Error, error) { var x uint8 var s uint for i := 0; i < 2; i++ { - slog.Debug("reading `uint8` byte", "i", i) + slog.Debug("reading u8 discriminant byte", "i", i) b, err := r.ReadByte() if err != nil { if i > 0 && err == io.EOF { err = io.ErrUnexpectedEOF } - return x, fmt.Errorf("failed to read `uint8` byte: %w", err) + return x, fmt.Errorf("failed to read u8 discriminant byte: %w", err) } if b < 0x80 { if i == 2 && b > 1 { - return x, errors.New("varint overflows a 8-bit integer") + return x, errors.New("discriminant overflows a 8-bit integer") } return x | uint8(b)< 0 && err == io.EOF { err = io.ErrUnexpectedEOF } - return x, fmt.Errorf("failed to read `uint64` byte: %w", err) + return x, fmt.Errorf("failed to read u64 byte: %w", err) } if b < 0x80 { if i == 9 && b > 1 { @@ -499,26 +499,7 @@ func ServeInterface(c wrpc.Client, h Handler) (stop func() error, err error) { } return nil } - stop0, err := c.Serve("wrpc:keyvalue/store@0.2.0-draft", "get", func(ctx context.Context, buffer []byte, tx wrpc.Transmitter, inv wrpc.IncomingInvocation) error { - slog.DebugContext(ctx, "subscribing for `wrpc:keyvalue/store@0.2.0-draft.get` parameters") - - payload := make(chan []byte) - stop, err := inv.Subscribe(func(ctx context.Context, buf []byte) { - payload <- buf - }) - if err != nil { - return err - } - defer func() { - if err := stop(); err != nil { - slog.ErrorContext(ctx, "failed to stop parameter subscription", "err", err) - } - }() - slog.DebugContext(ctx, "accepting handshake") - if err := inv.Accept(ctx, nil); err != nil { - return fmt.Errorf("failed to complete handshake: %w", err) - } - r := wrpc.NewChanReader(ctx, payload, buffer) + stop0, err := c.Serve("wrpc:keyvalue/store@0.2.0-draft", "get", func(ctx context.Context, w wrpc.IndexWriter, r wrpc.IndexReader, errCh <-chan error) error { slog.DebugContext(ctx, "reading parameter", "i", 0) p0, err := func(r wrpc.ByteReader) (string, error) { var x uint32 @@ -554,7 +535,7 @@ func ServeInterface(c wrpc.Client, h Handler) (stop func() error, err error) { return "", errors.New("string length overflows a 32-bit integer") }(r) if err != nil { - return fmt.Errorf("failed to read parameter 0") + return fmt.Errorf("failed to read parameter 0: %w", err) } slog.DebugContext(ctx, "reading parameter", "i", 1) p1, err := func(r wrpc.ByteReader) (string, error) { @@ -591,7 +572,7 @@ func ServeInterface(c wrpc.Client, h Handler) (stop func() error, err error) { return "", errors.New("string length overflows a 32-bit integer") }(r) if err != nil { - return fmt.Errorf("failed to read parameter 1") + return fmt.Errorf("failed to read parameter 1: %w", err) } slog.DebugContext(ctx, "calling `wrpc:keyvalue/store@0.2.0-draft.get` handler") r0, err := h.Get(ctx, p0, p1) @@ -673,8 +654,9 @@ func ServeInterface(c wrpc.Client, h Handler) (stop func() error, err error) { return fmt.Errorf("failed to write result value 0: %w", err) } slog.DebugContext(ctx, "transmitting `wrpc:keyvalue/store@0.2.0-draft.get` result") - if err := tx.Transmit(context.Background(), buf.Bytes()); err != nil { - return fmt.Errorf("failed to transmit result: %w", err) + _, err = w.Write(buf.Bytes()) + if err != nil { + return fmt.Errorf("failed to write result: %w", err) } return nil }) @@ -682,26 +664,7 @@ func ServeInterface(c wrpc.Client, h Handler) (stop func() error, err error) { return nil, fmt.Errorf("failed to serve `wrpc:keyvalue/store@0.2.0-draft.get`: %w", err) } stops = append(stops, stop0) - stop1, err := c.Serve("wrpc:keyvalue/store@0.2.0-draft", "set", func(ctx context.Context, buffer []byte, tx wrpc.Transmitter, inv wrpc.IncomingInvocation) error { - slog.DebugContext(ctx, "subscribing for `wrpc:keyvalue/store@0.2.0-draft.set` parameters") - - payload := make(chan []byte) - stop, err := inv.Subscribe(func(ctx context.Context, buf []byte) { - payload <- buf - }) - if err != nil { - return err - } - defer func() { - if err := stop(); err != nil { - slog.ErrorContext(ctx, "failed to stop parameter subscription", "err", err) - } - }() - slog.DebugContext(ctx, "accepting handshake") - if err := inv.Accept(ctx, nil); err != nil { - return fmt.Errorf("failed to complete handshake: %w", err) - } - r := wrpc.NewChanReader(ctx, payload, buffer) + stop1, err := c.Serve("wrpc:keyvalue/store@0.2.0-draft", "set", func(ctx context.Context, w wrpc.IndexWriter, r wrpc.IndexReader, errCh <-chan error) error { slog.DebugContext(ctx, "reading parameter", "i", 0) p0, err := func(r wrpc.ByteReader) (string, error) { var x uint32 @@ -737,7 +700,7 @@ func ServeInterface(c wrpc.Client, h Handler) (stop func() error, err error) { return "", errors.New("string length overflows a 32-bit integer") }(r) if err != nil { - return fmt.Errorf("failed to read parameter 0") + return fmt.Errorf("failed to read parameter 0: %w", err) } slog.DebugContext(ctx, "reading parameter", "i", 1) p1, err := func(r wrpc.ByteReader) (string, error) { @@ -774,7 +737,7 @@ func ServeInterface(c wrpc.Client, h Handler) (stop func() error, err error) { return "", errors.New("string length overflows a 32-bit integer") }(r) if err != nil { - return fmt.Errorf("failed to read parameter 1") + return fmt.Errorf("failed to read parameter 1: %w", err) } slog.DebugContext(ctx, "reading parameter", "i", 2) p2, err := func(r wrpc.ByteReader) ([]uint8, error) { @@ -798,10 +761,10 @@ func ServeInterface(c wrpc.Client, h Handler) (stop func() error, err error) { for i := range vs { slog.Debug("reading list element", "i", i) vs[i], err = func(r wrpc.ByteReader) (uint8, error) { - slog.Debug("reading `u8` byte") + slog.Debug("reading u8 byte") v, err := r.ReadByte() if err != nil { - return 0, fmt.Errorf("failed to read `u8` byte: %w", err) + return 0, fmt.Errorf("failed to read u8 byte: %w", err) } return v, nil }(r) @@ -817,7 +780,7 @@ func ServeInterface(c wrpc.Client, h Handler) (stop func() error, err error) { return nil, errors.New("list length overflows a 32-bit integer") }(r) if err != nil { - return fmt.Errorf("failed to read parameter 2") + return fmt.Errorf("failed to read parameter 2: %w", err) } slog.DebugContext(ctx, "calling `wrpc:keyvalue/store@0.2.0-draft.set` handler") r0, err := h.Set(ctx, p0, p1, p2) @@ -853,8 +816,9 @@ func ServeInterface(c wrpc.Client, h Handler) (stop func() error, err error) { return fmt.Errorf("failed to write result value 0: %w", err) } slog.DebugContext(ctx, "transmitting `wrpc:keyvalue/store@0.2.0-draft.set` result") - if err := tx.Transmit(context.Background(), buf.Bytes()); err != nil { - return fmt.Errorf("failed to transmit result: %w", err) + _, err = w.Write(buf.Bytes()) + if err != nil { + return fmt.Errorf("failed to write result: %w", err) } return nil }) @@ -862,26 +826,7 @@ func ServeInterface(c wrpc.Client, h Handler) (stop func() error, err error) { return nil, fmt.Errorf("failed to serve `wrpc:keyvalue/store@0.2.0-draft.set`: %w", err) } stops = append(stops, stop1) - stop2, err := c.Serve("wrpc:keyvalue/store@0.2.0-draft", "delete", func(ctx context.Context, buffer []byte, tx wrpc.Transmitter, inv wrpc.IncomingInvocation) error { - slog.DebugContext(ctx, "subscribing for `wrpc:keyvalue/store@0.2.0-draft.delete` parameters") - - payload := make(chan []byte) - stop, err := inv.Subscribe(func(ctx context.Context, buf []byte) { - payload <- buf - }) - if err != nil { - return err - } - defer func() { - if err := stop(); err != nil { - slog.ErrorContext(ctx, "failed to stop parameter subscription", "err", err) - } - }() - slog.DebugContext(ctx, "accepting handshake") - if err := inv.Accept(ctx, nil); err != nil { - return fmt.Errorf("failed to complete handshake: %w", err) - } - r := wrpc.NewChanReader(ctx, payload, buffer) + stop2, err := c.Serve("wrpc:keyvalue/store@0.2.0-draft", "delete", func(ctx context.Context, w wrpc.IndexWriter, r wrpc.IndexReader, errCh <-chan error) error { slog.DebugContext(ctx, "reading parameter", "i", 0) p0, err := func(r wrpc.ByteReader) (string, error) { var x uint32 @@ -917,7 +862,7 @@ func ServeInterface(c wrpc.Client, h Handler) (stop func() error, err error) { return "", errors.New("string length overflows a 32-bit integer") }(r) if err != nil { - return fmt.Errorf("failed to read parameter 0") + return fmt.Errorf("failed to read parameter 0: %w", err) } slog.DebugContext(ctx, "reading parameter", "i", 1) p1, err := func(r wrpc.ByteReader) (string, error) { @@ -954,7 +899,7 @@ func ServeInterface(c wrpc.Client, h Handler) (stop func() error, err error) { return "", errors.New("string length overflows a 32-bit integer") }(r) if err != nil { - return fmt.Errorf("failed to read parameter 1") + return fmt.Errorf("failed to read parameter 1: %w", err) } slog.DebugContext(ctx, "calling `wrpc:keyvalue/store@0.2.0-draft.delete` handler") r0, err := h.Delete(ctx, p0, p1) @@ -990,8 +935,9 @@ func ServeInterface(c wrpc.Client, h Handler) (stop func() error, err error) { return fmt.Errorf("failed to write result value 0: %w", err) } slog.DebugContext(ctx, "transmitting `wrpc:keyvalue/store@0.2.0-draft.delete` result") - if err := tx.Transmit(context.Background(), buf.Bytes()); err != nil { - return fmt.Errorf("failed to transmit result: %w", err) + _, err = w.Write(buf.Bytes()) + if err != nil { + return fmt.Errorf("failed to write result: %w", err) } return nil }) @@ -999,26 +945,7 @@ func ServeInterface(c wrpc.Client, h Handler) (stop func() error, err error) { return nil, fmt.Errorf("failed to serve `wrpc:keyvalue/store@0.2.0-draft.delete`: %w", err) } stops = append(stops, stop2) - stop3, err := c.Serve("wrpc:keyvalue/store@0.2.0-draft", "exists", func(ctx context.Context, buffer []byte, tx wrpc.Transmitter, inv wrpc.IncomingInvocation) error { - slog.DebugContext(ctx, "subscribing for `wrpc:keyvalue/store@0.2.0-draft.exists` parameters") - - payload := make(chan []byte) - stop, err := inv.Subscribe(func(ctx context.Context, buf []byte) { - payload <- buf - }) - if err != nil { - return err - } - defer func() { - if err := stop(); err != nil { - slog.ErrorContext(ctx, "failed to stop parameter subscription", "err", err) - } - }() - slog.DebugContext(ctx, "accepting handshake") - if err := inv.Accept(ctx, nil); err != nil { - return fmt.Errorf("failed to complete handshake: %w", err) - } - r := wrpc.NewChanReader(ctx, payload, buffer) + stop3, err := c.Serve("wrpc:keyvalue/store@0.2.0-draft", "exists", func(ctx context.Context, w wrpc.IndexWriter, r wrpc.IndexReader, errCh <-chan error) error { slog.DebugContext(ctx, "reading parameter", "i", 0) p0, err := func(r wrpc.ByteReader) (string, error) { var x uint32 @@ -1054,7 +981,7 @@ func ServeInterface(c wrpc.Client, h Handler) (stop func() error, err error) { return "", errors.New("string length overflows a 32-bit integer") }(r) if err != nil { - return fmt.Errorf("failed to read parameter 0") + return fmt.Errorf("failed to read parameter 0: %w", err) } slog.DebugContext(ctx, "reading parameter", "i", 1) p1, err := func(r wrpc.ByteReader) (string, error) { @@ -1091,7 +1018,7 @@ func ServeInterface(c wrpc.Client, h Handler) (stop func() error, err error) { return "", errors.New("string length overflows a 32-bit integer") }(r) if err != nil { - return fmt.Errorf("failed to read parameter 1") + return fmt.Errorf("failed to read parameter 1: %w", err) } slog.DebugContext(ctx, "calling `wrpc:keyvalue/store@0.2.0-draft.exists` handler") r0, err := h.Exists(ctx, p0, p1) @@ -1138,8 +1065,9 @@ func ServeInterface(c wrpc.Client, h Handler) (stop func() error, err error) { return fmt.Errorf("failed to write result value 0: %w", err) } slog.DebugContext(ctx, "transmitting `wrpc:keyvalue/store@0.2.0-draft.exists` result") - if err := tx.Transmit(context.Background(), buf.Bytes()); err != nil { - return fmt.Errorf("failed to transmit result: %w", err) + _, err = w.Write(buf.Bytes()) + if err != nil { + return fmt.Errorf("failed to write result: %w", err) } return nil }) @@ -1147,26 +1075,7 @@ func ServeInterface(c wrpc.Client, h Handler) (stop func() error, err error) { return nil, fmt.Errorf("failed to serve `wrpc:keyvalue/store@0.2.0-draft.exists`: %w", err) } stops = append(stops, stop3) - stop4, err := c.Serve("wrpc:keyvalue/store@0.2.0-draft", "list-keys", func(ctx context.Context, buffer []byte, tx wrpc.Transmitter, inv wrpc.IncomingInvocation) error { - slog.DebugContext(ctx, "subscribing for `wrpc:keyvalue/store@0.2.0-draft.list-keys` parameters") - - payload := make(chan []byte) - stop, err := inv.Subscribe(func(ctx context.Context, buf []byte) { - payload <- buf - }) - if err != nil { - return err - } - defer func() { - if err := stop(); err != nil { - slog.ErrorContext(ctx, "failed to stop parameter subscription", "err", err) - } - }() - slog.DebugContext(ctx, "accepting handshake") - if err := inv.Accept(ctx, nil); err != nil { - return fmt.Errorf("failed to complete handshake: %w", err) - } - r := wrpc.NewChanReader(ctx, payload, buffer) + stop4, err := c.Serve("wrpc:keyvalue/store@0.2.0-draft", "list-keys", func(ctx context.Context, w wrpc.IndexWriter, r wrpc.IndexReader, errCh <-chan error) error { slog.DebugContext(ctx, "reading parameter", "i", 0) p0, err := func(r wrpc.ByteReader) (string, error) { var x uint32 @@ -1202,7 +1111,7 @@ func ServeInterface(c wrpc.Client, h Handler) (stop func() error, err error) { return "", errors.New("string length overflows a 32-bit integer") }(r) if err != nil { - return fmt.Errorf("failed to read parameter 0") + return fmt.Errorf("failed to read parameter 0: %w", err) } slog.DebugContext(ctx, "reading parameter", "i", 1) p1, err := func(r wrpc.ByteReader) (*uint64, error) { @@ -1220,13 +1129,13 @@ func ServeInterface(c wrpc.Client, h Handler) (stop func() error, err error) { var x uint64 var s uint for i := 0; i < 10; i++ { - slog.Debug("reading `uint64` byte", "i", i) + slog.Debug("reading u64 byte", "i", i) b, err := r.ReadByte() if err != nil { if i > 0 && err == io.EOF { err = io.ErrUnexpectedEOF } - return x, fmt.Errorf("failed to read `uint64` byte: %w", err) + return x, fmt.Errorf("failed to read u64 byte: %w", err) } if b < 0x80 { if i == 9 && b > 1 { @@ -1248,7 +1157,7 @@ func ServeInterface(c wrpc.Client, h Handler) (stop func() error, err error) { } }(r) if err != nil { - return fmt.Errorf("failed to read parameter 1") + return fmt.Errorf("failed to read parameter 1: %w", err) } slog.DebugContext(ctx, "calling `wrpc:keyvalue/store@0.2.0-draft.list-keys` handler") r0, err := h.ListKeys(ctx, p0, p1) @@ -1288,8 +1197,9 @@ func ServeInterface(c wrpc.Client, h Handler) (stop func() error, err error) { return fmt.Errorf("failed to write result value 0: %w", err) } slog.DebugContext(ctx, "transmitting `wrpc:keyvalue/store@0.2.0-draft.list-keys` result") - if err := tx.Transmit(context.Background(), buf.Bytes()); err != nil { - return fmt.Errorf("failed to transmit result: %w", err) + _, err = w.Write(buf.Bytes()) + if err != nil { + return fmt.Errorf("failed to write result: %w", err) } return nil }) diff --git a/go/reader.go b/go/chan_reader.go similarity index 100% rename from go/reader.go rename to go/chan_reader.go diff --git a/go/client.go b/go/client.go deleted file mode 100644 index 7e20005ea..000000000 --- a/go/client.go +++ /dev/null @@ -1,76 +0,0 @@ -package wrpc - -import ( - "context" - "io" -) - -type ErrorSubscriber interface { - SubscribeError(func(context.Context, []byte)) (func() error, error) -} - -type IncomingInvocation interface { - Subscriber - ErrorSubscriber - - Accept(context.Context, []byte) error -} - -type SubscribePath []*uint32 - -func NewSubscribePath(ps ...*uint32) SubscribePath { - return SubscribePath(ps) -} - -func (p SubscribePath) push(v *uint32) SubscribePath { - return SubscribePath(append(append(make(SubscribePath, 0, len(p)+1), p...), v)) -} - -func (p SubscribePath) Index(i uint32) SubscribePath { - return p.push(&i) -} - -func (p SubscribePath) Wildcard() SubscribePath { - return p.push(nil) -} - -func (p SubscribePath) Parent() (SubscribePath, bool) { - n := len(p) - if n == 0 { - return nil, false - } - return SubscribePath(p[:n-1]), true -} - -type Index[T any] interface { - Index(path ...uint32) (T, error) -} - -type IndexReader interface { - io.Reader - io.ByteReader - - Index[IndexReader] -} - -type ReaderFromIndex interface { - ReadFromIndex(IndexReader) error -} - -type IndexWriter interface { - io.Writer - io.ByteWriter - - Index[IndexWriter] -} - -type WriterToIndex interface { - WriteToIndex(IndexWriter) error -} - -type Client interface { - Invoke(ctx context.Context, instance string, name string, f func(IndexWriter, IndexReader, <-chan error) error, subs ...SubscribePath) error - ServeIndex(instance string, name string, f func(context.Context, IndexWriter, IndexReader, <-chan error) error, subs ...SubscribePath) (func() error, error) - - Serve(instance string, name string, f func(context.Context, []byte, Transmitter, IncomingInvocation) error) (func() error, error) -} diff --git a/go/codec.go b/go/codec.go deleted file mode 100644 index 77e607c11..000000000 --- a/go/codec.go +++ /dev/null @@ -1,963 +0,0 @@ -package wrpc - -import ( - "bytes" - "context" - "encoding/binary" - "errors" - "fmt" - "io" - "log/slog" - "math" -) - -func Slice[T any](v []T) *[]T { - if v == nil { - return nil - } - return &v -} - -type Tuple2[T0, T1 any] struct { - V0 T0 - V1 T1 -} - -func (v *Tuple2[T0, T1]) WriteTo(w ByteWriter, f0 func(T0, ByteWriter) error, f1 func(T1, ByteWriter) error) error { - slog.Debug("writing tuple element 0") - if err := f0(v.V0, w); err != nil { - return fmt.Errorf("failed to write tuple element 0: %w", err) - } - slog.Debug("writing tuple element 1") - if err := f1(v.V1, w); err != nil { - return fmt.Errorf("failed to write tuple element 1: %w", err) - } - return nil -} - -type Tuple3[T0, T1, T2 any] struct { - V0 T0 - V1 T1 - V2 T2 -} - -func (v *Tuple3[T0, T1, T2]) WriteTo(w ByteWriter, f0 func(T0, ByteWriter) error, f1 func(T1, ByteWriter) error, f2 func(T2, ByteWriter) error) error { - slog.Debug("writing tuple element 0") - if err := f0(v.V0, w); err != nil { - return fmt.Errorf("failed to write tuple element 0: %w", err) - } - slog.Debug("writing tuple element 1") - if err := f1(v.V1, w); err != nil { - return fmt.Errorf("failed to write tuple element 1: %w", err) - } - slog.Debug("writing tuple element 2") - if err := f2(v.V2, w); err != nil { - return fmt.Errorf("failed to write tuple element 2: %w", err) - } - return nil -} - -type Tuple4[T0, T1, T2, T3 any] struct { - V0 T0 - V1 T1 - V2 T2 - V3 T3 -} - -type Tuple5[T0, T1, T2, T3, T4 any] struct { - V0 T0 - V1 T1 - V2 T2 - V3 T3 - V4 T4 -} - -type Tuple6[T0, T1, T2, T3, T4, T5 any] struct { - V0 T0 - V1 T1 - V2 T2 - V3 T3 - V4 T4 - V5 T5 -} - -type Tuple7[T0, T1, T2, T3, T4, T5, T6 any] struct { - V0 T0 - V1 T1 - V2 T2 - V3 T3 - V4 T4 - V5 T5 - V6 T6 -} - -type Tuple8[T0, T1, T2, T3, T4, T5, T6, T7 any] struct { - V0 T0 - V1 T1 - V2 T2 - V3 T3 - V4 T4 - V5 T5 - V6 T6 - V7 T7 -} - -type Tuple9[T0, T1, T2, T3, T4, T5, T6, T7, T8 any] struct { - V0 T0 - V1 T1 - V2 T2 - V3 T3 - V4 T4 - V5 T5 - V6 T6 - V7 T7 - V8 T8 -} - -type Tuple10[T0, T1, T2, T3, T4, T5, T6, T7, T8, T9 any] struct { - V0 T0 - V1 T1 - V2 T2 - V3 T3 - V4 T4 - V5 T5 - V6 T6 - V7 T7 - V8 T8 - V9 T9 -} - -type Tuple11[T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10 any] struct { - V0 T0 - V1 T1 - V2 T2 - V3 T3 - V4 T4 - V5 T5 - V6 T6 - V7 T7 - V8 T8 - V9 T9 - V10 T10 -} - -type Tuple12[T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11 any] struct { - V0 T0 - V1 T1 - V2 T2 - V3 T3 - V4 T4 - V5 T5 - V6 T6 - V7 T7 - V8 T8 - V9 T9 - V10 T10 - V11 T11 -} - -type Tuple13[T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12 any] struct { - V0 T0 - V1 T1 - V2 T2 - V3 T3 - V4 T4 - V5 T5 - V6 T6 - V7 T7 - V8 T8 - V9 T9 - V10 T10 - V11 T11 - V12 T12 -} - -type Tuple14[T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13 any] struct { - V0 T0 - V1 T1 - V2 T2 - V3 T3 - V4 T4 - V5 T5 - V6 T6 - V7 T7 - V8 T8 - V9 T9 - V10 T10 - V11 T11 - V12 T12 - V13 T13 -} - -type Tuple15[T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14 any] struct { - V0 T0 - V1 T1 - V2 T2 - V3 T3 - V4 T4 - V5 T5 - V6 T6 - V7 T7 - V8 T8 - V9 T9 - V10 T10 - V11 T11 - V12 T12 - V13 T13 - V14 T14 -} - -type Tuple16[T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15 any] struct { - V0 T0 - V1 T1 - V2 T2 - V3 T3 - V4 T4 - V5 T5 - V6 T6 - V7 T7 - V8 T8 - V9 T9 - V10 T10 - V11 T11 - V12 T12 - V13 T13 - V14 T14 - V15 T15 -} - -type Result[Ok, Err any] struct { - Ok *Ok - Err *Err -} - -func Ok[Err, Ok any](v Ok) *Result[Ok, Err] { - return &Result[Ok, Err]{Ok: &v} -} - -func Err[Ok, Err any](v Err) *Result[Ok, Err] { - return &Result[Ok, Err]{Err: &v} -} - -func (v *Result[Ok, Err]) WriteTo(w ByteWriter, fOk func(*Ok, ByteWriter) error, fErr func(*Err, ByteWriter) error) error { - switch { - case v.Ok == nil && v.Err == nil: - return errors.New("both result variants cannot be nil") - case v.Ok != nil && v.Err != nil: - return errors.New("exactly one result variant must non-nil") - case v.Ok != nil: - slog.Debug("writing `result::ok` status byte") - if err := w.WriteByte(0); err != nil { - return fmt.Errorf("failed to write `result::ok` status byte: %w", err) - } - slog.Debug("writing `result::ok` payload") - if err := fOk(v.Ok, w); err != nil { - return fmt.Errorf("failed to write `result::ok` payload: %w", err) - } - return nil - default: - slog.Debug("writing `result::err` status byte") - if err := w.WriteByte(1); err != nil { - return fmt.Errorf("failed to write `result::err` status byte: %w", err) - } - slog.Debug("writing `result::err` payload") - if err := fErr(v.Err, w); err != nil { - return fmt.Errorf("failed to write `result::err` payload: %w", err) - } - return nil - } -} - -type ByteStreamWriter struct { - r io.Reader - chunk []byte -} - -func (v *ByteStreamWriter) WriteTo(w ByteWriter) error { - if len(v.chunk) == 0 { - v.chunk = make([]byte, 8096) - } - for { - var end bool - slog.Debug("reading pending byte stream contents") - n, err := v.r.Read(v.chunk) - if err == io.EOF { - end = true - slog.Debug("pending byte stream reached EOF") - } else if err != nil { - return fmt.Errorf("failed to read pending byte stream chunk: %w", err) - } - if n > math.MaxUint32 { - return fmt.Errorf("pending byte stream chunk length of %d overflows a 32-bit integer", n) - } - slog.Debug("writing pending byte stream chunk length", "len", n) - if err := WriteUint32(uint32(n), w); err != nil { - return fmt.Errorf("failed to write pending byte stream chunk length of %d: %w", n, err) - } - _, err = w.Write(v.chunk[:n]) - if err != nil { - return fmt.Errorf("failed to write pending byte stream chunk contents: %w", err) - } - if end { - if err := w.WriteByte(0); err != nil { - return fmt.Errorf("failed to write pending byte stream end byte: %w", err) - } - return nil - } - } -} - -type ByteWriter interface { - io.ByteWriter - io.Writer -} - -type ByteReader interface { - io.ByteReader - io.Reader -} - -type Ready interface { - Ready() bool -} - -type Receiver[T any] interface { - Receive() (T, error) -} - -type ReadyReceiver[T any] interface { - Receiver[T] - Ready -} - -type ReadyReader interface { - io.Reader - Ready -} - -type ReadyByteReader interface { - ByteReader - Ready -} - -type byteReader struct { - *bytes.Reader -} - -func (*byteReader) Ready() bool { - return true -} - -type PendingByteReader struct { - ByteReader -} - -func (*PendingByteReader) Ready() bool { - return false -} - -func NewPendingByteReader(r ByteReader) *PendingByteReader { - return &PendingByteReader{r} -} - -type ready[T any] struct { - v T -} - -func (r *ready[T]) Receive() (T, error) { - return r.v, nil -} - -func (*ready[T]) Ready() bool { - return true -} - -type byteStreamReceiver struct { - ByteReader - buffered uint32 -} - -func (r *byteStreamReceiver) Read(p []byte) (int, error) { - n := r.buffered - if n == 0 { - slog.Debug("reading pending byte stream chunk length") - var err error - n, err = ReadUint32(r) - if err != nil { - return 0, fmt.Errorf("failed to read pending byte stream chunk length: %w", err) - } - if n == 0 { - return 0, io.EOF - } - } - if len(p) > int(n) { - p = p[:n] - } - slog.Debug("reading pending byte stream chunk contents", "len", n) - rn, err := r.Read(p) - if err != nil { - return rn, fmt.Errorf("failed to read pending stream chunk bytes: %w", err) - } - if rn > int(n) { - return rn, errors.New("read more bytes than requested") - } - r.buffered = n - uint32(rn) - return rn, nil -} - -func (*byteStreamReceiver) Ready() bool { - return false -} - -type decodeReceiver[T any] struct { - r ByteReader - decode func(ByteReader) (T, error) -} - -func (r *decodeReceiver[T]) Receive() (T, error) { - return r.decode(r.r) -} - -func (*decodeReceiver[T]) Ready() bool { - return false -} - -func PutUint16(buf []byte, x uint16) { - binary.PutUvarint(buf, uint64(x)) -} - -func AppendUint16(buf []byte, x uint16) []byte { - return binary.AppendUvarint(buf, uint64(x)) -} - -func PutUint32(buf []byte, x uint32) { - binary.PutUvarint(buf, uint64(x)) -} - -func AppendUint32(buf []byte, x uint32) []byte { - return binary.AppendUvarint(buf, uint64(x)) -} - -func PutUint64(buf []byte, x uint64) { - binary.PutUvarint(buf, x) -} - -func AppendUint64(buf []byte, x uint64) []byte { - return binary.AppendUvarint(buf, x) -} - -func AppendFloat32(buf []byte, x float32) []byte { - return binary.LittleEndian.AppendUint32(buf, math.Float32bits(x)) -} - -func PutFloat32(buf []byte, x float32) { - binary.LittleEndian.PutUint32(buf, math.Float32bits(x)) -} - -func AppendFloat64(buf []byte, x float64) []byte { - return binary.LittleEndian.AppendUint64(buf, math.Float64bits(x)) -} - -func PutFloat64(buf []byte, x float64) { - binary.LittleEndian.PutUint64(buf, math.Float64bits(x)) -} - -func WriteUint8(v uint8, w ByteWriter) error { - return w.WriteByte(v) -} - -func WriteUint16(v uint16, w ByteWriter) error { - b := make([]byte, binary.MaxVarintLen16) - i := binary.PutUvarint(b, uint64(v)) - _, err := w.Write(b[:i]) - return err -} - -func WriteUint32(v uint32, w ByteWriter) error { - b := make([]byte, binary.MaxVarintLen32) - i := binary.PutUvarint(b, uint64(v)) - _, err := w.Write(b[:i]) - return err -} - -func WriteUint64(v uint64, w ByteWriter) error { - b := make([]byte, binary.MaxVarintLen64) - i := binary.PutUvarint(b, uint64(v)) - _, err := w.Write(b[:i]) - return err -} - -func WriteString(v string, w ByteWriter) error { - n := len(v) - if n > math.MaxUint32 { - return fmt.Errorf("string byte length of %d overflows a 32-bit integer", n) - } - slog.Debug("writing string byte length", "len", n) - if err := WriteUint32(uint32(n), w); err != nil { - return fmt.Errorf("failed to write string length of %d: %w", n, err) - } - slog.Debug("writing string bytes") - _, err := w.Write([]byte(v)) - if err != nil { - return fmt.Errorf("failed to write string bytes: %w", err) - } - return nil -} - -func WriteByteList(v []byte, w ByteWriter) error { - n := len(v) - if n > math.MaxUint32 { - return fmt.Errorf("byte list length of %d overflows a 32-bit integer", n) - } - slog.Debug("writing byte list length", "len", n) - if err := WriteUint32(uint32(n), w); err != nil { - return fmt.Errorf("failed to write list length of %d: %w", n, err) - } - slog.Debug("writing byte list contents") - _, err := w.Write(v) - if err != nil { - return fmt.Errorf("failed to write byte list contents: %w", err) - } - return nil -} - -func WriteList[T any](v []T, w ByteWriter, f func(T, ByteWriter) error) error { - n := len(v) - if n > math.MaxUint32 { - return fmt.Errorf("list length of %d overflows a 32-bit integer", n) - } - slog.Debug("writing list length", "len", n) - if err := WriteUint32(uint32(n), w); err != nil { - return fmt.Errorf("failed to write list length of %d: %w", n, err) - } - for i := range v { - slog.Debug("writing list element", "index", i) - if err := f(v[i], w); err != nil { - return fmt.Errorf("failed to write list element %d: %w", i, err) - } - } - return nil -} - -func WriteOption[T any](v *T, w ByteWriter, f func(T, ByteWriter) error) error { - if v == nil { - slog.Debug("writing `option::none` status byte") - if err := w.WriteByte(0); err != nil { - return fmt.Errorf("failed to write `option::none` byte: %w", err) - } - return nil - } - slog.Debug("writing `option::some` status byte") - if err := w.WriteByte(1); err != nil { - return fmt.Errorf("failed to write `option::some` status byte: %w", err) - } - slog.Debug("writing `option::some` payload") - if err := f(*v, w); err != nil { - return fmt.Errorf("failed to write `option::some` payload: %w", err) - } - return nil -} - -func WriteByteStream(r ReadyReader, w ByteWriter, chunk []byte, path ...uint32) (*ByteStreamWriter, error) { - if r.Ready() { - slog.Debug("writing byte stream `stream::ready` status byte") - if err := w.WriteByte(1); err != nil { - return nil, fmt.Errorf("failed to write `stream::ready` byte: %w", err) - } - var buf bytes.Buffer - slog.Debug("reading ready byte stream contents") - n, err := io.CopyBuffer(&buf, r, chunk) - if err != nil { - return nil, fmt.Errorf("failed to read ready byte stream contents: %w", err) - } - slog.Debug("writing ready byte stream contents", "len", n) - return nil, WriteByteList(buf.Bytes(), w) - } - slog.Debug("writing byte stream `stream::pending` status byte") - if err := w.WriteByte(0); err != nil { - return nil, fmt.Errorf("failed to write `stream::pending` byte: %w", err) - } - return &ByteStreamWriter{r, chunk}, nil -} - -// ReadUint64 reads an encoded uint64 from r and returns it. -// The error is [io.EOF] only if no bytes were read. -// If an [io.EOF] happens after reading some but not all the bytes, -// ReadUvarint returns [io.ErrUnexpectedEOF]. -func ReadUint64(r ByteReader) (uint64, error) { - return binary.ReadUvarint(r) -} - -// ReadOptionStatus reads a single byte from `r` and returns: -// - `true` for `option::some` -// - `false` for `option::none` -func ReadOptionStatus(r ByteReader) (bool, error) { - status, err := r.ReadByte() - if err != nil { - return false, fmt.Errorf("failed to read `option` status byte: %w", err) - } - switch status { - case 0: - return false, nil - case 1: - return true, nil - default: - return false, fmt.Errorf("invalid `option` status byte %d", status) - } -} - -// ReadOption reads an option from `r` -func ReadOption[T any](r ByteReader, f func(ByteReader) (T, error)) (*T, error) { - slog.Debug("reading option status byte") - ok, err := ReadOptionStatus(r) - if err != nil { - return nil, err - } - if !ok { - return nil, nil - } - slog.Debug("reading `option::some` payload") - v, err := f(r) - if err != nil { - return nil, fmt.Errorf("failed to read `option::some` value: %w", err) - } - return &v, nil -} - -// ReadFlatOption reads an option from `r` without pointer indirection -func ReadFlatOption[T any](r ByteReader, f func(ByteReader) (T, error)) (v T, err error) { - slog.Debug("reading option status byte") - ok, err := ReadOptionStatus(r) - if err != nil { - return v, err - } - if !ok { - return v, err - } - slog.Debug("reading `option::some` payload") - v, err = f(r) - if err != nil { - return v, fmt.Errorf("failed to read `option::some` value: %w", err) - } - return v, nil -} - -func FlattenOption[T any](v **T) *T { - if v == nil { - return nil - } else { - return *v - } -} - -// ReadResultStatus reads a single byte from `r` and returns: -// - `true` for `result::ok` -// - `false` for `result::err` -func ReadResultStatus(r ByteReader) (bool, error) { - status, err := r.ReadByte() - if err != nil { - return false, fmt.Errorf("failed to read `result` status byte: %w", err) - } - switch status { - case 0: - return true, nil - case 1: - return false, nil - default: - return false, fmt.Errorf("invalid `result` status byte %d", status) - } -} - -// ReadResult reads a single byte from `r` -func ReadResult[T, U any](r ByteReader, fOk func(ByteReader) (T, error), fErr func(ByteReader) (U, error)) (*Result[T, U], error) { - ok, err := ReadResultStatus(r) - if err != nil { - return nil, err - } - if !ok { - v, err := fErr(r) - if err != nil { - return nil, fmt.Errorf("failed to read `result::err` value: %w", err) - } - return &Result[T, U]{Err: &v}, nil - } - v, err := fOk(r) - if err != nil { - return nil, fmt.Errorf("failed to read `result::ok` value: %w", err) - } - return &Result[T, U]{Ok: &v}, nil -} - -// ReadString reads a string from `r` and returns it -func ReadString(r ByteReader) (string, error) { - slog.Debug("reading string length") - n, err := ReadUint32(r) - if err != nil { - return "", fmt.Errorf("failed to read string length: %w", err) - } - - b := make([]byte, int(n)) - slog.Debug("reading string bytes", "len", n) - rn, err := r.Read(b) - if err != nil { - return "", fmt.Errorf("failed to read string: %w", err) - } - if rn > int(n) { - return "", fmt.Errorf("invalid amount of string bytes read, expected %d, got %d", n, rn) - } - slog.Debug("read string bytes", "buf", b) - return string(b), nil -} - -// ReadByteList reads a []byte from `r` and returns it -func ReadByteList(r ByteReader) ([]byte, error) { - slog.Debug("reading byte list length") - n, err := ReadUint32(r) - if err != nil { - return nil, fmt.Errorf("failed to read list length: %w", err) - } - - b := make([]byte, n) - slog.Debug("reading bytes", "len", n) - rn, err := r.Read(b) - if err != nil { - return nil, fmt.Errorf("failed to read list bytes: %w", err) - } - if rn > int(n) { - return nil, fmt.Errorf("invalid amount of list bytes read, expected %d, got %d", n, rn) - } - return b, nil -} - -// ReadList reads a list from `r` and returns it -func ReadList[T any](r ByteReader, f func(ByteReader) (T, error)) ([]T, error) { - slog.Debug("reading list length") - n, err := ReadUint32(r) - if err != nil { - return nil, fmt.Errorf("failed to read list length: %w", err) - } - vs := make([]T, n) - slog.Debug("reading list elements", "len", n) - for i := range vs { - slog.Debug("reading list element", "index", i) - v, err := f(r) - if err != nil { - return nil, fmt.Errorf("failed to read list element %d: %w", i, err) - } - vs[i] = v - } - return vs, nil -} - -func ReadTuple2[T0, T1 any](r ByteReader, f0 func(ByteReader) (T0, error), f1 func(ByteReader) (T1, error)) (*Tuple2[T0, T1], error, -) { - v0, err := f0(r) - if err != nil { - return nil, fmt.Errorf("failed to read tuple element 0: %w", err) - } - v1, err := f1(r) - if err != nil { - return nil, fmt.Errorf("failed to read tuple element 1: %w", err) - } - return &Tuple2[T0, T1]{v0, v1}, nil -} - -func ReadTuple3[T0, T1, T2 any](r ByteReader, f0 func(ByteReader) (T0, error), f1 func(ByteReader) (T1, error), f2 func(ByteReader) (T2, error)) (*Tuple3[T0, T1, T2], error, -) { - v0, err := f0(r) - if err != nil { - return nil, fmt.Errorf("failed to read tuple element 0: %w", err) - } - v1, err := f1(r) - if err != nil { - return nil, fmt.Errorf("failed to read tuple element 1: %w", err) - } - v2, err := f2(r) - if err != nil { - return nil, fmt.Errorf("failed to read tuple element 2: %w", err) - } - return &Tuple3[T0, T1, T2]{v0, v1, v2}, nil -} - -// ReadStreamStatus reads a single byte from `r` and returns: -// - `true` if stream is "ready" -// - `false` if stream is "pending" -func ReadStreamStatus(r ByteReader) (bool, error) { - status, err := r.ReadByte() - if err != nil { - return false, fmt.Errorf("failed to read `stream` status byte: %w", err) - } - switch status { - case 0: - return false, nil - case 1: - return true, nil - default: - return false, fmt.Errorf("invalid `stream` status byte %d", status) - } -} - -// ReadByteStream reads a stream of bytes from `r` and `ch` -func ReadByteStream(r IndexReader, path ...uint32) (ReadyReader, error) { - slog.Debug("reading byte stream status byte") - ok, err := ReadStreamStatus(r) - if err != nil { - return nil, err - } - if !ok { - r, err = r.Index(path...) - if err != nil { - return nil, fmt.Errorf("failed to get byte stream reader: %w", err) - } - return &byteStreamReceiver{r, 0}, nil - } - slog.Debug("reading ready byte stream") - buf, err := ReadByteList(r) - if err != nil { - return nil, fmt.Errorf("failed to read bytes: %w", err) - } - slog.Debug("read ready byte stream", "len", len(buf)) - return &byteReader{bytes.NewReader(buf)}, nil -} - -// ReadStream reads a stream from `r` and `ch` -func ReadStream[T any](ctx context.Context, r ByteReader, ch <-chan []byte, f func(ByteReader) (T, error)) (ReadyReceiver[[]T], error) { - slog.DebugContext(ctx, "reading stream status byte") - ok, err := ReadStreamStatus(r) - if err != nil { - return nil, err - } - if !ok { - return &decodeReceiver[[]T]{&ChanReader{ctx, ch, nil}, func(r ByteReader) ([]T, error) { - n, err := ReadUint32(r) - if err != nil { - return nil, fmt.Errorf("failed to read pending stream chunk length: %w", err) - } - if n == 0 { - return nil, io.EOF - } - vs := make([]T, n) - for i := range vs { - v, err := f(r) - if err != nil { - return nil, fmt.Errorf("failed to read pending stream chunk element %d: %w", i, err) - } - vs[i] = v - } - return vs, nil - }}, nil - } - slog.DebugContext(ctx, "reading ready stream") - vs, err := ReadList(r, f) - if err != nil { - return nil, fmt.Errorf("failed to read ready stream: %w", err) - } - slog.DebugContext(ctx, "read ready stream", "len", len(vs)) - return &ready[[]T]{vs}, nil -} - -// ReadFutureStatus reads a single byte from `r` and returns: -// - `true` if future is "ready" -// - `false` if future is "pending" -func ReadFutureStatus(r ByteReader) (bool, error) { - status, err := r.ReadByte() - if err != nil { - return false, fmt.Errorf("failed to read `future` status byte: %w", err) - } - switch status { - case 0: - return false, nil - case 1: - return true, nil - default: - return false, fmt.Errorf("invalid `future` status byte %d", status) - } -} - -// ReadFuture reads a future from `r` and `ch` -func ReadFuture[T any](r IndexReader, f func(ByteReader) (T, error), path ...uint32) (ReadyReceiver[T], error) { - slog.Debug("reading future status byte") - ok, err := ReadFutureStatus(r) - if err != nil { - return nil, err - } - if !ok { - r, err = r.Index(path...) - if err != nil { - return nil, fmt.Errorf("failed to get future reader: %w", err) - } - return &decodeReceiver[T]{r, f}, nil - } - slog.Debug("reading ready future") - v, err := f(r) - if err != nil { - return nil, err - } - return &ready[T]{v}, nil -} - -// NOTE: Below is adapted from https://cs.opensource.google/go/go/+/refs/tags/go1.22.2:src/encoding/binary/varint.go;l=128-153 - -// maxVarintLenN is the maximum length of a varint-encoded N-bit integer. -const ( - maxVarintLen16 = 3 - maxVarintLen32 = 5 -) - -var errOverflow16 = errors.New("wrpc: varint overflows a 16-bit integer") - -// ReadUint16 reads an encoded uint16 from r and returns it. -// The error is [io.EOF] only if no bytes were read. -// If an [io.EOF] happens after reading some but not all the bytes, -// ReadUvarint returns [io.ErrUnexpectedEOF]. -func ReadUint16(r ByteReader) (uint16, error) { - var x uint16 - var s uint - for i := 0; i < maxVarintLen16; i++ { - b, err := r.ReadByte() - if err != nil { - if i > 0 && err == io.EOF { - err = io.ErrUnexpectedEOF - } - return x, err - } - if b < 0x80 { - if i == maxVarintLen16-1 && b > 1 { - return x, errOverflow16 - } - return x | uint16(b)< 0 && err == io.EOF { - err = io.ErrUnexpectedEOF - } - return x, err - } - if b < 0x80 { - if i == maxVarintLen32-1 && b > 1 { - return x, errOverflow32 - } - return x | uint32(b)< math.MaxUint32 { + return fmt.Errorf("byte list length of %d overflows a 32-bit integer", n) + } + slog.Debug("writing byte list length", "len", n) + if err := WriteUint32(uint32(n), w); err != nil { + return fmt.Errorf("failed to write list length of %d: %w", n, err) + } + slog.Debug("writing byte list contents") + _, err := w.Write(v) + if err != nil { + return fmt.Errorf("failed to write byte list contents: %w", err) + } + return nil +} + +func WriteList[T any](v []T, w ByteWriter, f func(T, ByteWriter) error) error { + n := len(v) + if n > math.MaxUint32 { + return fmt.Errorf("list length of %d overflows a 32-bit integer", n) + } + slog.Debug("writing list length", "len", n) + if err := WriteUint32(uint32(n), w); err != nil { + return fmt.Errorf("failed to write list length of %d: %w", n, err) + } + for i := range v { + slog.Debug("writing list element", "index", i) + if err := f(v[i], w); err != nil { + return fmt.Errorf("failed to write list element %d: %w", i, err) + } + } + return nil +} + +// ReadByteList reads a []byte from `r` and returns it +func ReadByteList(r ByteReader) ([]byte, error) { + slog.Debug("reading byte list length") + n, err := ReadUint32(r) + if err != nil { + return nil, fmt.Errorf("failed to read list length: %w", err) + } + + b := make([]byte, n) + slog.Debug("reading bytes", "len", n) + rn, err := r.Read(b) + if err != nil { + return nil, fmt.Errorf("failed to read list bytes: %w", err) + } + if rn > int(n) { + return nil, fmt.Errorf("invalid amount of list bytes read, expected %d, got %d", n, rn) + } + return b, nil +} + +// ReadList reads a list from `r` and returns it +func ReadList[T any](r ByteReader, f func(ByteReader) (T, error)) ([]T, error) { + slog.Debug("reading list length") + n, err := ReadUint32(r) + if err != nil { + return nil, fmt.Errorf("failed to read list length: %w", err) + } + vs := make([]T, n) + slog.Debug("reading list elements", "len", n) + for i := range vs { + slog.Debug("reading list element", "index", i) + v, err := f(r) + if err != nil { + return nil, fmt.Errorf("failed to read list element %d: %w", i, err) + } + vs[i] = v + } + return vs, nil +} diff --git a/go/nats/client.go b/go/nats/client.go index 0275696a7..c6373bc2c 100644 --- a/go/nats/client.go +++ b/go/nats/client.go @@ -89,58 +89,6 @@ func transmit(ctx context.Context, conn *nats.Conn, subject string, reply string return nil } -type invocation struct { - conn *nats.Conn - rx string - tx string -} - -func (inv *invocation) SubscribeError(f func(context.Context, []byte)) (func() error, error) { - sub, err := inv.conn.Subscribe(fmt.Sprintf("%s.error", inv.rx), func(m *nats.Msg) { - ctx := context.Background() - ctx = ContextWithHeader(ctx, m.Header) - f(ctx, m.Data) - }) - if err != nil { - return nil, fmt.Errorf("failed to subscribe for error: %w", err) - } - return sub.Unsubscribe, nil -} - -type IncomingInvocation struct{ invocation } - -func (inv *IncomingInvocation) Subscribe(f func(context.Context, []byte), path ...uint32) (func() error, error) { - sub, err := subscribe(inv.conn, paramSubject(inv.rx), f, path...) - if err != nil { - return nil, fmt.Errorf("failed to subscribe for parameters: %w", err) - } - return sub.Unsubscribe, nil -} - -func (inv *IncomingInvocation) SubscribeError(f func(context.Context, []byte)) (func() error, error) { - return inv.invocation.SubscribeError(f) -} - -func (inv *IncomingInvocation) Accept(ctx context.Context, buf []byte) error { - if err := transmit(ctx, inv.conn, inv.tx, inv.rx, buf); err != nil { - return fmt.Errorf("failed to transmit accept: %w", err) - } - return nil -} - -type Transmitter struct { - conn *nats.Conn - subject string -} - -func (tx *Transmitter) Transmit(ctx context.Context, buf []byte, path ...uint32) error { - subject := tx.subject - for _, p := range path { - subject = fmt.Sprintf("%s.%d", subject, p) - } - return transmit(ctx, tx.conn, subject, "", buf) -} - type Client struct { conn *nats.Conn prefix string @@ -150,51 +98,6 @@ func NewClient(conn *nats.Conn, prefix string) *Client { return &Client{conn, prefix} } -func (c *Client) Serve(instance string, name string, f func(context.Context, []byte, wrpc.Transmitter, wrpc.IncomingInvocation) error) (stop func() error, err error) { - sub, err := c.conn.Subscribe(invocationSubject(c.prefix, instance, name), func(m *nats.Msg) { - slog.Debug("received invocation", "instance", instance, "name", name) - if m.Reply == "" { - slog.Warn("peer did not specify a reply subject") - return - } - ctx := context.Background() - ctx = ContextWithHeader(ctx, m.Header) - slog.Debug("calling server handler") - if err := f(ctx, m.Data, &Transmitter{ - conn: c.conn, - subject: resultSubject(m.Reply), - }, &IncomingInvocation{ - invocation: invocation{ - conn: c.conn, - rx: nats.NewInbox(), - tx: m.Reply, - }, - }); err != nil { - var buf bytes.Buffer - slog.Warn("failed to handle `handle`", "err", err) - if err = wrpc.WriteString(fmt.Sprintf("%s", err), &buf); err != nil { - slog.Warn("failed to encode `handle` handling error", "err", err) - // Encoding the error failed, let's try encoding the encoding error - if err = wrpc.WriteString(fmt.Sprintf("failed to encode error: %s", err), &buf); err != nil { - slog.Warn("failed to encode `handle` handling error encoding error", "err", err) - // Well, we're out of luck at this point, let's just send an empty string - buf.Reset() - } - } - slog.Debug("transmitting error") - if err = transmit(context.Background(), c.conn, fmt.Sprintf("%s.error", m.Reply), "", buf.Bytes()); err != nil { - slog.Warn("failed to send error to client", "err", err) - } - return - } - slog.Debug("successfully finished serving invocation") - }) - if err != nil { - return nil, fmt.Errorf("failed to serve `%s` for instance `%s`: %w", name, instance, err) - } - return sub.Unsubscribe, nil -} - type paramWriter struct { ctx context.Context nc *nats.Conn @@ -533,7 +436,7 @@ func (w *resultWriter) Index(path ...uint32) (wrpc.IndexWriter, error) { return &resultWriter{nc: w.nc, tx: indexPath(w.tx, path...)}, nil } -func (c *Client) ServeIndex(instance string, name string, f func(context.Context, wrpc.IndexWriter, wrpc.IndexReader, <-chan error) error, subs ...wrpc.SubscribePath) (stop func() error, err error) { +func (c *Client) Serve(instance string, name string, f func(context.Context, wrpc.IndexWriter, wrpc.IndexReader, <-chan error) error, subs ...wrpc.SubscribePath) (stop func() error, err error) { sub, err := c.conn.Subscribe(invocationSubject(c.prefix, instance, name), func(m *nats.Msg) { ctx := context.Background() ctx, cancel := context.WithCancel(ctx) diff --git a/go/option.go b/go/option.go new file mode 100644 index 000000000..e2fc5cfa8 --- /dev/null +++ b/go/option.go @@ -0,0 +1,87 @@ +package wrpc + +import ( + "fmt" + "log/slog" +) + +func FlattenOption[T any](v **T) *T { + if v == nil { + return nil + } else { + return *v + } +} + +// ReadOptionStatus reads a single byte from `r` and returns: +// - `true` for `option::some` +// - `false` for `option::none` +func ReadOptionStatus(r ByteReader) (bool, error) { + status, err := r.ReadByte() + if err != nil { + return false, fmt.Errorf("failed to read `option` status byte: %w", err) + } + switch status { + case 0: + return false, nil + case 1: + return true, nil + default: + return false, fmt.Errorf("invalid `option` status byte %d", status) + } +} + +// ReadOption reads an option from `r` +func ReadOption[T any](r ByteReader, f func(ByteReader) (T, error)) (*T, error) { + slog.Debug("reading option status byte") + ok, err := ReadOptionStatus(r) + if err != nil { + return nil, err + } + if !ok { + return nil, nil + } + slog.Debug("reading `option::some` payload") + v, err := f(r) + if err != nil { + return nil, fmt.Errorf("failed to read `option::some` value: %w", err) + } + return &v, nil +} + +// ReadFlatOption reads an option from `r` without pointer indirection +func ReadFlatOption[T any](r ByteReader, f func(ByteReader) (T, error)) (v T, err error) { + slog.Debug("reading option status byte") + ok, err := ReadOptionStatus(r) + if err != nil { + return v, err + } + if !ok { + return v, err + } + slog.Debug("reading `option::some` payload") + v, err = f(r) + if err != nil { + return v, fmt.Errorf("failed to read `option::some` value: %w", err) + } + return v, nil +} + +func WriteOption[T any](v *T, w ByteWriter, f func(T, ByteWriter) error) error { + if v == nil { + slog.Debug("writing `option::none` status byte") + if err := w.WriteByte(0); err != nil { + return fmt.Errorf("failed to write `option::none` byte: %w", err) + } + return nil + } + slog.Debug("writing `option::some` status byte") + if err := w.WriteByte(1); err != nil { + return fmt.Errorf("failed to write `option::some` status byte: %w", err) + } + slog.Debug("writing `option::some` payload") + if err := f(*v, w); err != nil { + return fmt.Errorf("failed to write `option::some` payload: %w", err) + } + return nil +} diff --git a/go/primitive.go b/go/primitive.go new file mode 100644 index 000000000..ce6490da7 --- /dev/null +++ b/go/primitive.go @@ -0,0 +1,187 @@ +package wrpc + +import ( + "encoding/binary" + "errors" + "fmt" + "io" + "log/slog" + "math" +) + +func PutUint16(buf []byte, x uint16) { + binary.PutUvarint(buf, uint64(x)) +} + +func AppendUint16(buf []byte, x uint16) []byte { + return binary.AppendUvarint(buf, uint64(x)) +} + +func PutUint32(buf []byte, x uint32) { + binary.PutUvarint(buf, uint64(x)) +} + +func AppendUint32(buf []byte, x uint32) []byte { + return binary.AppendUvarint(buf, uint64(x)) +} + +func PutUint64(buf []byte, x uint64) { + binary.PutUvarint(buf, x) +} + +func AppendUint64(buf []byte, x uint64) []byte { + return binary.AppendUvarint(buf, x) +} + +func AppendFloat32(buf []byte, x float32) []byte { + return binary.LittleEndian.AppendUint32(buf, math.Float32bits(x)) +} + +func PutFloat32(buf []byte, x float32) { + binary.LittleEndian.PutUint32(buf, math.Float32bits(x)) +} + +func AppendFloat64(buf []byte, x float64) []byte { + return binary.LittleEndian.AppendUint64(buf, math.Float64bits(x)) +} + +func PutFloat64(buf []byte, x float64) { + binary.LittleEndian.PutUint64(buf, math.Float64bits(x)) +} + +func WriteUint8(v uint8, w ByteWriter) error { + return w.WriteByte(v) +} + +func WriteUint16(v uint16, w ByteWriter) error { + b := make([]byte, binary.MaxVarintLen16) + i := binary.PutUvarint(b, uint64(v)) + _, err := w.Write(b[:i]) + return err +} + +func WriteUint32(v uint32, w ByteWriter) error { + b := make([]byte, binary.MaxVarintLen32) + i := binary.PutUvarint(b, uint64(v)) + _, err := w.Write(b[:i]) + return err +} + +func WriteUint64(v uint64, w ByteWriter) error { + b := make([]byte, binary.MaxVarintLen64) + i := binary.PutUvarint(b, uint64(v)) + _, err := w.Write(b[:i]) + return err +} + +func WriteString(v string, w ByteWriter) error { + n := len(v) + if n > math.MaxUint32 { + return fmt.Errorf("string byte length of %d overflows a 32-bit integer", n) + } + slog.Debug("writing string byte length", "len", n) + if err := WriteUint32(uint32(n), w); err != nil { + return fmt.Errorf("failed to write string length of %d: %w", n, err) + } + slog.Debug("writing string bytes") + _, err := w.Write([]byte(v)) + if err != nil { + return fmt.Errorf("failed to write string bytes: %w", err) + } + return nil +} + +// ReadString reads a string from `r` and returns it +func ReadString(r ByteReader) (string, error) { + slog.Debug("reading string length") + n, err := ReadUint32(r) + if err != nil { + return "", fmt.Errorf("failed to read string length: %w", err) + } + + b := make([]byte, int(n)) + slog.Debug("reading string bytes", "len", n) + rn, err := r.Read(b) + if err != nil { + return "", fmt.Errorf("failed to read string: %w", err) + } + if rn > int(n) { + return "", fmt.Errorf("invalid amount of string bytes read, expected %d, got %d", n, rn) + } + slog.Debug("read string bytes", "buf", b) + return string(b), nil +} + +// ReadUint64 reads an encoded uint64 from r and returns it. +// The error is [io.EOF] only if no bytes were read. +// If an [io.EOF] happens after reading some but not all the bytes, +// ReadUvarint returns [io.ErrUnexpectedEOF]. +func ReadUint64(r ByteReader) (uint64, error) { + return binary.ReadUvarint(r) +} + +// NOTE: Below is adapted from https://cs.opensource.google/go/go/+/refs/tags/go1.22.2:src/encoding/binary/varint.go;l=128-153 + +// maxVarintLenN is the maximum length of a varint-encoded N-bit integer. +const ( + maxVarintLen16 = 3 + maxVarintLen32 = 5 +) + +var errOverflow16 = errors.New("wrpc: varint overflows a 16-bit integer") + +// ReadUint16 reads an encoded uint16 from r and returns it. +// The error is [io.EOF] only if no bytes were read. +// If an [io.EOF] happens after reading some but not all the bytes, +// ReadUvarint returns [io.ErrUnexpectedEOF]. +func ReadUint16(r ByteReader) (uint16, error) { + var x uint16 + var s uint + for i := 0; i < maxVarintLen16; i++ { + b, err := r.ReadByte() + if err != nil { + if i > 0 && err == io.EOF { + err = io.ErrUnexpectedEOF + } + return x, err + } + if b < 0x80 { + if i == maxVarintLen16-1 && b > 1 { + return x, errOverflow16 + } + return x | uint16(b)< 0 && err == io.EOF { + err = io.ErrUnexpectedEOF + } + return x, err + } + if b < 0x80 { + if i == maxVarintLen32-1 && b > 1 { + return x, errOverflow32 + } + return x | uint32(b)< math.MaxUint32 { + return fmt.Errorf("pending byte stream chunk length of %d overflows a 32-bit integer", n) + } + slog.Debug("writing pending byte stream chunk length", "len", n) + if err := WriteUint32(uint32(n), w); err != nil { + return fmt.Errorf("failed to write pending byte stream chunk length of %d: %w", n, err) + } + _, err = w.Write(v.chunk[:n]) + if err != nil { + return fmt.Errorf("failed to write pending byte stream chunk contents: %w", err) + } + if end { + if err := w.WriteByte(0); err != nil { + return fmt.Errorf("failed to write pending byte stream end byte: %w", err) + } + return nil + } + } +} + +type byteStreamReceiver struct { + ByteReader + buffered uint32 +} + +func (r *byteStreamReceiver) Read(p []byte) (int, error) { + n := r.buffered + if n == 0 { + slog.Debug("reading pending byte stream chunk length") + var err error + n, err = ReadUint32(r) + if err != nil { + return 0, fmt.Errorf("failed to read pending byte stream chunk length: %w", err) + } + if n == 0 { + return 0, io.EOF + } + } + if len(p) > int(n) { + p = p[:n] + } + slog.Debug("reading pending byte stream chunk contents", "len", n) + rn, err := r.Read(p) + if err != nil { + return rn, fmt.Errorf("failed to read pending stream chunk bytes: %w", err) + } + if rn > int(n) { + return rn, errors.New("read more bytes than requested") + } + r.buffered = n - uint32(rn) + return rn, nil +} + +func (*byteStreamReceiver) Ready() bool { + return false +} + +// ReadStreamStatus reads a single byte from `r` and returns: +// - `true` if stream is "ready" +// - `false` if stream is "pending" +func ReadStreamStatus(r ByteReader) (bool, error) { + status, err := r.ReadByte() + if err != nil { + return false, fmt.Errorf("failed to read `stream` status byte: %w", err) + } + switch status { + case 0: + return false, nil + case 1: + return true, nil + default: + return false, fmt.Errorf("invalid `stream` status byte %d", status) + } +} + +// ReadByteStream reads a stream of bytes from `r` and `ch` +func ReadByteStream(r IndexReader, path ...uint32) (ReadyReader, error) { + slog.Debug("reading byte stream status byte") + ok, err := ReadStreamStatus(r) + if err != nil { + return nil, err + } + if !ok { + r, err = r.Index(path...) + if err != nil { + return nil, fmt.Errorf("failed to get byte stream reader: %w", err) + } + return &byteStreamReceiver{r, 0}, nil + } + slog.Debug("reading ready byte stream") + buf, err := ReadByteList(r) + if err != nil { + return nil, fmt.Errorf("failed to read bytes: %w", err) + } + slog.Debug("read ready byte stream", "len", len(buf)) + return &byteReader{bytes.NewReader(buf)}, nil +} + +// ReadStream reads a stream from `r` and `ch` +func ReadStream[T any](ctx context.Context, r ByteReader, ch <-chan []byte, f func(ByteReader) (T, error)) (ReadyReceiver[[]T], error) { + slog.DebugContext(ctx, "reading stream status byte") + ok, err := ReadStreamStatus(r) + if err != nil { + return nil, err + } + if !ok { + return &decodeReceiver[[]T]{&ChanReader{ctx, ch, nil}, func(r ByteReader) ([]T, error) { + n, err := ReadUint32(r) + if err != nil { + return nil, fmt.Errorf("failed to read pending stream chunk length: %w", err) + } + if n == 0 { + return nil, io.EOF + } + vs := make([]T, n) + for i := range vs { + v, err := f(r) + if err != nil { + return nil, fmt.Errorf("failed to read pending stream chunk element %d: %w", i, err) + } + vs[i] = v + } + return vs, nil + }}, nil + } + slog.DebugContext(ctx, "reading ready stream") + vs, err := ReadList(r, f) + if err != nil { + return nil, fmt.Errorf("failed to read ready stream: %w", err) + } + slog.DebugContext(ctx, "read ready stream", "len", len(vs)) + return &ready[[]T]{vs}, nil +} + +func WriteByteStream(r ReadyReader, w ByteWriter, chunk []byte, path ...uint32) (*ByteStreamWriter, error) { + if r.Ready() { + slog.Debug("writing byte stream `stream::ready` status byte") + if err := w.WriteByte(1); err != nil { + return nil, fmt.Errorf("failed to write `stream::ready` byte: %w", err) + } + var buf bytes.Buffer + slog.Debug("reading ready byte stream contents") + n, err := io.CopyBuffer(&buf, r, chunk) + if err != nil { + return nil, fmt.Errorf("failed to read ready byte stream contents: %w", err) + } + slog.Debug("writing ready byte stream contents", "len", n) + return nil, WriteByteList(buf.Bytes(), w) + } + slog.Debug("writing byte stream `stream::pending` status byte") + if err := w.WriteByte(0); err != nil { + return nil, fmt.Errorf("failed to write `stream::pending` byte: %w", err) + } + return &ByteStreamWriter{r, chunk}, nil +} diff --git a/go/subscriber.go b/go/subscriber.go deleted file mode 100644 index 3c5785c5f..000000000 --- a/go/subscriber.go +++ /dev/null @@ -1,22 +0,0 @@ -package wrpc - -import ( - "context" -) - -type Subscriber interface { - Subscribe(func(context.Context, []byte), ...uint32) (func() error, error) -} - -type NestedSubscriber struct { - Subscriber - Path []uint32 -} - -func (sub *NestedSubscriber) Subscribe(f func(context.Context, []byte), path ...uint32) (func() error, error) { - return sub.Subscriber.Subscribe(f, append(sub.Path, path...)...) -} - -func NewNestedSubscriber(sub Subscriber, path ...uint32) *NestedSubscriber { - return &NestedSubscriber{sub, path} -} diff --git a/go/transmitter.go b/go/transmitter.go deleted file mode 100644 index 033b9e0c9..000000000 --- a/go/transmitter.go +++ /dev/null @@ -1,22 +0,0 @@ -package wrpc - -import ( - "context" -) - -type Transmitter interface { - Transmit(context.Context, []byte, ...uint32) error -} - -type NestedTransmitter struct { - Transmitter - Path []uint32 -} - -func (tx *NestedTransmitter) Transmit(ctx context.Context, b []byte, path ...uint32) error { - return tx.Transmitter.Transmit(ctx, b, append(tx.Path, path...)...) -} - -func NewNestedTransmitter(tx Transmitter, path ...uint32) *NestedTransmitter { - return &NestedTransmitter{tx, path} -} diff --git a/go/tuple.go b/go/tuple.go new file mode 100644 index 000000000..edf0f7ff4 --- /dev/null +++ b/go/tuple.go @@ -0,0 +1,244 @@ +package wrpc + +import ( + "fmt" + "log/slog" +) + +type Tuple2[T0, T1 any] struct { + V0 T0 + V1 T1 +} + +func ReadTuple2[T0, T1 any](r ByteReader, f0 func(ByteReader) (T0, error), f1 func(ByteReader) (T1, error)) (*Tuple2[T0, T1], error, +) { + v0, err := f0(r) + if err != nil { + return nil, fmt.Errorf("failed to read tuple element 0: %w", err) + } + v1, err := f1(r) + if err != nil { + return nil, fmt.Errorf("failed to read tuple element 1: %w", err) + } + return &Tuple2[T0, T1]{v0, v1}, nil +} + +func (v *Tuple2[T0, T1]) WriteTo(w ByteWriter, f0 func(T0, ByteWriter) error, f1 func(T1, ByteWriter) error) error { + slog.Debug("writing tuple element 0") + if err := f0(v.V0, w); err != nil { + return fmt.Errorf("failed to write tuple element 0: %w", err) + } + slog.Debug("writing tuple element 1") + if err := f1(v.V1, w); err != nil { + return fmt.Errorf("failed to write tuple element 1: %w", err) + } + return nil +} + +type Tuple3[T0, T1, T2 any] struct { + V0 T0 + V1 T1 + V2 T2 +} + +func ReadTuple3[T0, T1, T2 any](r ByteReader, f0 func(ByteReader) (T0, error), f1 func(ByteReader) (T1, error), f2 func(ByteReader) (T2, error)) (*Tuple3[T0, T1, T2], error, +) { + v0, err := f0(r) + if err != nil { + return nil, fmt.Errorf("failed to read tuple element 0: %w", err) + } + v1, err := f1(r) + if err != nil { + return nil, fmt.Errorf("failed to read tuple element 1: %w", err) + } + v2, err := f2(r) + if err != nil { + return nil, fmt.Errorf("failed to read tuple element 2: %w", err) + } + return &Tuple3[T0, T1, T2]{v0, v1, v2}, nil +} + +func (v *Tuple3[T0, T1, T2]) WriteTo(w ByteWriter, f0 func(T0, ByteWriter) error, f1 func(T1, ByteWriter) error, f2 func(T2, ByteWriter) error) error { + slog.Debug("writing tuple element 0") + if err := f0(v.V0, w); err != nil { + return fmt.Errorf("failed to write tuple element 0: %w", err) + } + slog.Debug("writing tuple element 1") + if err := f1(v.V1, w); err != nil { + return fmt.Errorf("failed to write tuple element 1: %w", err) + } + slog.Debug("writing tuple element 2") + if err := f2(v.V2, w); err != nil { + return fmt.Errorf("failed to write tuple element 2: %w", err) + } + return nil +} + +type Tuple4[T0, T1, T2, T3 any] struct { + V0 T0 + V1 T1 + V2 T2 + V3 T3 +} + +type Tuple5[T0, T1, T2, T3, T4 any] struct { + V0 T0 + V1 T1 + V2 T2 + V3 T3 + V4 T4 +} + +type Tuple6[T0, T1, T2, T3, T4, T5 any] struct { + V0 T0 + V1 T1 + V2 T2 + V3 T3 + V4 T4 + V5 T5 +} + +type Tuple7[T0, T1, T2, T3, T4, T5, T6 any] struct { + V0 T0 + V1 T1 + V2 T2 + V3 T3 + V4 T4 + V5 T5 + V6 T6 +} + +type Tuple8[T0, T1, T2, T3, T4, T5, T6, T7 any] struct { + V0 T0 + V1 T1 + V2 T2 + V3 T3 + V4 T4 + V5 T5 + V6 T6 + V7 T7 +} + +type Tuple9[T0, T1, T2, T3, T4, T5, T6, T7, T8 any] struct { + V0 T0 + V1 T1 + V2 T2 + V3 T3 + V4 T4 + V5 T5 + V6 T6 + V7 T7 + V8 T8 +} + +type Tuple10[T0, T1, T2, T3, T4, T5, T6, T7, T8, T9 any] struct { + V0 T0 + V1 T1 + V2 T2 + V3 T3 + V4 T4 + V5 T5 + V6 T6 + V7 T7 + V8 T8 + V9 T9 +} + +type Tuple11[T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10 any] struct { + V0 T0 + V1 T1 + V2 T2 + V3 T3 + V4 T4 + V5 T5 + V6 T6 + V7 T7 + V8 T8 + V9 T9 + V10 T10 +} + +type Tuple12[T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11 any] struct { + V0 T0 + V1 T1 + V2 T2 + V3 T3 + V4 T4 + V5 T5 + V6 T6 + V7 T7 + V8 T8 + V9 T9 + V10 T10 + V11 T11 +} + +type Tuple13[T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12 any] struct { + V0 T0 + V1 T1 + V2 T2 + V3 T3 + V4 T4 + V5 T5 + V6 T6 + V7 T7 + V8 T8 + V9 T9 + V10 T10 + V11 T11 + V12 T12 +} + +type Tuple14[T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13 any] struct { + V0 T0 + V1 T1 + V2 T2 + V3 T3 + V4 T4 + V5 T5 + V6 T6 + V7 T7 + V8 T8 + V9 T9 + V10 T10 + V11 T11 + V12 T12 + V13 T13 +} + +type Tuple15[T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14 any] struct { + V0 T0 + V1 T1 + V2 T2 + V3 T3 + V4 T4 + V5 T5 + V6 T6 + V7 T7 + V8 T8 + V9 T9 + V10 T10 + V11 T11 + V12 T12 + V13 T13 + V14 T14 +} + +type Tuple16[T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15 any] struct { + V0 T0 + V1 T1 + V2 T2 + V3 T3 + V4 T4 + V5 T5 + V6 T6 + V7 T7 + V8 T8 + V9 T9 + V10 T10 + V11 T11 + V12 T12 + V13 T13 + V14 T14 + V15 T15 +} diff --git a/go/writer.go b/go/writer.go deleted file mode 100644 index 548e4c258..000000000 --- a/go/writer.go +++ /dev/null @@ -1,24 +0,0 @@ -package wrpc - -import ( - "context" -) - -type TransmitWriter struct { - ctx context.Context - tx Transmitter - path []uint32 -} - -func NewTransmitWriter(ctx context.Context, tx Transmitter, path ...uint32) *TransmitWriter { - return &TransmitWriter{ - ctx, tx, path, - } -} - -func (w *TransmitWriter) Write(b []byte) (int, error) { - if err := w.tx.Transmit(w.ctx, b, w.path...); err != nil { - return 0, err - } - return len(b), nil -} diff --git a/go/wrpc.go b/go/wrpc.go new file mode 100644 index 000000000..a9d0e408d --- /dev/null +++ b/go/wrpc.go @@ -0,0 +1,142 @@ +package wrpc + +import ( + "bytes" + "context" + "io" +) + +type SubscribePath []*uint32 + +func NewSubscribePath(ps ...*uint32) SubscribePath { + return SubscribePath(ps) +} + +func (p SubscribePath) push(v *uint32) SubscribePath { + return SubscribePath(append(append(make(SubscribePath, 0, len(p)+1), p...), v)) +} + +func (p SubscribePath) Index(i uint32) SubscribePath { + return p.push(&i) +} + +func (p SubscribePath) Wildcard() SubscribePath { + return p.push(nil) +} + +func (p SubscribePath) Parent() (SubscribePath, bool) { + n := len(p) + if n == 0 { + return nil, false + } + return SubscribePath(p[:n-1]), true +} + +type Index[T any] interface { + Index(path ...uint32) (T, error) +} + +type IndexReader interface { + io.Reader + io.ByteReader + + Index[IndexReader] +} + +type ReaderFromIndex interface { + ReadFromIndex(IndexReader) error +} + +type IndexWriter interface { + io.Writer + io.ByteWriter + + Index[IndexWriter] +} + +type WriterToIndex interface { + WriteToIndex(IndexWriter) error +} + +type Client interface { + Invoke(ctx context.Context, instance string, name string, f func(IndexWriter, IndexReader, <-chan error) error, subs ...SubscribePath) error + Serve(instance string, name string, f func(context.Context, IndexWriter, IndexReader, <-chan error) error, subs ...SubscribePath) (func() error, error) +} + +type ByteWriter interface { + io.ByteWriter + io.Writer +} + +type ByteReader interface { + io.ByteReader + io.Reader +} + +type Ready interface { + Ready() bool +} + +type Receiver[T any] interface { + Receive() (T, error) +} + +type ReadyReceiver[T any] interface { + Receiver[T] + Ready +} + +type ReadyReader interface { + io.Reader + Ready +} + +type ReadyByteReader interface { + ByteReader + Ready +} + +type byteReader struct { + *bytes.Reader +} + +func (*byteReader) Ready() bool { + return true +} + +type PendingByteReader struct { + ByteReader +} + +func (*PendingByteReader) Ready() bool { + return false +} + +func NewPendingByteReader(r ByteReader) *PendingByteReader { + return &PendingByteReader{r} +} + +type ready[T any] struct { + v T +} + +func (r *ready[T]) Receive() (T, error) { + return r.v, nil +} + +func (*ready[T]) Ready() bool { + return true +} + +type decodeReceiver[T any] struct { + r ByteReader + decode func(ByteReader) (T, error) +} + +func (r *decodeReceiver[T]) Receive() (T, error) { + return r.decode(r.r) +} + +func (*decodeReceiver[T]) Ready() bool { + return false +} From e9fcf598e7cce0fac35afa0d9c9967705f6d7463 Mon Sep 17 00:00:00 2001 From: Roman Volosatovs Date: Fri, 10 May 2024 19:56:40 +0200 Subject: [PATCH 2/2] refactor(go): cleanup NATS implementation Signed-off-by: Roman Volosatovs --- go/nats/client.go | 139 +++++++++++++++++++--------------------------- 1 file changed, 58 insertions(+), 81 deletions(-) diff --git a/go/nats/client.go b/go/nats/client.go index c6373bc2c..de8aa207e 100644 --- a/go/nats/client.go +++ b/go/nats/client.go @@ -35,6 +35,33 @@ func resultSubject(prefix string) string { return fmt.Sprintf("%s.results", prefix) } +func indexPath(prefix string, path ...uint32) string { + s := prefix + for _, p := range path { + if s != "" { + s = fmt.Sprintf("%s.%d", s, p) + } else { + s = fmt.Sprintf("%d", p) + } + } + return s +} + +func subscribePath(prefix string, path wrpc.SubscribePath) string { + s := prefix + for _, p := range path { + if s != "" { + s = fmt.Sprintf("%s.", s) + } + if p == nil { + s = fmt.Sprintf("%s*", s) + } else { + s = fmt.Sprintf("%s%d", s, *p) + } + } + return s +} + func invocationSubject(prefix string, instance string, name string) string { subject := fmt.Sprintf("wrpc.0.0.1.%s.%s", instance, name) if prefix != "" { @@ -55,35 +82,30 @@ func subscribe(conn *nats.Conn, prefix string, f func(context.Context, []byte), }) } -func transmit(ctx context.Context, conn *nats.Conn, subject string, reply string, buf []byte) error { - header, hasHeader := HeaderFromContext(ctx) - m := nats.NewMsg(subject) - m.Reply = reply - if hasHeader { - m.Header = header - } - - maxPayload := conn.MaxPayload() - mSize := int64(m.Size()) - if mSize > maxPayload { - return fmt.Errorf("message size %d is larger than maximum allowed payload size %d", mSize, maxPayload) +func transmitError(nc *nats.Conn, subject string, err error) error { + var buf bytes.Buffer + if err := wrpc.WriteString(fmt.Sprintf("%s", err), &buf); err != nil { + slog.Warn("failed to encode handling error", "err", err) + if err := wrpc.WriteString(fmt.Sprintf("failed to encode error: %s", err), &buf); err != nil { + slog.Warn("failed to encode handling error encoding error", "err", err) + buf.Reset() + } } - maxPayload -= mSize - maxPayload = min(maxPayload, int64(len(buf))) - m.Data, buf = buf[:maxPayload], buf[maxPayload:] - if err := conn.PublishMsg(m); err != nil { - return fmt.Errorf("failed to send initial payload chunk: %w", err) + maxPayload := nc.MaxPayload() + maxPayload = min(maxPayload, int64(buf.Len())) + b := buf.Bytes() + var tail []byte + b, tail = b[:maxPayload], b[maxPayload:] + slog.Debug("transmitting initial error chunk") + if err := nc.Publish(subject, b); err != nil { + return fmt.Errorf("failed to send initial error chunk: %w", err) } - for len(buf) > 0 { - m := nats.NewMsg(subject) - m.Reply = reply - if hasHeader { - m.Header = header - } - maxPayload = min(maxPayload, int64(len(buf))) - m.Data, buf = buf[:maxPayload], buf[maxPayload:] - if err := conn.PublishMsg(m); err != nil { - return fmt.Errorf("failed to send payload chunk: %w", err) + for len(tail) > 0 { + maxPayload = min(maxPayload, int64(len(tail))) + b, tail = b[:maxPayload], b[maxPayload:] + slog.Debug("transmitting error chunk") + if err := nc.Publish(subject, b); err != nil { + return fmt.Errorf("failed to send error chunk: %w", err) } } return nil @@ -141,7 +163,7 @@ func (w *paramWriter) publish(p []byte) (int, error) { if m.Reply == "" { return n, errors.New("peer did not specify a reply subject") } - w.tx = fmt.Sprintf("%s.params", m.Reply) + w.tx = paramSubject(m.Reply) w.init = true } buf := p @@ -177,7 +199,7 @@ func (c *Client) Invoke(ctx context.Context, instance string, name string, f fun rx := nats.NewInbox() - resultRx := fmt.Sprintf("%s.results", rx) + resultRx := resultSubject(rx) resultSub, err := c.conn.SubscribeSync(resultRx) if err != nil { return fmt.Errorf("failed to subscribe on result subject `%s`: %w", resultRx, err) @@ -192,7 +214,7 @@ func (c *Client) Invoke(ctx context.Context, instance string, name string, f fun } }() - errRx := fmt.Sprintf("%s.error", rx) + errRx := errorSubject(rx) errPayloadCh := make(chan []byte, 1) errSub, err := c.conn.Subscribe(errRx, func(m *nats.Msg) { errPayloadCh <- m.Data @@ -239,16 +261,7 @@ func (c *Client) Invoke(ctx context.Context, instance string, name string, f fun root: resultSub, buf: nil, }, errCh); err != nil && w.init { - var buf bytes.Buffer - if err := wrpc.WriteString(fmt.Sprintf("%s", err), &buf); err != nil { - slog.Warn("failed to encode handling error", "err", err) - if err := wrpc.WriteString(fmt.Sprintf("failed to encode error: %s", err), &buf); err != nil { - slog.Warn("failed to encode handling error encoding error", "err", err) - buf.Reset() - } - } - slog.Debug("transmitting error") - if err := transmit(context.Background(), c.conn, fmt.Sprintf("%s.error", w.tx), "", buf.Bytes()); err != nil { + if err := transmitError(c.conn, errorSubject(w.tx), err); err != nil { slog.Warn("failed to send error to client", "err", err) } } @@ -258,9 +271,9 @@ func (c *Client) Invoke(ctx context.Context, instance string, name string, f fun type streamReader struct { ctx context.Context root *nats.Subscription - nestMu sync.Mutex nest map[string]*nats.Subscription buf []byte + nestMu sync.Mutex } type indexedStreamReader struct { @@ -270,33 +283,6 @@ type indexedStreamReader struct { buf []byte } -func indexPath(prefix string, path ...uint32) string { - s := prefix - for _, p := range path { - if s != "" { - s = fmt.Sprintf("%s.%d", s, p) - } else { - s = fmt.Sprintf("%d", p) - } - } - return s -} - -func subscribePath(prefix string, path wrpc.SubscribePath) string { - s := prefix - for _, p := range path { - if s != "" { - s = fmt.Sprintf("%s.", s) - } - if p == nil { - s = fmt.Sprintf("%s*", s) - } else { - s = fmt.Sprintf("%s%d", s, *p) - } - } - return s -} - func (r *streamReader) Read(p []byte) (int, error) { if len(r.buf) > 0 { n := copy(p, r.buf) @@ -451,7 +437,7 @@ func (c *Client) Serve(instance string, name string, f func(context.Context, wrp rx := nats.NewInbox() - paramRx := fmt.Sprintf("%s.params", rx) + paramRx := paramSubject(rx) paramSub, err := c.conn.SubscribeSync(paramRx) if err != nil { slog.Warn("failed to subscribe on parameter subject", "subject", paramRx, "err", err) @@ -463,7 +449,7 @@ func (c *Client) Serve(instance string, name string, f func(context.Context, wrp } }() - errRx := fmt.Sprintf("%s.error", rx) + errRx := errorSubject(rx) errPayloadCh := make(chan []byte, 1) errSub, err := c.conn.Subscribe(errRx, func(m *nats.Msg) { errPayloadCh <- m.Data @@ -518,7 +504,7 @@ func (c *Client) Serve(instance string, name string, f func(context.Context, wrp slog.Debug("calling server handler") if err := f(ctx, &resultWriter{ nc: c.conn, - tx: fmt.Sprintf("%s.results", m.Reply), + tx: resultSubject(m.Reply), }, &streamReader{ ctx: ctx, root: paramSub, @@ -526,16 +512,7 @@ func (c *Client) Serve(instance string, name string, f func(context.Context, wrp buf: m.Data, }, errCh); err != nil { slog.Warn("failed to handle invocation", "err", err) - var buf bytes.Buffer - if err = wrpc.WriteString(fmt.Sprintf("%s", err), &buf); err != nil { - slog.Warn("failed to encode handling error", "err", err) - if err = wrpc.WriteString(fmt.Sprintf("failed to encode error: %s", err), &buf); err != nil { - slog.Warn("failed to encode handling error encoding error", "err", err) - buf.Reset() - } - } - slog.Debug("transmitting error") - if err = transmit(context.Background(), c.conn, fmt.Sprintf("%s.error", m.Reply), "", buf.Bytes()); err != nil { + if err := transmitError(c.conn, errorSubject(m.Reply), err); err != nil { slog.Warn("failed to send error to client", "err", err) } return