Skip to content
Permalink
Browse files

collector cleanup and comments

  • Loading branch information...
dreadl0ck committed Jan 17, 2019
1 parent 9290320 commit 8923641b12dea912e6ca4c1ce8ad137b341fd620
Showing with 51 additions and 34 deletions.
  1. +7 −5 collector/batch.go
  2. +7 −5 collector/collector.go
  3. +6 −0 collector/pcapUtils.go
  4. +31 −24 collector/worker.go
@@ -80,11 +80,13 @@ func (c *Collector) InitBatching(maxSize int, bpf string, in string) ([]BatchInf
}()

// get channels for all layer encoders
for _, e := range encoder.LayerEncoders {
chans = append(chans, BatchInfo{
Type: e.Type,
Chan: e.GetChan(),
})
for _, encoders := range encoder.LayerEncoders {
for _, e := range encoders {
chans = append(chans, BatchInfo{
Type: e.Type,
Chan: e.GetChan(),
})
}
}

// get channels for all custom encoders
@@ -121,11 +121,13 @@ func (c *Collector) cleanup() {
c.closePcapFiles()

// flush all buffers
for _, e := range encoder.LayerEncoders {
name, size := e.Destroy()
if size != 0 {
c.totalBytesWritten += size
c.files[name] = humanize.Bytes(uint64(size))
for _, encoders := range encoder.LayerEncoders {
for _, e := range encoders {
name, size := e.Destroy()
if size != 0 {
c.totalBytesWritten += size
c.files[name] = humanize.Bytes(uint64(size))
}
}
}

@@ -27,6 +27,7 @@ import (

// close errors.pcap and unknown.pcap
func (c *Collector) closePcapFiles() error {

// unknown.pcap

err := c.unkownPcapWriterBuffered.Flush()
@@ -84,7 +85,9 @@ func (c *Collector) closePcapFiles() error {

// create unknown.pcap file for packets with unknown layers.
func (c *Collector) createUnknownPcap() error {

var err error

// Open output pcap file and write header
c.unknownPcapFile, err = os.Create(filepath.Join(c.config.EncoderConfig.Out, "unknown.pcap"))
if err != nil {
@@ -104,7 +107,9 @@ func (c *Collector) createUnknownPcap() error {

// create errors.pcap file for errors
func (c *Collector) createErrorsPcap() error {

var err error

// Open output pcap file and write header
c.errorsPcapFile, err = os.Create(filepath.Join(c.config.EncoderConfig.Out, "errors.pcap"))
if err != nil {
@@ -133,6 +138,7 @@ func (c *Collector) writePacketToUnknownPcap(p gopacket.Packet) error {

// logPacketError handles an error when decoding a packet.
func (c *Collector) logPacketError(p gopacket.Packet, err string) error {

// increment errorMap stats
c.errorMap.Inc(err)

@@ -21,6 +21,7 @@ import (
// worker spawns a new worker goroutine
// and returns a channel for receiving input packets.
func (c *Collector) worker() chan gopacket.Packet {

// init channel to receive input packets
chanInput := make(chan gopacket.Packet, c.config.PacketBufferSize)

@@ -35,43 +36,46 @@ func (c *Collector) worker() chan gopacket.Packet {
return
}

// // experiment: only decode loaded layers
// for lt, le := range encoder.LayerEncoders {

// layer := p.Layer(lt)
// if layer != nil {
// c.allProtosAtomic.Inc(lt.String())
// err := le.Encode(layer, p.Metadata().Timestamp)
// if err != nil {
// c.logPacketError(p, "Layer Encoder Error: "+lt.String()+": "+err.Error())
// goto done
// }
// }
// }

// iterate over all layers
for _, layer := range p.Layers() {

// increment counter for layer type
c.allProtosAtomic.Inc(layer.LayerType().String())

// check if packet contains an unknown layer
switch layer.LayerType().String() {
case "Unknown": // not known to gopacket
switch layer.LayerType() {
case gopacket.LayerTypeZero: // not known to gopacket

// increase counter
c.unknownProtosAtomic.Inc(layer.LayerType().String())

// write to unknown.pcap file
c.writePacketToUnknownPcap(p)
case "DecodeFailure":

// call custom decoders
goto done
case gopacket.LayerTypeDecodeFailure:
// call custom decoders
goto done
}

// pick encoder from the encoderMap by looking up the layer type
if d, ok := encoder.LayerEncoders[layer.LayerType()]; ok {
err := d.Encode(layer, p.Metadata().Timestamp)
if err != nil {
c.logPacketError(p, "Layer Encoder Error: "+layer.LayerType().String()+": "+err.Error())
goto done
// pick encoders from the encoderMap by looking up the layer type
if encoders, ok := encoder.LayerEncoders[layer.LayerType()]; ok {
for _, e := range encoders {
err := e.Encode(layer, p.Metadata().Timestamp)
if err != nil {
c.logPacketError(p, "Layer Encoder Error: "+layer.LayerType().String()+": "+err.Error())
goto done
}
}
} else { // no netcap encoder implemented

// increment unknown layer type counter
c.unknownProtosAtomic.Inc(layer.LayerType().String())
if layer.LayerType().String() != "Payload" {

// if its not a payload layer, write to unknown .pcap file
// TODO make this configurable?
if layer.LayerType() != gopacket.LayerTypePayload {
c.writePacketToUnknownPcap(p)
goto done
}
@@ -89,10 +93,13 @@ func (c *Collector) worker() chan gopacket.Packet {
}

// Check for errors after decoding all layers
// if an error has occured while decoding the packet
// it will be logged and written into the errors.pcap file
if errLayer := p.ErrorLayer(); errLayer != nil {
c.logPacketError(p, errLayer.Error().Error())
}
}

c.wg.Done()
continue
}

0 comments on commit 8923641

Please sign in to comment.
You can’t perform that action at this time.