Skip to content

Commit

Permalink
[Fleet] Support download rate in string with unit format (#3677)
Browse files Browse the repository at this point in the history
(cherry picked from commit b04157a)
  • Loading branch information
nchaulet authored and mergify[bot] committed Jul 4, 2024
1 parent d00da29 commit bd2c6eb
Show file tree
Hide file tree
Showing 7 changed files with 484 additions and 277 deletions.
402 changes: 201 additions & 201 deletions NOTICE.txt

Large diffs are not rendered by default.

31 changes: 31 additions & 0 deletions changelog/fragments/1720021515-fix_download_rate.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: bug-fix

# Change summary; a 80ish characters long description of the change.
summary: Fix Download rate parsing

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
description: Fix checkin API to API to temporarly support download_rate as string with unit per second (example 123MBps) as elastic-agent is currently sending this.

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: fleet-server

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/fleet-server/pull/3677
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
#issue: https://github.com/owner/repo/1234
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.21
require (
github.com/Pallinder/go-randomdata v1.2.0
github.com/dgraph-io/ristretto v0.1.1
github.com/docker/go-units v0.5.0
github.com/elastic/elastic-agent-client/v7 v7.13.0
github.com/elastic/elastic-agent-libs v0.9.13
github.com/elastic/elastic-agent-system-metrics v0.10.3
Expand Down Expand Up @@ -48,7 +49,6 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect
github.com/elastic/go-structform v0.0.10 // indirect
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/api/handleCheckin.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func (ct *CheckinT) validateRequest(zlog zerolog.Logger, w http.ResponseWriter,
// Compare local_metadata content and update if different
rawMeta, err := parseMeta(zlog, agent, &req)
if err != nil {
return val, err
return val, &BadRequestErr{msg: "unable to parse meta", nextErr: err}
}

// Compare agent_components content and update if different
Expand Down
187 changes: 113 additions & 74 deletions internal/pkg/api/handleCheckin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,80 +512,119 @@ func TestProcessUpgradeDetails(t *testing.T) {
return mCache
},
err: nil,
}, {
name: "upgrade downloading action in cache no metadata",
agent: &model.Agent{ESDocument: esd, Agent: &model.AgentMetadata{ID: "test-agent"}},
details: &UpgradeDetails{
ActionId: "test-action",
State: UpgradeDetailsStateUPGDOWNLOADING,
},
bulk: func() *ftesting.MockBulk {
mBulk := ftesting.NewMockBulk()
mBulk.On("Update", mock.Anything, dl.FleetAgents, "doc-ID", mock.Anything, mock.Anything, mock.Anything).Return(nil)
return mBulk
},
cache: func() *testcache.MockCache {
mCache := testcache.NewMockCache()
mCache.On("GetAction", "test-action").Return(model.Action{}, true)
return mCache
},
err: nil,
}, {
name: "upgrade downloading action in cache wrong metadata attribute present",
agent: &model.Agent{ESDocument: esd, Agent: &model.AgentMetadata{ID: "test-agent"}},
details: &UpgradeDetails{
ActionId: "test-action",
State: UpgradeDetailsStateUPGDOWNLOADING,
Metadata: &UpgradeDetails_Metadata{json.RawMessage(`{"scheduled_at":"2023-01-02T12:00:00Z"}`)},
},
bulk: func() *ftesting.MockBulk {
mBulk := ftesting.NewMockBulk()
mBulk.On("Update", mock.Anything, dl.FleetAgents, "doc-ID", mock.Anything, mock.Anything, mock.Anything).Return(nil)
return mBulk
},
cache: func() *testcache.MockCache {
mCache := testcache.NewMockCache()
mCache.On("GetAction", "test-action").Return(model.Action{}, true)
return mCache
},
err: nil,
}, {
name: "upgrade failed action in cache",
agent: &model.Agent{ESDocument: esd, Agent: &model.AgentMetadata{ID: "test-agent"}},
details: &UpgradeDetails{
ActionId: "test-action",
State: UpgradeDetailsStateUPGFAILED,
Metadata: &UpgradeDetails_Metadata{json.RawMessage(`{"error_msg":"failed"}`)},
},
bulk: func() *ftesting.MockBulk {
mBulk := ftesting.NewMockBulk()
mBulk.On("Update", mock.Anything, dl.FleetAgents, "doc-ID", mock.Anything, mock.Anything, mock.Anything).Return(nil)
return mBulk
},
cache: func() *testcache.MockCache {
mCache := testcache.NewMockCache()
mCache.On("GetAction", "test-action").Return(model.Action{}, true)
return mCache
},
err: nil,
}, {
name: "upgrade failed action in cache empty error_msg",
agent: &model.Agent{ESDocument: esd, Agent: &model.AgentMetadata{ID: "test-agent"}},
details: &UpgradeDetails{
ActionId: "test-action",
State: UpgradeDetailsStateUPGFAILED,
Metadata: &UpgradeDetails_Metadata{json.RawMessage(`{"error_msg":""}`)},
},
bulk: func() *ftesting.MockBulk {
return ftesting.NewMockBulk()
},
cache: func() *testcache.MockCache {
mCache := testcache.NewMockCache()
mCache.On("GetAction", "test-action").Return(model.Action{}, true)
return mCache
},
err: ErrInvalidUpgradeMetadata,
}}
},
{
name: "upgrade downloading action in cache, download rate in bytes",
agent: &model.Agent{ESDocument: esd, Agent: &model.AgentMetadata{ID: "test-agent"}},
details: &UpgradeDetails{
ActionId: "test-action",
State: UpgradeDetailsStateUPGDOWNLOADING,
Metadata: &UpgradeDetails_Metadata{json.RawMessage(`{"download_percent":12.3, "download_rate": 1000000}`)},
},
bulk: func() *ftesting.MockBulk {
mBulk := ftesting.NewMockBulk()
mBulk.On("Update", mock.Anything, dl.FleetAgents, "doc-ID", mock.Anything, mock.Anything, mock.Anything).Return(nil)
return mBulk
},
cache: func() *testcache.MockCache {
mCache := testcache.NewMockCache()
mCache.On("GetAction", "test-action").Return(model.Action{}, true)
return mCache
},
err: nil,
}, {
name: "upgrade downloading action in cache, download rate in Human MB",
agent: &model.Agent{ESDocument: esd, Agent: &model.AgentMetadata{ID: "test-agent"}},
details: &UpgradeDetails{
ActionId: "test-action",
State: UpgradeDetailsStateUPGDOWNLOADING,
Metadata: &UpgradeDetails_Metadata{json.RawMessage(`{"download_percent":12.3, "download_rate": "1MBps"}`)},
},
bulk: func() *ftesting.MockBulk {
mBulk := ftesting.NewMockBulk()
mBulk.On("Update", mock.Anything, dl.FleetAgents, "doc-ID", mock.Anything, mock.Anything, mock.Anything).Return(nil)
return mBulk
},
cache: func() *testcache.MockCache {
mCache := testcache.NewMockCache()
mCache.On("GetAction", "test-action").Return(model.Action{}, true)
return mCache
},
err: nil,
}, {
name: "upgrade downloading action in cache no metadata",
agent: &model.Agent{ESDocument: esd, Agent: &model.AgentMetadata{ID: "test-agent"}},
details: &UpgradeDetails{
ActionId: "test-action",
State: UpgradeDetailsStateUPGDOWNLOADING,
},
bulk: func() *ftesting.MockBulk {
mBulk := ftesting.NewMockBulk()
mBulk.On("Update", mock.Anything, dl.FleetAgents, "doc-ID", mock.Anything, mock.Anything, mock.Anything).Return(nil)
return mBulk
},
cache: func() *testcache.MockCache {
mCache := testcache.NewMockCache()
mCache.On("GetAction", "test-action").Return(model.Action{}, true)
return mCache
},
err: nil,
}, {
name: "upgrade downloading action in cache wrong metadata attribute present",
agent: &model.Agent{ESDocument: esd, Agent: &model.AgentMetadata{ID: "test-agent"}},
details: &UpgradeDetails{
ActionId: "test-action",
State: UpgradeDetailsStateUPGDOWNLOADING,
Metadata: &UpgradeDetails_Metadata{json.RawMessage(`{"scheduled_at":"2023-01-02T12:00:00Z"}`)},
},
bulk: func() *ftesting.MockBulk {
mBulk := ftesting.NewMockBulk()
mBulk.On("Update", mock.Anything, dl.FleetAgents, "doc-ID", mock.Anything, mock.Anything, mock.Anything).Return(nil)
return mBulk
},
cache: func() *testcache.MockCache {
mCache := testcache.NewMockCache()
mCache.On("GetAction", "test-action").Return(model.Action{}, true)
return mCache
},
err: nil,
}, {
name: "upgrade failed action in cache",
agent: &model.Agent{ESDocument: esd, Agent: &model.AgentMetadata{ID: "test-agent"}},
details: &UpgradeDetails{
ActionId: "test-action",
State: UpgradeDetailsStateUPGFAILED,
Metadata: &UpgradeDetails_Metadata{json.RawMessage(`{"error_msg":"failed"}`)},
},
bulk: func() *ftesting.MockBulk {
mBulk := ftesting.NewMockBulk()
mBulk.On("Update", mock.Anything, dl.FleetAgents, "doc-ID", mock.Anything, mock.Anything, mock.Anything).Return(nil)
return mBulk
},
cache: func() *testcache.MockCache {
mCache := testcache.NewMockCache()
mCache.On("GetAction", "test-action").Return(model.Action{}, true)
return mCache
},
err: nil,
}, {
name: "upgrade failed action in cache empty error_msg",
agent: &model.Agent{ESDocument: esd, Agent: &model.AgentMetadata{ID: "test-agent"}},
details: &UpgradeDetails{
ActionId: "test-action",
State: UpgradeDetailsStateUPGFAILED,
Metadata: &UpgradeDetails_Metadata{json.RawMessage(`{"error_msg":""}`)},
},
bulk: func() *ftesting.MockBulk {
return ftesting.NewMockBulk()
},
cache: func() *testcache.MockCache {
mCache := testcache.NewMockCache()
mCache.On("GetAction", "test-action").Return(model.Action{}, true)
return mCache
},
err: ErrInvalidUpgradeMetadata,
}}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
Expand Down
81 changes: 81 additions & 0 deletions internal/pkg/api/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package api

import (
"encoding/json"
"fmt"
"strings"

"github.com/docker/go-units"
)

func parseDownloadRate(jsonDownloadRate json.RawMessage) (*float64, error) {
var fDownloadRate float64
err := json.Unmarshal(jsonDownloadRate, &fDownloadRate)
if err == nil {
return &fDownloadRate, nil
}

// Handle string download_rate with format human_unitps
var rawDownloadRate string
err = json.Unmarshal(jsonDownloadRate, &rawDownloadRate)
if err != nil {
return nil, fmt.Errorf("error unmarshaling download_rate: %w", err)
}
if rawDownloadRate != "" {
downloadRate, err := units.FromHumanSize(strings.TrimSuffix(rawDownloadRate, "ps"))
if err != nil {
return nil, fmt.Errorf("error converting download_rate from human size: %w", err)
}
fDownloadRate := float64(downloadRate)
return &fDownloadRate, nil
}

return nil, nil
}

func (t *UpgradeMetadataDownloading) UnmarshalJSON(b []byte) error {
object := make(map[string]json.RawMessage)
err := json.Unmarshal(b, &object)
if err != nil {
return err
}

if raw, found := object["download_rate"]; found {
downloadRate, err := parseDownloadRate(raw)
if err != nil {
return err
}
t.DownloadRate = downloadRate
delete(object, "download_rate")
}

if raw, found := object["download_percent"]; found {
err = json.Unmarshal(raw, &t.DownloadPercent)
if err != nil {
return fmt.Errorf("error reading 'download_percent': %w", err)
}
delete(object, "download_percent")
}

if raw, found := object["retry_error_msg"]; found {
err = json.Unmarshal(raw, &t.RetryErrorMsg)
if err != nil {
return fmt.Errorf("error reading 'retry_error_msg': %w", err)
}
delete(object, "retry_error_msg")
}

if raw, found := object["retry_until"]; found {
err = json.Unmarshal(raw, &t.RetryUntil)
if err != nil {
return fmt.Errorf("error reading 'retry_until': %w", err)
}
delete(object, "retry_until")
}

return err
}
56 changes: 56 additions & 0 deletions internal/pkg/api/metadata_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build !integration

package api

import (
"encoding/json"
"fmt"
"testing"

"github.com/stretchr/testify/require"
)

func Test_ParseDownloadRate(t *testing.T) {
tests := []struct {
name string
raw json.RawMessage
expectedErrMsg string
expectedValue float64
}{{
name: "download rate as float",
raw: json.RawMessage(`1000000`),
expectedValue: 1000000.00,
}, {
name: "download rate as MBps",
raw: json.RawMessage(`"1MBps"`),
expectedValue: 1000000.00,
}, {
name: "download only MBps",
raw: json.RawMessage(`"MBps"`),
expectedErrMsg: "error converting download_rate from human size: invalid size: 'MB'",
}, {
name: "download rate random string",
raw: json.RawMessage(`"toto"`),
expectedErrMsg: "error converting download_rate from human size: invalid size: 'toto'",
}}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {

res, err := parseDownloadRate(tc.raw)
if tc.expectedErrMsg != "" {
fmt.Printf("TEST %v+", err)
require.ErrorContains(t, err, tc.expectedErrMsg)
} else {
require.NoError(t, err)
require.NotNil(t, res)
require.Equal(t, tc.expectedValue, *res)
}

})
}
}

0 comments on commit bd2c6eb

Please sign in to comment.