Skip to content

Commit

Permalink
hubble/parser: Populate traffic direction for trace and drop events
Browse files Browse the repository at this point in the history
Where possible, populate the TrafficDirection field from the trace
and drop notifications. This is done by comparing the `Source`
endpoint field of the trace/drop event with source address of the
captured IP packet.

For drop notifications, we assume there are no ongoing connections
which this packet belongs to and just compare the source address. For
trace notifications however, we do assume that there might an ongoing
connection. Therefore, we rely on the connection tracking state to
revert the traffic direction for reply packages. For trace observation
points without connection tracking, we do not assign any traffic
direction.

Signed-off-by: Sebastian Wicki <sebastian@isovalent.com>
  • Loading branch information
gandro committed Apr 22, 2020
1 parent b3a76db commit fa7f33e
Show file tree
Hide file tree
Showing 2 changed files with 178 additions and 2 deletions.
39 changes: 37 additions & 2 deletions pkg/hubble/parser/threefour/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (p *Parser) Decode(payload *pb.Payload, decoded *pb.Flow) error {
decoded.DestinationNames = p.resolveNames(srcEndpoint.ID, dstIP)
decoded.L7 = nil
decoded.Reply = decodeIsReply(tn)
decoded.TrafficDirection = decodeTrafficDirection(pvn)
decoded.TrafficDirection = decodeTrafficDirection(srcEndpoint.ID, dn, tn, pvn)
decoded.EventType = decodeCiliumEventType(eventType, eventSubType)
decoded.SourceService = sourceService
decoded.DestinationService = destinationService
Expand Down Expand Up @@ -444,7 +444,42 @@ func decodeSecurityIdentities(dn *monitor.DropNotify, tn *monitor.TraceNotify, p
return
}

func decodeTrafficDirection(pvn *monitor.PolicyVerdictNotify) pb.TrafficDirection {
func decodeTrafficDirection(srcEP uint64, dn *monitor.DropNotify, tn *monitor.TraceNotify, pvn *monitor.PolicyVerdictNotify) pb.TrafficDirection {
if dn != nil && dn.Source != 0 {
// If the local endpoint at which the drop occurred is the same as the
// source of the dropped packet, we assume it was an egress flow. This
// implies that we also assume that dropped packets are not dropped
// reply packets of an ongoing connection.
if dn.Source == uint16(srcEP) {
return pb.TrafficDirection_EGRESS
}
return pb.TrafficDirection_INGRESS
}
if tn != nil && tn.Source != 0 {
// For trace events, we assume that packets may be reply packets of an
// ongoing connection. Therefore, we want to access the connection
// tracking result from the `Reason` field to invert the direction for
// reply packets. The datapath currently populates the `Reason` field
// with CT information in TRACE_TO_{LXC,PROXY,HOST,STACK} events.
switch tn.ObsPoint {
case monitorAPI.TraceToLxc,
monitorAPI.TraceToProxy,
monitorAPI.TraceToHost,
monitorAPI.TraceToStack:

// true if the traffic source is the local endpoint, i.e. egress
isSourceEP := tn.Source == uint16(srcEP)
// true if the packet is a reply, i.e. reverse direction
isReply := decodeIsReply(tn)

// isSourceEP != isReply ==
// (isSourceEP && !isReply) || (!isSourceEP && isReply)
if isSourceEP != isReply {
return pb.TrafficDirection_EGRESS
}
return pb.TrafficDirection_INGRESS
}
}
if pvn != nil {
if pvn.IsTrafficIngress() {
return pb.TrafficDirection_INGRESS
Expand Down
141 changes: 141 additions & 0 deletions pkg/hubble/parser/threefour/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,147 @@ func TestDecodeLocalIdentity(t *testing.T) {
assert.Equal(t, []string{"cidr:1.2.3.4/12", "some=label"}, f.GetDestination().GetLabels())
}

func TestDecodeTrafficDirection(t *testing.T) {
localIP := net.ParseIP("1.2.3.4")
localEP := uint16(1234)
remoteIP := net.ParseIP("5.6.7.8")

endpointGetter := &testutils.FakeEndpointGetter{
OnGetEndpointInfo: func(ip net.IP) (endpoint v1.EndpointInfo, ok bool) {
if ip.Equal(localIP) {
return &v1.Endpoint{
ID: uint64(localEP),
}, true
}
return nil, false
},
}

parser, err := New(endpointGetter, nil, nil, nil, nil)
require.NoError(t, err)
parseFlow := func(event interface{}, srcIPv4, dstIPv4 net.IP) *pb.Flow {
data, err := testutils.CreateL3L4Payload(event,
&layers.Ethernet{
SrcMAC: net.HardwareAddr{1, 2, 3, 4, 5, 6},
DstMAC: net.HardwareAddr{7, 8, 9, 0, 1, 2},
EthernetType: layers.EthernetTypeIPv4,
},
&layers.IPv4{SrcIP: srcIPv4, DstIP: dstIPv4})
require.NoError(t, err)
f := &pb.Flow{}
err = parser.Decode(&pb.Payload{Data: data}, f)
require.NoError(t, err)
return f
}

// DROP at unknown endpoint
dn := monitor.DropNotify{
Type: byte(api.MessageTypeDrop),
}
f := parseFlow(dn, localIP, remoteIP)
assert.Equal(t, pb.TrafficDirection_TRAFFIC_DIRECTION_UNKNOWN, f.GetTrafficDirection())
assert.Equal(t, uint64(localEP), f.GetSource().GetID())

// DROP Egress
dn = monitor.DropNotify{
Type: byte(api.MessageTypeDrop),
Source: localEP,
}
f = parseFlow(dn, localIP, remoteIP)
assert.Equal(t, pb.TrafficDirection_EGRESS, f.GetTrafficDirection())
assert.Equal(t, uint64(localEP), f.GetSource().GetID())

// DROP Ingress
dn = monitor.DropNotify{
Type: byte(api.MessageTypeDrop),
Source: localEP,
}
f = parseFlow(dn, remoteIP, localIP)
assert.Equal(t, pb.TrafficDirection_INGRESS, f.GetTrafficDirection())
assert.Equal(t, uint64(localEP), f.GetDestination().GetID())

// TRACE_TO_LXC at unknown endpoint
tn := monitor.TraceNotifyV0{
Type: byte(api.MessageTypeTrace),
ObsPoint: api.TraceToLxc,
}
f = parseFlow(tn, localIP, remoteIP)
assert.Equal(t, pb.TrafficDirection_TRAFFIC_DIRECTION_UNKNOWN, f.GetTrafficDirection())
assert.Equal(t, uint64(localEP), f.GetSource().GetID())

// TRACE_TO_LXC Egress
tn = monitor.TraceNotifyV0{
Type: byte(api.MessageTypeTrace),
Source: localEP,
ObsPoint: api.TraceToLxc,
}
f = parseFlow(tn, localIP, remoteIP)
assert.Equal(t, pb.TrafficDirection_EGRESS, f.GetTrafficDirection())
assert.Equal(t, uint64(localEP), f.GetSource().GetID())

// TRACE_TO_LXC Egress, reversed by CT_REPLY
tn = monitor.TraceNotifyV0{
Type: byte(api.MessageTypeTrace),
Source: localEP,
ObsPoint: api.TraceToLxc,
Reason: monitor.TraceReasonCtReply,
}
f = parseFlow(tn, localIP, remoteIP)
assert.Equal(t, pb.TrafficDirection_INGRESS, f.GetTrafficDirection())
assert.Equal(t, uint64(localEP), f.GetSource().GetID())

// TRACE_TO_HOST Ingress
tn = monitor.TraceNotifyV0{
Type: byte(api.MessageTypeTrace),
Source: localEP,
ObsPoint: api.TraceToHost,
}
f = parseFlow(tn, remoteIP, localIP)
assert.Equal(t, pb.TrafficDirection_INGRESS, f.GetTrafficDirection())
assert.Equal(t, uint64(localEP), f.GetDestination().GetID())

// TRACE_TO_HOST Ingress, reversed by CT_REPLY
tn = monitor.TraceNotifyV0{
Type: byte(api.MessageTypeTrace),
Source: localEP,
ObsPoint: api.TraceToHost,
Reason: monitor.TraceReasonCtReply,
}
f = parseFlow(tn, remoteIP, localIP)
assert.Equal(t, pb.TrafficDirection_EGRESS, f.GetTrafficDirection())
assert.Equal(t, uint64(localEP), f.GetDestination().GetID())

// TRACE_FROM_LXC (traffic direction not supported)
tn = monitor.TraceNotifyV0{
Type: byte(api.MessageTypeTrace),
Source: localEP,
ObsPoint: api.TraceFromLxc,
}
f = parseFlow(tn, localIP, remoteIP)
assert.Equal(t, pb.TrafficDirection_TRAFFIC_DIRECTION_UNKNOWN, f.GetTrafficDirection())
assert.Equal(t, uint64(localEP), f.GetSource().GetID())

// PolicyVerdictNotify Egress
pvn := monitor.PolicyVerdictNotify{
Type: byte(api.MessageTypePolicyVerdict),
Source: localEP,
Flags: api.PolicyEgress,
}
f = parseFlow(pvn, localIP, remoteIP)
assert.Equal(t, pb.TrafficDirection_EGRESS, f.GetTrafficDirection())
assert.Equal(t, uint64(localEP), f.GetSource().GetID())

// PolicyVerdictNotify Ingress
pvn = monitor.PolicyVerdictNotify{
Type: byte(api.MessageTypePolicyVerdict),
Source: localEP,
Flags: api.PolicyIngress,
}
f = parseFlow(pvn, remoteIP, localIP)
assert.Equal(t, pb.TrafficDirection_INGRESS, f.GetTrafficDirection())
assert.Equal(t, uint64(localEP), f.GetDestination().GetID())
}

func Test_filterCidrLabels(t *testing.T) {
type args struct {
labels []string
Expand Down

0 comments on commit fa7f33e

Please sign in to comment.