Skip to content

Commit

Permalink
Make the CSub GetCollectEntries() RPC response streaming (#1865)
Browse files Browse the repository at this point in the history
* Fix Makefile proto target

- The deps.dev API proto file was removed in a6c67d3

Signed-off-by: Narsimham Chelluri (Narsa) <narsa@kusari.dev>

* Make GetCollectEntries() rpc have a streaming response

- also fix typo in CollectSubscriberServiceClient name

Signed-off-by: Narsimham Chelluri (Narsa) <narsa@kusari.dev>

* Bump up response message size

- In my testing the message size gets too big at ~55K pURLs.
- So I bumped message size up from 1K entries to 15K entries.
  This should allow plenty of room for large data source
  entries and also still result in far fewer roundtrips.

Signed-off-by: Narsimham Chelluri (Narsa) <narsa@kusari.dev>

* Add missing header comments

Signed-off-by: Narsimham Chelluri (Narsa) <narsa@kusari.dev>

* Make cb error more descriptive

Signed-off-by: Narsimham Chelluri (Narsa) <narsa@kusari.dev>

---------

Signed-off-by: Narsimham Chelluri (Narsa) <narsa@kusari.dev>
  • Loading branch information
nchelluri committed Apr 29, 2024
1 parent 46e8893 commit 6100427
Show file tree
Hide file tree
Showing 8 changed files with 279 additions and 111 deletions.
3 changes: 0 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,6 @@ proto:
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
pkg/collectsub/collectsub/collectsub.proto
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
pkg/handler/collector/deps_dev/internal/api.proto

# Remove temporary files
.PHONY: clean
Expand Down
19 changes: 15 additions & 4 deletions pkg/collectsub/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"crypto/tls"
"crypto/x509"
"fmt"
"io"

pb "github.com/guacsec/guac/pkg/collectsub/collectsub"
"google.golang.org/grpc"
Expand All @@ -34,7 +35,7 @@ type Client interface {
}

type client struct {
client pb.ColectSubscriberServiceClient
client pb.CollectSubscriberServiceClient
conn *grpc.ClientConn
}

Expand Down Expand Up @@ -72,7 +73,7 @@ func NewClient(opts CsubClientOptions) (Client, error) {
if err != nil {
return nil, err
}
c := pb.NewColectSubscriberServiceClient(conn)
c := pb.NewCollectSubscriberServiceClient(conn)

return &client{
client: c,
Expand All @@ -92,7 +93,7 @@ func (c *client) AddCollectEntries(ctx context.Context, entries []*pb.CollectEnt
return err
}
if !res.Success {
return fmt.Errorf("add collect entry unsuccessful")
return fmt.Errorf("add collect entries unsuccessful")
}
return nil
}
Expand All @@ -105,5 +106,15 @@ func (c *client) GetCollectEntries(ctx context.Context, filters []*pb.CollectEnt
return nil, err
}

return res.Entries, nil
var allEntries []*pb.CollectEntry
for {
entries, err := res.Recv()
if err == io.EOF {
return allEntries, nil
} else if err != nil {
return nil, err
}

allEntries = append(allEntries, entries.Entries...)
}
}
66 changes: 33 additions & 33 deletions pkg/collectsub/collectsub/collectsub.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions pkg/collectsub/collectsub/collectsub.proto
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ message CollectEntry {
int64 since_time = 3;
}

// rpc AddCollectEntry
// rpc AddCollectEntries
message AddCollectEntriesRequest {
repeated CollectEntry entries = 1;
}
Expand All @@ -42,7 +42,6 @@ message AddCollectEntriesResponse {
bool success = 1;
}


// rpc GetCollectEntries
message CollectEntryFilter {
CollectDataType type = 1;
Expand All @@ -59,7 +58,7 @@ message GetCollectEntriesResponse {
repeated CollectEntry entries = 1;
}

service ColectSubscriberService {
service CollectSubscriberService {
rpc AddCollectEntries(AddCollectEntriesRequest) returns (AddCollectEntriesResponse);
rpc GetCollectEntries (GetCollectEntriesRequest) returns (GetCollectEntriesResponse);
rpc GetCollectEntries (GetCollectEntriesRequest) returns (stream GetCollectEntriesResponse);
}
Loading

0 comments on commit 6100427

Please sign in to comment.