Skip to content

Commit

Permalink
rtmp: improve performance
Browse files Browse the repository at this point in the history
reuse existing structs instead of allocating them during every read()
  • Loading branch information
aler9 committed Aug 15, 2022
1 parent 4f023b2 commit 0db2d3e
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 66 deletions.
6 changes: 3 additions & 3 deletions internal/rtmp/conn.go
Expand Up @@ -3,7 +3,7 @@ package rtmp
import (
"errors"
"fmt"
"net"
"io"
"net/url"
"strings"
"time"
Expand Down Expand Up @@ -112,9 +112,9 @@ type Conn struct {
}

// NewConn initializes a connection.
func NewConn(nconn net.Conn) *Conn {
func NewConn(rw io.ReadWriter) *Conn {
c := &Conn{}
c.bc = bytecounter.NewReadWriter(nconn)
c.bc = bytecounter.NewReadWriter(rw)
c.mrw = message.NewReadWriter(c.bc, false)
return c
}
Expand Down
30 changes: 30 additions & 0 deletions internal/rtmp/conn_test.go
@@ -1,6 +1,7 @@
package rtmp

import (
"bytes"
"net"
"net/url"
"testing"
Expand Down Expand Up @@ -1210,3 +1211,32 @@ func TestWriteTracks(t *testing.T) {
Payload: []byte{0x12, 0x10},
}, msg)
}

func BenchmarkRead(b *testing.B) {
var buf bytes.Buffer

for n := 0; n < b.N; n++ {
buf.Write([]byte{
7, 0, 0, 23, 0, 0, 98, 8,
0, 0, 0, 64, 175, 1, 1, 2,
3, 4, 1, 2, 3, 4, 1, 2,
3, 4, 1, 2, 3, 4, 1, 2,
3, 4, 1, 2, 3, 4, 1, 2,
3, 4, 1, 2, 3, 4, 1, 2,
3, 4, 1, 2, 3, 4, 1, 2,
3, 4, 1, 2, 3, 4, 1, 2,
3, 4, 1, 2, 3, 4, 1, 2,
3, 4, 1, 2, 3, 4, 1, 2,
3, 4, 1, 2, 3, 4, 1, 2,
3, 4, 1, 2, 3, 4, 1, 2,
3, 4, 1, 2, 3, 4, 1, 2,
3, 4, 1, 2, 3, 4,
})
}

conn := NewConn(&buf)

for n := 0; n < b.N; n++ {
conn.ReadMessage()
}
}
121 changes: 58 additions & 63 deletions internal/rtmp/rawmessage/reader.go
Expand Up @@ -17,7 +17,7 @@ type readerChunkStream struct {
curType *chunk.MessageType
curMessageStreamID *uint32
curBodyLen *uint32
curBody *[]byte
curBody []byte
curTimestampDelta *uint32
}

Expand Down Expand Up @@ -52,33 +52,31 @@ func (rc *readerChunkStream) readMessage(typ byte) (*Message, error) {
return nil, fmt.Errorf("received type 0 chunk but expected type 3 chunk")
}

var c0 chunk.Chunk0
err := rc.readChunk(&c0, rc.mr.chunkSize)
err := rc.readChunk(&rc.mr.c0, rc.mr.chunkSize)
if err != nil {
return nil, err
}

v1 := c0.MessageStreamID
v1 := rc.mr.c0.MessageStreamID
rc.curMessageStreamID = &v1
v2 := c0.Type
v2 := rc.mr.c0.Type
rc.curType = &v2
v3 := c0.Timestamp
v3 := rc.mr.c0.Timestamp
rc.curTimestamp = &v3
v4 := c0.BodyLen
v4 := rc.mr.c0.BodyLen
rc.curBodyLen = &v4
rc.curTimestampDelta = nil

if c0.BodyLen != uint32(len(c0.Body)) {
rc.curBody = &c0.Body
if rc.mr.c0.BodyLen != uint32(len(rc.mr.c0.Body)) {
rc.curBody = rc.mr.c0.Body
return nil, errMoreChunksNeeded
}

return &Message{
Timestamp: time.Duration(c0.Timestamp) * time.Millisecond,
Type: c0.Type,
MessageStreamID: c0.MessageStreamID,
Body: c0.Body,
}, nil
rc.mr.msg.Timestamp = time.Duration(rc.mr.c0.Timestamp) * time.Millisecond
rc.mr.msg.Type = rc.mr.c0.Type
rc.mr.msg.MessageStreamID = rc.mr.c0.MessageStreamID
rc.mr.msg.Body = rc.mr.c0.Body
return &rc.mr.msg, nil

case 1:
if rc.curTimestamp == nil {
Expand All @@ -89,32 +87,30 @@ func (rc *readerChunkStream) readMessage(typ byte) (*Message, error) {
return nil, fmt.Errorf("received type 1 chunk but expected type 3 chunk")
}

var c1 chunk.Chunk1
err := rc.readChunk(&c1, rc.mr.chunkSize)
err := rc.readChunk(&rc.mr.c1, rc.mr.chunkSize)
if err != nil {
return nil, err
}

v2 := c1.Type
v2 := rc.mr.c1.Type
rc.curType = &v2
v3 := *rc.curTimestamp + c1.TimestampDelta
v3 := *rc.curTimestamp + rc.mr.c1.TimestampDelta
rc.curTimestamp = &v3
v4 := c1.BodyLen
v4 := rc.mr.c1.BodyLen
rc.curBodyLen = &v4
v5 := c1.TimestampDelta
v5 := rc.mr.c1.TimestampDelta
rc.curTimestampDelta = &v5

if c1.BodyLen != uint32(len(c1.Body)) {
rc.curBody = &c1.Body
if rc.mr.c1.BodyLen != uint32(len(rc.mr.c1.Body)) {
rc.curBody = rc.mr.c1.Body
return nil, errMoreChunksNeeded
}

return &Message{
Timestamp: time.Duration(*rc.curTimestamp) * time.Millisecond,
Type: c1.Type,
MessageStreamID: *rc.curMessageStreamID,
Body: c1.Body,
}, nil
rc.mr.msg.Timestamp = time.Duration(*rc.curTimestamp) * time.Millisecond
rc.mr.msg.Type = rc.mr.c1.Type
rc.mr.msg.MessageStreamID = *rc.curMessageStreamID
rc.mr.msg.Body = rc.mr.c1.Body
return &rc.mr.msg, nil

case 2:
if rc.curTimestamp == nil {
Expand All @@ -130,88 +126,82 @@ func (rc *readerChunkStream) readMessage(typ byte) (*Message, error) {
chunkBodyLen = rc.mr.chunkSize
}

var c2 chunk.Chunk2
err := rc.readChunk(&c2, chunkBodyLen)
err := rc.readChunk(&rc.mr.c2, chunkBodyLen)
if err != nil {
return nil, err
}

v1 := *rc.curTimestamp + c2.TimestampDelta
v1 := *rc.curTimestamp + rc.mr.c2.TimestampDelta
rc.curTimestamp = &v1
v2 := c2.TimestampDelta
v2 := rc.mr.c2.TimestampDelta
rc.curTimestampDelta = &v2

if *rc.curBodyLen != uint32(len(c2.Body)) {
rc.curBody = &c2.Body
if *rc.curBodyLen != uint32(len(rc.mr.c2.Body)) {
rc.curBody = rc.mr.c2.Body
return nil, errMoreChunksNeeded
}

return &Message{
Timestamp: time.Duration(*rc.curTimestamp) * time.Millisecond,
Type: *rc.curType,
MessageStreamID: *rc.curMessageStreamID,
Body: c2.Body,
}, nil
rc.mr.msg.Timestamp = time.Duration(*rc.curTimestamp) * time.Millisecond
rc.mr.msg.Type = *rc.curType
rc.mr.msg.MessageStreamID = *rc.curMessageStreamID
rc.mr.msg.Body = rc.mr.c2.Body
return &rc.mr.msg, nil

default: // 3
if rc.curBody == nil && rc.curTimestampDelta == nil {
return nil, fmt.Errorf("received type 3 chunk without previous chunk")
}

if rc.curBody != nil {
chunkBodyLen := (*rc.curBodyLen) - uint32(len(*rc.curBody))
chunkBodyLen := (*rc.curBodyLen) - uint32(len(rc.curBody))
if chunkBodyLen > rc.mr.chunkSize {
chunkBodyLen = rc.mr.chunkSize
}

var c3 chunk.Chunk3
err := rc.readChunk(&c3, chunkBodyLen)
err := rc.readChunk(&rc.mr.c3, chunkBodyLen)
if err != nil {
return nil, err
}

*rc.curBody = append(*rc.curBody, c3.Body...)
rc.curBody = append(rc.curBody, rc.mr.c3.Body...)

if *rc.curBodyLen != uint32(len(*rc.curBody)) {
if *rc.curBodyLen != uint32(len(rc.curBody)) {
return nil, errMoreChunksNeeded
}

body := *rc.curBody
body := rc.curBody
rc.curBody = nil

return &Message{
Timestamp: time.Duration(*rc.curTimestamp) * time.Millisecond,
Type: *rc.curType,
MessageStreamID: *rc.curMessageStreamID,
Body: body,
}, nil
rc.mr.msg.Timestamp = time.Duration(*rc.curTimestamp) * time.Millisecond
rc.mr.msg.Type = *rc.curType
rc.mr.msg.MessageStreamID = *rc.curMessageStreamID
rc.mr.msg.Body = body
return &rc.mr.msg, nil
}

chunkBodyLen := (*rc.curBodyLen)
if chunkBodyLen > rc.mr.chunkSize {
chunkBodyLen = rc.mr.chunkSize
}

var c3 chunk.Chunk3
err := rc.readChunk(&c3, chunkBodyLen)
err := rc.readChunk(&rc.mr.c3, chunkBodyLen)
if err != nil {
return nil, err
}

v1 := *rc.curTimestamp + *rc.curTimestampDelta
rc.curTimestamp = &v1

if *rc.curBodyLen != uint32(len(c3.Body)) {
rc.curBody = &c3.Body
if *rc.curBodyLen != uint32(len(rc.mr.c3.Body)) {
rc.curBody = rc.mr.c3.Body
return nil, errMoreChunksNeeded
}

return &Message{
Timestamp: time.Duration(*rc.curTimestamp) * time.Millisecond,
Type: *rc.curType,
MessageStreamID: *rc.curMessageStreamID,
Body: c3.Body,
}, nil
rc.mr.msg.Timestamp = time.Duration(*rc.curTimestamp) * time.Millisecond
rc.mr.msg.Type = *rc.curType
rc.mr.msg.MessageStreamID = *rc.curMessageStreamID
rc.mr.msg.Body = rc.mr.c3.Body
return &rc.mr.msg, nil
}
}

Expand All @@ -223,6 +213,11 @@ type Reader struct {
chunkSize uint32
ackWindowSize uint32
lastAckCount uint32
msg Message
c0 chunk.Chunk0
c1 chunk.Chunk1
c2 chunk.Chunk2
c3 chunk.Chunk3
chunkStreams map[byte]*readerChunkStream
}

Expand Down

0 comments on commit 0db2d3e

Please sign in to comment.