Skip to content

Commit

Permalink
Add support send/recv chunk for checkin protocol (#89)
Browse files Browse the repository at this point in the history
* Add support send/recv chunk for checkin.

* Fix current tests.

* Refactor chunking add tests.

* Add receive utils and more tests.

* Code review changes.
  • Loading branch information
blakerouse committed Dec 18, 2023
1 parent 21e4fd8 commit 8a1940e
Show file tree
Hide file tree
Showing 21 changed files with 1,566 additions and 277 deletions.
30 changes: 29 additions & 1 deletion elastic-agent-client.proto
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ service ElasticAgent {
//
// Use of the source field allows the input configurations to evolve without needing to modify
// the control protocol itself. In some cases commonly used or important fields are extracted as
// a dedicated message type, but these definitions do not comletely define the contents of the
// a dedicated message type, but these definitions do not completely define the contents of the
// source field which is free to contain additional fields.
rpc CheckinV2(stream CheckinObserved) returns (stream CheckinExpected);

Expand All @@ -62,6 +62,12 @@ service ElasticAgent {
rpc Checkin(stream StateObserved) returns (stream StateExpected);
}

// Features that the connection between the client and the server supports.
enum ConnectionSupports {
// Checkin chunking support.
CheckinChunking = 0;
}

// State codes for the current state.
enum State {
// STARTING is an optional observed state indicating the unit is doing work to start before
Expand Down Expand Up @@ -282,6 +288,13 @@ message CheckinExpected {
// Index or revision of the expected component configuration. When the expected configuration
// changes the agent will increment this number and the Component field will be populated.
uint64 component_idx = 6;

// When a units timestamp is provided then the set of units could not all fit inside this single message
// and it was split across multiple messages. Each message chunk must have the same units timestamp, in
// the case that the client gets a new message with a different timestamp and its newer than the other
// timestamp then it should take that new message chunk as a start of a new message set. To finish the a
// set of messages with the same timestamp, the last chunk should be an empty set of units.
google.protobuf.Timestamp units_timestamp = 7;
}

// Observed status for a unit.
Expand Down Expand Up @@ -338,6 +351,17 @@ message CheckinObserved {

// Index or revision of the currently component configuration.
uint64 component_idx = 6;

// When a units timestamp is provided then the set of units could not all fit inside this single message
// and it was split across multiple messages. Each message chunk must have the same units timestamp, in
// the case that the client gets a new message with a different timestamp and its newer than the other
// timestamp then it should take that new message chunk as a start of a new message set. To finish the a
// set of messages with the same timestamp, the last chunk should be an empty set of units.
google.protobuf.Timestamp units_timestamp = 7;

// Supports provides information to the agent about extra features this client supports. Should always be included
// on first checkin, and not again unless upon reconnect.
repeated ConnectionSupports supports = 8;
}

// A action request is streamed from the Elastic Agent to the application so an action can be performed
Expand Down Expand Up @@ -449,4 +473,8 @@ message ConnInfo {
bytes peer_key = 6;
// Allowed services that spawned process can use. (only used in V2)
repeated ConnInfoServices services = 7;
// Supports provides information to the client about extra features this server supports.
repeated ConnectionSupports supports = 8;
// Maximum message size that the client can use.
uint32 max_message_size = 9;
}
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
module github.com/elastic/elastic-agent-client/v7

go 1.18
go 1.20

require (
github.com/gofrs/uuid v4.2.0+incompatible
github.com/golang/protobuf v1.5.3
github.com/google/go-cmp v0.5.9
github.com/google/pprof v0.0.0-20230426061923-93006964c1fc
github.com/hashicorp/go-multierror v1.1.1
github.com/magefile/mage v1.13.0
github.com/stretchr/testify v1.7.0
golang.org/x/exp v0.0.0-20231127185646-65229373498e
google.golang.org/grpc v1.56.3
google.golang.org/protobuf v1.30.0
)
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/pprof v0.0.0-20230426061923-93006964c1fc h1:AGDHt781oIcL4EFk7cPnvBUYTwU8BEU6GDTO3ZMn1sE=
github.com/google/pprof v0.0.0-20230426061923-93006964c1fc/go.mod h1:79YE0hCXdHag9sBkw2o+N/YnZtTkXi0UT9Nnixa5eYk=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
Expand All @@ -29,6 +30,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/exp v0.0.0-20231127185646-65229373498e h1:Gvh4YaCaXNs6dKTlfgismwWZKyjVZXwOPfIyUaqU3No=
golang.org/x/exp v0.0.0-20231127185646-65229373498e/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
Expand Down
2 changes: 1 addition & 1 deletion pkg/client/artifact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestArtifact(t *testing.T) {
var errs []error
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
client := NewV2(fmt.Sprintf(":%d", srv.Port), token, VersionInfo{}, grpc.WithTransportCredentials(insecure.NewCredentials()))
client := NewV2(fmt.Sprintf(":%d", srv.Port), token, VersionInfo{}, WithGRPCDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials())))
storeErrors(ctx, client, &errs, &errsMu)

var unitsMu sync.Mutex
Expand Down
152 changes: 152 additions & 0 deletions pkg/client/chunk/expected.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package chunk

import (
"time"

"golang.org/x/exp/slices"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
protobuf "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/elastic/elastic-agent-client/v7/pkg/proto"
)

// Expected chunks `proto.CheckinExpected` message into multiple chunks to be sent across the protocol.
func Expected(msg *proto.CheckinExpected, maxSize int, opts ...Option) ([]*proto.CheckinExpected, error) {
var options options
options.timestamp = time.Now() // timestamp used for chunk set
for _, opt := range opts {
opt(&options)
}

s := protobuf.Size(msg)
if s <= maxSize || len(msg.Units) <= 1 {
// fits so no chunking needed or has 0 or 1 units which cannot be chunked
return []*proto.CheckinExpected{msg}, nil
}

msgs := make([]*proto.CheckinExpected, 0, 3) // start at 3 minimum

// a single unit is the smallest a chunk can be
// pre-calculate the size and ensure that a single unit is less than the maxSize
bySize := make([]expectedBySize, len(msg.Units))
for i, u := range msg.Units {
bySize[i].unit = u
bySize[i].size = protobuf.Size(u)
// >= is used because even if it's at the maxSize, with overhead
// it will still be too big even if it's at the exact maxSize
if bySize[i].size >= maxSize {
return nil, status.Errorf(
codes.ResourceExhausted,
"unable to chunk proto.CheckinExpected the unit %s is larger than max (%d vs. %d)",
u.Id, bySize[i].size, maxSize)
}
}

// sort the smallest units first, this ensures that the first chunk that includes extra
// fields uses the smallest unit to ensure that it all fits
slices.SortStableFunc(bySize, func(a, b expectedBySize) int {
return a.size - b.size
})

// first message all fields are set; except units is made smaller
m := shallowCopyCheckinExpected(msg)
m.Units = make([]*proto.UnitExpected, 0, 1)
m.Units = append(m.Units, bySize[0].unit)
m.UnitsTimestamp = timestamppb.New(options.timestamp)
s = protobuf.Size(m)
if s >= maxSize {
// not possible even for the first chunk to fit
return nil, status.Errorf(
codes.ResourceExhausted,
"unable to chunk proto.CheckinExpected the first chunk with unit %s is larger than max (%d vs. %d)",
m.Units[0].Id, s, maxSize)
}

// keep adding units until it doesn't fit
for nextUnit := 1; s < maxSize && nextUnit < len(bySize); nextUnit++ {
us := bySize[nextUnit]
if s+us.size < maxSize {
// unit fits add it
m.Units = append(m.Units, us.unit)
s += us.size
} else {
// doesn't fit, create a new chunk
msgs = append(msgs, m)
m = &proto.CheckinExpected{}
m.UnitsTimestamp = timestamppb.New(options.timestamp)
m.Units = make([]*proto.UnitExpected, 0, 1)
m.Units = append(m.Units, us.unit)
s = protobuf.Size(m)
}
}
msgs = append(msgs, m)

// all chunks created, create the empty chunk
m = &proto.CheckinExpected{}
m.UnitsTimestamp = timestamppb.New(options.timestamp)
m.Units = make([]*proto.UnitExpected, 0)
msgs = append(msgs, m)
return msgs, nil
}

// CheckinExpectedReceiver provides a Recv interface to receive proto.CheckinExpected messages.
type CheckinExpectedReceiver interface {
Recv() (*proto.CheckinExpected, error)
}

// RecvExpected handles the receiving of chunked proto.CheckinObjected.
func RecvExpected(recv CheckinExpectedReceiver) (*proto.CheckinExpected, error) {
var first *proto.CheckinExpected
for {
msg, err := recv.Recv()
if err != nil {
return nil, err
}
if msg.UnitsTimestamp == nil {
// all included in a single message
return msg, nil
}
if first == nil {
// first message in batch
first = msg
} else if first.UnitsTimestamp.AsTime() != msg.UnitsTimestamp.AsTime() {
// only used if the new timestamp is newer
if first.UnitsTimestamp.AsTime().After(msg.UnitsTimestamp.AsTime()) {
// not newer so we ignore the message
continue
}
// different batch; restart
first = msg
}
if len(msg.Units) == 0 {
// ending match message
return first, nil
}
if first != msg {
first.Units = append(first.Units, msg.Units...)
}
}
}

func shallowCopyCheckinExpected(msg *proto.CheckinExpected) *proto.CheckinExpected {
return &proto.CheckinExpected{
AgentInfo: msg.AgentInfo,
Features: msg.Features,
FeaturesIdx: msg.FeaturesIdx,
Component: msg.Component,
ComponentIdx: msg.ComponentIdx,
Units: msg.Units,
UnitsTimestamp: msg.UnitsTimestamp,
}
}

type expectedBySize struct {
unit *proto.UnitExpected
size int
}
Loading

0 comments on commit 8a1940e

Please sign in to comment.