Skip to content

Commit

Permalink
Add port for logging
Browse files Browse the repository at this point in the history
Signed-off-by: Qiyue Yao <yaoq@vmware.com>

add UT

Signed-off-by: Qiyue Yao <yaoq@vmware.com>
  • Loading branch information
qiyueyao committed Feb 10, 2022
1 parent f7f353a commit 85beb21
Show file tree
Hide file tree
Showing 8 changed files with 231 additions and 149 deletions.
8 changes: 4 additions & 4 deletions docs/antrea-network-policy.md
Expand Up @@ -502,13 +502,13 @@ deduplication is applied to simplify multiple logs. Duplication buffer length is
The rules are logged in the following format:

```text
<yyyy/mm/dd> <time> <ovs-table-name> <antrea-native-policy-reference> <action> <openflow-priority> <source-ip> <destination-ip> <packet-length> <protocol>
<yyyy/mm/dd> <time> <ovs-table-name> <antrea-native-policy-reference> <action> <openflow-priority> <source-ip> <source-port> <destination-ip> <destination-port> <protocol> <packet-length>
Deduplication:
<yyyy/mm/dd> <time> <ovs-table-name> <antrea-native-policy-reference> <action> <openflow-priority> <source-ip> <destination-ip> <packet-length> <protocol> [<num of packets> packets in <duplicate duration>]
<yyyy/mm/dd> <time> <ovs-table-name> <antrea-native-policy-reference> <action> <openflow-priority> <source-ip> <source-port> <destination-ip> <destination-port> <protocol> <packet-length> [<num of packets> packets in <duplicate duration>]
Example:
2020/11/02 22:21:21.148395 AntreaPolicyAppTierIngressRule AntreaNetworkPolicy:default/test-anp Allow 61800 10.0.0.4 10.0.0.5 60 TCP
2021/06/24 23:56:41.346165 AntreaPolicyEgressRule AntreaNetworkPolicy:default/test-anp Drop 44900 10.0.0.5 10.0.0.4 60 TCP [3 packets in 1.011379442s]
2020/11/02 22:21:21.148395 AntreaPolicyAppTierIngressRule AntreaNetworkPolicy:default/test-anp Allow 61800 10.10.1.65 35402 10.0.0.5 80 TCP 60
2021/06/24 23:56:41.346165 AntreaPolicyEgressRule AntreaNetworkPolicy:default/test-anp Drop 44900 10.10.1.65 35402 10.0.0.5 80 TCP 60 [3 packets in 1.011379442s]
```

**`appliedTo` per rule**: A ClusterNetworkPolicy ingress or egress rule may
Expand Down
69 changes: 68 additions & 1 deletion pkg/agent/controller/networkpolicy/audit_logging.go
Expand Up @@ -19,13 +19,17 @@ import (
"log"
"os"
"path/filepath"
"strconv"
"sync"
"time"

"antrea.io/ofnet/ofctrl"
"gopkg.in/natefinch/lumberjack.v2"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent/openflow"
binding "antrea.io/antrea/pkg/ovs/openflow"
"antrea.io/antrea/pkg/util/ip"
"antrea.io/antrea/pkg/util/logdir"
)

