Skip to content

Commit

Permalink
Add support for 802.1q in Sflow
Browse files Browse the repository at this point in the history
  • Loading branch information
Oliver Herms committed Apr 23, 2019
1 parent bab0f63 commit b0e351c
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 67 deletions.
9 changes: 5 additions & 4 deletions sflow/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,11 @@ func decodeFlowSample(flowSamplePtr unsafe.Pointer) (*FlowSample, error) {
}

fs := &FlowSample{
FlowSampleHeader: fsh,
RawPacketHeader: rph,
RawPacketHeaderData: rphd,
ExtendedRouterData: erd,
FlowSampleHeader: fsh,
RawPacketHeader: rph,
Data: rphd,
DataLen: rph.OriginalPacketLength,
ExtendedRouterData: erd,
}

return fs, nil
Expand Down
9 changes: 5 additions & 4 deletions sflow/packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,11 @@ type headerBottom struct {

// FlowSample is an sflow version 5 flow sample
type FlowSample struct {
FlowSampleHeader *FlowSampleHeader
RawPacketHeader *RawPacketHeader
RawPacketHeaderData unsafe.Pointer
ExtendedRouterData *ExtendedRouterData
FlowSampleHeader *FlowSampleHeader
RawPacketHeader *RawPacketHeader
Data unsafe.Pointer
DataLen uint32
ExtendedRouterData *ExtendedRouterData
}

// FlowSampleHeader is an sflow version 5 flow sample header
Expand Down
139 changes: 80 additions & 59 deletions sfserver/sfserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (sfs *SflowServer) processPacket(agent net.IP, buffer []byte) {
continue
}

if fs.RawPacketHeaderData == nil {
if fs.Data == nil {
log.Infof("Received sflow packet without raw packet header. Skipped.")
continue
}
Expand All @@ -140,11 +140,13 @@ func (sfs *SflowServer) processPacket(agent net.IP, buffer []byte) {
continue
}

ether, err := packet.DecodeEthernet(fs.RawPacketHeaderData, fs.RawPacketHeader.OriginalPacketLength)
ether, err := packet.DecodeEthernet(fs.Data, fs.RawPacketHeader.OriginalPacketLength)
if err != nil {
log.Infof("Unable to decode ether packet: %v", err)
continue
}
fs.Data = unsafe.Pointer(uintptr(fs.Data) - packet.SizeOfEthernetII)
fs.DataLen -= uint32(packet.SizeOfEthernetII)

fl := &netflow.Flow{
Router: agent,
Expand All @@ -159,70 +161,89 @@ func (sfs *SflowServer) processPacket(agent net.IP, buffer []byte) {
// We're updating the sampleCache to allow the forntend to show current sampling rates
sfs.sampleRateCache.Set(agent, uint64(fs.FlowSampleHeader.SamplingRate))

if fs.ExtendedRouterData != nil {
fl.NextHop = fs.ExtendedRouterData.NextHop
}

if ether.EtherType == packet.EtherTypeIPv4 {
fl.Family = 4
ipv4Ptr := unsafe.Pointer(uintptr(fs.RawPacketHeaderData) - packet.SizeOfEthernetII)
ipv4, err := packet.DecodeIPv4(ipv4Ptr, fs.RawPacketHeader.OriginalPacketLength-uint32(packet.SizeOfEthernetII))
if err != nil {
log.Errorf("Unable to decode IPv4 packet: %v", err)
}

fl.SrcAddr = convert.Reverse(ipv4.SrcAddr[:])
fl.DstAddr = convert.Reverse(ipv4.DstAddr[:])
fl.Protocol = uint32(ipv4.Protocol)
switch ipv4.Protocol {
case packet.TCP:
tcpPtr := unsafe.Pointer(uintptr(ipv4Ptr) - packet.SizeOfIPv4Header)
len := fs.RawPacketHeader.OriginalPacketLength - uint32(packet.SizeOfEthernetII) - uint32(packet.SizeOfIPv4Header)
if err := getTCP(tcpPtr, len, fl); err != nil {
log.Errorf("%v", err)
}
case packet.UDP:
udpPtr := unsafe.Pointer(uintptr(ipv4Ptr) - packet.SizeOfIPv4Header)
len := fs.RawPacketHeader.OriginalPacketLength - uint32(packet.SizeOfEthernetII) - uint32(packet.SizeOfIPv4Header)
if err := getUDP(udpPtr, len, fl); err != nil {
log.Errorf("%v", err)
}
}
} else if ether.EtherType == packet.EtherTypeIPv6 {
fl.Family = 6
ipv6Ptr := unsafe.Pointer(uintptr(fs.RawPacketHeaderData) - packet.SizeOfEthernetII)
ipv6, err := packet.DecodeIPv6(ipv6Ptr, fs.RawPacketHeader.OriginalPacketLength-uint32(packet.SizeOfEthernetII))
if err != nil {
log.Errorf("Unable to decode IPv6 packet: %v", err)
}

fl.SrcAddr = convert.Reverse(ipv6.SrcAddr[:])
fl.DstAddr = convert.Reverse(ipv6.DstAddr[:])
fl.Protocol = uint32(ipv6.NextHeader)
switch ipv6.NextHeader {
case packet.TCP:
tcpPtr := unsafe.Pointer(uintptr(ipv6Ptr) - packet.SizeOfIPv6Header)
len := fs.RawPacketHeader.OriginalPacketLength - uint32(packet.SizeOfEthernetII) - uint32(packet.SizeOfIPv6Header)
if err := getTCP(tcpPtr, len, fl); err != nil {
log.Errorf("%v", err)
}
case packet.UDP:
udpPtr := unsafe.Pointer(uintptr(ipv6Ptr) - packet.SizeOfIPv6Header)
len := fs.RawPacketHeader.OriginalPacketLength - uint32(packet.SizeOfEthernetII) - uint32(packet.SizeOfIPv6Header)
if err := getUDP(udpPtr, len, fl); err != nil {
log.Errorf("%v", err)
}
}
} else if ether.EtherType == packet.EtherTypeARP || ether.EtherType == packet.EtherTypeLACP {
if fs.ExtendedRouterData == nil {
continue
} else {
log.Errorf("Unknown EtherType: 0x%x", ether.EtherType)
}
fl.NextHop = fs.ExtendedRouterData.NextHop

sfs.processEthernet(ether.EtherType, fs, fl)
sfs.Output <- fl
}
}

func (sfs *SflowServer) processEthernet(ethType uint16, fs *sflow.FlowSample, fl *netflow.Flow) {
if ethType == packet.EtherTypeIPv4 {
sfs.processIPv4Packet(fs, fl)
} else if ethType == packet.EtherTypeIPv6 {
sfs.processIPv6Packet(fs, fl)
} else if ethType == packet.EtherTypeARP || ethType == packet.EtherTypeLACP {
return
} else if ethType == packet.EtherTypeIEEE8021Q {
sfs.processDot1QPacket(fs, fl)
} else {
log.Errorf("Unknown EtherType: 0x%x", ethType)
}
}

func (sfs *SflowServer) processDot1QPacket(fs *sflow.FlowSample, fl *netflow.Flow) {
dot1q, err := packet.DecodeDot1Q(fs.Data, fs.DataLen)
if err != nil {
log.Errorf("Unable to decode dot1q header: %v", err)
}
fs.Data = unsafe.Pointer(uintptr(fs.Data) - packet.SizeOfDot1Q)
fs.DataLen -= uint32(packet.SizeOfDot1Q)

sfs.processEthernet(dot1q.EtherType, fs, fl)
}

func (sfs *SflowServer) processIPv4Packet(fs *sflow.FlowSample, fl *netflow.Flow) {
fl.Family = 4
ipv4, err := packet.DecodeIPv4(fs.Data, fs.DataLen)
if err != nil {
log.Errorf("Unable to decode IPv4 packet: %v", err)
}
fs.Data = unsafe.Pointer(uintptr(fs.Data) - packet.SizeOfIPv4Header)
fs.DataLen -= uint32(packet.SizeOfIPv4Header)

fl.SrcAddr = convert.Reverse(ipv4.SrcAddr[:])
fl.DstAddr = convert.Reverse(ipv4.DstAddr[:])
fl.Protocol = uint32(ipv4.Protocol)
switch ipv4.Protocol {
case packet.TCP:
if err := getTCP(fs.Data, fs.DataLen, fl); err != nil {
log.Errorf("%v", err)
}
case packet.UDP:
if err := getUDP(fs.Data, fs.DataLen, fl); err != nil {
log.Errorf("%v", err)
}
}
}

func (sfs *SflowServer) processIPv6Packet(fs *sflow.FlowSample, fl *netflow.Flow) {
fl.Family = 6
ipv6, err := packet.DecodeIPv6(fs.Data, fs.DataLen)
if err != nil {
log.Errorf("Unable to decode IPv6 packet: %v", err)
}
fs.Data = unsafe.Pointer(uintptr(fs.Data) - packet.SizeOfIPv6Header)
fs.DataLen -= uint32(packet.SizeOfIPv6Header)

fl.SrcAddr = convert.Reverse(ipv6.SrcAddr[:])
fl.DstAddr = convert.Reverse(ipv6.DstAddr[:])
fl.Protocol = uint32(ipv6.NextHeader)
switch ipv6.NextHeader {
case packet.TCP:
if err := getTCP(fs.Data, fs.DataLen, fl); err != nil {
log.Errorf("%v", err)
}
case packet.UDP:
if err := getUDP(fs.Data, fs.DataLen, fl); err != nil {
log.Errorf("%v", err)
}
}
}

func getUDP(udpPtr unsafe.Pointer, length uint32, fl *netflow.Flow) error {
udp, err := packet.DecodeUDP(udpPtr, length)
if err != nil {
Expand Down

2 comments on commit b0e351c

@corny
Copy link
Collaborator

@corny corny commented on b0e351c Apr 25, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change breaks the tests

@corny
Copy link
Collaborator

@corny corny commented on b0e351c May 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the build as well ....

Please sign in to comment.