Skip to content

Commit

Permalink
hubble: Use netip.Addr instead of net.IP in getter functions
Browse files Browse the repository at this point in the history
Various parts of Cilium now use netip.Addr instead of net.IP or custom IP types
internally, and other parts will likely be migrated soon. This patch updates
Hubble getters to take netip.Addr as an argument instead of net.IP. That will
reduce type conversions needed in Hubble. All getters handle invalid addresses.

Signed-off-by: Anna Kapuscinska <anna@isovalent.com>
  • Loading branch information
lambdanis authored and christarazi committed Mar 9, 2023
1 parent 7eecd10 commit 75c9ce9
Show file tree
Hide file tree
Showing 12 changed files with 131 additions and 124 deletions.
49 changes: 16 additions & 33 deletions daemon/cmd/hubble.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package cmd
import (
"context"
"fmt"
"net"
"net/netip"
"strings"
"time"

Expand Down Expand Up @@ -39,7 +39,6 @@ import (
"github.com/cilium/cilium/pkg/hubble/server"
"github.com/cilium/cilium/pkg/hubble/server/serveroption"
"github.com/cilium/cilium/pkg/identity"
ippkg "github.com/cilium/cilium/pkg/ip"
"github.com/cilium/cilium/pkg/ipcache"
"github.com/cilium/cilium/pkg/loadbalancer"
"github.com/cilium/cilium/pkg/logging"
Expand Down Expand Up @@ -297,8 +296,6 @@ func (d *Daemon) launchHubble() {

// GetIdentity looks up identity by ID from Cilium's identity cache. Hubble uses the identity info
// to populate source and destination labels of flows.
//
// - IdentityGetter: https://github.com/cilium/hubble/blob/04ab72591faca62a305ce0715108876167182e04/pkg/parser/getters/getters.go#L40
func (d *Daemon) GetIdentity(securityIdentity uint32) (*identity.Identity, error) {
ident := d.identityAllocator.LookupIdentityByID(context.Background(), identity.NumericIdentity(securityIdentity))
if ident == nil {
Expand All @@ -309,14 +306,11 @@ func (d *Daemon) GetIdentity(securityIdentity uint32) (*identity.Identity, error

// GetEndpointInfo returns endpoint info for a given IP address. Hubble uses this function to populate
// fields like namespace and pod name for local endpoints.
//
// - EndpointGetter: https://github.com/cilium/hubble/blob/04ab72591faca62a305ce0715108876167182e04/pkg/parser/getters/getters.go#L34
func (d *Daemon) GetEndpointInfo(ip net.IP) (endpoint v1.EndpointInfo, ok bool) {
addr, ok := ippkg.AddrFromIP(ip)
if !ok {
func (d *Daemon) GetEndpointInfo(ip netip.Addr) (endpoint v1.EndpointInfo, ok bool) {
if !ip.IsValid() {
return nil, false
}
ep := d.endpointManager.LookupIP(addr)
ep := d.endpointManager.LookupIP(ip)
if ep == nil {
return nil, false
}
Expand All @@ -338,19 +332,16 @@ func (d *Daemon) GetEndpoints() map[policy.Endpoint]struct{} {

// GetNamesOf implements DNSGetter.GetNamesOf. It looks up DNS names of a given IP from the
// FQDN cache of an endpoint specified by sourceEpID.
//
// - DNSGetter: https://github.com/cilium/hubble/blob/04ab72591faca62a305ce0715108876167182e04/pkg/parser/getters/getters.go#L27
func (d *Daemon) GetNamesOf(sourceEpID uint32, ip net.IP) []string {
func (d *Daemon) GetNamesOf(sourceEpID uint32, ip netip.Addr) []string {
ep := d.endpointManager.LookupCiliumID(uint16(sourceEpID))
if ep == nil {
return nil
}

addr, ok := ippkg.AddrFromIP(ip)
if !ok {
if !ip.IsValid() {
return nil
}
names := ep.DNSHistory.LookupIP(addr)
names := ep.DNSHistory.LookupIP(ip)

for i := range names {
names[i] = strings.TrimSuffix(names[i], ".")
Expand All @@ -361,13 +352,11 @@ func (d *Daemon) GetNamesOf(sourceEpID uint32, ip net.IP) []string {

// GetServiceByAddr looks up service by IP/port. Hubble uses this function to annotate flows
// with service information.
//
// - ServiceGetter: https://github.com/cilium/hubble/blob/04ab72591faca62a305ce0715108876167182e04/pkg/parser/getters/getters.go#L52
func (d *Daemon) GetServiceByAddr(ip net.IP, port uint16) *flowpb.Service {
addrCluster, ok := cmtypes.AddrClusterFromIP(ip)
if !ok {
func (d *Daemon) GetServiceByAddr(ip netip.Addr, port uint16) *flowpb.Service {
if !ip.IsValid() {
return nil
}
addrCluster := cmtypes.AddrClusterFrom(ip, 0)
addr := loadbalancer.L3n4Addr{
AddrCluster: addrCluster,
L4Addr: loadbalancer.L4Addr{
Expand All @@ -386,8 +375,8 @@ func (d *Daemon) GetServiceByAddr(ip net.IP, port uint16) *flowpb.Service {

// GetK8sMetadata returns the Kubernetes metadata for the given IP address.
// It implements hubble parser's IPGetter.GetK8sMetadata.
func (d *Daemon) GetK8sMetadata(ip net.IP) *ipcache.K8sMetadata {
if ip == nil {
func (d *Daemon) GetK8sMetadata(ip netip.Addr) *ipcache.K8sMetadata {
if !ip.IsValid() {
return nil
}
return d.ipcache.GetK8sMetadata(ip.String())
Expand All @@ -396,8 +385,8 @@ func (d *Daemon) GetK8sMetadata(ip net.IP) *ipcache.K8sMetadata {
// LookupSecIDByIP returns the security ID for the given IP. If the security ID
// cannot be found, ok is false.
// It implements hubble parser's IPGetter.LookupSecIDByIP.
func (d *Daemon) LookupSecIDByIP(ip net.IP) (id ipcache.Identity, ok bool) {
if ip == nil {
func (d *Daemon) LookupSecIDByIP(ip netip.Addr) (id ipcache.Identity, ok bool) {
if !ip.IsValid() {
return ipcache.Identity{}, false
}

Expand All @@ -407,20 +396,14 @@ func (d *Daemon) LookupSecIDByIP(ip net.IP) (id ipcache.Identity, ok bool) {

ipv6Prefixes, ipv4Prefixes := d.GetCIDRPrefixLengths()
prefixes := ipv4Prefixes
bits := net.IPv4len * 8
if ip.To4() == nil {
if ip.Is6() {
prefixes = ipv6Prefixes
bits = net.IPv6len * 8
}
for _, prefixLen := range prefixes {
// note: we perform a lookup even when `prefixLen == bits`, as some
// entries derived by a single address cidr-range will not have been
// found by the above lookup
mask := net.CIDRMask(prefixLen, bits)
cidr := net.IPNet{
IP: ip.Mask(mask),
Mask: mask,
}
cidr, _ := ip.Prefix(prefixLen)
if id, ok = d.ipcache.LookupByPrefix(cidr.String()); ok {
return id, ok
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/fqdn/dnsproxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"math"
"net"
"net/netip"
"regexp"
"strconv"
"strings"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/cilium/cilium/pkg/fqdn/re"
"github.com/cilium/cilium/pkg/fqdn/restore"
"github.com/cilium/cilium/pkg/identity"
ippkg "github.com/cilium/cilium/pkg/ip"
"github.com/cilium/cilium/pkg/ipcache"
"github.com/cilium/cilium/pkg/lock"
"github.com/cilium/cilium/pkg/logging"
Expand Down Expand Up @@ -473,7 +475,7 @@ type LookupEndpointIDByIPFunc func(ip net.IP) (endpoint *endpoint.Endpoint, err
// LookupSecIDByIPFunc Func wraps logic to lookup an IP's security ID from the
// ipcache.
// See DNSProxy.LookupSecIDByIP for usage.
type LookupSecIDByIPFunc func(ip net.IP) (secID ipcache.Identity, exists bool)
type LookupSecIDByIPFunc func(ip netip.Addr) (secID ipcache.Identity, exists bool)

// LookupIPsBySecIDFunc Func wraps logic to lookup an IPs by security ID from the
// ipcache.
Expand Down Expand Up @@ -809,7 +811,9 @@ func (p *DNSProxy) ServeDNS(w dns.ResponseWriter, request *dns.Msg) {
}

targetServerID := identity.ReservedIdentityWorld
if serverSecID, exists := p.LookupSecIDByIP(targetServerIP); !exists {
// Ignore invalid IP - getter will handle invalid value.
targetServerAddr, _ := ippkg.AddrFromIP(targetServerIP)
if serverSecID, exists := p.LookupSecIDByIP(targetServerAddr); !exists {
scopedLog.WithField("server", targetServerAddrStr).Debug("cannot find server ip in ipcache, defaulting to WORLD")
} else {
targetServerID = serverSecID.ID
Expand Down
2 changes: 1 addition & 1 deletion pkg/fqdn/dnsproxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (s *DNSProxyTestSuite) SetUpTest(c *C) {
return endpoint.NewEndpointWithState(s, s, testipcache.NewMockIPCache(), &endpoint.FakeEndpointProxy{}, testidentity.NewMockIdentityAllocator(nil), uint16(epID1), endpoint.StateReady), nil
},
// LookupSecIDByIP
func(ip net.IP) (ipcache.Identity, bool) {
func(ip netip.Addr) (ipcache.Identity, bool) {
DNSServerListenerAddr := (s.dnsServer.Listener.Addr()).(*net.TCPAddr)
switch {
case ip.String() == DNSServerListenerAddr.IP.String():
Expand Down
4 changes: 2 additions & 2 deletions pkg/hubble/parser/common/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
package common

import (
"net"
"net/netip"

"github.com/sirupsen/logrus"

Expand Down Expand Up @@ -36,7 +36,7 @@ func NewEndpointResolver(
}
}

func (r *EndpointResolver) ResolveEndpoint(ip net.IP, datapathSecurityIdentity uint32) *pb.Endpoint {
func (r *EndpointResolver) ResolveEndpoint(ip netip.Addr, datapathSecurityIdentity uint32) *pb.Endpoint {
// The datapathSecurityIdentity parameter is the numeric security identity
// obtained from the datapath.
// The numeric identity from the datapath can differ from the one we obtain
Expand Down
12 changes: 6 additions & 6 deletions pkg/hubble/parser/getters/getters.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
package getters

import (
"net"
"net/netip"

"k8s.io/client-go/tools/cache"

Expand All @@ -20,13 +20,13 @@ type DNSGetter interface {
// GetNamesOf fetches FQDNs of a given IP from the perspective of
// the endpoint with ID sourceEpID. The returned names must not have
// trailing dots.
GetNamesOf(sourceEpID uint32, ip net.IP) (names []string)
GetNamesOf(sourceEpID uint32, ip netip.Addr) (names []string)
}

// EndpointGetter ...
type EndpointGetter interface {
// GetEndpointInfo looks up endpoint by IP address.
GetEndpointInfo(ip net.IP) (endpoint v1.EndpointInfo, ok bool)
GetEndpointInfo(ip netip.Addr) (endpoint v1.EndpointInfo, ok bool)
// GetEndpointInfo looks up endpoint by id
GetEndpointInfoByID(id uint16) (endpoint v1.EndpointInfo, ok bool)
}
Expand All @@ -40,15 +40,15 @@ type IdentityGetter interface {
// IPGetter fetches per-IP metadata
type IPGetter interface {
// GetK8sMetadata returns Kubernetes metadata for the given IP address.
GetK8sMetadata(ip net.IP) *ipcache.K8sMetadata
GetK8sMetadata(ip netip.Addr) *ipcache.K8sMetadata
// LookupSecIDByIP returns the corresponding security identity that
// endpoint IP maps to as well as if the corresponding entry exists.
LookupSecIDByIP(ip net.IP) (ipcache.Identity, bool)
LookupSecIDByIP(ip netip.Addr) (ipcache.Identity, bool)
}

// ServiceGetter fetches service metadata.
type ServiceGetter interface {
GetServiceByAddr(ip net.IP, port uint16) *flowpb.Service
GetServiceByAddr(ip netip.Addr, port uint16) *flowpb.Service
}

// StoreGetter ...
Expand Down
34 changes: 17 additions & 17 deletions pkg/hubble/parser/seven/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
package seven

import (
"net"
"net/http"
"net/netip"
"net/url"
"testing"
"time"
Expand Down Expand Up @@ -51,7 +51,7 @@ func TestDecodeL7HTTPRequest(t *testing.T) {
lr.DestinationEndpoint.Port = 80

dnsGetter := &testutils.FakeFQDNCache{
OnGetNamesOf: func(epID uint32, ip net.IP) (names []string) {
OnGetNamesOf: func(epID uint32, ip netip.Addr) (names []string) {
ipStr := ip.String()
switch {
case epID == uint32(fakeSourceEndpoint.ID) && ipStr == fakeDestinationEndpoint.IPv4:
Expand All @@ -63,8 +63,8 @@ func TestDecodeL7HTTPRequest(t *testing.T) {
},
}
IPGetter := &testutils.FakeIPGetter{
OnGetK8sMetadata: func(ip net.IP) *ipcache.K8sMetadata {
if ip.String() == fakeDestinationEndpoint.IPv4 {
OnGetK8sMetadata: func(ip netip.Addr) *ipcache.K8sMetadata {
if ip == netip.MustParseAddr(fakeDestinationEndpoint.IPv4) {
return &ipcache.K8sMetadata{
Namespace: "default",
PodName: "pod-1234",
Expand All @@ -74,8 +74,8 @@ func TestDecodeL7HTTPRequest(t *testing.T) {
},
}
serviceGetter := &testutils.FakeServiceGetter{
OnGetServiceByAddr: func(ip net.IP, port uint16) *flowpb.Service {
if ip.Equal(net.ParseIP(fakeDestinationEndpoint.IPv4)) && (port == fakeDestinationEndpoint.Port) {
OnGetServiceByAddr: func(ip netip.Addr, port uint16) *flowpb.Service {
if ip == netip.MustParseAddr(fakeDestinationEndpoint.IPv4) && (port == fakeDestinationEndpoint.Port) {
return &flowpb.Service{
Name: "service-1234",
Namespace: "default",
Expand All @@ -85,13 +85,13 @@ func TestDecodeL7HTTPRequest(t *testing.T) {
},
}
endpointGetter := &testutils.FakeEndpointGetter{
OnGetEndpointInfo: func(ip net.IP) (endpoint v1.EndpointInfo, ok bool) {
OnGetEndpointInfo: func(ip netip.Addr) (endpoint v1.EndpointInfo, ok bool) {
switch {
case ip.Equal(net.ParseIP(fakeSourceEndpoint.IPv4)):
case ip == netip.MustParseAddr(fakeSourceEndpoint.IPv4):
return &testutils.FakeEndpointInfo{
ID: fakeSourceEndpoint.ID,
}, true
case ip.Equal(net.ParseIP(fakeDestinationEndpoint.IPv4)):
case ip == netip.MustParseAddr(fakeDestinationEndpoint.IPv4):
return &testutils.FakeEndpointInfo{
ID: fakeDestinationEndpoint.ID,
}, true
Expand Down Expand Up @@ -166,7 +166,7 @@ func TestDecodeL7HTTPRecordResponse(t *testing.T) {
lr.DestinationEndpoint.Port = 56789

dnsGetter := &testutils.FakeFQDNCache{
OnGetNamesOf: func(epID uint32, ip net.IP) (names []string) {
OnGetNamesOf: func(epID uint32, ip netip.Addr) (names []string) {
ipStr := ip.String()
switch {
case epID == uint32(fakeSourceEndpoint.ID) && ipStr == fakeDestinationEndpoint.IPv4:
Expand All @@ -178,8 +178,8 @@ func TestDecodeL7HTTPRecordResponse(t *testing.T) {
},
}
IPGetter := &testutils.FakeIPGetter{
OnGetK8sMetadata: func(ip net.IP) *ipcache.K8sMetadata {
if ip.String() == fakeDestinationEndpoint.IPv4 {
OnGetK8sMetadata: func(ip netip.Addr) *ipcache.K8sMetadata {
if ip == netip.MustParseAddr(fakeDestinationEndpoint.IPv4) {
return &ipcache.K8sMetadata{
Namespace: "default",
PodName: "pod-1234",
Expand All @@ -189,8 +189,8 @@ func TestDecodeL7HTTPRecordResponse(t *testing.T) {
},
}
serviceGetter := &testutils.FakeServiceGetter{
OnGetServiceByAddr: func(ip net.IP, port uint16) *flowpb.Service {
if ip.Equal(net.ParseIP(fakeDestinationEndpoint.IPv4)) && (port == fakeDestinationEndpoint.Port) {
OnGetServiceByAddr: func(ip netip.Addr, port uint16) *flowpb.Service {
if ip == netip.MustParseAddr(fakeDestinationEndpoint.IPv4) && (port == fakeDestinationEndpoint.Port) {
return &flowpb.Service{
Name: "service-1234",
Namespace: "default",
Expand All @@ -200,13 +200,13 @@ func TestDecodeL7HTTPRecordResponse(t *testing.T) {
},
}
endpointGetter := &testutils.FakeEndpointGetter{
OnGetEndpointInfo: func(ip net.IP) (endpoint v1.EndpointInfo, ok bool) {
OnGetEndpointInfo: func(ip netip.Addr) (endpoint v1.EndpointInfo, ok bool) {
switch {
case ip.Equal(net.ParseIP(fakeSourceEndpoint.IPv4)):
case ip.String() == fakeSourceEndpoint.IPv4:
return &testutils.FakeEndpointInfo{
ID: fakeSourceEndpoint.ID,
}, true
case ip.Equal(net.ParseIP(fakeDestinationEndpoint.IPv4)):
case ip.String() == fakeDestinationEndpoint.IPv4:
return &testutils.FakeEndpointInfo{
ID: fakeDestinationEndpoint.ID,
}, true
Expand Down
11 changes: 7 additions & 4 deletions pkg/hubble/parser/seven/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package seven

import (
"fmt"
"net"
"net/netip"
"sort"
"time"

Expand Down Expand Up @@ -89,8 +89,11 @@ func (p *Parser) Decode(r *accesslog.LogRecord, decoded *flowpb.Flow) error {

ip := decodeIP(r.IPVersion, r.SourceEndpoint, r.DestinationEndpoint)

sourceIP := net.ParseIP(ip.Source)
destinationIP := net.ParseIP(ip.Destination)
// Ignore IP parsing errors as IPs can be empty. Getters will handle invalid values.
// Flows with empty IPs have been observed in practice, but it was not clear what kind of flows
// those are - errors handling here should be revisited once it's clear.
sourceIP, _ := netip.ParseAddr(ip.Source)
destinationIP, _ := netip.ParseAddr(ip.Destination)
var sourceNames, destinationNames []string
var sourceNamespace, sourcePod, destinationNamespace, destinationPod string
if p.dnsGetter != nil {
Expand Down Expand Up @@ -206,7 +209,7 @@ func (p *Parser) computeResponseTime(r *accesslog.LogRecord, timestamp time.Time
return 0
}

func (p *Parser) updateEndpointWorkloads(ip net.IP, endpoint *flowpb.Endpoint) {
func (p *Parser) updateEndpointWorkloads(ip netip.Addr, endpoint *flowpb.Endpoint) {
if ep, ok := p.endpointGetter.GetEndpointInfo(ip); ok {
if pod := ep.GetPod(); pod != nil {
workload, workloadTypeMeta, ok := utils.GetWorkloadMetaFromPod(pod)
Expand Down

0 comments on commit 75c9ce9

Please sign in to comment.