Skip to content

Commit

Permalink
Add flowType field for Flow Exporter
Browse files Browse the repository at this point in the history
In this PR, we implemented the logic of flowType value assignment.
We distinguished Pod-To-Pod flows and Pod-To-External flows using the
podCIDRs of all nodes in the k8s cluster.
  • Loading branch information
dreamtalen committed Apr 8, 2021
1 parent e027c7b commit 55b6c7e
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 14 deletions.
5 changes: 4 additions & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ func run(o *Options) error {
if features.DefaultFeatureGate.Enabled(features.FlowExporter) {
v4Enabled := config.IsIPv4Enabled(nodeConfig, networkConfig.TrafficEncapMode)
v6Enabled := config.IsIPv6Enabled(nodeConfig, networkConfig.TrafficEncapMode)
isNetworkPolicyOnly := networkConfig.TrafficEncapMode.IsNetworkPolicyOnly()

flowRecords := flowrecords.NewFlowRecords()
connStore := connections.NewConnectionStore(
Expand All @@ -376,7 +377,9 @@ func run(o *Options) error {
o.config.EnableTLSToFlowAggregator,
v4Enabled,
v6Enabled,
k8sClient)
k8sClient,
nodeRouteController,
isNetworkPolicyOnly)
if err != nil {
return fmt.Errorf("error when creating IPFIX flow exporter: %v", err)
}
Expand Down
42 changes: 41 additions & 1 deletion pkg/agent/controller/noderoute/node_route_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ const (
ovsExternalIDNodeName = "node-name"

nodeRouteInfoPodCIDRIndexName = "podCIDR"

IPv4BitLen = net.IPv4len * 8
IPv6BitLen = net.IPv6len * 8
)

// Controller is responsible for setting up necessary IP routes and Openflow entries for inter-node traffic.
Expand Down Expand Up @@ -97,7 +100,8 @@ func NewNodeRouteController(
nodeLister: nodeInformer.Lister(),
nodeListerSynced: nodeInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "noderoute"),
installedNodes: cache.NewIndexer(nodeRouteInfoKeyFunc, cache.Indexers{nodeRouteInfoPodCIDRIndexName: nodeRouteInfoPodCIDRIndexFunc})}
installedNodes: cache.NewIndexer(nodeRouteInfoKeyFunc, cache.Indexers{nodeRouteInfoPodCIDRIndexName: nodeRouteInfoPodCIDRIndexFunc}),
}
nodeInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: func(cur interface{}) {
Expand Down Expand Up @@ -615,3 +619,39 @@ func GetNodeAddr(node *corev1.Node) (net.IP, error) {
}
return ipAddr, nil
}

func (c *Controller) IPInPodSubnets(ip net.IP) bool {
var ipCIDR *net.IPNet
var curNodeCIDRStr string
if ip.To4() != nil {
var podIPv4CIDRMaskSize int
if c.nodeConfig.PodIPv4CIDR != nil {
curNodeCIDRStr = c.nodeConfig.PodIPv4CIDR.String()
podIPv4CIDRMaskSize, _ = c.nodeConfig.PodIPv4CIDR.Mask.Size()
} else {
return false
}
v4Mask := net.CIDRMask(podIPv4CIDRMaskSize, IPv4BitLen)
ipCIDR = &net.IPNet{
IP: ip.Mask(v4Mask),
Mask: v4Mask,
}

} else {
var podIPv6CIDRMaskSize int
if c.nodeConfig.PodIPv6CIDR != nil {
curNodeCIDRStr = c.nodeConfig.PodIPv6CIDR.String()
podIPv6CIDRMaskSize, _ = c.nodeConfig.PodIPv6CIDR.Mask.Size()
} else {
return false
}
v6Mask := net.CIDRMask(podIPv6CIDRMaskSize, IPv6BitLen)
ipCIDR = &net.IPNet{
IP: ip.Mask(v6Mask),
Mask: v6Mask,
}
}
ipCIDRStr := ipCIDR.String()
nodeInCluster, _ := c.installedNodes.ByIndex(nodeRouteInfoPodCIDRIndexName, ipCIDRStr)
return len(nodeInCluster) > 0 || ipCIDRStr == curNodeCIDRStr
}
80 changes: 75 additions & 5 deletions pkg/agent/controller/noderoute/node_route_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/containernetworking/plugins/pkg/ip"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
Expand All @@ -35,11 +36,13 @@ import (
)

var (
gatewayMAC, _ = net.ParseMAC("00:00:00:00:00:01")
_, podCIDR, _ = net.ParseCIDR("1.1.1.0/24")
podCIDRGateway = ip.NextIP(podCIDR.IP)
nodeIP1 = net.ParseIP("10.10.10.10")
nodeIP2 = net.ParseIP("10.10.10.11")
gatewayMAC, _ = net.ParseMAC("00:00:00:00:00:01")
_, podCIDR, _ = net.ParseCIDR("1.1.1.0/24")
_, podCIDR2, _ = net.ParseCIDR("1.1.2.0/24")
podCIDRGateway = ip.NextIP(podCIDR.IP)
podCIDR2Gateway = ip.NextIP(podCIDR2.IP)
nodeIP1 = net.ParseIP("10.10.10.10")
nodeIP2 = net.ParseIP("10.10.10.11")
)

