From 0db2d3eb8c47b71b30da7b98d5398c01570b53b7 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Mon, 15 Aug 2022 15:42:16 +0200 Subject: [PATCH] rtmp: improve performance reuse existing structs instead of allocating them during every read() --- internal/rtmp/conn.go | 6 +- internal/rtmp/conn_test.go | 30 +++++++ internal/rtmp/rawmessage/reader.go | 121 ++++++++++++++--------------- 3 files changed, 91 insertions(+), 66 deletions(-) diff --git a/internal/rtmp/conn.go b/internal/rtmp/conn.go index b65240020cc..ef407cd4607 100644 --- a/internal/rtmp/conn.go +++ b/internal/rtmp/conn.go @@ -3,7 +3,7 @@ package rtmp import ( "errors" "fmt" - "net" + "io" "net/url" "strings" "time" @@ -112,9 +112,9 @@ type Conn struct { } // NewConn initializes a connection. -func NewConn(nconn net.Conn) *Conn { +func NewConn(rw io.ReadWriter) *Conn { c := &Conn{} - c.bc = bytecounter.NewReadWriter(nconn) + c.bc = bytecounter.NewReadWriter(rw) c.mrw = message.NewReadWriter(c.bc, false) return c } diff --git a/internal/rtmp/conn_test.go b/internal/rtmp/conn_test.go index cdec6108939..d50c5d6ed41 100644 --- a/internal/rtmp/conn_test.go +++ b/internal/rtmp/conn_test.go @@ -1,6 +1,7 @@ package rtmp import ( + "bytes" "net" "net/url" "testing" @@ -1210,3 +1211,32 @@ func TestWriteTracks(t *testing.T) { Payload: []byte{0x12, 0x10}, }, msg) } + +func BenchmarkRead(b *testing.B) { + var buf bytes.Buffer + + for n := 0; n < b.N; n++ { + buf.Write([]byte{ + 7, 0, 0, 23, 0, 0, 98, 8, + 0, 0, 0, 64, 175, 1, 1, 2, + 3, 4, 1, 2, 3, 4, 1, 2, + 3, 4, 1, 2, 3, 4, 1, 2, + 3, 4, 1, 2, 3, 4, 1, 2, + 3, 4, 1, 2, 3, 4, 1, 2, + 3, 4, 1, 2, 3, 4, 1, 2, + 3, 4, 1, 2, 3, 4, 1, 2, + 3, 4, 1, 2, 3, 4, 1, 2, + 3, 4, 1, 2, 3, 4, 1, 2, + 3, 4, 1, 2, 3, 4, 1, 2, + 3, 4, 1, 2, 3, 4, 1, 2, + 3, 4, 1, 2, 3, 4, 1, 2, + 3, 4, 1, 2, 3, 4, + }) + } + + conn := NewConn(&buf) + + for n := 0; n < b.N; n++ { + conn.ReadMessage() + } +} diff --git a/internal/rtmp/rawmessage/reader.go b/internal/rtmp/rawmessage/reader.go index f7b293b919b..df31576947e 100644 --- a/internal/rtmp/rawmessage/reader.go +++ b/internal/rtmp/rawmessage/reader.go @@ -17,7 +17,7 @@ type readerChunkStream struct { curType *chunk.MessageType curMessageStreamID *uint32 curBodyLen *uint32 - curBody *[]byte + curBody []byte curTimestampDelta *uint32 } @@ -52,33 +52,31 @@ func (rc *readerChunkStream) readMessage(typ byte) (*Message, error) { return nil, fmt.Errorf("received type 0 chunk but expected type 3 chunk") } - var c0 chunk.Chunk0 - err := rc.readChunk(&c0, rc.mr.chunkSize) + err := rc.readChunk(&rc.mr.c0, rc.mr.chunkSize) if err != nil { return nil, err } - v1 := c0.MessageStreamID + v1 := rc.mr.c0.MessageStreamID rc.curMessageStreamID = &v1 - v2 := c0.Type + v2 := rc.mr.c0.Type rc.curType = &v2 - v3 := c0.Timestamp + v3 := rc.mr.c0.Timestamp rc.curTimestamp = &v3 - v4 := c0.BodyLen + v4 := rc.mr.c0.BodyLen rc.curBodyLen = &v4 rc.curTimestampDelta = nil - if c0.BodyLen != uint32(len(c0.Body)) { - rc.curBody = &c0.Body + if rc.mr.c0.BodyLen != uint32(len(rc.mr.c0.Body)) { + rc.curBody = rc.mr.c0.Body return nil, errMoreChunksNeeded } - return &Message{ - Timestamp: time.Duration(c0.Timestamp) * time.Millisecond, - Type: c0.Type, - MessageStreamID: c0.MessageStreamID, - Body: c0.Body, - }, nil + rc.mr.msg.Timestamp = time.Duration(rc.mr.c0.Timestamp) * time.Millisecond + rc.mr.msg.Type = rc.mr.c0.Type + rc.mr.msg.MessageStreamID = rc.mr.c0.MessageStreamID + rc.mr.msg.Body = rc.mr.c0.Body + return &rc.mr.msg, nil case 1: if rc.curTimestamp == nil { @@ -89,32 +87,30 @@ func (rc *readerChunkStream) readMessage(typ byte) (*Message, error) { return nil, fmt.Errorf("received type 1 chunk but expected type 3 chunk") } - var c1 chunk.Chunk1 - err := rc.readChunk(&c1, rc.mr.chunkSize) + err := rc.readChunk(&rc.mr.c1, rc.mr.chunkSize) if err != nil { return nil, err } - v2 := c1.Type + v2 := rc.mr.c1.Type rc.curType = &v2 - v3 := *rc.curTimestamp + c1.TimestampDelta + v3 := *rc.curTimestamp + rc.mr.c1.TimestampDelta rc.curTimestamp = &v3 - v4 := c1.BodyLen + v4 := rc.mr.c1.BodyLen rc.curBodyLen = &v4 - v5 := c1.TimestampDelta + v5 := rc.mr.c1.TimestampDelta rc.curTimestampDelta = &v5 - if c1.BodyLen != uint32(len(c1.Body)) { - rc.curBody = &c1.Body + if rc.mr.c1.BodyLen != uint32(len(rc.mr.c1.Body)) { + rc.curBody = rc.mr.c1.Body return nil, errMoreChunksNeeded } - return &Message{ - Timestamp: time.Duration(*rc.curTimestamp) * time.Millisecond, - Type: c1.Type, - MessageStreamID: *rc.curMessageStreamID, - Body: c1.Body, - }, nil + rc.mr.msg.Timestamp = time.Duration(*rc.curTimestamp) * time.Millisecond + rc.mr.msg.Type = rc.mr.c1.Type + rc.mr.msg.MessageStreamID = *rc.curMessageStreamID + rc.mr.msg.Body = rc.mr.c1.Body + return &rc.mr.msg, nil case 2: if rc.curTimestamp == nil { @@ -130,28 +126,26 @@ func (rc *readerChunkStream) readMessage(typ byte) (*Message, error) { chunkBodyLen = rc.mr.chunkSize } - var c2 chunk.Chunk2 - err := rc.readChunk(&c2, chunkBodyLen) + err := rc.readChunk(&rc.mr.c2, chunkBodyLen) if err != nil { return nil, err } - v1 := *rc.curTimestamp + c2.TimestampDelta + v1 := *rc.curTimestamp + rc.mr.c2.TimestampDelta rc.curTimestamp = &v1 - v2 := c2.TimestampDelta + v2 := rc.mr.c2.TimestampDelta rc.curTimestampDelta = &v2 - if *rc.curBodyLen != uint32(len(c2.Body)) { - rc.curBody = &c2.Body + if *rc.curBodyLen != uint32(len(rc.mr.c2.Body)) { + rc.curBody = rc.mr.c2.Body return nil, errMoreChunksNeeded } - return &Message{ - Timestamp: time.Duration(*rc.curTimestamp) * time.Millisecond, - Type: *rc.curType, - MessageStreamID: *rc.curMessageStreamID, - Body: c2.Body, - }, nil + rc.mr.msg.Timestamp = time.Duration(*rc.curTimestamp) * time.Millisecond + rc.mr.msg.Type = *rc.curType + rc.mr.msg.MessageStreamID = *rc.curMessageStreamID + rc.mr.msg.Body = rc.mr.c2.Body + return &rc.mr.msg, nil default: // 3 if rc.curBody == nil && rc.curTimestampDelta == nil { @@ -159,32 +153,30 @@ func (rc *readerChunkStream) readMessage(typ byte) (*Message, error) { } if rc.curBody != nil { - chunkBodyLen := (*rc.curBodyLen) - uint32(len(*rc.curBody)) + chunkBodyLen := (*rc.curBodyLen) - uint32(len(rc.curBody)) if chunkBodyLen > rc.mr.chunkSize { chunkBodyLen = rc.mr.chunkSize } - var c3 chunk.Chunk3 - err := rc.readChunk(&c3, chunkBodyLen) + err := rc.readChunk(&rc.mr.c3, chunkBodyLen) if err != nil { return nil, err } - *rc.curBody = append(*rc.curBody, c3.Body...) + rc.curBody = append(rc.curBody, rc.mr.c3.Body...) - if *rc.curBodyLen != uint32(len(*rc.curBody)) { + if *rc.curBodyLen != uint32(len(rc.curBody)) { return nil, errMoreChunksNeeded } - body := *rc.curBody + body := rc.curBody rc.curBody = nil - return &Message{ - Timestamp: time.Duration(*rc.curTimestamp) * time.Millisecond, - Type: *rc.curType, - MessageStreamID: *rc.curMessageStreamID, - Body: body, - }, nil + rc.mr.msg.Timestamp = time.Duration(*rc.curTimestamp) * time.Millisecond + rc.mr.msg.Type = *rc.curType + rc.mr.msg.MessageStreamID = *rc.curMessageStreamID + rc.mr.msg.Body = body + return &rc.mr.msg, nil } chunkBodyLen := (*rc.curBodyLen) @@ -192,8 +184,7 @@ func (rc *readerChunkStream) readMessage(typ byte) (*Message, error) { chunkBodyLen = rc.mr.chunkSize } - var c3 chunk.Chunk3 - err := rc.readChunk(&c3, chunkBodyLen) + err := rc.readChunk(&rc.mr.c3, chunkBodyLen) if err != nil { return nil, err } @@ -201,17 +192,16 @@ func (rc *readerChunkStream) readMessage(typ byte) (*Message, error) { v1 := *rc.curTimestamp + *rc.curTimestampDelta rc.curTimestamp = &v1 - if *rc.curBodyLen != uint32(len(c3.Body)) { - rc.curBody = &c3.Body + if *rc.curBodyLen != uint32(len(rc.mr.c3.Body)) { + rc.curBody = rc.mr.c3.Body return nil, errMoreChunksNeeded } - return &Message{ - Timestamp: time.Duration(*rc.curTimestamp) * time.Millisecond, - Type: *rc.curType, - MessageStreamID: *rc.curMessageStreamID, - Body: c3.Body, - }, nil + rc.mr.msg.Timestamp = time.Duration(*rc.curTimestamp) * time.Millisecond + rc.mr.msg.Type = *rc.curType + rc.mr.msg.MessageStreamID = *rc.curMessageStreamID + rc.mr.msg.Body = rc.mr.c3.Body + return &rc.mr.msg, nil } } @@ -223,6 +213,11 @@ type Reader struct { chunkSize uint32 ackWindowSize uint32 lastAckCount uint32 + msg Message + c0 chunk.Chunk0 + c1 chunk.Chunk1 + c2 chunk.Chunk2 + c3 chunk.Chunk3 chunkStreams map[byte]*readerChunkStream }