From b169f94cce1ecbab50248f25ee3b33dd40602fe1 Mon Sep 17 00:00:00 2001 From: xiongziliang <771730766@qq.com> Date: Tue, 7 Apr 2020 13:03:53 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A4=A7=E5=B9=85=E6=8F=90=E9=AB=98rtsp?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E5=99=A8=E6=80=A7=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 3rdpart/ZLToolKit | 2 +- src/Rtsp/RtpMultiCaster.cpp | 17 +++-- src/Rtsp/RtspMediaSource.h | 122 ++++++++++++++++++++++++++++++++++-- src/Rtsp/RtspPusher.cpp | 38 +++++++---- src/Rtsp/RtspPusher.h | 2 +- src/Rtsp/RtspSession.cpp | 37 +++++++---- src/Rtsp/RtspSession.h | 4 +- 7 files changed, 181 insertions(+), 41 deletions(-) diff --git a/3rdpart/ZLToolKit b/3rdpart/ZLToolKit index 58a74f8c5a..681be205ef 160000 --- a/3rdpart/ZLToolKit +++ b/3rdpart/ZLToolKit @@ -1 +1 @@ -Subproject commit 58a74f8c5ab802a0dd9fdcdcc0fe4c5a3d841964 +Subproject commit 681be205ef164db08effd83f925bb750eb1fe149 diff --git a/src/Rtsp/RtpMultiCaster.cpp b/src/Rtsp/RtpMultiCaster.cpp index 49ac2945f4..88a382448f 100644 --- a/src/Rtsp/RtpMultiCaster.cpp +++ b/src/Rtsp/RtpMultiCaster.cpp @@ -109,13 +109,18 @@ RtpMultiCaster::RtpMultiCaster(const EventPoller::Ptr &poller,const string &strL _apUdpSock[i]->setSendPeerAddr((struct sockaddr *)&peerAddr); } _pReader = src->getRing()->attach(poller); - _pReader->setReadCB([this](const RtpPacket::Ptr &pkt){ - int i = (int)(pkt->type); - auto &pSock = _apUdpSock[i]; - auto &peerAddr = _aPeerUdpAddr[i]; - BufferRtp::Ptr buffer(new BufferRtp(pkt,4)); - pSock->send(buffer); + _pReader->setReadCB([this](const RtspMediaSource::RingDataType &pkt){ + int i = 0; + int size = pkt->size(); + pkt->for_each([&](const RtpPacket::Ptr &rtp) { + int i = (int) (rtp->type); + auto &pSock = _apUdpSock[i]; + auto &peerAddr = _aPeerUdpAddr[i]; + BufferRtp::Ptr buffer(new BufferRtp(rtp, 4)); + pSock->send(buffer, nullptr, 0, ++i == size); + }); }); + _pReader->setDetachCB([this](){ unordered_map _mapDetach_copy; { diff --git a/src/Rtsp/RtspMediaSource.h b/src/Rtsp/RtspMediaSource.h index 47ff76d72d..11ff43e74d 100644 --- a/src/Rtsp/RtspMediaSource.h +++ b/src/Rtsp/RtspMediaSource.h @@ -27,10 +27,95 @@ #include "Thread/ThreadPool.h" using namespace std; using namespace toolkit; +#define RTP_GOP_SIZE 512 +namespace mediakit { -#define RTP_GOP_SIZE 2048 +class RtpVideoCache { +public: -namespace mediakit { + RtpVideoCache() { + _cache = std::make_shared >(); + } + + virtual ~RtpVideoCache() = default; + + void inputVideoRtp(const RtpPacket::Ptr &rtp, bool key_pos) { + if (_last_rtp_stamp != rtp->timeStamp) { + //时间戳发生变化了 + flushAll(); + } else if (_cache->size() > RTP_GOP_SIZE) { + //这个逻辑用于避免时间戳异常的流导致的内存暴增问题 + flushAll(); + } + + //追加数据到最后 + _cache->emplace_back(rtp); + _last_rtp_stamp = rtp->timeStamp; + if (key_pos) { + _key_pos = key_pos; + } + } + + virtual void onFlushVideoRtp(std::shared_ptr > &, bool key_pos) = 0; + +private: + + void flushAll() { + if (_cache->empty()) { + return; + } + onFlushVideoRtp(_cache, _key_pos); + _cache = std::make_shared >(); + _key_pos = false; + } + +private: + + std::shared_ptr > _cache; + uint32_t _last_rtp_stamp = 0; + bool _key_pos = false; +}; + +class RtpAudioCache { +public: + + RtpAudioCache() { + _cache = std::make_shared >(); + } + + virtual ~RtpAudioCache() = default; + + void inputAudioRtp(const RtpPacket::Ptr &rtp) { + if (rtp->timeStamp > _last_rtp_stamp + 100) { + //累积了100ms的音频数据 + flushAll(); + } else if (_cache->size() > 10) { + //或者audio rtp缓存超过10个 + flushAll(); + } + + //追加数据到最后 + _cache->emplace_back(rtp); + _last_rtp_stamp = rtp->timeStamp; + } + + virtual void onFlushAudioRtp(std::shared_ptr > &) = 0; + +private: + + void flushAll() { + if (_cache->empty()) { + return; + } + onFlushAudioRtp(_cache); + _cache = std::make_shared >(); + } + +private: + + std::shared_ptr > _cache; + uint32_t _last_rtp_stamp = 0; +}; /** * rtsp媒体源的数据抽象 @@ -38,11 +123,12 @@ namespace mediakit { * 只要生成了这两要素,那么要实现rtsp推流、rtsp服务器就很简单了 * rtsp推拉流协议中,先传递sdp,然后再协商传输方式(tcp/udp/组播),最后一直传递rtp */ -class RtspMediaSource : public MediaSource, public RingDelegate { +class RtspMediaSource : public MediaSource, public RingDelegate, public RtpVideoCache, public RtpAudioCache { public: typedef ResourcePool PoolType; typedef std::shared_ptr Ptr; - typedef RingBuffer RingType; + typedef std::shared_ptr > RingDataType; + typedef RingBuffer RingType; /** * 构造函数 @@ -173,10 +259,34 @@ class RtspMediaSource : public MediaSource, public RingDelegate regist(); } } - //不存在视频,为了减少缓存延时,那么关闭GOP缓存 - _ring->write(rtp, _have_video ? keyPos : true); + + if(rtp->type == TrackVideo){ + RtpVideoCache::inputVideoRtp(rtp, keyPos); + }else{ + RtpAudioCache::inputAudioRtp(rtp); + } } + private: + + /** + * 批量flush时间戳相同的视频rtp包时触发该函数 + * @param rtp_list 时间戳相同的rtp包列表 + * @param key_pos 是否包含关键帧 + */ + void onFlushVideoRtp(std::shared_ptr > &rtp_list, bool key_pos) override { + _ring->write(rtp_list, key_pos); + } + + /** + * 批量flush一定数量的音频rtp包时触发该函数 + * @param rtp_list rtp包列表 + */ + void onFlushAudioRtp(std::shared_ptr > &rtp_list) override{ + //只有音频的话,就不存在gop缓存的意义 + _ring->write(rtp_list, !_have_video); + } + /** * 每次增减消费者都会触发该函数 */ diff --git a/src/Rtsp/RtspPusher.cpp b/src/Rtsp/RtspPusher.cpp index e59c86675e..922a553f50 100644 --- a/src/Rtsp/RtspPusher.cpp +++ b/src/Rtsp/RtspPusher.cpp @@ -301,23 +301,36 @@ void RtspPusher::sendOptions() { sendRtspRequest("OPTIONS",_strContentBase); } -inline void RtspPusher::sendRtpPacket(const RtpPacket::Ptr & pkt) { +inline void RtspPusher::sendRtpPacket(const RtspMediaSource::RingDataType &pkt) { //InfoL<<(int)pkt.Interleaved; switch (_eType) { case Rtsp::RTP_TCP: { - BufferRtp::Ptr buffer(new BufferRtp(pkt)); - send(buffer); + int i = 0; + int size = pkt->size(); + setSendFlushFlag(false); + pkt->for_each([&](const RtpPacket::Ptr &rtp) { + if (++i == size) { + setSendFlushFlag(true); + } + BufferRtp::Ptr buffer(new BufferRtp(rtp)); + send(buffer); + }); } break; case Rtsp::RTP_UDP: { - int iTrackIndex = getTrackIndexByTrackType(pkt->type); - auto &pSock = _apUdpSock[iTrackIndex]; - if (!pSock) { - shutdown(SockException(Err_shutdown,"udp sock not opened yet")); - return; - } - BufferRtp::Ptr buffer(new BufferRtp(pkt,4)); - pSock->send(buffer); + int i = 0; + int size = pkt->size(); + pkt->for_each([&](const RtpPacket::Ptr &rtp) { + int iTrackIndex = getTrackIndexByTrackType(rtp->type); + auto &pSock = _apUdpSock[iTrackIndex]; + if (!pSock) { + shutdown(SockException(Err_shutdown,"udp sock not opened yet")); + return; + } + + BufferRtp::Ptr buffer(new BufferRtp(rtp,4)); + pSock->send(buffer, nullptr, 0, ++i == size); + }); } break; default: @@ -337,7 +350,6 @@ inline int RtspPusher::getTrackIndexByTrackType(TrackType type) { return -1; } - void RtspPusher::sendRecord() { _onHandshake = [this](const Parser& parser){ auto src = _pMediaSrc.lock(); @@ -347,7 +359,7 @@ void RtspPusher::sendRecord() { _pRtspReader = src->getRing()->attach(getPoller()); weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); - _pRtspReader->setReadCB([weakSelf](const RtpPacket::Ptr &pkt){ + _pRtspReader->setReadCB([weakSelf](const RtspMediaSource::RingDataType &pkt){ auto strongSelf = weakSelf.lock(); if(!strongSelf) { return; diff --git a/src/Rtsp/RtspPusher.h b/src/Rtsp/RtspPusher.h index 6c1e8c7803..ccc4c76fd3 100644 --- a/src/Rtsp/RtspPusher.h +++ b/src/Rtsp/RtspPusher.h @@ -67,7 +67,7 @@ class RtspPusher : public TcpClient, public RtspSplitter, public PusherBase { inline int getTrackIndexByTrackType(TrackType type); - void sendRtpPacket(const RtpPacket::Ptr & pkt) ; + void sendRtpPacket(const RtspMediaSource::RingDataType & pkt) ; void sendRtspRequest(const string &cmd, const string &url ,const StrCaseMap &header = StrCaseMap(),const string &sdp = "" ); void sendRtspRequest(const string &cmd, const string &url ,const std::initializer_list &header,const string &sdp = ""); diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index 0f2dc6023d..a64a640d10 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -796,7 +796,7 @@ void RtspSession::handleReq_Play(const Parser &parser) { } strongSelf->shutdown(SockException(Err_shutdown,"rtsp ring buffer detached")); }); - _pRtpReader->setReadCB([weakSelf](const RtpPacket::Ptr &pack) { + _pRtpReader->setReadCB([weakSelf](const RtspMediaSource::RingDataType &pack) { auto strongSelf = weakSelf.lock(); if(!strongSelf) { return; @@ -1123,23 +1123,36 @@ int RtspSession::totalReaderCount(MediaSource &sender) { return _pushSrc ? _pushSrc->totalReaderCount() : sender.readerCount(); } -void RtspSession::sendRtpPacket(const RtpPacket::Ptr & pkt) { +void RtspSession::sendRtpPacket(const RtspMediaSource::RingDataType &pkt) { //InfoP(this) <<(int)pkt.Interleaved; switch (_rtpType) { case Rtsp::RTP_TCP: { - send(pkt); + int i = 0; + int size = pkt->size(); + setSendFlushFlag(false); + pkt->for_each([&](const RtpPacket::Ptr &rtp) { + if (++i == size) { + setSendFlushFlag(true); + } + send(rtp); + }); } break; case Rtsp::RTP_UDP: { - int iTrackIndex = getTrackIndexByTrackType(pkt->type); - auto &pSock = _apRtpSock[iTrackIndex]; - if (!pSock) { - shutdown(SockException(Err_shutdown,"udp sock not opened yet")); - return; - } - BufferRtp::Ptr buffer(new BufferRtp(pkt,4)); - _ui64TotalBytes += buffer->size(); - pSock->send(buffer); + int i = 0; + int size = pkt->size(); + pkt->for_each([&](const RtpPacket::Ptr &rtp) { + int iTrackIndex = getTrackIndexByTrackType(rtp->type); + auto &pSock = _apRtpSock[iTrackIndex]; + if (!pSock) { + shutdown(SockException(Err_shutdown, "udp sock not opened yet")); + return; + } + + BufferRtp::Ptr buffer(new BufferRtp(rtp, 4)); + _ui64TotalBytes += buffer->size(); + pSock->send(buffer, nullptr, 0, ++i == size); + }); } break; default: diff --git a/src/Rtsp/RtspSession.h b/src/Rtsp/RtspSession.h index 29e7319cea..eadafdff26 100644 --- a/src/Rtsp/RtspSession.h +++ b/src/Rtsp/RtspSession.h @@ -162,7 +162,7 @@ class RtspSession: public TcpSession, public RtspSplitter, public RtpReceiver , void onAuthDigest(const string &realm,const string &strMd5); //发送rtp给客户端 - void sendRtpPacket(const RtpPacket::Ptr &pkt); + void sendRtpPacket(const RtspMediaSource::RingDataType &pkt); //回复客户端 bool sendRtspResponse(const string &res_code,const std::initializer_list &header, const string &sdp = "" , const char *protocol = "RTSP/1.0"); bool sendRtspResponse(const string &res_code,const StrCaseMap &header = StrCaseMap(), const string &sdp = "",const char *protocol = "RTSP/1.0"); @@ -186,7 +186,7 @@ class RtspSession: public TcpSession, public RtspSplitter, public RtpReceiver , //rtsp播放器绑定的直播源 std::weak_ptr _pMediaSrc; //直播源读取器 - RingBuffer::RingReader::Ptr _pRtpReader; + RtspMediaSource::RingType::RingReader::Ptr _pRtpReader; //推流或拉流客户端采用的rtp传输方式 Rtsp::eRtpType _rtpType = Rtsp::RTP_Invalid; //sdp里面有效的track,包含音频或视频