Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] Support client streams #2212

Closed
AuditeMarlow opened this issue Aug 18, 2021 · 4 comments
Closed

[FEATURE] Support client streams #2212

AuditeMarlow opened this issue Aug 18, 2021 · 4 comments

Comments

@AuditeMarlow
Copy link
Collaborator

AuditeMarlow commented Aug 18, 2021

I've searched the documentation, plugins and existing issues for examples on how to implement client streams, but haven't found a conclusive way to achieve this with the Go Micro framework just yet. Are there examples out in the wild that I can learn from?

For clarification purposes, I think I've got the server implementation down. Below is some boilerplate code I'm using to test with.

proto/helloworld.proto

syntax = "proto3";

package helloworld;

option go_package = "./proto;helloworld";

service Helloworld {
        rpc ClientStream(stream ClientStreamRequest) returns (ClientStreamResponse) {}
}

message ClientStreamRequest {
        int64 stroke = 1;
}

message ClientStreamResponse {
        int64 count = 1;
}

handler/helloworld.go

package handler

import (
    "context"
    "io"

    log "github.com/asim/go-micro/v3/logger"

    pb "helloworld/proto"
)

type Helloworld struct {}

func (e *Helloworld) ClientStream(ctx context.Context, stream pb.Helloworld_ClientStreamStream) error {
        var count int64
        for {
                req, err := stream.Recv()
                if err == io.EOF {
                        rsp := &pb.ClientStreamResponse{Count: count}
                        log.Infof("Sending response: %v", rsp)
                        return stream.SendMsg(rsp)
                }
                if err != nil {
                        return err
                }
                log.Infof("Got ping %v", req.Stroke)
                count++
        }
}

I can't seem to figure out how to implement the client as there seems to be no way for the client to send EOF without closing the stream.

@xpunch
Copy link
Member

xpunch commented Aug 26, 2021

github.com/asim/go-micro/client/client.go#L66

// Stream is the inteface for a bidirectional synchronous stream
type Stream interface {
	// Context for the stream
	Context() context.Context
	// The request made
	Request() Request
	// The response read
	Response() Response
	// Send will encode and send a request
	Send(interface{}) error
	// Recv will decode and read a response
	Recv(interface{}) error
	// Error returns the stream error
	Error() error
	// Close closes the stream
	Close() error
}

The abstraction of Stream do not support CloseSend, so there is no way to support send EOF currently I think.

@xpunch
Copy link
Member

xpunch commented Aug 26, 2021

github.com/grpc/grpc-go/stream.go#L87

// ClientStream defines the client-side behavior of a streaming RPC.
//
// All errors returned from ClientStream methods are compatible with the
// status package.
type ClientStream interface {
	// Header returns the header metadata received from the server if there
	// is any. It blocks if the metadata is not ready to read.
	Header() (metadata.MD, error)
	// Trailer returns the trailer metadata from the server, if there is any.
	// It must only be called after stream.CloseAndRecv has returned, or
	// stream.Recv has returned a non-nil error (including io.EOF).
	Trailer() metadata.MD
	// CloseSend closes the send direction of the stream. It closes the stream
	// when non-nil error is met. It is also not safe to call CloseSend
	// concurrently with SendMsg.
	CloseSend() error
	// Context returns the context for this stream.
	//
	// It should not be called until after Header or RecvMsg has returned. Once
	// called, subsequent client-side retries are disabled.
	Context() context.Context
	// SendMsg is generally called by generated code. On error, SendMsg aborts
	// the stream. If the error was generated by the client, the status is
	// returned directly; otherwise, io.EOF is returned and the status of
	// the stream may be discovered using RecvMsg.
	//
	// SendMsg blocks until:
	//   - There is sufficient flow control to schedule m with the transport, or
	//   - The stream is done, or
	//   - The stream breaks.
	//
	// SendMsg does not wait until the message is received by the server. An
	// untimely stream closure may result in lost messages. To ensure delivery,
	// users should ensure the RPC completed successfully using RecvMsg.
	//
	// It is safe to have a goroutine calling SendMsg and another goroutine
	// calling RecvMsg on the same stream at the same time, but it is not safe
	// to call SendMsg on the same stream in different goroutines. It is also
	// not safe to call CloseSend concurrently with SendMsg.
	SendMsg(m interface{}) error
	// RecvMsg blocks until it receives a message into m or the stream is
	// done. It returns io.EOF when the stream completes successfully. On
	// any other error, the stream is aborted and the error contains the RPC
	// status.
	//
	// It is safe to have a goroutine calling SendMsg and another goroutine
	// calling RecvMsg on the same stream at the same time, but it is not
	// safe to call RecvMsg on the same stream in different goroutines.
	RecvMsg(m interface{}) error
}

The grpc ClientStream support CloseSend to close the send direction of the stream(EOF).
There are a lot of implementation of go-micro client, some of them may not be able to support CloseSend behavior.
I used a workaround in my project by sending an empty object to present EOF, not a very good solution, but solved my issue.

@AuditeMarlow AuditeMarlow changed the title gRPC client stream example [FEATURE] Add gRPC client stream Aug 31, 2021
@AuditeMarlow AuditeMarlow changed the title [FEATURE] Add gRPC client stream [FEATURE] Support client streams Sep 3, 2021
@bootfirst
Copy link

What does "an empty object" mean?
I get "xxx is not type of *bytes.Frame or proto.Message"

@xpunch
Copy link
Member

xpunch commented Dec 23, 2021

Already support in latest v4.

@xpunch xpunch closed this as completed Dec 23, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants