Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Hubble memory usage and performance on decoding events #17482

Merged
merged 9 commits into from
Sep 29, 2021
8 changes: 4 additions & 4 deletions daemon/cmd/hubble.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func (d *Daemon) GetNamesOf(sourceEpID uint32, ip net.IP) []string {
// with service information.
//
// - ServiceGetter: https://github.com/cilium/hubble/blob/04ab72591faca62a305ce0715108876167182e04/pkg/parser/getters/getters.go#L52
func (d *Daemon) GetServiceByAddr(ip net.IP, port uint16) (flowpb.Service, bool) {
func (d *Daemon) GetServiceByAddr(ip net.IP, port uint16) *flowpb.Service {
addr := loadbalancer.L3n4Addr{
IP: ip,
L4Addr: loadbalancer.L4Addr{
Expand All @@ -335,12 +335,12 @@ func (d *Daemon) GetServiceByAddr(ip net.IP, port uint16) (flowpb.Service, bool)
}
namespace, name, ok := d.svc.GetServiceNameByAddr(addr)
if !ok {
return flowpb.Service{}, false
return nil
}
return flowpb.Service{
return &flowpb.Service{
Namespace: namespace,
Name: name,
}, true
}
}

// GetK8sMetadata returns the Kubernetes metadata for the given IP address.
Expand Down
43 changes: 21 additions & 22 deletions pkg/hubble/parser/agent/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

flowpb "github.com/cilium/cilium/api/v1/flow"
"github.com/cilium/cilium/pkg/hubble/parser/agent"
"github.com/cilium/cilium/pkg/monitor/api"
monitorAPI "github.com/cilium/cilium/pkg/monitor/api"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -62,49 +61,49 @@ func TestDecodeAgentEvent(t *testing.T) {

tt := []struct {
name string
msg api.AgentNotifyMessage
msg monitorAPI.AgentNotifyMessage
ev *flowpb.AgentEvent
}{
{
name: "empty",
msg: api.AgentNotifyMessage{},
msg: monitorAPI.AgentNotifyMessage{},
ev: &flowpb.AgentEvent{
Type: flowpb.AgentEventType_AGENT_EVENT_UNKNOWN,
Notification: &flowpb.AgentEvent_Unknown{
Unknown: &flowpb.AgentEventUnknown{
Type: fmt.Sprintf("%d", api.AgentNotifyUnspec),
Type: fmt.Sprintf("%d", monitorAPI.AgentNotifyUnspec),
Notification: "null",
},
},
},
},
{
name: "unspecified",
msg: api.AgentNotifyMessage{
Type: api.AgentNotifyUnspec,
msg: monitorAPI.AgentNotifyMessage{
Type: monitorAPI.AgentNotifyUnspec,
Notification: unknownNotification,
},
ev: &flowpb.AgentEvent{
Type: flowpb.AgentEventType_AGENT_EVENT_UNKNOWN,
Notification: &flowpb.AgentEvent_Unknown{
Unknown: &flowpb.AgentEventUnknown{
Type: fmt.Sprintf("%d", api.AgentNotifyUnspec),
Type: fmt.Sprintf("%d", monitorAPI.AgentNotifyUnspec),
Notification: string(unknownNotificationJSON),
},
},
},
},
{
name: "type and notification type mismatch",
msg: api.AgentNotifyMessage{
Type: api.AgentNotifyStart,
msg: monitorAPI.AgentNotifyMessage{
Type: monitorAPI.AgentNotifyStart,
Notification: unknownNotification,
},
ev: &flowpb.AgentEvent{
Type: flowpb.AgentEventType_AGENT_EVENT_UNKNOWN,
Notification: &flowpb.AgentEvent_Unknown{
Unknown: &flowpb.AgentEventUnknown{
Type: fmt.Sprintf("%d", api.AgentNotifyStart),
Type: fmt.Sprintf("%d", monitorAPI.AgentNotifyStart),
Notification: string(unknownNotificationJSON),
},
},
Expand All @@ -113,7 +112,7 @@ func TestDecodeAgentEvent(t *testing.T) {

{
name: "StartMessage",
msg: api.StartMessage(agentStartTS),
msg: monitorAPI.StartMessage(agentStartTS),
ev: &flowpb.AgentEvent{
Type: flowpb.AgentEventType_AGENT_STARTED,
Notification: &flowpb.AgentEvent_AgentStart{
Expand All @@ -125,7 +124,7 @@ func TestDecodeAgentEvent(t *testing.T) {
},
{
name: "PolicyUpdateMessage",
msg: api.PolicyUpdateMessage(42, []string{"hubble=rocks", "cilium=too"}, 7),
msg: monitorAPI.PolicyUpdateMessage(42, []string{"hubble=rocks", "cilium=too"}, 7),
ev: &flowpb.AgentEvent{
Type: flowpb.AgentEventType_POLICY_UPDATED,
Notification: &flowpb.AgentEvent_PolicyUpdate{
Expand All @@ -139,7 +138,7 @@ func TestDecodeAgentEvent(t *testing.T) {
},
{
name: "PolicyDeleteMessage",
msg: api.PolicyDeleteMessage(23, []string{"foo=bar"}, 255),
msg: monitorAPI.PolicyDeleteMessage(23, []string{"foo=bar"}, 255),
ev: &flowpb.AgentEvent{
Type: flowpb.AgentEventType_POLICY_DELETED,
Notification: &flowpb.AgentEvent_PolicyUpdate{
Expand All @@ -153,7 +152,7 @@ func TestDecodeAgentEvent(t *testing.T) {
},
{
name: "EndpointRegenMessage success",
msg: api.EndpointRegenMessage(mockEP, nil),
msg: monitorAPI.EndpointRegenMessage(mockEP, nil),
ev: &flowpb.AgentEvent{
Type: flowpb.AgentEventType_ENDPOINT_REGENERATE_SUCCESS,
Notification: &flowpb.AgentEvent_EndpointRegenerate{
Expand All @@ -167,7 +166,7 @@ func TestDecodeAgentEvent(t *testing.T) {
},
{
name: "EndpointRegenMessage failure",
msg: api.EndpointRegenMessage(mockEP, errors.New("error regenerating endpoint")),
msg: monitorAPI.EndpointRegenMessage(mockEP, errors.New("error regenerating endpoint")),
ev: &flowpb.AgentEvent{
Type: flowpb.AgentEventType_ENDPOINT_REGENERATE_FAILURE,
Notification: &flowpb.AgentEvent_EndpointRegenerate{
Expand All @@ -181,7 +180,7 @@ func TestDecodeAgentEvent(t *testing.T) {
},
{
name: "EndpointCreateMessage",
msg: api.EndpointCreateMessage(mockEP),
msg: monitorAPI.EndpointCreateMessage(mockEP),
ev: &flowpb.AgentEvent{
Type: flowpb.AgentEventType_ENDPOINT_CREATED,
Notification: &flowpb.AgentEvent_EndpointUpdate{
Expand All @@ -197,7 +196,7 @@ func TestDecodeAgentEvent(t *testing.T) {
},
{
name: "EndpointDeleteMessage",
msg: api.EndpointDeleteMessage(mockEP),
msg: monitorAPI.EndpointDeleteMessage(mockEP),
ev: &flowpb.AgentEvent{
Type: flowpb.AgentEventType_ENDPOINT_DELETED,
Notification: &flowpb.AgentEvent_EndpointUpdate{
Expand All @@ -213,7 +212,7 @@ func TestDecodeAgentEvent(t *testing.T) {
},
{
name: "IPCacheUpsertedMessage (insert)",
msg: api.IPCacheUpsertedMessage("10.0.1.42/32", 1023, nil, net.ParseIP("10.1.5.4"), nil, 0xff, "default", "foobar"),
msg: monitorAPI.IPCacheUpsertedMessage("10.0.1.42/32", 1023, nil, net.ParseIP("10.1.5.4"), nil, 0xff, "default", "foobar"),
ev: &flowpb.AgentEvent{
Type: flowpb.AgentEventType_IPCACHE_UPSERTED,
Notification: &flowpb.AgentEvent_IpcacheUpdate{
Expand All @@ -232,7 +231,7 @@ func TestDecodeAgentEvent(t *testing.T) {
},
{
name: "IPCacheUpsertedMessage (update)",
msg: api.IPCacheUpsertedMessage("192.168.10.11/32", 1023, &oldID, net.ParseIP("10.1.5.4"), net.ParseIP("10.2.6.11"), 5, "hubble", "podmcpodface"),
msg: monitorAPI.IPCacheUpsertedMessage("192.168.10.11/32", 1023, &oldID, net.ParseIP("10.1.5.4"), net.ParseIP("10.2.6.11"), 5, "hubble", "podmcpodface"),
ev: &flowpb.AgentEvent{
Type: flowpb.AgentEventType_IPCACHE_UPSERTED,
Notification: &flowpb.AgentEvent_IpcacheUpdate{
Expand All @@ -253,7 +252,7 @@ func TestDecodeAgentEvent(t *testing.T) {
},
{
name: "IPCacheDeletedMessage",
msg: api.IPCacheDeletedMessage("192.168.10.0/24", 6048, nil, net.ParseIP("10.1.5.4"), nil, 0, "", ""),
msg: monitorAPI.IPCacheDeletedMessage("192.168.10.0/24", 6048, nil, net.ParseIP("10.1.5.4"), nil, 0, "", ""),
ev: &flowpb.AgentEvent{
Type: flowpb.AgentEventType_IPCACHE_DELETED,
Notification: &flowpb.AgentEvent_IpcacheUpdate{
Expand All @@ -272,7 +271,7 @@ func TestDecodeAgentEvent(t *testing.T) {
},
{
name: "ServiceUpsertMessage",
msg: api.ServiceUpsertMessage(
msg: monitorAPI.ServiceUpsertMessage(
214,
monitorAPI.ServiceUpsertNotificationAddr{
IP: net.ParseIP("10.240.12.1"),
Expand Down Expand Up @@ -322,7 +321,7 @@ func TestDecodeAgentEvent(t *testing.T) {
},
{
name: "ServiceDeleteMessage",
msg: api.ServiceDeleteMessage(1048575),
msg: monitorAPI.ServiceDeleteMessage(1048575),
ev: &flowpb.AgentEvent{
Type: flowpb.AgentEventType_SERVICE_DELETED,
Notification: &flowpb.AgentEvent_ServiceDelete{
Expand Down
2 changes: 1 addition & 1 deletion pkg/hubble/parser/debug/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func New(log logrus.FieldLogger, endpointGetter getters.EndpointGetter) (*Parser
// Decode takes the a debug event payload obtained from the perf event ring
// buffer and decodes it
func (p *Parser) Decode(data []byte, cpu int) (*flowpb.DebugEvent, error) {
if data == nil || len(data) == 0 {
if len(data) == 0 {
return nil, errors.ErrEmptyData
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/hubble/parser/getters/getters.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type IPGetter interface {

// ServiceGetter fetches service metadata.
type ServiceGetter interface {
GetServiceByAddr(ip net.IP, port uint16) (service flowpb.Service, ok bool)
GetServiceByAddr(ip net.IP, port uint16) *flowpb.Service
}

// StoreGetter ...
Expand Down
2 changes: 1 addition & 1 deletion pkg/hubble/parser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (p *Parser) Decode(monitorEvent *observerTypes.MonitorEvent) (*v1.Event, er

switch payload := monitorEvent.Payload.(type) {
case *observerTypes.PerfEvent:
if payload.Data == nil || len(payload.Data) == 0 {
if len(payload.Data) == 0 {
return nil, errors.ErrEmptyData
} else if payload.Data[0] == monitorAPI.MessageTypeDebug {
// Debug is currently the only perf ring buffer event without any
Expand Down
8 changes: 2 additions & 6 deletions pkg/hubble/parser/seven/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,8 @@ func (p *Parser) Decode(r *accesslog.LogRecord, decoded *pb.Flow) error {
l4, sourcePort, destinationPort := decodeLayer4(r.TransportProtocol, sourceEndpoint, destinationEndpoint)
var sourceService, destinationService *pb.Service
if p.serviceGetter != nil {
if srcService, ok := p.serviceGetter.GetServiceByAddr(sourceIP, sourcePort); ok {
sourceService = &srcService
}
if dstService, ok := p.serviceGetter.GetServiceByAddr(destinationIP, destinationPort); ok {
destinationService = &dstService
}
sourceService = p.serviceGetter.GetServiceByAddr(sourceIP, sourcePort)
destinationService = p.serviceGetter.GetServiceByAddr(destinationIP, destinationPort)
}

decoded.Time = pbTimestamp
Expand Down
8 changes: 4 additions & 4 deletions pkg/hubble/parser/seven/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,14 @@ func TestDecodeL7HTTPRecord(t *testing.T) {
},
}
serviceGetter := &testutils.FakeServiceGetter{
OnGetServiceByAddr: func(ip net.IP, port uint16) (service pb.Service, ok bool) {
OnGetServiceByAddr: func(ip net.IP, port uint16) *pb.Service {
if ip.Equal(net.ParseIP(fakeDestinationEndpoint.IPv4)) && (port == fakeDestinationEndpoint.Port) {
return pb.Service{
return &pb.Service{
Name: "service-1234",
Namespace: "default",
}, true
}
}
return
return nil
},
}

Expand Down
10 changes: 3 additions & 7 deletions pkg/hubble/parser/threefour/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func New(

// Decode decodes the data from 'data' into 'decoded'
func (p *Parser) Decode(data []byte, decoded *pb.Flow) error {
if data == nil || len(data) == 0 {
if len(data) == 0 {
return errors.ErrEmptyData
}

Expand Down Expand Up @@ -180,12 +180,8 @@ func (p *Parser) Decode(data []byte, decoded *pb.Flow) error {
dstEndpoint := p.resolveEndpoint(dstIP, dstLabelID)
var sourceService, destinationService *pb.Service
if p.serviceGetter != nil {
if srcService, ok := p.serviceGetter.GetServiceByAddr(srcIP, srcPort); ok {
sourceService = &srcService
}
if dstService, ok := p.serviceGetter.GetServiceByAddr(dstIP, dstPort); ok {
destinationService = &dstService
}
sourceService = p.serviceGetter.GetServiceByAddr(srcIP, srcPort)
destinationService = p.serviceGetter.GetServiceByAddr(dstIP, dstPort)
}

decoded.Verdict = decodeVerdict(dn, tn, pvn)
Expand Down
24 changes: 18 additions & 6 deletions pkg/hubble/parser/threefour/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cilium/cilium/pkg/byteorder"
"github.com/cilium/cilium/pkg/datapath/link"
v1 "github.com/cilium/cilium/pkg/hubble/api/v1"
"github.com/cilium/cilium/pkg/hubble/parser/errors"
"github.com/cilium/cilium/pkg/hubble/testutils"
"github.com/cilium/cilium/pkg/identity"
"github.com/cilium/cilium/pkg/ipcache"
Expand All @@ -45,6 +46,17 @@ func init() {
log.SetOutput(io.Discard)
}

func TestL34DecodeEmpty(t *testing.T) {
parser, err := New(log, &testutils.NoopEndpointGetter, &testutils.NoopIdentityGetter,
&testutils.NoopDNSGetter, &testutils.NoopIPGetter, &testutils.NoopServiceGetter)
require.NoError(t, err)

var d []byte
f := &flowpb.Flow{}
err = parser.Decode(d, f)
assert.Equal(t, err, errors.ErrEmptyData)
}

func TestL34Decode(t *testing.T) {
//SOURCE DESTINATION TYPE SUMMARY
//192.168.33.11:6443(sun-sr-https) 10.16.236.178:54222 L3/4 TCP Flags: ACK
Expand Down Expand Up @@ -120,20 +132,20 @@ func TestL34Decode(t *testing.T) {
},
}
serviceGetter := &testutils.FakeServiceGetter{
OnGetServiceByAddr: func(ip net.IP, port uint16) (service flowpb.Service, ok bool) {
OnGetServiceByAddr: func(ip net.IP, port uint16) *flowpb.Service {
if ip.Equal(net.ParseIP("192.168.33.11")) && (port == 6443) {
return flowpb.Service{
return &flowpb.Service{
Name: "service-1234",
Namespace: "remote",
}, true
}
}
if ip.Equal(net.ParseIP("10.16.236.178")) && (port == 54222) {
return flowpb.Service{
return &flowpb.Service{
Name: "service-4321",
Namespace: "default",
}, true
}
}
return
return nil
},
}
identityCache := &testutils.NoopIdentityGetter
Expand Down
8 changes: 4 additions & 4 deletions pkg/hubble/testutils/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,11 +360,11 @@ var NoopIPGetter = FakeIPGetter{

// FakeServiceGetter is used for unit tests that need ServiceGetter.
type FakeServiceGetter struct {
OnGetServiceByAddr func(ip net.IP, port uint16) (service flowpb.Service, ok bool)
OnGetServiceByAddr func(ip net.IP, port uint16) *flowpb.Service
}

// GetServiceByAddr implements FakeServiceGetter.GetServiceByAddr.
func (f *FakeServiceGetter) GetServiceByAddr(ip net.IP, port uint16) (service flowpb.Service, ok bool) {
func (f *FakeServiceGetter) GetServiceByAddr(ip net.IP, port uint16) *flowpb.Service {
if f.OnGetServiceByAddr != nil {
return f.OnGetServiceByAddr(ip, port)
}
Expand All @@ -373,8 +373,8 @@ func (f *FakeServiceGetter) GetServiceByAddr(ip net.IP, port uint16) (service fl

// NoopServiceGetter always returns an empty response.
var NoopServiceGetter = FakeServiceGetter{
OnGetServiceByAddr: func(ip net.IP, port uint16) (service flowpb.Service, ok bool) {
return flowpb.Service{}, false
OnGetServiceByAddr: func(ip net.IP, port uint16) *flowpb.Service {
return nil
},
}

Expand Down
7 changes: 3 additions & 4 deletions pkg/identity/model/identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func NewIdentityFromModel(base *models.Identity) *identity.Identity {

id := &identity.Identity{
ID: identity.NumericIdentity(base.ID),
Labels: make(labels.Labels),
Labels: make(labels.Labels, len(base.Labels)),
kkourt marked this conversation as resolved.
Show resolved Hide resolved
}
for _, v := range base.Labels {
lbl := labels.ParseLabel(v)
Expand All @@ -34,13 +34,12 @@ func CreateModel(id *identity.Identity) *models.Identity {

ret := &models.Identity{
ID: int64(id.ID),
Labels: []string{},
LabelsSHA256: "",
Labels: make([]string, 0, len(id.Labels)),
LabelsSHA256: id.GetLabelsSHA256(),
}

for _, v := range id.Labels {
ret.Labels = append(ret.Labels, v.String())
}
ret.LabelsSHA256 = id.GetLabelsSHA256()
return ret
}