Skip to content

Commit

Permalink
rtmp: fix parsing error caused by extended timestamps (#2393) (#2556) (
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 committed Dec 14, 2023
1 parent 22414bb commit f50cc05
Show file tree
Hide file tree
Showing 14 changed files with 430 additions and 228 deletions.
4 changes: 2 additions & 2 deletions internal/protocols/rtmp/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ import (

// Chunk is a chunk.
type Chunk interface {
Read(io.Reader, uint32) error
Marshal() ([]byte, error)
Read(r io.Reader, bodyLen uint32, hasExtendedTimestamp bool) error
Marshal(hasExtendedTimestamp bool) ([]byte, error)
}
74 changes: 56 additions & 18 deletions internal/protocols/rtmp/chunk/chunk0.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ import (
type Chunk0 struct {
ChunkStreamID byte
Timestamp uint32
BodyLen uint32
Type uint8
MessageStreamID uint32
BodyLen uint32
Body []byte
}

// Read reads the chunk.
func (c *Chunk0) Read(r io.Reader, chunkMaxBodyLen uint32) error {
func (c *Chunk0) Read(r io.Reader, maxBodyLen uint32, _ bool) error {
header := make([]byte, 12)
_, err := io.ReadFull(r, header)
if err != nil {
Expand All @@ -31,31 +31,69 @@ func (c *Chunk0) Read(r io.Reader, chunkMaxBodyLen uint32) error {
c.Type = header[7]
c.MessageStreamID = uint32(header[8])<<24 | uint32(header[9])<<16 | uint32(header[10])<<8 | uint32(header[11])

if c.Timestamp >= 0xFFFFFF {
_, err := io.ReadFull(r, header[:4])
if err != nil {
return err
}

Check warning on line 38 in internal/protocols/rtmp/chunk/chunk0.go

View check run for this annotation

Codecov / codecov/patch

internal/protocols/rtmp/chunk/chunk0.go#L37-L38

Added lines #L37 - L38 were not covered by tests

c.Timestamp = uint32(header[0])<<24 | uint32(header[1])<<16 | uint32(header[2])<<8 | uint32(header[3])
}

chunkBodyLen := c.BodyLen
if chunkBodyLen > chunkMaxBodyLen {
chunkBodyLen = chunkMaxBodyLen
if chunkBodyLen > maxBodyLen {
chunkBodyLen = maxBodyLen
}

c.Body = make([]byte, chunkBodyLen)
_, err = io.ReadFull(r, c.Body)
return err
}

func (c Chunk0) marshalSize() int {
n := 12 + len(c.Body)
if c.Timestamp >= 0xFFFFFF {
n += 4
}
return n
}

// Marshal writes the chunk.
func (c Chunk0) Marshal() ([]byte, error) {
buf := make([]byte, 12+len(c.Body))
func (c Chunk0) Marshal(_ bool) ([]byte, error) {
buf := make([]byte, c.marshalSize())
buf[0] = c.ChunkStreamID
buf[1] = byte(c.Timestamp >> 16)
buf[2] = byte(c.Timestamp >> 8)
buf[3] = byte(c.Timestamp)
buf[4] = byte(c.BodyLen >> 16)
buf[5] = byte(c.BodyLen >> 8)
buf[6] = byte(c.BodyLen)
buf[7] = c.Type
buf[8] = byte(c.MessageStreamID >> 24)
buf[9] = byte(c.MessageStreamID >> 16)
buf[10] = byte(c.MessageStreamID >> 8)
buf[11] = byte(c.MessageStreamID)
copy(buf[12:], c.Body)

if c.Timestamp >= 0xFFFFFF {
buf[1] = 0xFF
buf[2] = 0xFF
buf[3] = 0xFF
buf[4] = byte(c.BodyLen >> 16)
buf[5] = byte(c.BodyLen >> 8)
buf[6] = byte(c.BodyLen)
buf[7] = c.Type
buf[8] = byte(c.MessageStreamID >> 24)
buf[9] = byte(c.MessageStreamID >> 16)
buf[10] = byte(c.MessageStreamID >> 8)
buf[11] = byte(c.MessageStreamID)
buf[12] = byte(c.Timestamp >> 24)
buf[13] = byte(c.Timestamp >> 16)
buf[14] = byte(c.Timestamp >> 8)
buf[15] = byte(c.Timestamp)
copy(buf[16:], c.Body)
} else {
buf[1] = byte(c.Timestamp >> 16)
buf[2] = byte(c.Timestamp >> 8)
buf[3] = byte(c.Timestamp)
buf[4] = byte(c.BodyLen >> 16)
buf[5] = byte(c.BodyLen >> 8)
buf[6] = byte(c.BodyLen)
buf[7] = c.Type
buf[8] = byte(c.MessageStreamID >> 24)
buf[9] = byte(c.MessageStreamID >> 16)
buf[10] = byte(c.MessageStreamID >> 8)
buf[11] = byte(c.MessageStreamID)
copy(buf[12:], c.Body)
}

return buf, nil
}
35 changes: 0 additions & 35 deletions internal/protocols/rtmp/chunk/chunk0_test.go

This file was deleted.

62 changes: 48 additions & 14 deletions internal/protocols/rtmp/chunk/chunk1.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ import (
type Chunk1 struct {
ChunkStreamID byte
TimestampDelta uint32
Type uint8
BodyLen uint32
Type uint8
Body []byte
}

// Read reads the chunk.
func (c *Chunk1) Read(r io.Reader, chunkMaxBodyLen uint32) error {
func (c *Chunk1) Read(r io.Reader, maxBodyLen uint32, _ bool) error {
header := make([]byte, 8)
_, err := io.ReadFull(r, header)
if err != nil {
Expand All @@ -31,27 +31,61 @@ func (c *Chunk1) Read(r io.Reader, chunkMaxBodyLen uint32) error {
c.BodyLen = uint32(header[4])<<16 | uint32(header[5])<<8 | uint32(header[6])
c.Type = header[7]

if c.TimestampDelta >= 0xFFFFFF {
_, err = io.ReadFull(r, header[:4])
if err != nil {
return err
}

Check warning on line 38 in internal/protocols/rtmp/chunk/chunk1.go

View check run for this annotation

Codecov / codecov/patch

internal/protocols/rtmp/chunk/chunk1.go#L37-L38

Added lines #L37 - L38 were not covered by tests

c.TimestampDelta = uint32(header[0])<<24 | uint32(header[1])<<16 | uint32(header[2])<<8 | uint32(header[3])
}

chunkBodyLen := (c.BodyLen)
if chunkBodyLen > chunkMaxBodyLen {
chunkBodyLen = chunkMaxBodyLen
if chunkBodyLen > maxBodyLen {
chunkBodyLen = maxBodyLen
}

c.Body = make([]byte, chunkBodyLen)
_, err = io.ReadFull(r, c.Body)
return err
}

func (c Chunk1) marshalSize() int {
n := 8 + len(c.Body)
if c.TimestampDelta >= 0xFFFFFF {
n += 4
}
return n
}

// Marshal writes the chunk.
func (c Chunk1) Marshal() ([]byte, error) {
buf := make([]byte, 8+len(c.Body))
func (c Chunk1) Marshal(_ bool) ([]byte, error) {
buf := make([]byte, c.marshalSize())
buf[0] = 1<<6 | c.ChunkStreamID
buf[1] = byte(c.TimestampDelta >> 16)
buf[2] = byte(c.TimestampDelta >> 8)
buf[3] = byte(c.TimestampDelta)
buf[4] = byte(c.BodyLen >> 16)
buf[5] = byte(c.BodyLen >> 8)
buf[6] = byte(c.BodyLen)
buf[7] = c.Type
copy(buf[8:], c.Body)

if c.TimestampDelta >= 0xFFFFFF {
buf[1] = 0xFF
buf[2] = 0xFF
buf[3] = 0xFF
buf[4] = byte(c.BodyLen >> 16)
buf[5] = byte(c.BodyLen >> 8)
buf[6] = byte(c.BodyLen)
buf[7] = c.Type
buf[8] = byte(c.TimestampDelta >> 24)
buf[9] = byte(c.TimestampDelta >> 16)
buf[10] = byte(c.TimestampDelta >> 8)
buf[11] = byte(c.TimestampDelta)
copy(buf[12:], c.Body)
} else {
buf[1] = byte(c.TimestampDelta >> 16)
buf[2] = byte(c.TimestampDelta >> 8)
buf[3] = byte(c.TimestampDelta)
buf[4] = byte(c.BodyLen >> 16)
buf[5] = byte(c.BodyLen >> 8)
buf[6] = byte(c.BodyLen)
buf[7] = c.Type
copy(buf[8:], c.Body)
}

return buf, nil
}
34 changes: 0 additions & 34 deletions internal/protocols/rtmp/chunk/chunk1_test.go

This file was deleted.

46 changes: 38 additions & 8 deletions internal/protocols/rtmp/chunk/chunk2.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type Chunk2 struct {
}

// Read reads the chunk.
func (c *Chunk2) Read(r io.Reader, chunkBodyLen uint32) error {
func (c *Chunk2) Read(r io.Reader, bodyLen uint32, _ bool) error {
header := make([]byte, 4)
_, err := io.ReadFull(r, header)
if err != nil {
Expand All @@ -25,18 +25,48 @@ func (c *Chunk2) Read(r io.Reader, chunkBodyLen uint32) error {
c.ChunkStreamID = header[0] & 0x3F
c.TimestampDelta = uint32(header[1])<<16 | uint32(header[2])<<8 | uint32(header[3])

c.Body = make([]byte, chunkBodyLen)
if c.TimestampDelta >= 0xFFFFFF {
_, err = io.ReadFull(r, header[:4])
if err != nil {
return err
}

Check warning on line 32 in internal/protocols/rtmp/chunk/chunk2.go

View check run for this annotation

Codecov / codecov/patch

internal/protocols/rtmp/chunk/chunk2.go#L31-L32

Added lines #L31 - L32 were not covered by tests

c.TimestampDelta = uint32(header[0])<<24 | uint32(header[1])<<16 | uint32(header[2])<<8 | uint32(header[3])
}

c.Body = make([]byte, bodyLen)
_, err = io.ReadFull(r, c.Body)
return err
}

func (c Chunk2) marshalSize() int {
n := 4 + len(c.Body)
if c.TimestampDelta >= 0xFFFFFF {
n += 4
}
return n
}

// Marshal writes the chunk.
func (c Chunk2) Marshal() ([]byte, error) {
buf := make([]byte, 4+len(c.Body))
func (c Chunk2) Marshal(_ bool) ([]byte, error) {
buf := make([]byte, c.marshalSize())
buf[0] = 2<<6 | c.ChunkStreamID
buf[1] = byte(c.TimestampDelta >> 16)
buf[2] = byte(c.TimestampDelta >> 8)
buf[3] = byte(c.TimestampDelta)
copy(buf[4:], c.Body)

if c.TimestampDelta >= 0xFFFFFF {
buf[1] = 0xFF
buf[2] = 0xFF
buf[3] = 0xFF
buf[4] = byte(c.TimestampDelta >> 24)
buf[5] = byte(c.TimestampDelta >> 16)
buf[6] = byte(c.TimestampDelta >> 8)
buf[7] = byte(c.TimestampDelta)
copy(buf[8:], c.Body)
} else {
buf[1] = byte(c.TimestampDelta >> 16)
buf[2] = byte(c.TimestampDelta >> 8)
buf[3] = byte(c.TimestampDelta)
copy(buf[4:], c.Body)
}

return buf, nil
}
31 changes: 0 additions & 31 deletions internal/protocols/rtmp/chunk/chunk2_test.go

This file was deleted.

0 comments on commit f50cc05

Please sign in to comment.