type fakeController struct {
Expand Down Expand Up @@ -158,3 +161,70 @@ func TestControllerWithDuplicatePodCIDR(t *testing.T) {
case <-finishCh:
}
}

func TestIPInPodSubnets(t *testing.T) {
c, closeFn := newController(t)
defer closeFn()
defer c.queue.ShutDown()

stopCh := make(chan struct{})
defer close(stopCh)
c.informerFactory.Start(stopCh)
// Must wait for cache sync, otherwise resource creation events will be missing if the resources are created
// in-between list and watch call of an informer. This is because fake clientset doesn't support watching with
// resourceVersion. A watcher of fake clientset only gets events that happen after the watcher is created.
c.informerFactory.WaitForCacheSync(stopCh)
c.Controller.nodeConfig.PodIPv4CIDR = podCIDR

node1 := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
},
Spec: corev1.NodeSpec{
PodCIDR: podCIDR.String(),
PodCIDRs: []string{podCIDR.String()},
},
Status: corev1.NodeStatus{
Addresses: []corev1.NodeAddress{
{
Type: corev1.NodeInternalIP,
Address: nodeIP1.String(),
},
},
},
}
node2 := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node2",
},
Spec: corev1.NodeSpec{
PodCIDR: podCIDR2.String(),
PodCIDRs: []string{podCIDR2.String()},
},
Status: corev1.NodeStatus{
Addresses: []corev1.NodeAddress{
{
Type: corev1.NodeInternalIP,
Address: nodeIP2.String(),
},
},
},
}

c.clientset.CoreV1().Nodes().Create(context.TODO(), node1, metav1.CreateOptions{})
// The 2nd argument is Any() because the argument is unpredictable when it uses pointer as the key of map.
// The argument type is map[*net.IPNet]net.IP.
c.ofClient.EXPECT().InstallNodeFlows("node1", gomock.Any(), nodeIP1, uint32(0)).Times(1)
c.routeClient.EXPECT().AddRoutes(podCIDR, "node1", nodeIP1, podCIDRGateway).Times(1)
c.processNextWorkItem()

c.clientset.CoreV1().Nodes().Create(context.TODO(), node2, metav1.CreateOptions{})
c.ofClient.EXPECT().InstallNodeFlows("node2", gomock.Any(), nodeIP2, uint32(0)).Times(1)
c.routeClient.EXPECT().AddRoutes(podCIDR2, "node2", nodeIP2, podCIDR2Gateway).Times(1)
c.processNextWorkItem()

assert.Equal(t, true, c.Controller.IPInPodSubnets(net.ParseIP("1.1.1.1")))
assert.Equal(t, true, c.Controller.IPInPodSubnets(net.ParseIP("1.1.2.1")))
assert.Equal(t, false, c.Controller.IPInPodSubnets(net.ParseIP("10.10.10.10")))
assert.Equal(t, false, c.Controller.IPInPodSubnets(net.ParseIP("8.8.8.8")))
}
46 changes: 39 additions & 7 deletions pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/klog"

"github.com/vmware-tanzu/antrea/pkg/agent/controller/noderoute"
"github.com/vmware-tanzu/antrea/pkg/agent/flowexporter"
"github.com/vmware-tanzu/antrea/pkg/agent/flowexporter/connections"
"github.com/vmware-tanzu/antrea/pkg/agent/flowexporter/flowrecords"
"github.com/vmware-tanzu/antrea/pkg/agent/openflow"
"github.com/vmware-tanzu/antrea/pkg/ipfix"
"github.com/vmware-tanzu/antrea/pkg/util/env"
)
Expand Down Expand Up @@ -94,6 +96,8 @@ type flowExporter struct {
idleFlowTimeout time.Duration
enableTLSToFlowAggregator bool
k8sClient kubernetes.Interface
nodeRouteController *noderoute.Controller
isNetworkPolicyOnly bool
}

