From 5eeee9cb6b6a08a69e55fa1bbbcce876bca6fd1a Mon Sep 17 00:00:00 2001 From: Daniel Kristjansson Date: Thu, 16 Feb 2012 14:24:58 -0500 Subject: [PATCH] Adds RTPDataPacket/RTPFECPacket/RTPTSPacket stubs. This adds some implementation code as well, but it's really just there to help me flesh out the interfaces. --- mythtv/libs/libmythtv/dtvrecorder.h | 2 +- mythtv/libs/libmythtv/iptvchannel.cpp | 26 +++- mythtv/libs/libmythtv/iptvchannel.h | 14 +- mythtv/libs/libmythtv/iptvrecorder.cpp | 26 +++- mythtv/libs/libmythtv/iptvrecorder.h | 8 +- mythtv/libs/libmythtv/iptvstreamhandler.cpp | 140 ++++++++++++++---- mythtv/libs/libmythtv/iptvstreamhandler.h | 48 +++++- mythtv/libs/libmythtv/libmythtv.pro | 4 + mythtv/libs/libmythtv/rtp/rtpdatapacket.h | 115 ++++++++++++++ mythtv/libs/libmythtv/rtp/rtpfecpacket.h | 23 +++ mythtv/libs/libmythtv/rtp/rtpfixedheader.h | 63 -------- mythtv/libs/libmythtv/rtp/rtppacketbuffer.cpp | 110 ++++++++++++++ mythtv/libs/libmythtv/rtp/rtppacketbuffer.h | 53 +++++-- mythtv/libs/libmythtv/rtp/rtptsdatapacket.h | 39 +++++ mythtv/libs/libmythtv/rtp/udppacket.h | 43 ++++++ 15 files changed, 582 insertions(+), 132 deletions(-) create mode 100644 mythtv/libs/libmythtv/rtp/rtpdatapacket.h create mode 100644 mythtv/libs/libmythtv/rtp/rtpfecpacket.h delete mode 100644 mythtv/libs/libmythtv/rtp/rtpfixedheader.h create mode 100644 mythtv/libs/libmythtv/rtp/rtppacketbuffer.cpp create mode 100644 mythtv/libs/libmythtv/rtp/rtptsdatapacket.h create mode 100644 mythtv/libs/libmythtv/rtp/udppacket.h diff --git a/mythtv/libs/libmythtv/dtvrecorder.h b/mythtv/libs/libmythtv/dtvrecorder.h index e2749bf0944..5f0c28257db 100644 --- a/mythtv/libs/libmythtv/dtvrecorder.h +++ b/mythtv/libs/libmythtv/dtvrecorder.h @@ -52,7 +52,7 @@ class DTVRecorder : virtual void SetNextRecording(const ProgramInfo*, RingBuffer*); virtual void SetStreamData(void); - void SetStreamData(MPEGStreamData* sd); + virtual void SetStreamData(MPEGStreamData* sd); MPEGStreamData *GetStreamData(void) const { return _stream_data; } virtual void Reset(void); diff --git a/mythtv/libs/libmythtv/iptvchannel.cpp b/mythtv/libs/libmythtv/iptvchannel.cpp index dca8958520a..30df769d911 100644 --- a/mythtv/libs/libmythtv/iptvchannel.cpp +++ b/mythtv/libs/libmythtv/iptvchannel.cpp @@ -8,14 +8,15 @@ // MythTV headers #include "iptvstreamhandler.h" +#include "iptvrecorder.h" #include "iptvchannel.h" #include "mythlogging.h" #include "mythdb.h" #define LOC QString("IPTVChan(%1): ").arg(GetCardID()) -IPTVChannel::IPTVChannel(TVRec *rec) : - DTVChannel(rec), m_open(false) +IPTVChannel::IPTVChannel(TVRec *rec, const QString&) : + DTVChannel(rec), m_open(false), m_recorder(NULL) { LOG(VB_CHANNEL, LOG_INFO, LOC + "ctor"); } @@ -47,6 +48,16 @@ bool IPTVChannel::Open(void) return m_open; } +void IPTVChannel::SetRecorder(IPTVRecorder *rec) +{ + QMutexLocker locker(&m_lock); + if (m_recorder && m_stream_handler && m_recorder->GetStreamData()) + m_stream_handler->RemoveListener(m_recorder->GetStreamData()); + m_recorder = rec; + if (m_recorder && m_stream_handler && m_recorder->GetStreamData()) + m_stream_handler->AddListener(m_recorder->GetStreamData()); +} + void IPTVChannel::Close(void) { LOG(VB_GENERAL, LOG_INFO, LOC + "Close()"); @@ -72,13 +83,12 @@ bool IPTVChannel::Tune(const QString &freqid, int finetune) QHostAddress addr(QHostAddress::Any); - int ports[3]; - ports[0] = 5555; - ports[1] = -1; - ports[2] = -1; + int ports[3] = { 5555, -1, -1, }; + int bitrate = 5000000; - QString channel_id = QString("%1!%2!%3!%4") - .arg(addr.toString()).arg(ports[0]).arg(ports[1]).arg(ports[2]); + QString channel_id = QString("%1!%2!%3!%4!%5") + .arg(addr.toString()).arg(ports[0]).arg(ports[1]).arg(ports[2]) + .arg(bitrate); if (m_stream_handler) IPTVStreamHandler::Return(m_stream_handler); diff --git a/mythtv/libs/libmythtv/iptvchannel.h b/mythtv/libs/libmythtv/iptvchannel.h index 28bc2ea9d32..47383388c35 100644 --- a/mythtv/libs/libmythtv/iptvchannel.h +++ b/mythtv/libs/libmythtv/iptvchannel.h @@ -16,17 +16,22 @@ #include "dtvchannel.h" class IPTVStreamHandler; +class IPTVRecorder; class IPTVChannel : public DTVChannel { public: - IPTVChannel(TVRec*); + IPTVChannel(TVRec*, const QString&); ~IPTVChannel(); // Commands - bool Open(void); - void Close(void); - bool Tune(const QString &freqid, int finetune); + virtual bool Open(void); + virtual void Close(void); + virtual bool Tune(const QString &freqid, int finetune); + virtual bool Tune(const DTVMultiplex&, QString) { return false; } + + // Sets + void SetRecorder(IPTVRecorder*); // Gets bool IsOpen(void) const; @@ -36,6 +41,7 @@ class IPTVChannel : public DTVChannel volatile bool m_open; QString m_last_channel_id; IPTVStreamHandler *m_stream_handler; + IPTVRecorder *m_recorder; }; #endif // _IPTV_CHANNEL_H_ diff --git a/mythtv/libs/libmythtv/iptvrecorder.cpp b/mythtv/libs/libmythtv/iptvrecorder.cpp index 68d27aa3259..b0ee83cfc54 100644 --- a/mythtv/libs/libmythtv/iptvrecorder.cpp +++ b/mythtv/libs/libmythtv/iptvrecorder.cpp @@ -7,8 +7,9 @@ */ // MythTV headers -#include "iptvrecorder.h" #include "mpegstreamdata.h" +#include "iptvrecorder.h" +#include "iptvchannel.h" #define LOC QString("IPTVRec: ") @@ -34,6 +35,8 @@ bool IPTVRecorder::Open(void) LOG(VB_RECORD, LOG_INFO, LOC + "opened successfully"); + m_channel->SetRecorder(this); + return true; } @@ -45,9 +48,19 @@ bool IPTVRecorder::IsOpen(void) const void IPTVRecorder::Close(void) { LOG(VB_RECORD, LOG_INFO, LOC + "Close()"); + + m_channel->SetRecorder(NULL); + m_open = false; } +void IPTVRecorder::SetStreamData(MPEGStreamData *data) +{ + DTVRecorder::SetStreamData(data); + if (m_open) + m_channel->SetRecorder(this); +} + void IPTVRecorder::run(void) { LOG(VB_RECORD, LOG_INFO, LOC + "run -- begin"); @@ -84,6 +97,14 @@ void IPTVRecorder::run(void) if (!IsRecordingRequested()) break; + { // sleep 100 milliseconds unless StopRecording() or Unpause() + // is called, just to avoid running this too often. + QMutexLocker locker(&pauseLock); + if (!request_recording || request_pause) + continue; + unpauseWait.wait(&pauseLock, 100); + } + if (!_input_pmt) { LOG(VB_GENERAL, LOG_WARNING, LOC + @@ -91,9 +112,6 @@ void IPTVRecorder::run(void) usleep(5000); continue; } - - // TODO IMPLEMENT -- RTP/UDP reading.. - usleep(100 * 1000); } LOG(VB_RECORD, LOG_INFO, LOC + "run -- ending..."); diff --git a/mythtv/libs/libmythtv/iptvrecorder.h b/mythtv/libs/libmythtv/iptvrecorder.h index 70195689d4b..a9def81b740 100644 --- a/mythtv/libs/libmythtv/iptvrecorder.h +++ b/mythtv/libs/libmythtv/iptvrecorder.h @@ -21,11 +21,13 @@ class IPTVRecorder : public DTVRecorder IPTVRecorder(TVRec*, IPTVChannel*); ~IPTVRecorder(); - bool Open(void); + virtual bool Open(void); // RecorderBase + virtual void Close(void); // RecorderBase bool IsOpen(void) const; - void Close(void); - virtual void run(void); + virtual void SetStreamData(MPEGStreamData*); // DTVRecorder + + virtual void run(void); // QRunnable private: IPTVChannel *m_channel; diff --git a/mythtv/libs/libmythtv/iptvstreamhandler.cpp b/mythtv/libs/libmythtv/iptvstreamhandler.cpp index a7bee580912..52687200e87 100644 --- a/mythtv/libs/libmythtv/iptvstreamhandler.cpp +++ b/mythtv/libs/libmythtv/iptvstreamhandler.cpp @@ -5,28 +5,32 @@ // MythTV headers #include "iptvstreamhandler.h" +#include "rtppacketbuffer.h" +#include "rtptsdatapacket.h" +#include "rtpdatapacket.h" +#include "rtpfecpacket.h" #include "mythlogging.h" -#define LOC QString("IPTVSH(%1): ").arg(_device) +#define LOC QString("IPTVSH(%1): ").arg(_device) -QMap IPTVStreamHandler::_handlers; -QMap IPTVStreamHandler::_handlers_refcnt; -QMutex IPTVStreamHandler::_handlers_lock; +QMap IPTVStreamHandler::s_handlers; +QMap IPTVStreamHandler::s_handlers_refcnt; +QMutex IPTVStreamHandler::s_handlers_lock; IPTVStreamHandler *IPTVStreamHandler::Get(const QString &devname) { - QMutexLocker locker(&_handlers_lock); + QMutexLocker locker(&s_handlers_lock); QString devkey = devname.toUpper(); - QMap::iterator it = _handlers.find(devkey); + QMap::iterator it = s_handlers.find(devkey); - if (it == _handlers.end()) + if (it == s_handlers.end()) { IPTVStreamHandler *newhandler = new IPTVStreamHandler(devkey); newhandler->Open(); - _handlers[devkey] = newhandler; - _handlers_refcnt[devkey] = 1; + s_handlers[devkey] = newhandler; + s_handlers_refcnt[devkey] = 1; LOG(VB_RECORD, LOG_INFO, QString("IPTVSH: Creating new stream handler %1 for %2") @@ -34,25 +38,25 @@ IPTVStreamHandler *IPTVStreamHandler::Get(const QString &devname) } else { - _handlers_refcnt[devkey]++; - uint rcount = _handlers_refcnt[devkey]; + s_handlers_refcnt[devkey]++; + uint rcount = s_handlers_refcnt[devkey]; LOG(VB_RECORD, LOG_INFO, QString("IPTVSH: Using existing stream handler %1 for %2") .arg(devkey) .arg(devname) + QString(" (%1 in use)").arg(rcount)); } - return _handlers[devkey]; + return s_handlers[devkey]; } void IPTVStreamHandler::Return(IPTVStreamHandler * & ref) { - QMutexLocker locker(&_handlers_lock); + QMutexLocker locker(&s_handlers_lock); QString devname = ref->_device; - QMap::iterator rit = _handlers_refcnt.find(devname); - if (rit == _handlers_refcnt.end()) + QMap::iterator rit = s_handlers_refcnt.find(devname); + if (rit == s_handlers_refcnt.end()) return; if (*rit > 1) @@ -62,14 +66,14 @@ void IPTVStreamHandler::Return(IPTVStreamHandler * & ref) return; } - QMap::iterator it = _handlers.find(devname); - if ((it != _handlers.end()) && (*it == ref)) + QMap::iterator it = s_handlers.find(devname); + if ((it != s_handlers.end()) && (*it == ref)) { LOG(VB_RECORD, LOG_INFO, QString("IPTVSH: Closing handler for %1") .arg(devname)); ref->Close(); delete *it; - _handlers.erase(it); + s_handlers.erase(it); } else { @@ -78,7 +82,7 @@ void IPTVStreamHandler::Return(IPTVStreamHandler * & ref) .arg(devname)); } - _handlers_refcnt.erase(rit); + s_handlers_refcnt.erase(rit); ref = NULL; } @@ -86,18 +90,20 @@ IPTVStreamHandler::IPTVStreamHandler(const QString &device) : StreamHandler(device) { QStringList parts = device.split("!"); - if (parts.size() >= 4) + if (parts.size() >= 5) { m_addr = QHostAddress(parts[0]); m_ports[0] = parts[1].toInt(); m_ports[1] = parts[2].toInt(); m_ports[2] = parts[3].toInt(); + m_bitrate = parts[4].toInt(); } else { m_ports[0] = -1; m_ports[1] = -1; m_ports[2] = -1; + m_bitrate = -1; } } @@ -105,18 +111,22 @@ void IPTVStreamHandler::run(void) { RunProlog(); - // Open our ports... // TODO Error handling.. + + // Setup for (uint i = 0; i < sizeof(m_ports)/sizeof(int); i++) { if (m_ports[i] >= 0) { m_sockets[i] = new QUdpSocket(); - m_helpers[i] = new IPTVStreamHandlerHelper(this, m_sockets[i]); + m_read_helpers[i] = new IPTVStreamHandlerReadHelper(this, m_sockets[i], i); m_sockets[i]->bind(m_addr, m_ports[i]); } } + m_buffer = new RTPPacketBuffer(m_bitrate); + m_write_helper = new IPTVStreamHandlerWriteHelper(this); + // Enter event loop exec(); // Clean up @@ -126,25 +136,91 @@ void IPTVStreamHandler::run(void) { delete m_sockets[i]; m_sockets[i] = NULL; - delete m_helpers[i]; - m_helpers[i] = NULL; + delete m_read_helpers[i]; + m_read_helpers[i] = NULL; } } + delete m_buffer; + m_buffer = NULL; + delete m_write_helper; + m_write_helper = NULL; RunEpilog(); } -void IPTVStreamHandlerHelper::ReadPending(void) +void IPTVStreamHandlerReadHelper::ReadPending(void) { - QByteArray datagram; QHostAddress sender; quint16 senderPort; - - while (m_socket->hasPendingDatagrams()) + + if (0 == m_stream) + { + while (m_socket->hasPendingDatagrams()) + { + RTPDataPacket packet(m_parent->m_buffer->GetEmptyPacket()); + QByteArray &data = packet.GetDataReference(); + data.resize(m_socket->pendingDatagramSize()); + m_socket->readDatagram(data.data(), data.size(), + &sender, &senderPort); + m_parent->m_buffer->PushDataPacket(packet); + } + } + else + { + while (m_socket->hasPendingDatagrams()) + { + RTPFECPacket packet(m_parent->m_buffer->GetEmptyPacket()); + QByteArray &data = packet.GetDataReference(); + data.resize(m_socket->pendingDatagramSize()); + m_socket->readDatagram(data.data(), data.size(), + &sender, &senderPort); + m_parent->m_buffer->PushFECPacket(packet, m_stream - 1); + } + } +} + +#define LOC_WH QString("IPTVSH(%1): ").arg(m_parent->_device) + +void IPTVStreamHandlerWriteHelper::timerEvent(QTimerEvent*) +{ + if (!m_parent->m_buffer->HasAvailablePacket()) + return; + + while (true) { - datagram.resize(m_socket->pendingDatagramSize()); - m_socket->readDatagram(datagram.data(), datagram.size(), - &sender, &senderPort); - // TODO actually do something with the data.. + RTPDataPacket packet(m_parent->m_buffer->PopDataPacket()); + + if (!packet.IsValid()) + break; + + if (packet.GetPayloadType() == RTPDataPacket::kPayLoadTypeTS) + { + RTPTSDataPacket ts_packet(packet); + + if (!ts_packet.IsValid()) + { + m_parent->m_buffer->FreePacket(packet); + continue; + } + + m_parent->_listener_lock.lock(); + + int remainder = 0; + IPTVStreamHandler::StreamDataList::const_iterator sit; + sit = m_parent->_stream_data_list.begin(); + for (; sit != m_parent->_stream_data_list.end(); ++sit) + remainder = sit.key()->ProcessData(ts_packet.GetTSData(), ts_packet.GetTSDataSize()); + + m_parent->_listener_lock.unlock(); + + if (remainder != 0) + { + LOG(VB_RECORD, LOG_INFO, LOC_WH + + QString("RunTS(): data_length = %1 remainder = %2") + .arg(ts_packet.GetTSDataSize()).arg(remainder)); + } + } + + m_parent->m_buffer->FreePacket(packet); } } diff --git a/mythtv/libs/libmythtv/iptvstreamhandler.h b/mythtv/libs/libmythtv/iptvstreamhandler.h index a9ed036e475..90e61108497 100644 --- a/mythtv/libs/libmythtv/iptvstreamhandler.h +++ b/mythtv/libs/libmythtv/iptvstreamhandler.h @@ -19,13 +19,14 @@ using namespace std; class IPTVStreamHandler; class DTVSignalMonitor; +class RTPPacketBuffer; class IPTVChannel; -class IPTVStreamHandlerHelper : QObject +class IPTVStreamHandlerReadHelper : QObject { public: - IPTVStreamHandlerHelper(IPTVStreamHandler *p, QUdpSocket *s) : - m_parent(p), m_socket(s) + IPTVStreamHandlerReadHelper(IPTVStreamHandler *p, QUdpSocket *s, uint stream) : + m_parent(p), m_socket(s), m_stream(stream) { connect(m_socket, SIGNAL(readyRead()), this, SLOT(ReadPending())); @@ -37,10 +38,37 @@ class IPTVStreamHandlerHelper : QObject private: IPTVStreamHandler *m_parent; QUdpSocket *m_socket; + uint m_stream; +}; + +class IPTVStreamHandlerWriteHelper : QObject +{ + public: + IPTVStreamHandlerWriteHelper(IPTVStreamHandler *p) : m_parent(p) { } + ~IPTVStreamHandlerWriteHelper() + { + killTimer(m_timer); + m_timer = 0; + m_parent = NULL; + } + + void Start(void) + { + m_timer = startTimer(200); + } + + private: + void timerEvent(QTimerEvent*); + + private: + IPTVStreamHandler *m_parent; + int m_timer; }; class IPTVStreamHandler : public StreamHandler { + friend class IPTVStreamHandlerReadHelper; + friend class IPTVStreamHandlerWriteHelper; public: static IPTVStreamHandler *Get(const QString &devicename); static void Return(IPTVStreamHandler * & ref); @@ -53,6 +81,8 @@ class IPTVStreamHandler : public StreamHandler StreamHandler::AddListener(data, false, false, output_file); } // StreamHandler + int GetBitrate(void) const { return m_bitrate; } + private: IPTVStreamHandler(const QString &); @@ -62,17 +92,19 @@ class IPTVStreamHandler : public StreamHandler virtual void run(void); // MThread private: - mutable QMutex m_lock; // TODO should we care about who is broadcasting to us? QHostAddress m_addr; int m_ports[3]; + int m_bitrate; QUdpSocket *m_sockets[3]; - IPTVStreamHandlerHelper *m_helpers[3]; + IPTVStreamHandlerReadHelper *m_read_helpers[3]; + IPTVStreamHandlerWriteHelper *m_write_helper; + RTPPacketBuffer *m_buffer; // for implementing Get & Return - static QMutex _handlers_lock; - static QMap _handlers; - static QMap _handlers_refcnt; + static QMutex s_handlers_lock; + static QMap s_handlers; + static QMap s_handlers_refcnt; }; #endif // _IPTVSTREAMHANDLER_H_ diff --git a/mythtv/libs/libmythtv/libmythtv.pro b/mythtv/libs/libmythtv/libmythtv.pro index ad233eca496..4c1281bef49 100644 --- a/mythtv/libs/libmythtv/libmythtv.pro +++ b/mythtv/libs/libmythtv/libmythtv.pro @@ -569,9 +569,13 @@ using_backend { # Support for RTP/UDP streams HEADERS += iptvchannel.h iptvrecorder.h HEADERS += iptvsignalmonitor.h iptvstreamhandler.h + HEADERS += rtppacketbuffer.h udppacket.h + HEADERS += rtpdatapacket.h rtpfecpacket.h SOURCES += iptvchannel.cpp iptvrecorder.cpp SOURCES += iptvsignalmonitor.cpp iptvstreamhandler.cpp + SOURCES += rtppacketbuffer.cpp + DEFINES += USING_IPTV # Support for HDHomeRun box using_hdhomerun { diff --git a/mythtv/libs/libmythtv/rtp/rtpdatapacket.h b/mythtv/libs/libmythtv/rtp/rtpdatapacket.h new file mode 100644 index 00000000000..5ee84509512 --- /dev/null +++ b/mythtv/libs/libmythtv/rtp/rtpdatapacket.h @@ -0,0 +1,115 @@ +/* -*- Mode: c++ -*- + * Copyright (c) 2012 Digital Nirvana, Inc. + * Distributed as part of MythTV under GPL v2 and later. + */ + +#ifndef _RTP_DATA_PACKET_H_ +#define _RTP_DATA_PACKET_H_ + +#include // for ntohs()/ntohl() + +#include "udppacket.h" + +/** \brief RTP Data Packet + * + * The RTP Header exists for all RTP packets, it contains a payload + * type, timestamp and a sequence number for packet reordering. + * + * Different RTP Data Packet types have their own sub-classes for + * accessing the data portion of the packet. + * + * The data is stored in a QByteArray which is a reference counted + * shared data container, so an RTPDataPacket can be assigned to a + * subclass efficiently. + */ +class RTPDataPacket : public UDPPacket +{ + public: + RTPDataPacket(const RTPDataPacket &o) : UDPPacket(o), m_off(o.m_off) { } + RTPDataPacket(const UDPPacket &o) : UDPPacket(o) { } + RTPDataPacket(uint64_t key) : UDPPacket(key), m_off(0) { } + RTPDataPacket(void) : UDPPacket(0ULL), m_off(0) { } + + bool IsValid(void) const + { + if (m_data.size() < 12) + return false; + + if (2 != GetVersion()) + return false; + + if (HasPadding() && (m_data.size() < 1328)) + return false; + + int off = 12 + 4 * GetCSRCCount(); + if (off > m_data.size()) + return false; + + if (HasExtension()) + { + uint ext_size = m_data[off+2] << 8 | m_data[off+3]; + off += 4 * (1 + ext_size); + } + if (off > m_data.size()) + return false; + + m_off = off; + + return true; + } + + uint GetVersion(void) const { return (m_data[0] >> 6); } + bool HasPadding(void) const { return (m_data[0] >> 5) & 0x1; } + bool HasExtension(void) const { return (m_data[0] >> 4) & 0x1; } + uint GetCSRCCount(void) const { return m_data[0] & 0xf; } + + enum { + kPayLoadTypePCMAudio = 8, + kPayLoadTypeMPEGAudio = 12, + kPayLoadTypeH261Video = 31, + kPayLoadTypeMPEG2Video = 32, + kPayLoadTypeTS = 33, + kPayLoadTypeH263Video = 34, + }; + + uint GetPayloadType(void) const + { + return m_data[1] & 0x7f; + } + + uint GetSequenceNumber(void) const + { + return ntohs(*reinterpret_cast(m_data.data()+2)); + } + + uint GetTimeStamp(void) const + { + return ntohl(*reinterpret_cast(m_data.data()+4)); + } + + uint GetSynchronizationSource(void) const + { + return ntohl(*reinterpret_cast(m_data.data()+8)); + } + + uint GetContributingSource(uint i) const + { + const uint32_t tmp = + *reinterpret_cast(m_data.data() + 12 + 4 * i); + return ntohl(tmp); + } + + uint GetPayloadOffset(void) const { return m_off; } + + uint GetPaddingSize(void) const + { + if (!HasPadding()) + return 0; + return m_data[1328]; + } + + protected: + mutable uint m_off; +}; + +#endif // _RTP_DATA_PACKET_H_ diff --git a/mythtv/libs/libmythtv/rtp/rtpfecpacket.h b/mythtv/libs/libmythtv/rtp/rtpfecpacket.h new file mode 100644 index 00000000000..8b94f0f672e --- /dev/null +++ b/mythtv/libs/libmythtv/rtp/rtpfecpacket.h @@ -0,0 +1,23 @@ +/* -*- Mode: c++ -*- + * Copyright (c) 2012 Digital Nirvana, Inc. + * Distributed as part of MythTV under GPL v2 and later. + */ + +#include "udppacket.h" + +#ifndef _RTP_FEC_PACKET_H_ +#define _RTP_FEC_PACKET_H_ + +/** \brief RTP FEC Packet + */ +class RTPFECPacket : public UDPPacket +{ + public: + RTPFECPacket(const UDPPacket &o) : UDPPacket(o) { } + RTPFECPacket(uint64_t key) : UDPPacket(key) { } + RTPFECPacket(void) : UDPPacket(0ULL) { } + + // TODO +}; + +#endif // _RTP_FEC_PACKET_H_ diff --git a/mythtv/libs/libmythtv/rtp/rtpfixedheader.h b/mythtv/libs/libmythtv/rtp/rtpfixedheader.h deleted file mode 100644 index 5c52092f881..00000000000 --- a/mythtv/libs/libmythtv/rtp/rtpfixedheader.h +++ /dev/null @@ -1,63 +0,0 @@ -/* -*- Mode: c++ -*- - * Copyright (c) 2012 Digital Nirvana, Inc. - * Distributed as part of MythTV under GPL v2 and later. - */ - -/** \brief RTP Fixed Header for all packets. - * - * The RTP Header exists for all RTP packets, it contains a payload - * type, timestamp and a sequence number for packet reordering. - */ -class RTPFixedHeader -{ - public: - RTPFixedHeader(const char *data, uint sz) : - m_data(data) - { - // TODO verify packet is big enough to contain the data - m_valid = 2 == GetVersion(); - } - - bool IsValid(void) const { return m_valid; } - uint GetVersion(void) const { return (m_data[0] >> 6); } - uint GetExtension(void) const { return (m_data[0] >> 4) & 0x1; } - uint GetCSRCCount(void) const { return m_data[0] & 0xf; } - - enum { - kPayLoadTypePCMAudio = 8, - kPayLoadTypeMPEGAudio = 12, - kPayLoadTypeH261Video = 31, - kPayLoadTypeMPEG2Video = 32, - kPayLoadTypeTS = 33, - kPayLoadTypeH263Video = 34, - }; - - uint GetPayloadType(void) const - { - return m_data[1] & 0x7f; - } - - uint GetSequenceNumber(void) const - { - return 0; // TODO -- return data and ensure proper byte order. - } - - uint GetTimeStamp(void) const - { - return 0; // TODO -- return data and ensure proper byte order. - } - - uint GetSynchronizationSource(void) const - { - return 0; // TODO -- return data and ensure proper byte order. - } - - uint GetContributingSource(uint i) const - { - return 0; // TODO -- return data and ensure proper byte order. - } - - private: - unsigned char *m_data; - bool m_valid; -}; diff --git a/mythtv/libs/libmythtv/rtp/rtppacketbuffer.cpp b/mythtv/libs/libmythtv/rtp/rtppacketbuffer.cpp new file mode 100644 index 00000000000..085795b2743 --- /dev/null +++ b/mythtv/libs/libmythtv/rtp/rtppacketbuffer.cpp @@ -0,0 +1,110 @@ +/* -*- Mode: c++ -*- + * RTPPacketBuffer + * Copyright (c) 2012 Digital Nirvana, Inc. + * Distributed as part of MythTV under GPL v2 and later. + */ + +#include +using namespace std; + +#include "rtppacketbuffer.h" +#include "rtpdatapacket.h" +#include "rtpfecpacket.h" + +RTPPacketBuffer::RTPPacketBuffer(unsigned int bitrate) : + m_bitrate(bitrate), + m_next_empty_packet_key(0ULL), + m_large_sequence_number_seen_recently(0), + m_current_sequence(0ULL) +{ + while (!m_next_empty_packet_key) + { + m_next_empty_packet_key = + (random() << 24) ^ (random() << 16) ^ + (random() << 8) ^ random(); + } +} + +void RTPPacketBuffer::PushDataPacket(const RTPDataPacket &packet) +{ + uint key = packet.GetSequenceNumber(); + + bool large_was_seen_recently = m_large_sequence_number_seen_recently > 0; + m_large_sequence_number_seen_recently = + (key > (1U>>31)) ? 500 : m_large_sequence_number_seen_recently - 1; + m_large_sequence_number_seen_recently = + max(m_large_sequence_number_seen_recently, 0); + + if (m_large_sequence_number_seen_recently > 0) + { + if (key < (1U>>20)) + key += 1ULL<<32; + } + else if (large_was_seen_recently) + { + m_current_sequence += 1ULL<<32; + } + + key += m_current_sequence; + + m_unordered_packets[key] = packet; + + // TODO pushing packets onto the ordered list should be based on + // the bitrate and the M+N of the FEC.. but for now... + const int kHighWaterMark = 500; + const int kLowWaterMark = 100; + if (m_unordered_packets.size() > kHighWaterMark) + { + while (m_unordered_packets.size() > kLowWaterMark) + { + QMap::iterator it = m_unordered_packets.begin(); + m_available_packets.push_back(*it); + m_unordered_packets.erase(it); + } + } +} + +void RTPPacketBuffer::PushFECPacket(const RTPFECPacket &packet, uint fec_stream_num) +{ + (void) fec_stream_num; + // TODO IMPLEMENT + // for now just free the packet for immediate reuse + FreePacket(packet); +} + +bool RTPPacketBuffer::HasAvailablePacket(void) const +{ + return !m_available_packets.empty(); +} + +RTPDataPacket RTPPacketBuffer::PopDataPacket(void) +{ + if (m_available_packets.empty()) + return RTPDataPacket(0); + + RTPDataPacket packet(m_available_packets.front()); + m_available_packets.pop_front(); + + return packet; +} + +UDPPacket RTPPacketBuffer::GetEmptyPacket(void) +{ + QMap::iterator it = m_empty_packets.begin(); + if (it == m_empty_packets.end()) + { + return UDPPacket(m_next_empty_packet_key++); + } + + UDPPacket packet(*it); + m_empty_packets.erase(it); + + return packet; +} + +void RTPPacketBuffer::FreePacket(const UDPPacket &packet) +{ + uint64_t top = packet.GetKey() & (0xFFFFFFFFULL<<32); + if (top == (m_next_empty_packet_key & (0xFFFFFFFFULL<<32))) + m_empty_packets[packet.GetKey()] = packet; +} diff --git a/mythtv/libs/libmythtv/rtp/rtppacketbuffer.h b/mythtv/libs/libmythtv/rtp/rtppacketbuffer.h index dc807848cc2..2b0fae64e06 100644 --- a/mythtv/libs/libmythtv/rtp/rtppacketbuffer.h +++ b/mythtv/libs/libmythtv/rtp/rtppacketbuffer.h @@ -4,23 +4,58 @@ * Distributed as part of MythTV under GPL v2 and later. */ +#ifndef _RTP_PACKET_BUFFER_H_ +#define _RTP_PACKET_BUFFER_H_ + +#include +#include + +#include "udppacket.h" + +class RTPDataPacket; +class RTPFECPacket; + class RTPPacketBuffer { - RTPPacketBuffer(uint bitrate); - ~RTPPacketBuffer(); + public: + RTPPacketBuffer(unsigned int bitrate); /// Adds RFC 3550 RTP data packet - void PushDataPacket(unsigned char *data, uint size); + void PushDataPacket(const RTPDataPacket&); /// Adds SMPTE 2022 Forward Error Correction Stream packet - void PushFECPacket(unsigned char *data, uint size, int fec_stream_num); + void PushFECPacket(const RTPFECPacket&, unsigned int fec_stream_num); + + /// Return true if there are ordered packets ready for processing + bool HasAvailablePacket(void) const; - /// Returns number of re-ordered packets ready for reading - uint AvailablePackets(void) const; + /// Fetches an RTP Data Packet for processing. + /// Call FreePacket() when done with the packet. + RTPDataPacket PopDataPacket(void); - /// Fetches an RTP Data Packet for processing - RTPDataPacket *PopDataPacket(void); + /// Gets a packet for use in PushDataPacket/PushFECPacket + UDPPacket GetEmptyPacket(void); /// Frees an RTPDataPacket returned by PopDataPacket. - void FreeDataPacket(RTPDataPacket*); + void FreePacket(const UDPPacket &); + + private: + uint m_bitrate; + + /// Packets key to use for next empty packet + uint64_t m_next_empty_packet_key; + + /// Packets ready for reuse + QMap m_empty_packets; + + /// Ordered list of available packets + QList m_available_packets; + + int m_large_sequence_number_seen_recently; + uint64_t m_current_sequence; + + /// The key is the RTP sequence number + sequence if applicable + QMap m_unordered_packets; }; + +#endif // _RTP_PACKET_BUFFER_H_ diff --git a/mythtv/libs/libmythtv/rtp/rtptsdatapacket.h b/mythtv/libs/libmythtv/rtp/rtptsdatapacket.h new file mode 100644 index 00000000000..3eaeb781aa4 --- /dev/null +++ b/mythtv/libs/libmythtv/rtp/rtptsdatapacket.h @@ -0,0 +1,39 @@ +/* -*- Mode: c++ -*- + * Copyright (c) 2012 Digital Nirvana, Inc. + * Distributed as part of MythTV under GPL v2 and later. + */ + +#ifndef _RTP_TS_DATA_PACKET_H_ +#define _RTP_TS_DATA_PACKET_H_ + +#include +using namespace std; + +#include "rtpdatapacket.h" + +/** \brief RTP Transport Stream Data Packet + * + */ +class RTPTSDataPacket : public RTPDataPacket +{ + public: + RTPTSDataPacket(const RTPDataPacket &o) : RTPDataPacket(o) { } + RTPTSDataPacket(const UDPPacket &o) : RTPDataPacket(o) { } + RTPTSDataPacket(uint64_t key) : RTPDataPacket(key) { } + RTPTSDataPacket(void) : RTPDataPacket(0ULL) { } + + const unsigned char *GetTSData(void) const + { + return reinterpret_cast(m_data.data()) + GetTSOffset(); + } + + unsigned int GetTSDataSize(void) const + { + return max(1328 - (int)GetTSOffset() - (int)GetPaddingSize(), 0); + } + + private: + uint GetTSOffset(void) const { return 0; /* TODO */ } +}; + +#endif // _RTP_TS_DATA_PACKET_H_ diff --git a/mythtv/libs/libmythtv/rtp/udppacket.h b/mythtv/libs/libmythtv/rtp/udppacket.h new file mode 100644 index 00000000000..633af07b860 --- /dev/null +++ b/mythtv/libs/libmythtv/rtp/udppacket.h @@ -0,0 +1,43 @@ +/* -*- Mode: c++ -*- + * Copyright (c) 2012 Digital Nirvana, Inc. + * Distributed as part of MythTV under GPL v2 and later. + */ + +#ifndef _UDP_PACKET_H_ +#define _UDP_PACKET_H_ + +#include + +#include + +/** UDP Packet + * + * The data is stored in a QByteArray which is a reference counted + * shared data container, so an UDPPacket can be assigned to a + * subclass efficiently. + * + */ +class UDPPacket +{ + public: + UDPPacket(const UDPPacket &o) : m_key(o.m_key), m_data(o.m_data) { } + UDPPacket(uint64_t key) : m_key(key) { } + UDPPacket(void) : m_key(0ULL) { } + virtual ~UDPPacket() {} + + /// IsValid() must return true before any data access methods are called, + /// other than GetDataReference() and GetData() + virtual bool IsValid(void) const { return true; } + + uint64_t GetKey(void) const { return m_key; } + + QByteArray &GetDataReference(void) { return m_data; } + QByteArray GetData(void) const { return m_data; } + + protected: + /// Key used to ensure we avoid extra memory allocation in m_data QByteArray + uint64_t m_key; + QByteArray m_data; +}; + +#endif // _UDP_PACKET_H_