Skip to content

Commit

Permalink
api: add "created" field to RTSP sessions, RTMP connections, HLS muxers
Browse files Browse the repository at this point in the history
  • Loading branch information
aler9 committed Aug 5, 2022
1 parent 055e08a commit 092a2be
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 31 deletions.
6 changes: 6 additions & 0 deletions apidocs/openapi.yaml
Expand Up @@ -295,6 +295,8 @@ components:
RTSPSession:
type: object
properties:
created:
type: string
remoteAddr:
type: string
state:
Expand All @@ -313,6 +315,8 @@ components:
RTMPConn:
type: object
properties:
created:
type: string
remoteAddr:
type: string
state:
Expand All @@ -322,6 +326,8 @@ components:
HLSMuxer:
type: object
properties:
created:
type: string
lastRequest:
type: string

Expand Down
3 changes: 3 additions & 0 deletions internal/core/hls_muxer.go
Expand Up @@ -127,6 +127,7 @@ type hlsMuxer struct {

ctx context.Context
ctxCancel func()
created time.Time
path *path
ringBuffer *ringbuffer.RingBuffer
lastRequestTime *int64
Expand Down Expand Up @@ -173,6 +174,7 @@ func newHLSMuxer(
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
created: time.Now(),
lastRequestTime: func() *int64 {
v := time.Now().Unix()
return &v
Expand Down Expand Up @@ -240,6 +242,7 @@ func (m *hlsMuxer) run() {

case req := <-m.chAPIHLSMuxersList:
req.data.Items[m.name] = hlsServerAPIMuxersListItem{
Created: m.created,
LastRequest: time.Unix(atomic.LoadInt64(m.lastRequestTime), 0).String(),
}
close(req.res)
Expand Down
4 changes: 3 additions & 1 deletion internal/core/hls_server.go
Expand Up @@ -12,6 +12,7 @@ import (
gopath "path"
"strings"
"sync"
"time"

"github.com/gin-gonic/gin"

Expand All @@ -27,7 +28,8 @@ func (nilWriter) Write(p []byte) (int, error) {
}

type hlsServerAPIMuxersListItem struct {
LastRequest string `json:"lastRequest"`
Created time.Time `json:"created"`
LastRequest string `json:"lastRequest"`
}

type hlsServerAPIMuxersListData struct {
Expand Down
2 changes: 1 addition & 1 deletion internal/core/path.go
Expand Up @@ -67,7 +67,7 @@ type pathParent interface {
}

type pathRTSPSession interface {
IsRTSPSession()
isRTSPSession()
}

type pathReaderState int
Expand Down
11 changes: 3 additions & 8 deletions internal/core/rtmp_conn.go
Expand Up @@ -74,6 +74,7 @@ type rtmpConn struct {

ctx context.Context
ctxCancel func()
created time.Time
path *path
ringBuffer *ringbuffer.RingBuffer // read
state rtmpConnState
Expand Down Expand Up @@ -115,6 +116,7 @@ func newRTMPConn(
parent: parent,
ctx: ctx,
ctxCancel: ctxCancel,
created: time.Now(),
}

c.log(logger.Info, "opened")
Expand All @@ -125,18 +127,11 @@ func newRTMPConn(
return c
}

// Close closes a Conn.
func (c *rtmpConn) close() {
c.ctxCancel()
}

// ID returns the ID of the Conn.
func (c *rtmpConn) ID() string {
return c.id
}

// RemoteAddr returns the remote address of the Conn.
func (c *rtmpConn) RemoteAddr() net.Addr {
func (c *rtmpConn) remoteAddr() net.Addr {
return c.nconn.RemoteAddr()
}

Expand Down
15 changes: 9 additions & 6 deletions internal/core/rtmp_server.go
Expand Up @@ -7,15 +7,17 @@ import (
"net"
"strconv"
"sync"
"time"

"github.com/aler9/rtsp-simple-server/internal/conf"
"github.com/aler9/rtsp-simple-server/internal/externalcmd"
"github.com/aler9/rtsp-simple-server/internal/logger"
)

type rtmpServerAPIConnsListItem struct {
RemoteAddr string `json:"remoteAddr"`
State string `json:"state"`
Created time.Time `json:"created"`
RemoteAddr string `json:"remoteAddr"`
State string `json:"state"`
}

type rtmpServerAPIConnsListData struct {
Expand Down Expand Up @@ -202,8 +204,9 @@ outer:
}

for c := range s.conns {
data.Items[c.ID()] = rtmpServerAPIConnsListItem{
RemoteAddr: c.RemoteAddr().String(),
data.Items[c.id] = rtmpServerAPIConnsListItem{
Created: c.created,
RemoteAddr: c.remoteAddr().String(),
State: func() string {
switch c.safeState() {
case rtmpConnStateRead:
Expand All @@ -222,7 +225,7 @@ outer:
case req := <-s.chAPIConnsKick:
res := func() bool {
for c := range s.conns {
if c.ID() == req.id {
if c.id == req.id {
delete(s.conns, c)
c.close()
return true
Expand Down Expand Up @@ -266,7 +269,7 @@ func (s *rtmpServer) newConnID() (string, error) {

alreadyPresent := func() bool {
for c := range s.conns {
if c.ID() == id {
if c.id == id {
return true
}
}
Expand Down
14 changes: 8 additions & 6 deletions internal/core/rtsp_server.go
Expand Up @@ -21,8 +21,9 @@ import (
)

type rtspServerAPISessionsListItem struct {
RemoteAddr string `json:"remoteAddr"`
State string `json:"state"`
Created time.Time `json:"created"`
RemoteAddr string `json:"remoteAddr"`
State string `json:"state"`
}

type rtspServerAPISessionsListData struct {
Expand Down Expand Up @@ -242,7 +243,7 @@ func (s *rtspServer) newSessionID() (string, error) {

alreadyPresent := func() bool {
for _, s := range s.sessions {
if s.ID() == id {
if s.id == id {
return true
}
}
Expand Down Expand Up @@ -408,8 +409,9 @@ func (s *rtspServer) apiSessionsList(req rtspServerAPISessionsListReq) rtspServe
}

for _, s := range s.sessions {
data.Items[s.ID()] = rtspServerAPISessionsListItem{
RemoteAddr: s.RemoteAddr().String(),
data.Items[s.id] = rtspServerAPISessionsListItem{
Created: s.created,
RemoteAddr: s.remoteAddr().String(),
State: func() string {
switch s.safeState() {
case gortsplib.ServerSessionStatePrePlay,
Expand Down Expand Up @@ -440,7 +442,7 @@ func (s *rtspServer) apiSessionsKick(req rtspServerAPISessionsKickReq) rtspServe
defer s.mutex.RUnlock()

for key, se := range s.sessions {
if se.ID() == req.id {
if se.id == req.id {
se.close()
delete(s.sessions, key)
se.onClose(liberrors.ErrServerTerminated{})
Expand Down
14 changes: 5 additions & 9 deletions internal/core/rtsp_session.go
Expand Up @@ -38,6 +38,7 @@ type rtspSession struct {
pathManager rtspSessionPathManager
parent rtspSessionParent

created time.Time
path *path
state gortsplib.ServerSessionState
stateMutex sync.Mutex
Expand Down Expand Up @@ -65,6 +66,7 @@ func newRTSPSession(
externalCmdPool: externalCmdPool,
pathManager: pathManager,
parent: parent,
created: time.Now(),
}

s.log(logger.Info, "created by %v", s.author.NetConn().RemoteAddr())
Expand All @@ -77,22 +79,16 @@ func (s *rtspSession) close() {
s.ss.Close()
}

// IsRTSPSession implements pathRTSPSession.
func (s *rtspSession) IsRTSPSession() {}

// ID returns the public ID of the session.
func (s *rtspSession) ID() string {
return s.id
}
// isRTSPSession implements pathRTSPSession.
func (s *rtspSession) isRTSPSession() {}

func (s *rtspSession) safeState() gortsplib.ServerSessionState {
s.stateMutex.Lock()
defer s.stateMutex.Unlock()
return s.state
}

// RemoteAddr returns the remote address of the author of the session.
func (s *rtspSession) RemoteAddr() net.Addr {
func (s *rtspSession) remoteAddr() net.Addr {
return s.author.NetConn().RemoteAddr()
}

Expand Down

0 comments on commit 092a2be

Please sign in to comment.