diff --git a/packet_buffer.go b/packet_buffer.go index 409cd9c..eeae5ae 100644 --- a/packet_buffer.go +++ b/packet_buffer.go @@ -4,16 +4,27 @@ import ( "fmt" "io" + "math" + "sync" + "github.com/pkg/errors" ) // packetBuffer represents a packet buffer type packetBuffer struct { - b []*Packet + b []byte + items []*packetBufferItem packetSize int r io.Reader } +// packetBufferItem represents a packet buffer item +type packetBufferItem struct { + err error + p *Packet + wg sync.WaitGroup +} + // newPacketBuffer creates a new packet buffer func newPacketBuffer(r io.Reader, packetSize int) (pb *packetBuffer, err error) { // Init @@ -30,6 +41,7 @@ func newPacketBuffer(r io.Reader, packetSize int) (pb *packetBuffer, err error) return } } + pb.b = make([]byte, 10000*pb.packetSize) return } @@ -91,21 +103,49 @@ func rewind(r io.Reader) (n int64, err error) { // next fetches the next packet from the buffer func (pb *packetBuffer) next() (p *Packet, err error) { + // Check items + if p, err = pb.first(); p != nil || err != nil { + return + } + // Read - var b = make([]byte, pb.packetSize) - if _, err = io.ReadFull(pb.r, b); err != nil { - if err == io.EOF || err == io.ErrUnexpectedEOF { + var n int + if n, err = io.ReadFull(pb.r, pb.b); err != nil && err != io.ErrUnexpectedEOF { + if err == io.EOF { err = ErrNoMorePackets } else { - err = errors.Wrapf(err, "astits: reading %d bytes failed", pb.packetSize) + err = errors.Wrapf(err, "astits: reading %d bytes failed", len(pb.b)) } return } - // Parse packet - if p, err = parsePacket(b); err != nil { - err = errors.Wrap(err, "astits: building packet failed") - return + // Loop through packets + for i := 0; i < int(math.Ceil(float64(n)/float64(pb.packetSize))); i++ { + var item = &packetBufferItem{} + item.wg.Add(1) + pb.items = append(pb.items, item) + go func(i int) { + defer item.wg.Done() + var b = make([]byte, pb.packetSize) + copy(b, pb.b[i*pb.packetSize:(i+1)*pb.packetSize]) + if item.p, item.err = parsePacket(b); err != nil { + item.err = errors.Wrap(item.err, "astits: building packet failed") + return + } + }(i) + } + + // Get first packet + p, err = pb.first() + return +} + +// first returns the first packet of the buffer +func (pb *packetBuffer) first() (p *Packet, err error) { + if len(pb.items) > 0 { + pb.items[0].wg.Wait() + p, err = pb.items[0].p, pb.items[0].err + pb.items = pb.items[1:] } return }