Skip to content

Commit

Permalink
Merge pull request #5 from crossoverJie/feature/stream-20220114
Browse files Browse the repository at this point in the history
Feature/stream 20220114
  • Loading branch information
crossoverJie committed Feb 13, 2022
2 parents 97ffe9c + afe28c5 commit 7b1bac8
Show file tree
Hide file tree
Showing 11 changed files with 800 additions and 65 deletions.
2 changes: 1 addition & 1 deletion Makefile
Expand Up @@ -46,7 +46,7 @@ release:
gen-go-proto:
@protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
reflect/gen/user.proto
reflect/gen/test.proto

gen-log-proto:
@protoc --go_out=. --go_opt=paths=source_relative \
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -10,7 +10,7 @@ require (
github.com/golang/protobuf v1.5.2
github.com/jhump/protoreflect v1.10.1
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.5.1 // indirect
github.com/stretchr/testify v1.5.1
github.com/urfave/cli/v2 v2.3.0
google.golang.org/grpc v1.38.0
google.golang.org/protobuf v1.27.1
Expand Down
10 changes: 0 additions & 10 deletions go.sum
Expand Up @@ -2,13 +2,10 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT
fyne.io/fyne/v2 v2.1.1 h1:3p39SwQ/rBiYODVYI4ggTuwMufWYmqaRMJvXTFg7jSw=
fyne.io/fyne/v2 v2.1.1/go.mod h1:c1vwI38Ebd0dAdxVa6H1Pj6/+cK1xtDy61+I31g+s14=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/toml v0.4.1 h1:GaI7EiDXDRfa8VshkTj7Fym7ha+y8/XxIgD2okUIjLw=
github.com/BurntSushi/toml v0.4.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/Kodeworks/golang-image-ico v0.0.0-20141118225523-73f0f4cfade9 h1:1ltqoej5GtaWF8jaiA49HwsZD459jqm9YFz9ZtMFpQA=
github.com/Kodeworks/golang-image-ico v0.0.0-20141118225523-73f0f4cfade9/go.mod h1:7uhhqiBaR4CpN0k9rMjOtjpcfGd6DG2m04zQxKnWQ0I=
github.com/VividCortex/ewma v1.1.1 h1:MnEK4VOv6n0RSY4vtRe3h11qjxL3+t0B8yOL8iMXdcM=
github.com/VividCortex/ewma v1.1.1/go.mod h1:2Tkkvm3sRDVXaiyucHiACn4cqf7DpdyLvmxzcbUokwA=
github.com/akavel/rsrc v0.8.0 h1:zjWn7ukO9Kc5Q62DOJCcxGpXC18RawVtYAGdz2aLlfw=
github.com/akavel/rsrc v0.8.0/go.mod h1:uLoCtb9J+EyAqh+26kdrTgmzRBFPGOolLWKpdxkKq+c=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cheggaaa/pb/v3 v3.0.5 h1:lmZOti7CraK9RSjzExsY53+WWfub9Qv13B5m4ptEoPE=
Expand Down Expand Up @@ -37,7 +34,6 @@ github.com/go-gl/gl v0.0.0-20210813123233-e4099ee2221f h1:s0O46d8fPwk9kU4k1jj76w
github.com/go-gl/gl v0.0.0-20210813123233-e4099ee2221f/go.mod h1:wjpnOv6ONl2SuJSxqCPVaPZibGFdSci9HFocT9qtVYM=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20210410170116-ea3d685f79fb h1:T6gaWBvRzJjuOrdCtg8fXXjKai2xSDqWTcKFUPuw8Tw=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20210410170116-ea3d685f79fb/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-ole/go-ole v1.2.5 h1:t4MGB5xEDZvXI+0rMjjsfBsD7yAgp/s9ZDkL1JndXwY=
github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/godbus/dbus/v5 v5.0.4 h1:9349emZab16e7zQvpmsbtjc18ykshndd8y2PG3sgJbA=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
Expand Down Expand Up @@ -67,18 +63,15 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gordonklaus/ineffassign v0.0.0-20200309095847-7953dde2c7bf/go.mod h1:cuNKsD1zp2v6XfE/orVX2QE1LC+i254ceGcVeDT3pTU=
github.com/jackmordaunt/icns v0.0.0-20181231085925-4f16af745526 h1:NfuKjkj/Xc2z1xZIj+EmNCm5p1nKJPyw3F4E20usXvg=
github.com/jackmordaunt/icns v0.0.0-20181231085925-4f16af745526/go.mod h1:UQkeMHVoNcyXYq9otUupF7/h/2tmHlhrS2zw7ZVvUqc=
github.com/jhump/protoreflect v1.10.1 h1:iH+UZfsbRE6vpyZH7asAjTPWJf7RJbpZ9j/N3lDlKs0=
github.com/jhump/protoreflect v1.10.1/go.mod h1:7GcYQDdMU/O/BBrl/cX6PNHpXh6cenjd8pneu5yW7Tg=
github.com/josephspurrier/goversioninfo v0.0.0-20200309025242-14b0ab84c6ca h1:ozPUX9TKQZVek4lZWYRsQo7uS8vJ+q4OOHvRhHiCLfU=
github.com/josephspurrier/goversioninfo v0.0.0-20200309025242-14b0ab84c6ca/go.mod h1:eJTEwMjXb7kZ633hO3Ln9mBUCOjX2+FlTljvpl9SYdE=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lucor/goinfo v0.0.0-20210802170112-c078a2b0f08b h1:tLSDWcFhT0WRlnsFszh4iaFTexWF8mmccGTk88Siq7Q=
github.com/lucor/goinfo v0.0.0-20210802170112-c078a2b0f08b/go.mod h1:PRq09yoB+Q2OJReAmwzKivcYyremnibWGbK7WfftHzc=
github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-colorable v0.1.9 h1:sqDoxXbdeALODt0DAeJCVp38ps9ZogZEAXjus69YV3U=
Expand All @@ -89,7 +82,6 @@ github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/mattn/go-runewidth v0.0.7 h1:Ei8KR0497xHyKJPAv59M1dkC+rOZCMBJ+t3fZ+twI54=
github.com/mattn/go-runewidth v0.0.7/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 h1:zYyBkD/k9seD2A7fsi6Oo2LfFZAehjjQMERAvZLEDnQ=
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646/go.mod h1:jpp1/29i3P1S/RLdc7JQKbRpFeM1dOBd8T9ki5s+AY8=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
Expand Down Expand Up @@ -135,7 +127,6 @@ golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHl
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand Down Expand Up @@ -181,7 +172,6 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20191130070609-6e064ea0cf2d/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200522201501-cb1345f3a375/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200717024301-6ddee64345a6/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.1.5 h1:ouewzE6p+/VEB31YYnTbEJdi8pFqKp4P4n85vwo3DHA=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
258 changes: 258 additions & 0 deletions gui/call/stream_call.go
@@ -0,0 +1,258 @@
package call

import (
"context"
"encoding/json"
"fmt"
"fyne.io/fyne/v2"
"fyne.io/fyne/v2/container"
"fyne.io/fyne/v2/theme"
"fyne.io/fyne/v2/widget"
"github.com/crossoverJie/ptg/reflect"
"github.com/golang/protobuf/proto"
"github.com/jhump/protoreflect/desc"
"github.com/jhump/protoreflect/dynamic/grpcdynamic"
"io"
)

type (
Call struct {
parse *reflect.ParseReflect
responseEntry, requestEntry *widget.Entry
processBar *widget.ProgressBarInfinite
mds *desc.MethodDescriptor
stub grpcdynamic.Stub
errorHandle errorHandle
}
errorHandle func(window fyne.Window, processBar *widget.ProgressBarInfinite, err error)
)

func NewCallBuilder() *Call {
return &Call{}
}
func (c *Call) Parse(parse *reflect.ParseReflect) *Call {
c.parse = parse
return c
}
func (c *Call) ResponseEntry(responseEntry *widget.Entry) *Call {
c.responseEntry = responseEntry
return c
}
func (c *Call) Mds(mds *desc.MethodDescriptor) *Call {
c.mds = mds
return c
}
func (c *Call) Stub(stub grpcdynamic.Stub) *Call {
c.stub = stub
return c
}
func (c *Call) RequestEntry(requestEntry *widget.Entry) *Call {
c.requestEntry = requestEntry
return c
}
func (c *Call) ProcessBar(processBar *widget.ProgressBarInfinite) *Call {
c.processBar = processBar
return c
}
func (c *Call) ErrorHandle(e errorHandle) *Call {
c.errorHandle = e
return c
}

func (c *Call) Run(ctx context.Context) (string, error) {
c.responseEntry.SetText("")
mds := c.mds
parse := c.parse
responseEntry := c.responseEntry
stub := c.stub
data := c.requestEntry.Text

if !mds.IsClientStreaming() && !mds.IsServerStreaming() {
// unary RPC
rpc, err := parse.InvokeRpc(ctx, stub, mds, data)
if err != nil {
return "", err
}
viewCallBack := &unaryCallBack{}
return viewCallBack.ViewCallBack(rpc, responseEntry), nil
}

if !mds.IsClientStreaming() && mds.IsServerStreaming() {
// server stream
rpc, err := parse.InvokeServerStreamRpc(ctx, stub, mds, data)
if err != nil {
return "", err
}
viewCallBack := &serverStreamCallBack{
ch: make(chan proto.Message, 10),
responseEntry: responseEntry,
}
viewCallBack.receive()
for {
msg, err := rpc.RecvMsg()
fmt.Printf("stream call %s \n", msg)
if err == io.EOF {
close(viewCallBack.ch)
}
if err != nil {
return "", nil
}
viewCallBack.ViewCallBack(msg, responseEntry)
}

}

if mds.IsClientStreaming() && !mds.IsServerStreaming() {
// client stream
rpc, err := parse.InvokeClientStreamRpc(ctx, stub, mds)
if err != nil {
return "", err
}
w := fyne.CurrentApp().NewWindow("Client stream call")
w.Resize(fyne.NewSize(300, 100))
request := widget.NewMultiLineEntry()
request.SetText(data)
var totalRequest string
requestButton := widget.NewButtonWithIcon("Push", theme.MediaPlayIcon(), func() {
messages, err := reflect.CreatePayloadsFromJSON(mds, request.Text)
if err != nil {
c.errorHandle(w, c.processBar, err)
return
}
rpc.SendMsg(messages[0])
totalRequest += request.Text + "\n"
})
finishButton := widget.NewButtonWithIcon("Finish", theme.ConfirmIcon(), func() {
receive, err := rpc.CloseAndReceive()
if err != nil {
c.errorHandle(w, c.processBar, err)
return
}
marshalIndent, _ := json.MarshalIndent(receive, "", "\t")
c.responseEntry.SetText(string(marshalIndent))
w.Close()
c.requestEntry.SetText(totalRequest)

})
w.SetContent(container.NewVBox(request, requestButton, finishButton))
w.CenterOnScreen()
w.Show()

}
if mds.IsClientStreaming() && mds.IsServerStreaming() {
// bidi stream
rpc, err := parse.InvokeBidiStreamRpc(ctx, stub, mds)
if err != nil {
return "", err
}
w := fyne.CurrentApp().NewWindow("Bidi stream call")
w.Resize(fyne.NewSize(300, 100))
request := widget.NewMultiLineEntry()
request.SetText(data)
var totalRequest string

streamCallBack := bidiStreamCallBack{
ch: make(chan proto.Message, 10),
responseEntry: responseEntry,
}
streamCallBack.receive()

requestButton := widget.NewButtonWithIcon("Push", theme.MediaPlayIcon(), func() {
messages, err := reflect.CreatePayloadsFromJSON(mds, request.Text)
if err != nil {
c.errorHandle(w, c.processBar, err)
return
}
rpc.SendMsg(messages[0])
totalRequest += request.Text + "\n"

receive, _ := rpc.RecvMsg()
streamCallBack.ViewCallBack(receive)
})
finishButton := widget.NewButtonWithIcon("Finish", theme.ConfirmIcon(), func() {
err := rpc.CloseSend()
if err != nil {
c.errorHandle(w, c.processBar, err)
return
}
w.Close()
c.requestEntry.SetText(totalRequest)

})
w.SetContent(container.NewVBox(request, requestButton, finishButton))
w.CenterOnScreen()
w.Show()
}

return "", nil
}

type unaryCallBack struct{}

func (u unaryCallBack) ViewCallBack(message proto.Message, responseEntry *widget.Entry) string {
marshalIndent, _ := json.MarshalIndent(message, "", "\t")
responseEntry.SetText(string(marshalIndent))
return string(marshalIndent)
}

type serverStreamCallBack struct {
ch chan proto.Message
responseEntry *widget.Entry
}

func (s *serverStreamCallBack) ViewCallBack(message proto.Message, responseEntry *widget.Entry) string {
s.ch <- message
return ""
}

func (s *serverStreamCallBack) receive() {
go func() {
for {
select {
case message, ok := <-s.ch:
if !ok {
fmt.Println("break")
return
}
marshalIndent, _ := json.MarshalIndent(message, "", "\t")
s.responseEntry.SetText(s.responseEntry.Text + string(marshalIndent) + "\n")
}
}
}()

}

type clientStreamCallBack struct{}

func (u clientStreamCallBack) ViewCallBack(message proto.Message, responseEntry *widget.Entry) string {
marshalIndent, _ := json.MarshalIndent(message, "", "\t")
responseEntry.SetText(string(marshalIndent))
return string(marshalIndent)
}

type bidiStreamCallBack struct {
ch chan proto.Message
responseEntry *widget.Entry
}

func (b bidiStreamCallBack) ViewCallBack(message proto.Message) string {
b.ch <- message
return ""
}

func (b *bidiStreamCallBack) receive() {
go func() {
for {
select {
case message, ok := <-b.ch:
if !ok {
fmt.Println("break")
return
}
marshalIndent, _ := json.MarshalIndent(message, "", "\t")
b.responseEntry.SetText(b.responseEntry.Text + string(marshalIndent) + "\n")
}
}
}()

}

0 comments on commit 7b1bac8

Please sign in to comment.