Skip to content

Commit

Permalink
move much of the low level packet processing into the packetData stru…
Browse files Browse the repository at this point in the history
…ct and associated methods
  • Loading branch information
Philip committed Apr 18, 2016
1 parent 1a7f7a4 commit 8cd9075
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 97 deletions.
244 changes: 164 additions & 80 deletions main.go
Expand Up @@ -7,6 +7,7 @@ import "time"
import "net"
import "os"
import "io"
import "errors"
import "runtime/pprof"
import "encoding/binary"

Expand Down Expand Up @@ -49,9 +50,139 @@ type tcpDataStruct struct {
or it can be 'flush' or 'stop' to signal packet handling threads
*/
type packetData struct {
Packet gopacket.Packet
Tcpdata tcpDataStruct
Type string
packet gopacket.Packet
tcpdata tcpDataStruct
datatype string

foundLayerTypes []gopacket.LayerType

ethLayer *layers.Ethernet
ipLayer *layers.IPv4
udpLayer *layers.UDP
tcpLayer *layers.TCP
dns *layers.DNS
payload *gopacket.Payload
}

func NewTcpData(tcpdata tcpDataStruct) *packetData {
var pd packetData

pd.datatype="tcp"

pd.tcpdata = tcpdata

return &pd
}

func NewPacketData(packet gopacket.Packet) *packetData {

var pd packetData

pd.datatype="packet"

pd.packet = packet

return &pd

}

func (pd *packetData) Parse() error {

if pd.datatype == "tcp" {
pd.dns = &layers.DNS{}
pd.payload = &gopacket.Payload{}
//for parsing the reassembled TCP streams
dnsParser := gopacket.NewDecodingLayerParser(
layers.LayerTypeDNS,
pd.dns,
pd.payload,
)

dnsParser.DecodeLayers(pd.tcpdata.DnsData, &pd.foundLayerTypes)

return nil
}else if pd.datatype == "packet" {
pd.ethLayer = &layers.Ethernet{}
pd.ipLayer = &layers.IPv4{}
pd.udpLayer = &layers.UDP{}
pd.tcpLayer = &layers.TCP{}
pd.dns = &layers.DNS{}
pd.payload = &gopacket.Payload{}
//we're constraining the set of layer decoders that gopacket will apply
//to this traffic. this MASSIVELY speeds up the parsing phase
parser := gopacket.NewDecodingLayerParser(
layers.LayerTypeEthernet,
pd.ethLayer,
pd.ipLayer,
pd.udpLayer,
pd.tcpLayer,
pd.dns,
pd.payload,
)


parser.DecodeLayers(pd.packet.Data(), &pd.foundLayerTypes)

return nil

}else{
return errors.New("Bad packet type: "+pd.datatype)
}
}

func (pd *packetData) GetSrcIP() net.IP{
if pd.ipLayer != nil {
return pd.ipLayer.SrcIP
} else {
return net.IP(pd.tcpdata.IpLayer.Src().Raw())
}

}

func (pd *packetData) GetDstIP() net.IP{
if pd.ipLayer != nil {
return pd.ipLayer.DstIP
} else {
return net.IP(pd.tcpdata.IpLayer.Dst().Raw())
}
}



func (pd *packetData) IsTCPStream() bool {
return pd.datatype == "tcp"
}

func (pd *packetData) GetTCPLayer() *layers.TCP {
return pd.tcpLayer
}

func (pd *packetData) GetIPLayer() *layers.IPv4 {
return pd.ipLayer
}

func (pd *packetData) GetDNSLayer() *layers.DNS {
return pd.dns
}

func (pd *packetData) HasTCPLayer() bool {
return foundLayerType(layers.LayerTypeTCP, pd.foundLayerTypes)
}

func (pd *packetData) HasIPLayer() bool {
return foundLayerType(layers.LayerTypeIPv4, pd.foundLayerTypes)
}

func (pd *packetData) HasDNSLayer() bool {
return foundLayerType(layers.LayerTypeDNS, pd.foundLayerTypes)
}

func (pd *packetData) GetTimestamp() *time.Time {
if pd.datatype == "packet" {
return &pd.packet.Metadata().Timestamp
} else {
return nil
}
}

/*
Expand Down Expand Up @@ -186,7 +317,7 @@ func cleanDnsCache(conntable *map[uint16]dnsMapEntry, maxAge time.Duration, inte
}
}

func handleDns(conntable *map[uint16]dnsMapEntry, dns layers.DNS, logC chan dnsLogEntry,
func handleDns(conntable *map[uint16]dnsMapEntry, dns *layers.DNS, logC chan dnsLogEntry,
srcIP net.IP, dstIP net.IP) {
//skip non-query stuff (Updates, AXFRs, etc)
if dns.OpCode != layers.DNSOpCodeQuery {
Expand All @@ -210,11 +341,11 @@ func handleDns(conntable *map[uint16]dnsMapEntry, dns layers.DNS, logC chan dnsL
//if we just got the reply
if dns.QR {
log.Debug("Got 'answer' leg of query ID: " + strconv.Itoa(int(dns.ID)))
initLogEntry(srcIP, dstIP, item.entry, dns, &logs)
initLogEntry(srcIP, dstIP, item.entry, *dns, &logs)
} else {
//we just got the question, so we should already have the reply
log.Debug("Got the 'question' leg of query ID " + strconv.Itoa(int(dns.ID)))
initLogEntry(srcIP, dstIP, dns, item.entry, &logs)
initLogEntry(srcIP, dstIP, *dns, item.entry, &logs)
}
delete(*conntable, dns.ID)

Expand All @@ -228,7 +359,7 @@ func handleDns(conntable *map[uint16]dnsMapEntry, dns layers.DNS, logC chan dnsL
//This is the initial query. save it for later.
log.Debug("Got a leg of query ID " + strconv.Itoa(int(dns.ID)))
mapEntry := dnsMapEntry{
entry: dns,
entry: *dns,
inserted: time.Now(),
}
(*conntable)[dns.ID] = mapEntry
Expand All @@ -241,7 +372,7 @@ func handleDns(conntable *map[uint16]dnsMapEntry, dns layers.DNS, logC chan dnsL
we pass packet by value here because we turned on ZeroCopy for the capture, which reuses the capture buffer
*/
func handlePacket(packets chan packetData, logC chan dnsLogEntry,
func handlePacket(packets chan *packetData, logC chan dnsLogEntry,
gcInterval time.Duration, gcAge time.Duration, threadNum int) {

//DNS IDs are stored as uint16s by the gopacket DNS layer
Expand All @@ -250,34 +381,6 @@ func handlePacket(packets chan packetData, logC chan dnsLogEntry,
//setup garbage collection for this map
go cleanDnsCache(&conntable, gcAge, gcInterval, threadNum)

var ethLayer layers.Ethernet
var ipLayer layers.IPv4
var udpLayer layers.UDP
var tcpLayer layers.TCP
var dns layers.DNS
var payload gopacket.Payload

//we're constraining the set of layer decoders that gopacket will apply
//to this traffic. this MASSIVELY speeds up the parsing phase
parser := gopacket.NewDecodingLayerParser(
layers.LayerTypeEthernet,
&ethLayer,
&ipLayer,
&udpLayer,
&tcpLayer,
&dns,
&payload,
)

//for parsing the reassembled TCP streams
dnsParser := gopacket.NewDecodingLayerParser(
layers.LayerTypeDNS,
&dns,
&payload,
)

foundLayerTypes := []gopacket.LayerType{}

//TCP reassembly init
streamFactory := &dnsStreamFactory{}
streamPool := tcpassembly.NewStreamPool(streamFactory)
Expand All @@ -291,43 +394,30 @@ func handlePacket(packets chan packetData, logC chan dnsLogEntry,
//used for clean shutdowns
if !more {
return
}else if packet.Type == "flush" {
count:=assembler.FlushAll()
log.Debug("(thread "+strconv.Itoa(threadNum)+") flushed "+strconv.Itoa(count)+" connections")
continue
}

//we're intentionally ignoring the errors that DecodeLayers will
//return if it can't parse an entire packet. We check the list of
//discovered layers to work through a couple of possible error states.

var srcIP net.IP
var dstIP net.IP
err := packet.Parse()

if packet.Type == "packet" {
parser.DecodeLayers(packet.Packet.Data(), &foundLayerTypes)
srcIP = ipLayer.SrcIP
dstIP = ipLayer.DstIP
}else if packet.Type == "tcp" {
if len(packet.Tcpdata.DnsData) != packet.Tcpdata.Length {
log.Debugf("Got TCP data of length %d, expecting %d", len(packet.Tcpdata.DnsData), packet.Tcpdata.Length)
}
dnsParser.DecodeLayers(packet.Tcpdata.DnsData, &foundLayerTypes)
srcIP = net.IP(packet.Tcpdata.IpLayer.Src().Raw())
dstIP = net.IP(packet.Tcpdata.IpLayer.Dst().Raw())
}else{
log.Debug("Got a channel entry with no data!")
if err != nil {
log.Debugf("Error parsing packet: %s", err)
continue
}

srcIP := packet.GetSrcIP()
dstIP := packet.GetDstIP()

//All TCP goes to reassemble. This is first because a single packet DNS request will parse as DNS
//But that will leave the connection hanging around in memory, because the inital handshake won't
//parse as DNS, nor will the connection closing.
if foundLayerType(layers.LayerTypeTCP, foundLayerTypes) {
assembler.AssembleWithTimestamp(ipLayer.NetworkFlow(), &tcpLayer, packet.Packet.Metadata().Timestamp)

if packet.IsTCPStream() {
handleDns(&conntable, packet.GetDNSLayer(), logC, srcIP, dstIP)
}else if packet.HasTCPLayer() {
assembler.AssembleWithTimestamp(packet.GetIPLayer().NetworkFlow(),
packet.GetTCPLayer(), *packet.GetTimestamp())
continue
}else if foundLayerType(layers.LayerTypeDNS, foundLayerTypes){
handleDns(&conntable, dns, logC, srcIP, dstIP)
}else if packet.HasDNSLayer(){
handleDns(&conntable, packet.GetDNSLayer(), logC, srcIP, dstIP)
}else{
//UDP and doesn't parse as DNS?
log.Debug("Missing a DNS layer?")
Expand Down Expand Up @@ -403,9 +493,9 @@ func doCapture(handle *pcap.Handle, logChan chan dnsLogEntry,
reassembleChan = reChan

/* init channels for the packet handlers and kick off handler threads */
var channels []chan packetData
var channels []chan *packetData
for i := 0; i < numprocs; i++ {
channels = append(channels, make(chan packetData, 100))
channels = append(channels, make(chan *packetData, 100))
}

for i := 0; i < numprocs; i++ {
Expand Down Expand Up @@ -441,22 +531,18 @@ func doCapture(handle *pcap.Handle, logChan chan dnsLogEntry,

foundLayerTypes := []gopacket.LayerType{}

channelData := packetData{}

CAPTURE:
for {
select{
case reassembledTcp := <- reChan:
channelData.Tcpdata = reassembledTcp
channelData.Type = "tcp"
channels[int(reassembledTcp.IpLayer.FastHash()) & (numprocs-1)] <- channelData
pd := NewTcpData(reassembledTcp)
channels[int(reassembledTcp.IpLayer.FastHash()) & (numprocs-1)] <- pd
case packet := <- packetSource.Packets():
if packet != nil{
parser.DecodeLayers(packet.Data(), &foundLayerTypes)
channelData.Packet = packet
channelData.Type = "packet"
if foundLayerType(layers.LayerTypeIPv4, foundLayerTypes) {
channels[int(ipLayer.NetworkFlow().FastHash()) & (numprocs-1)] <- channelData
pd := NewPacketData(packet)
channels[int(ipLayer.NetworkFlow().FastHash()) & (numprocs-1)] <- pd
}
} else{
log.Debug("packetSource returned nil.")
Expand All @@ -469,25 +555,23 @@ CAPTURE:

}

func gracefulShutdown(channels []chan packetData, reChan chan tcpDataStruct, logChan chan dnsLogEntry) {
func gracefulShutdown(channels []chan *packetData, reChan chan tcpDataStruct, logChan chan dnsLogEntry) {

var wait_time int = 3
channelData := packetData{}
var numprocs int = len(channels)

log.Debug("Flushing channels...")
/*log.Debug("Flushing channels...")
for i := 0; i < numprocs; i++ {
channels[i] <- packetData{Type:"flush"}
}
}*/
log.Debug("Draining TCP data...")

OUTER:
for {
select{
case reassembledTcp := <- reChan:
channelData.Tcpdata = reassembledTcp
channelData.Type = "tcp"
channels[int(reassembledTcp.IpLayer.FastHash()) & (numprocs-1)] <- channelData
pd := NewTcpData(reassembledTcp)
channels[int(reassembledTcp.IpLayer.FastHash()) & (numprocs-1)] <- pd
case <- time.After(3*time.Second):
break OUTER
}
Expand Down

0 comments on commit 8cd9075

Please sign in to comment.