Skip to content

Commit

Permalink
Now reading a bigger buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
asticode committed Dec 3, 2017
1 parent de60942 commit 5b7e987
Showing 1 changed file with 49 additions and 9 deletions.
58 changes: 49 additions & 9 deletions packet_buffer.go
Expand Up @@ -4,16 +4,27 @@ import (
"fmt"
"io"

"math"
"sync"

"github.com/pkg/errors"
)

// packetBuffer represents a packet buffer
type packetBuffer struct {
b []*Packet
b []byte
items []*packetBufferItem
packetSize int
r io.Reader
}

// packetBufferItem represents a packet buffer item
type packetBufferItem struct {
err error
p *Packet
wg sync.WaitGroup
}

// newPacketBuffer creates a new packet buffer
func newPacketBuffer(r io.Reader, packetSize int) (pb *packetBuffer, err error) {
// Init
Expand All @@ -30,6 +41,7 @@ func newPacketBuffer(r io.Reader, packetSize int) (pb *packetBuffer, err error)
return
}
}
pb.b = make([]byte, 10000*pb.packetSize)
return
}

Expand Down Expand Up @@ -91,21 +103,49 @@ func rewind(r io.Reader) (n int64, err error) {

// next fetches the next packet from the buffer
func (pb *packetBuffer) next() (p *Packet, err error) {
// Check items
if p, err = pb.first(); p != nil || err != nil {
return
}

// Read
var b = make([]byte, pb.packetSize)
if _, err = io.ReadFull(pb.r, b); err != nil {
if err == io.EOF || err == io.ErrUnexpectedEOF {
var n int
if n, err = io.ReadFull(pb.r, pb.b); err != nil && err != io.ErrUnexpectedEOF {
if err == io.EOF {
err = ErrNoMorePackets
} else {
err = errors.Wrapf(err, "astits: reading %d bytes failed", pb.packetSize)
err = errors.Wrapf(err, "astits: reading %d bytes failed", len(pb.b))
}
return
}

// Parse packet
if p, err = parsePacket(b); err != nil {
err = errors.Wrap(err, "astits: building packet failed")
return
// Loop through packets
for i := 0; i < int(math.Ceil(float64(n)/float64(pb.packetSize))); i++ {
var item = &packetBufferItem{}
item.wg.Add(1)
pb.items = append(pb.items, item)
go func(i int) {
defer item.wg.Done()
var b = make([]byte, pb.packetSize)
copy(b, pb.b[i*pb.packetSize:(i+1)*pb.packetSize])
if item.p, item.err = parsePacket(b); err != nil {
item.err = errors.Wrap(item.err, "astits: building packet failed")
return
}
}(i)
}

// Get first packet
p, err = pb.first()
return
}

// first returns the first packet of the buffer
func (pb *packetBuffer) first() (p *Packet, err error) {
if len(pb.items) > 0 {
pb.items[0].wg.Wait()
p, err = pb.items[0].p, pb.items[0].err
pb.items = pb.items[1:]
}
return
}

0 comments on commit 5b7e987

Please sign in to comment.