Skip to content

Commit

Permalink
[breaking] Eliminate BoardListWatchRequest Interrupt from the grpc …
Browse files Browse the repository at this point in the history
…api and make it server streaming only (#2330)
  • Loading branch information
alessio-perugini committed Sep 25, 2023
1 parent 6ebfb1d commit 5a4f48b
Show file tree
Hide file tree
Showing 12 changed files with 302 additions and 318 deletions.
10 changes: 2 additions & 8 deletions client_example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,16 +657,13 @@ func callBoardList(client rpc.ArduinoCoreServiceClient, instance *rpc.Instance)
}

func callBoardListWatch(client rpc.ArduinoCoreServiceClient, instance *rpc.Instance) {
watchClient, err := client.BoardListWatch(context.Background())
req := &rpc.BoardListWatchRequest{Instance: instance}
watchClient, err := client.BoardListWatch(context.Background(), req)
if err != nil {
log.Fatalf("Board list watch error: %s\n", err)
}

// Start the watcher
watchClient.Send(&rpc.BoardListWatchRequest{
Instance: instance,
})

go func() {
for {
res, err := watchClient.Recv()
Expand All @@ -693,9 +690,6 @@ func callBoardListWatch(client rpc.ArduinoCoreServiceClient, instance *rpc.Insta
// Watch for 10 seconds and then interrupts
timer := time.NewTicker(time.Duration(10 * time.Second))
<-timer.C
watchClient.Send(&rpc.BoardListWatchRequest{
Interrupt: true,
})
}

func callPlatformUnInstall(client rpc.ArduinoCoreServiceClient, instance *rpc.Instance) {
Expand Down
11 changes: 5 additions & 6 deletions commands/board/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,23 +257,22 @@ func hasMatchingBoard(b *rpc.DetectedPort, fqbnFilter *cores.FQBN) bool {
}

// Watch returns a channel that receives boards connection and disconnection events.
// It also returns a callback function that must be used to stop and dispose the watch.
func Watch(req *rpc.BoardListWatchRequest) (<-chan *rpc.BoardListWatchResponse, func(), error) {
func Watch(ctx context.Context, req *rpc.BoardListWatchRequest) (<-chan *rpc.BoardListWatchResponse, error) {
pme, release := commands.GetPackageManagerExplorer(req)
if pme == nil {
return nil, nil, &arduino.InvalidInstanceError{}
return nil, &arduino.InvalidInstanceError{}
}
defer release()
dm := pme.DiscoveryManager()

watcher, err := dm.Watch()
if err != nil {
return nil, nil, err
return nil, err
}

ctx, cancel := context.WithCancel(context.Background())
go func() {
<-ctx.Done()
logrus.Trace("closed watch")
watcher.Close()
}()

Expand Down Expand Up @@ -301,5 +300,5 @@ func Watch(req *rpc.BoardListWatchRequest) (<-chan *rpc.BoardListWatchResponse,
}
}()

return outChan, cancel, nil
return outChan, nil
}
38 changes: 4 additions & 34 deletions commands/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,52 +86,22 @@ func (s *ArduinoCoreServerImpl) BoardSearch(ctx context.Context, req *rpc.BoardS
}

// BoardListWatch FIXMEDOC
func (s *ArduinoCoreServerImpl) BoardListWatch(stream rpc.ArduinoCoreService_BoardListWatchServer) error {
func (s *ArduinoCoreServerImpl) BoardListWatch(req *rpc.BoardListWatchRequest, stream rpc.ArduinoCoreService_BoardListWatchServer) error {
syncSend := NewSynchronizedSend(stream.Send)
msg, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}

if msg.Instance == nil {
err = fmt.Errorf(tr("no instance specified"))
if req.Instance == nil {
err := fmt.Errorf(tr("no instance specified"))
syncSend.Send(&rpc.BoardListWatchResponse{
EventType: "error",
Error: err.Error(),
})
return err
}

eventsChan, closeWatcher, err := board.Watch(msg)
eventsChan, err := board.Watch(stream.Context(), req)
if err != nil {
return convertErrorToRPCStatus(err)
}

go func() {
defer closeWatcher()
for {
msg, err := stream.Recv()
// Handle client closing the stream and eventual errors
if err == io.EOF {
logrus.Info("boards watcher stream closed")
return
}
if err != nil {
logrus.Infof("interrupting boards watcher: %v", err)
return
}

// Message received, does the client want to interrupt?
if msg != nil && msg.Interrupt {
logrus.Info("boards watcher interrupted by client")
return
}
}
}()

for event := range eventsChan {
if err := syncSend.Send(event); err != nil {
logrus.Infof("sending board watch message: %v", err)
Expand Down
31 changes: 31 additions & 0 deletions docs/UPGRADING.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,37 @@

Here you can find a list of migration guides to handle breaking changes between releases of the CLI.

## 0.35.0

### The gRPC `cc.arduino.cli.commands.v1.BoardListWatchRequest` command request has been changed.

The gRPC message `BoardListWatchRequest` has been changed from:

```
message BoardListWatchRequest {
// Arduino Core Service instance from the `Init` response.
Instance instance = 1;
// Set this to true to stop the discovery process
bool interrupt = 2;
}
```

to

```
message BoardListWatchRequest {
// Arduino Core Service instance from the `Init` response.
Instance instance = 1;
}
```

### The gRPC `cc.arduino.cli.commands.v1.BoardListWatch` service is now server stream only.

```
rpc BoardListWatch(BoardListWatchRequest)
returns (stream BoardListWatchResponse);
```

## 0.34.0

### The gRPC `cc.arduino.cli.commands.v1.UploadRepsonse` command response has been changed.
Expand Down
4 changes: 2 additions & 2 deletions internal/cli/board/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package board

import (
"context"
"errors"
"fmt"
"os"
Expand Down Expand Up @@ -84,11 +85,10 @@ func runListCommand(watch bool, timeout int64, fqbn string) {
}

func watchList(inst *rpc.Instance) {
eventsChan, closeCB, err := board.Watch(&rpc.BoardListWatchRequest{Instance: inst})
eventsChan, err := board.Watch(context.Background(), &rpc.BoardListWatchRequest{Instance: inst})
if err != nil {
feedback.Fatal(tr("Error detecting boards: %v", err), feedback.ErrNetwork)
}
defer closeCB()

// This is done to avoid printing the header each time a new event is received
if feedback.GetFormat() == feedback.Text {
Expand Down
6 changes: 3 additions & 3 deletions internal/integrationtest/arduino-cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,16 +362,16 @@ func (inst *ArduinoCLIInstance) BoardList(timeout time.Duration) (*commands.Boar
}

// BoardListWatch calls the "BoardListWatch" gRPC method.
func (inst *ArduinoCLIInstance) BoardListWatch() (commands.ArduinoCoreService_BoardListWatchClient, error) {
func (inst *ArduinoCLIInstance) BoardListWatch(ctx context.Context) (commands.ArduinoCoreService_BoardListWatchClient, error) {
boardListWatchReq := &commands.BoardListWatchRequest{
Instance: inst.instance,
}
logCallf(">>> BoardListWatch(%v)\n", boardListWatchReq)
watcher, err := inst.cli.daemonClient.BoardListWatch(context.Background())
watcher, err := inst.cli.daemonClient.BoardListWatch(ctx, boardListWatchReq)
if err != nil {
return watcher, err
}
return watcher, watcher.Send(boardListWatchReq)
return watcher, nil
}

// PlatformInstall calls the "PlatformInstall" gRPC method.
Expand Down
22 changes: 15 additions & 7 deletions internal/integrationtest/daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"github.com/arduino/arduino-cli/internal/integrationtest"
"github.com/arduino/arduino-cli/rpc/cc/arduino/cli/commands/v1"
"github.com/arduino/go-paths-helper"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/stretchr/testify/require"
)
Expand All @@ -52,28 +54,34 @@ func TestArduinoCliDaemon(t *testing.T) {
require.NoError(t, err)
fmt.Printf("Got boardlist response with %d ports\n", len(boardListResp.GetPorts()))

// When the client closes the connection we expect that the streaming from the server closes.
testWatcher := func() {
// Run watcher
watcher, err := grpcInst.BoardListWatch()
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
watcher, err := grpcInst.BoardListWatch(ctx)
require.NoError(t, err)
watcherCanceldCh := make(chan struct{})
go func() {
defer cancel()
for {
msg, err := watcher.Recv()
if err == io.EOF {
fmt.Println("Watcher EOF")
return
}
require.Empty(t, msg.Error, "Board list watcher returned an error")
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled {
fmt.Println("Watcher canceled")
watcherCanceldCh <- struct{}{}
return
}
require.NoError(t, err, "BoardListWatch grpc call returned an error")
fmt.Printf("WATCH> %v\n", msg)
require.Empty(t, msg.Error, "Board list watcher returned an error")
fmt.Printf("WATCH> %v %v\n", msg, err)
}
}()
time.Sleep(time.Second)
require.NoError(t, watcher.CloseSend())
cancel()
select {
case <-ctx.Done():
case <-watcherCanceldCh:
// all right!
case <-time.After(time.Second):
require.Fail(t, "BoardListWatch didn't close")
Expand Down
95 changes: 42 additions & 53 deletions rpc/cc/arduino/cli/commands/v1/board.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions rpc/cc/arduino/cli/commands/v1/board.proto
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,6 @@ message BoardListAllResponse {
message BoardListWatchRequest {
// Arduino Core Service instance from the `Init` response.
Instance instance = 1;
// Set this to true to stop the discovery process
bool interrupt = 2;
}

message BoardListWatchResponse {
Expand Down

0 comments on commit 5a4f48b

Please sign in to comment.