func genObservationID() (uint32, error) {
Expand Down Expand Up @@ -123,7 +127,8 @@ func prepareExporterInputArgs(collectorAddr, collectorProto string) (exporter.Ex

func NewFlowExporter(connStore connections.ConnectionStore, records *flowrecords.FlowRecords,
collectorAddr string, collectorProto string, activeFlowTimeout time.Duration, idleFlowTimeout time.Duration,
enableTLSToFlowAggregator bool, v4Enabled bool, v6Enabled bool, k8sClient kubernetes.Interface) (*flowExporter, error) {
enableTLSToFlowAggregator bool, v4Enabled bool, v6Enabled bool, k8sClient kubernetes.Interface,
nodeRouteController *noderoute.Controller, isNetworkPolicyOnly bool) (*flowExporter, error) {
// Initialize IPFIX registry
registry := ipfix.NewIPFIXRegistry()
registry.LoadRegistry()
Expand All @@ -133,6 +138,7 @@ func NewFlowExporter(connStore connections.ConnectionStore, records *flowrecords
if err != nil {
return nil, err
}

return &flowExporter{
connStore: connStore,
flowRecords: records,
Expand All @@ -145,6 +151,8 @@ func NewFlowExporter(connStore connections.ConnectionStore, records *flowrecords
ipfixSet: ipfix.NewSet(false),
enableTLSToFlowAggregator: enableTLSToFlowAggregator,
k8sClient: k8sClient,
nodeRouteController: nodeRouteController,
isNetworkPolicyOnly: isNetworkPolicyOnly,
}, nil
}

Expand Down Expand Up @@ -516,12 +524,7 @@ func (exp *flowExporter) addRecordToSet(record flowexporter.FlowRecord) error {
case "tcpState":
ie.Value = record.Conn.TCPState
case "flowType":
// TODO: assign flow type to support Pod-to-External flows
if record.Conn.SourcePodName == "" || record.Conn.DestinationPodName == "" {
ie.Value = ipfixregistry.InterNode
} else {
ie.Value = ipfixregistry.IntraNode
}
ie.Value = exp.findFlowType(record)
}
}

Expand All @@ -544,3 +547,32 @@ func (exp *flowExporter) sendDataSet() (int, error) {
klog.V(4).Infof("Data set sent successfully. Bytes sent: %d", sentBytes)
return sentBytes, nil
}

func (exp *flowExporter) findFlowType(record flowexporter.FlowRecord) uint8 {
// TODO: support Pod-To-External flows in network policy only mode.
if exp.isNetworkPolicyOnly {
if record.Conn.SourcePodName == "" || record.Conn.DestinationPodName == "" {
return ipfixregistry.InterNode
}
return ipfixregistry.IntraNode
}

if exp.nodeRouteController == nil {
klog.Warningf("Can't find flowType without nodeRouteController")
return 0
}
if exp.nodeRouteController.IPInPodSubnets(record.Conn.TupleOrig.SourceAddress) {
if record.Conn.Mark == openflow.ServiceCTMark || exp.nodeRouteController.IPInPodSubnets(record.Conn.TupleOrig.DestinationAddress) {
if record.Conn.SourcePodName == "" || record.Conn.DestinationPodName == "" {
return ipfixregistry.InterNode
}
return ipfixregistry.IntraNode
} else {
return ipfixregistry.ToExternal
}
} else {
// We do not support External-To-Pod flows for now.
klog.Warningf("Source IP: %s doesn't exist in PodCIDRs", record.Conn.TupleOrig.SourceAddress.String())
return 0
}
}
10 changes: 10 additions & 0 deletions test/e2e/flowaggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
ipfixregistry "github.com/vmware/go-ipfix/pkg/registry"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
Expand All @@ -40,6 +41,7 @@ DATA SET:
DATA RECORD-0:
flowStartSeconds: 1608338066
flowEndSeconds: 1608338072
flowEndReason: 2
sourceTransportPort: 43600
destinationTransportPort: 5201
protocolIdentifier: 6
Expand All @@ -65,6 +67,7 @@ DATA SET:
ingressNetworkPolicyNamespace: antrea-test
egressNetworkPolicyName: test-flow-aggregator-networkpolicy-egress
egressNetworkPolicyNamespace: antrea-test
flowType: 1
destinationClusterIPv4: 0.0.0.0
originalExporterIPv4Address: 10.10.0.1
originalObservationDomainId: 2134708971
Expand Down Expand Up @@ -242,8 +245,10 @@ func checkRecordsForFlows(t *testing.T, data *TestData, srcIP string, dstIP stri
// Check if record has both Pod name of source and destination pod.
if isIntraNode {
checkPodAndNodeData(t, record, "perftest-a", controlPlaneNodeName(), "perftest-b", controlPlaneNodeName())
checkFlowType(t, record, ipfixregistry.IntraNode)
} else {
checkPodAndNodeData(t, record, "perftest-a", controlPlaneNodeName(), "perftest-c", workerNodeName(1))
checkFlowType(t, record, ipfixregistry.InterNode)
}

if checkService {
Expand Down Expand Up @@ -336,6 +341,11 @@ func checkBandwidthFromRecord(t *testing.T, record, bandwidth string) {
}
}

// TODO: Add a test that checks the functionality of Pod-To-External flow.
func checkFlowType(t *testing.T, record string, flowType uint8) {
assert.Containsf(t, record, fmt.Sprintf("%s: %d", "flowType", flowType), "Record does not have correct flowType")
}

func getRecordsFromOutput(output string) []string {
re := regexp.MustCompile("(?m)^.*" + "#" + ".*$[\r\n]+")
output = re.ReplaceAllString(output, "")
Expand Down

0 comments on commit 55b6c7e

Please sign in to comment.