Expand Down Expand Up @@ -65,7 +69,9 @@ type logInfo struct {
disposition string // Allow/Drop of the rule sending packetin
ofPriority string // openflow priority of the flow sending packetin
srcIP string // source IP of the traffic logged
srcPort string // source port of the traffic logged
destIP string // destination IP of the traffic logged
destPort string // destination port of the traffic logged
pktLength uint16 // packet length of packetin
protocolStr string // protocol of the traffic logged
}
Expand Down Expand Up @@ -127,7 +133,7 @@ func (l *AntreaPolicyLogger) updateLogKey(logMsg string, bufferLength time.Durat
// LogDedupPacket logs information in ob based on disposition and duplication conditions.
func (l *AntreaPolicyLogger) LogDedupPacket(ob *logInfo) {
// Deduplicate non-Allow packet log.
logMsg := fmt.Sprintf("%s %s %s %s %s %s %d %s", ob.tableName, ob.npRef, ob.disposition, ob.ofPriority, ob.srcIP, ob.destIP, ob.pktLength, ob.protocolStr)
logMsg := fmt.Sprintf("%s %s %s %s %s %s %s %s %s %d", ob.tableName, ob.npRef, ob.disposition, ob.ofPriority, ob.srcIP, ob.srcPort, ob.destIP, ob.destPort, ob.protocolStr, ob.pktLength)
if ob.disposition == openflow.DispositionToString[openflow.DispositionAllow] {
l.anpLogger.Printf(logMsg)
} else {
Expand Down Expand Up @@ -170,3 +176,64 @@ func newAntreaPolicyLogger() (*AntreaPolicyLogger, error) {
klog.InfoS("Initialized Antrea-native Policy Logger for audit logging", "logFile", logFile)
return antreaPolicyLogger, nil
}

// getNetworkPolicyInfo fills in tableName, npName, ofPriority, disposition of logInfo ob.
func getNetworkPolicyInfo(pktIn *ofctrl.PacketIn, c *Controller, ob *logInfo) error {
matchers := pktIn.GetMatches()
var match *ofctrl.MatchField
// Get table name
tableID := pktIn.TableId
ob.tableName = openflow.GetFlowTableName(tableID)

// Get disposition Allow or Drop
match = getMatchRegField(matchers, openflow.APDispositionField)
info, err := getInfoInReg(match, openflow.APDispositionField.GetRange().ToNXRange())
if err != nil {
return fmt.Errorf("received error while unloading disposition from reg: %v", err)
}
ob.disposition = openflow.DispositionToString[info]

// Set match to corresponding ingress/egress reg according to disposition
match = getMatch(matchers, tableID, info)

// Get Network Policy full name and OF priority of the conjunction
info, err = getInfoInReg(match, nil)
if err != nil {
return fmt.Errorf("received error while unloading conjunction id from reg: %v", err)
}
ob.npRef, ob.ofPriority = c.ofClient.GetPolicyInfoFromConjunction(info)

return nil
}

// logPacket retrieves information from openflow reg, controller cache, packet-in
// packet to log. Log is deduplicated for non-Allow packets from record in logDeduplication.
// Deduplication is safe guarded by logRecordDedupMap mutex.
func (c *Controller) logPacket(pktIn *ofctrl.PacketIn) error {
ob := new(logInfo)
packet, err := binding.ParsePacketIn(pktIn)
if err != nil {
return fmt.Errorf("received error while parsing packetin: %v", err)
}

// Get Network Policy log info
err = getNetworkPolicyInfo(pktIn, c, ob)
if err != nil {
return fmt.Errorf("received error while retrieving NetworkPolicy info: %v", err)
}
// Set packet log info
ob.srcIP = packet.SourceIP.String()
ob.destIP = packet.DestinationIP.String()
ob.pktLength = packet.IPLength
ob.protocolStr = ip.IPProtocolNumberToString(packet.IPProto, "UnknownProtocol")
if ob.protocolStr == "TCP" || ob.protocolStr == "UDP" {
ob.srcPort = strconv.Itoa(int(packet.SourcePort))
ob.destPort = strconv.Itoa(int(packet.DestinationPort))
} else {
ob.srcPort, ob.destPort = "<nil>", "<nil>"
}

// Log the ob info to corresponding file w/ deduplication
c.antreaPolicyLogger.LogDedupPacket(ob)
return nil
}
13 changes: 9 additions & 4 deletions pkg/agent/controller/networkpolicy/audit_logging_test.go
Expand Up @@ -112,17 +112,22 @@ func newTestAntreaPolicyLogger(bufferLength time.Duration, clock Clock) (*Antrea
}

func newLogInfo(disposition string) (*logInfo, string) {
expected := fmt.Sprintf("AntreaPolicyIngressRule AntreaNetworkPolicy:default/test %s 0 0.0.0.0 1.1.1.1 60 TCP", disposition)
return &logInfo{
testLogInfo := &logInfo{
tableName: "AntreaPolicyIngressRule",
npRef: "AntreaNetworkPolicy:default/test",
ofPriority: "0",
disposition: disposition,
srcIP: "0.0.0.0",
srcPort: "35402",
destIP: "1.1.1.1",
pktLength: 60,
destPort: "80",
protocolStr: "TCP",
}, expected
pktLength: 60,
}
expected := fmt.Sprintf("%s %s %s %s %s %s %s %s %s %d", testLogInfo.tableName, testLogInfo.npRef, testLogInfo.disposition,
testLogInfo.ofPriority, testLogInfo.srcIP, testLogInfo.srcPort, testLogInfo.destIP, testLogInfo.destPort,
testLogInfo.protocolStr, testLogInfo.pktLength)
return testLogInfo, expected
}

func sendMultiplePackets(antreaLogger *AntreaPolicyLogger, ob *logInfo, numPackets int, sendInterval time.Duration) {
Expand Down
76 changes: 0 additions & 76 deletions pkg/agent/controller/networkpolicy/packetin.go
Expand Up @@ -20,15 +20,13 @@ import (
"time"

"antrea.io/libOpenflow/openflow13"
"antrea.io/libOpenflow/protocol"
"antrea.io/ofnet/ofctrl"
"github.com/vmware/go-ipfix/pkg/registry"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent/flowexporter"
"antrea.io/antrea/pkg/agent/openflow"
binding "antrea.io/antrea/pkg/ovs/openflow"
"antrea.io/antrea/pkg/util/ip"
)

// HandlePacketIn is the packetin handler registered to openflow by Antrea network
Expand Down Expand Up @@ -107,80 +105,6 @@ func getInfoInReg(regMatch *ofctrl.MatchField, rng *openflow13.NXRange) (uint32,
return regValue.Data, nil
}

// getNetworkPolicyInfo fills in tableName, npName, ofPriority, disposition of logInfo ob.
func getNetworkPolicyInfo(pktIn *ofctrl.PacketIn, c *Controller, ob *logInfo) error {
matchers := pktIn.GetMatches()
var match *ofctrl.MatchField
// Get table name
tableID := pktIn.TableId
ob.tableName = openflow.GetFlowTableName(tableID)

// Get disposition Allow or Drop
match = getMatchRegField(matchers, openflow.APDispositionField)
info, err := getInfoInReg(match, openflow.APDispositionField.GetRange().ToNXRange())
if err != nil {
return fmt.Errorf("received error while unloading disposition from reg: %v", err)
}
ob.disposition = openflow.DispositionToString[info]

// Set match to corresponding ingress/egress reg according to disposition
match = getMatch(matchers, tableID, info)

// Get Network Policy full name and OF priority of the conjunction
info, err = getInfoInReg(match, nil)
if err != nil {
return fmt.Errorf("received error while unloading conjunction id from reg: %v", err)
}
ob.npRef, ob.ofPriority = c.ofClient.GetPolicyInfoFromConjunction(info)

return nil
}

// getPacketInfo fills in srcIP, destIP, pktLength, protocol of logInfo ob.
func getPacketInfo(pktIn *ofctrl.PacketIn, ob *logInfo) error {
var prot uint8
switch ipPkt := pktIn.Data.Data.(type) {
case *protocol.IPv4:
ob.srcIP = ipPkt.NWSrc.String()
ob.destIP = ipPkt.NWDst.String()
ob.pktLength = ipPkt.Length
prot = ipPkt.Protocol
case *protocol.IPv6:
ob.srcIP = ipPkt.NWSrc.String()
ob.destIP = ipPkt.NWDst.String()
ob.pktLength = ipPkt.Length
prot = ipPkt.NextHeader
default:
return errors.New("unsupported packet-in: should be a valid IPv4 or IPv6 packet")
}

ob.protocolStr = ip.IPProtocolNumberToString(prot, "UnknownProtocol")

return nil
}

// logPacket retrieves information from openflow reg, controller cache, packet-in
// packet to log. Log is deduplicated for non-Allow packets from record in logDeduplication.
// Deduplication is safe guarded by logRecordDedupMap mutex.
func (c *Controller) logPacket(pktIn *ofctrl.PacketIn) error {
ob := new(logInfo)

// Get Network Policy log info
err := getNetworkPolicyInfo(pktIn, c, ob)
if err != nil {
return fmt.Errorf("received error while retrieving NetworkPolicy info: %v", err)
}
// Get packet log info
err = getPacketInfo(pktIn, ob)
if err != nil {
return fmt.Errorf("received error while retrieving NetworkPolicy info: %v", err)
}

// Log the ob info to corresponding file w/ deduplication
c.antreaPolicyLogger.LogDedupPacket(ob)
return nil
}

func (c *Controller) storeDenyConnection(pktIn *ofctrl.PacketIn) error {
packet, err := binding.ParsePacketIn(pktIn)
if err != nil {
Expand Down
61 changes: 0 additions & 61 deletions pkg/agent/controller/networkpolicy/packetin_test.go

This file was deleted.

0 comments on commit 85beb21

Please sign in to comment.