Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hubble: Populate traffic direction for trace and drop events #11062

Merged
merged 2 commits into from
Apr 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
41 changes: 38 additions & 3 deletions pkg/hubble/parser/threefour/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (p *Parser) Decode(payload *pb.Payload, decoded *pb.Flow) error {
decoded.DestinationNames = p.resolveNames(srcEndpoint.ID, dstIP)
decoded.L7 = nil
decoded.Reply = decodeIsReply(tn)
decoded.TrafficDirection = decodeTrafficDirection(pvn)
decoded.TrafficDirection = decodeTrafficDirection(srcEndpoint.ID, dn, tn, pvn)
decoded.EventType = decodeCiliumEventType(eventType, eventSubType)
decoded.SourceService = sourceService
decoded.DestinationService = destinationService
Expand Down Expand Up @@ -413,7 +413,7 @@ func decodeICMPv6(icmp *layers.ICMPv6) *pb.Layer4 {
}

func decodeIsReply(tn *monitor.TraceNotify) bool {
return tn != nil && tn.Reason == monitor.TraceReasonCtReply
return tn != nil && tn.Reason & ^monitor.TraceReasonEncryptMask == monitor.TraceReasonCtReply
}

func decodeCiliumEventType(eventType, eventSubType uint8) *pb.CiliumEventType {
Expand Down Expand Up @@ -444,7 +444,42 @@ func decodeSecurityIdentities(dn *monitor.DropNotify, tn *monitor.TraceNotify, p
return
}

func decodeTrafficDirection(pvn *monitor.PolicyVerdictNotify) pb.TrafficDirection {
func decodeTrafficDirection(srcEP uint64, dn *monitor.DropNotify, tn *monitor.TraceNotify, pvn *monitor.PolicyVerdictNotify) pb.TrafficDirection {
if dn != nil && dn.Source != 0 {
// If the local endpoint at which the drop occurred is the same as the
// source of the dropped packet, we assume it was an egress flow. This
// implies that we also assume that dropped packets are not dropped
// reply packets of an ongoing connection.
if dn.Source == uint16(srcEP) {
return pb.TrafficDirection_EGRESS
}
return pb.TrafficDirection_INGRESS
}
if tn != nil && tn.Source != 0 {
// For trace events, we assume that packets may be reply packets of an
// ongoing connection. Therefore, we want to access the connection
// tracking result from the `Reason` field to invert the direction for
// reply packets. The datapath currently populates the `Reason` field
// with CT information in TRACE_TO_{LXC,PROXY,HOST,STACK} events.
switch tn.ObsPoint {
case monitorAPI.TraceToLxc,
monitorAPI.TraceToProxy,
monitorAPI.TraceToHost,
monitorAPI.TraceToStack:

// true if the traffic source is the local endpoint, i.e. egress
isSourceEP := tn.Source == uint16(srcEP)
// true if the packet is a reply, i.e. reverse direction
isReply := decodeIsReply(tn)

// isSourceEP != isReply ==
// (isSourceEP && !isReply) || (!isSourceEP && isReply)
if isSourceEP != isReply {
return pb.TrafficDirection_EGRESS
}
return pb.TrafficDirection_INGRESS
}
}
if pvn != nil {
if pvn.IsTrafficIngress() {
return pb.TrafficDirection_INGRESS
Expand Down
141 changes: 141 additions & 0 deletions pkg/hubble/parser/threefour/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,147 @@ func TestDecodeLocalIdentity(t *testing.T) {
assert.Equal(t, []string{"cidr:1.2.3.4/12", "some=label"}, f.GetDestination().GetLabels())
}

func TestDecodeTrafficDirection(t *testing.T) {
localIP := net.ParseIP("1.2.3.4")
localEP := uint16(1234)
remoteIP := net.ParseIP("5.6.7.8")

endpointGetter := &testutils.FakeEndpointGetter{
OnGetEndpointInfo: func(ip net.IP) (endpoint v1.EndpointInfo, ok bool) {
if ip.Equal(localIP) {
return &v1.Endpoint{
ID: uint64(localEP),
}, true
}
return nil, false
},
}

parser, err := New(endpointGetter, nil, nil, nil, nil)
require.NoError(t, err)
parseFlow := func(event interface{}, srcIPv4, dstIPv4 net.IP) *pb.Flow {
data, err := testutils.CreateL3L4Payload(event,
&layers.Ethernet{
SrcMAC: net.HardwareAddr{1, 2, 3, 4, 5, 6},
DstMAC: net.HardwareAddr{7, 8, 9, 0, 1, 2},
EthernetType: layers.EthernetTypeIPv4,
},
&layers.IPv4{SrcIP: srcIPv4, DstIP: dstIPv4})
require.NoError(t, err)
f := &pb.Flow{}
err = parser.Decode(&pb.Payload{Data: data}, f)
require.NoError(t, err)
return f
}

// DROP at unknown endpoint
dn := monitor.DropNotify{
Type: byte(api.MessageTypeDrop),
}
f := parseFlow(dn, localIP, remoteIP)
assert.Equal(t, pb.TrafficDirection_TRAFFIC_DIRECTION_UNKNOWN, f.GetTrafficDirection())
assert.Equal(t, uint64(localEP), f.GetSource().GetID())

// DROP Egress
dn = monitor.DropNotify{
Type: byte(api.MessageTypeDrop),
Source: localEP,
}
f = parseFlow(dn, localIP, remoteIP)
assert.Equal(t, pb.TrafficDirection_EGRESS, f.GetTrafficDirection())
assert.Equal(t, uint64(localEP), f.GetSource().GetID())

// DROP Ingress
dn = monitor.DropNotify{
Type: byte(api.MessageTypeDrop),
Source: localEP,
}
f = parseFlow(dn, remoteIP, localIP)
assert.Equal(t, pb.TrafficDirection_INGRESS, f.GetTrafficDirection())
assert.Equal(t, uint64(localEP), f.GetDestination().GetID())

// TRACE_TO_LXC at unknown endpoint
tn := monitor.TraceNotifyV0{
Type: byte(api.MessageTypeTrace),
ObsPoint: api.TraceToLxc,
}
f = parseFlow(tn, localIP, remoteIP)
assert.Equal(t, pb.TrafficDirection_TRAFFIC_DIRECTION_UNKNOWN, f.GetTrafficDirection())
assert.Equal(t, uint64(localEP), f.GetSource().GetID())

// TRACE_TO_LXC Egress
tn = monitor.TraceNotifyV0{
Type: byte(api.MessageTypeTrace),
Source: localEP,
ObsPoint: api.TraceToLxc,
}
f = parseFlow(tn, localIP, remoteIP)
assert.Equal(t, pb.TrafficDirection_EGRESS, f.GetTrafficDirection())
assert.Equal(t, uint64(localEP), f.GetSource().GetID())

// TRACE_TO_LXC Egress, reversed by CT_REPLY
tn = monitor.TraceNotifyV0{
Type: byte(api.MessageTypeTrace),
Source: localEP,
ObsPoint: api.TraceToLxc,
Reason: monitor.TraceReasonCtReply,
}
f = parseFlow(tn, localIP, remoteIP)
assert.Equal(t, pb.TrafficDirection_INGRESS, f.GetTrafficDirection())
assert.Equal(t, uint64(localEP), f.GetSource().GetID())

// TRACE_TO_HOST Ingress
tn = monitor.TraceNotifyV0{
Type: byte(api.MessageTypeTrace),
Source: localEP,
ObsPoint: api.TraceToHost,
}
f = parseFlow(tn, remoteIP, localIP)
assert.Equal(t, pb.TrafficDirection_INGRESS, f.GetTrafficDirection())
assert.Equal(t, uint64(localEP), f.GetDestination().GetID())

// TRACE_TO_HOST Ingress, reversed by CT_REPLY
tn = monitor.TraceNotifyV0{
Type: byte(api.MessageTypeTrace),
Source: localEP,
ObsPoint: api.TraceToHost,
Reason: monitor.TraceReasonCtReply,
}
f = parseFlow(tn, remoteIP, localIP)
assert.Equal(t, pb.TrafficDirection_EGRESS, f.GetTrafficDirection())
assert.Equal(t, uint64(localEP), f.GetDestination().GetID())

// TRACE_FROM_LXC (traffic direction not supported)
tn = monitor.TraceNotifyV0{
Type: byte(api.MessageTypeTrace),
Source: localEP,
ObsPoint: api.TraceFromLxc,
}
f = parseFlow(tn, localIP, remoteIP)
assert.Equal(t, pb.TrafficDirection_TRAFFIC_DIRECTION_UNKNOWN, f.GetTrafficDirection())
assert.Equal(t, uint64(localEP), f.GetSource().GetID())

// PolicyVerdictNotify Egress
pvn := monitor.PolicyVerdictNotify{
Type: byte(api.MessageTypePolicyVerdict),
Source: localEP,
Flags: api.PolicyEgress,
}
f = parseFlow(pvn, localIP, remoteIP)
assert.Equal(t, pb.TrafficDirection_EGRESS, f.GetTrafficDirection())
assert.Equal(t, uint64(localEP), f.GetSource().GetID())

// PolicyVerdictNotify Ingress
pvn = monitor.PolicyVerdictNotify{
Type: byte(api.MessageTypePolicyVerdict),
Source: localEP,
Flags: api.PolicyIngress,
}
f = parseFlow(pvn, remoteIP, localIP)
assert.Equal(t, pb.TrafficDirection_INGRESS, f.GetTrafficDirection())
assert.Equal(t, uint64(localEP), f.GetDestination().GetID())
}

func Test_filterCidrLabels(t *testing.T) {
type args struct {
labels []string
Expand Down