From 98b1a775cd53aaa9dca1939d4ffa24de7f1add64 Mon Sep 17 00:00:00 2001 From: Lawrence Rust Date: Tue, 19 Jul 2011 14:49:12 +0200 Subject: [PATCH] MythPlayer: Add support for InteractiveTV streams This patch adds functionality to MythPlayer to enable interactive TV content to select altenative streamed media for display. NB this patch needs to be applied together with that for InteractionChannel streaming. Refs #10019 Signed-off-by: Lawrence Rust Signed-off-by: Stuart Morgan --- mythtv/libs/libmythtv/avformatdecoder.cpp | 3 + mythtv/libs/libmythtv/icringbuffer.cpp | 172 +++++ mythtv/libs/libmythtv/icringbuffer.h | 40 + mythtv/libs/libmythtv/libmythtv.pro | 6 + mythtv/libs/libmythtv/mythplayer.cpp | 182 ++++- mythtv/libs/libmythtv/mythplayer.h | 7 + mythtv/libs/libmythtv/netstream.cpp | 900 ++++++++++++++++++++++ mythtv/libs/libmythtv/netstream.h | 148 ++++ mythtv/libs/libmythtv/ringbuffer.h | 4 + 9 files changed, 1451 insertions(+), 11 deletions(-) create mode 100644 mythtv/libs/libmythtv/icringbuffer.cpp create mode 100644 mythtv/libs/libmythtv/icringbuffer.h create mode 100644 mythtv/libs/libmythtv/netstream.cpp create mode 100644 mythtv/libs/libmythtv/netstream.h diff --git a/mythtv/libs/libmythtv/avformatdecoder.cpp b/mythtv/libs/libmythtv/avformatdecoder.cpp index 9439d394b50..e1192f57e6d 100644 --- a/mythtv/libs/libmythtv/avformatdecoder.cpp +++ b/mythtv/libs/libmythtv/avformatdecoder.cpp @@ -2203,6 +2203,9 @@ int AvFormatDecoder::ScanStreams(bool novideo) } } + if ((uint)ic->bit_rate > bitrate) + bitrate = (uint)ic->bit_rate; + if (bitrate > 0) { bitrate = (bitrate + 999) / 1000; diff --git a/mythtv/libs/libmythtv/icringbuffer.cpp b/mythtv/libs/libmythtv/icringbuffer.cpp new file mode 100644 index 00000000000..d23bc3cc374 --- /dev/null +++ b/mythtv/libs/libmythtv/icringbuffer.cpp @@ -0,0 +1,172 @@ +#include "icringbuffer.h" + +#include // SEEK_SET + +#include +#include + +#include "netstream.h" +#include "mythlogging.h" + + +#define LOC QString("ICRingBuf ") + + +ICRingBuffer::ICRingBuffer(const QString &url, RingBuffer *parent) + : RingBuffer(kRingBufferType), m_stream(0), m_parent(parent) +{ + startreadahead = true; + OpenFile(url); +} + +ICRingBuffer::~ICRingBuffer() +{ + delete m_stream; + delete m_parent; +} + +bool ICRingBuffer::IsOpen(void) const +{ + return m_stream ? m_stream->IsOpen() : false; +} + +bool ICRingBuffer::OpenFile(const QString &url, uint retry_ms) +{ + if (!NetStream::IsSupported(url)) + { + LOG(VB_GENERAL, LOG_ERR, LOC + QString("Unsupported URL %1").arg(url) ); + return false; + } + + QScopedPointer stream(new NetStream(url, NetStream::kNeverCache)); + if (!stream || !stream->IsOpen()) + { + LOG(VB_GENERAL, LOG_ERR, LOC + QString("Failed to open %1").arg(url) ); + return false; + } + + if (!stream->WaitTillReady(30000)) + { + LOG(VB_GENERAL, LOG_ERR, LOC + QString("Stream not ready%1").arg(url) ); + return false; + } + + if (m_parent) + m_parent->Pause(); + + QWriteLocker locker(&rwlock); + + safefilename = url; + filename = url; + + delete m_stream; + m_stream = stream.take(); + + // The initial bitrate needs to be set with consideration for low bit rate + // streams (e.g. radio @ 64Kbps) such that fill_min bytes are received + // in a reasonable time period to enable decoders to peek the first few KB + // to determine type & settings. + rawbitrate = 128; // remotefile + CalcReadAheadThresh(); + + locker.unlock(); + Reset(true, false, true); + + LOG(VB_GENERAL, LOG_INFO, LOC + QString("Opened %1").arg(url)); + return true; +} + +long long ICRingBuffer::GetReadPosition(void) const +{ + return m_stream ? m_stream->GetReadPosition() : 0; +} + +long long ICRingBuffer::Seek(long long pos, int whence, bool has_lock) +{ + if (!m_stream) + return -1; + + // lockForWrite takes priority over lockForRead, so this will + // take priority over the lockForRead in the read ahead thread. + if (!has_lock) + rwlock.lockForWrite(); + + poslock.lockForWrite(); + + long long ret; + + // Optimize no-op seeks + if (readaheadrunning && + ((whence == SEEK_SET && pos == readpos) || + (whence == SEEK_CUR && pos == 0))) + { + ret = readpos; + + poslock.unlock(); + if (!has_lock) + rwlock.unlock(); + + return ret; + } + + switch (whence) + { + case SEEK_SET: + break; + case SEEK_CUR: + pos += m_stream->GetReadPosition(); + break; + case SEEK_END: + pos += m_stream->GetSize(); + break; + default: + errno = EINVAL; + ret = -1; + goto err; + } + + ret = m_stream->Seek(pos); + if (ret >= 0) + { + readpos = ret; + + ignorereadpos = -1; + + if (readaheadrunning) + ResetReadAhead(readpos); + + readAdjust = 0; + } + +err: + poslock.unlock(); + + generalWait.wakeAll(); + + if (!has_lock) + rwlock.unlock(); + + return ret; +} + +int ICRingBuffer::safe_read(void *data, uint sz) +{ + return m_stream ? m_stream->safe_read(data, sz, 1000) : (ateof = true, 0); +} + +long long ICRingBuffer::GetRealFileSize(void) const +{ + return m_stream ? m_stream->GetSize() : -1; +} + +// Take ownership of parent RingBuffer +RingBuffer *ICRingBuffer::Take() +{ + RingBuffer *parent = m_parent; + if (parent && IsOpen()) + parent->Unpause(); + m_parent = 0; + return parent; +} + +// End of file diff --git a/mythtv/libs/libmythtv/icringbuffer.h b/mythtv/libs/libmythtv/icringbuffer.h new file mode 100644 index 00000000000..dc7585d2ecf --- /dev/null +++ b/mythtv/libs/libmythtv/icringbuffer.h @@ -0,0 +1,40 @@ +#ifndef ICRINGBUFFER_H +#define ICRINGBUFFER_H + +#include "ringbuffer.h" + +class NetStream; + +class ICRingBuffer : public RingBuffer +{ + public: + static enum RingBufferType const kRingBufferType = kRingBuffer_MHEG; + + ICRingBuffer(const QString &url, RingBuffer *parent = 0); + virtual ~ICRingBuffer(); + + // RingBuffer implementation + virtual bool IsOpen(void) const; + virtual long long GetReadPosition(void) const; + virtual bool OpenFile(const QString &url, + uint retry_ms = kDefaultOpenTimeout); + virtual long long Seek(long long pos, int whence, bool has_lock); + virtual long long GetRealFileSize(void) const; + virtual bool IsStreamed(void) { return false; } + virtual bool IsSeekingAllowed(void) { return true; } + virtual bool IsBookmarkAllowed(void) { return false; } + + protected: + virtual int safe_read(void *data, uint sz); + + // Operations + public: + // Take ownership of parent RingBuffer + RingBuffer *Take(); + + private: + NetStream *m_stream; + RingBuffer *m_parent; // parent RingBuffer +}; + +#endif // ICRINGBUFFER_H diff --git a/mythtv/libs/libmythtv/libmythtv.pro b/mythtv/libs/libmythtv/libmythtv.pro index c815431f0bb..82aa9988b1a 100644 --- a/mythtv/libs/libmythtv/libmythtv.pro +++ b/mythtv/libs/libmythtv/libmythtv.pro @@ -123,6 +123,7 @@ HEADERS += mythsystemevent.h HEADERS += avfringbuffer.h ThreadedFileWriter.h HEADERS += ringbuffer.h fileringbuffer.h HEADERS += streamingringbuffer.h metadataimagehelper.h +HEADERS += icringbuffer.h SOURCES += recordinginfo.cpp SOURCES += dbcheck.cpp @@ -150,6 +151,7 @@ SOURCES += mythsystemevent.cpp SOURCES += avfringbuffer.cpp ThreadedFileWriter.cpp SOURCES += ringbuffer.cpp fileringBuffer.cpp SOURCES += streamingringbuffer.cpp metadataimagehelper.cpp +SOURCES += icringbuffer.cpp # DiSEqC HEADERS += diseqc.h diseqcsettings.h @@ -420,6 +422,10 @@ using_frontend { SOURCES += dsmcc.cpp dsmcccache.cpp SOURCES += dsmccbiop.cpp dsmccobjcarousel.cpp + # MHEG interaction channel + HEADERS += netstream.h + SOURCES += netstream.cpp + # MHEG/MHI stuff HEADERS += interactivetv.h mhi.h SOURCES += interactivetv.cpp mhi.cpp diff --git a/mythtv/libs/libmythtv/mythplayer.cpp b/mythtv/libs/libmythtv/mythplayer.cpp index 9538e9bc471..d2524682bff 100644 --- a/mythtv/libs/libmythtv/mythplayer.cpp +++ b/mythtv/libs/libmythtv/mythplayer.cpp @@ -60,6 +60,7 @@ using namespace std; #include "mythuiimage.h" #include "mythlogging.h" #include "mythmiscutil.h" +#include "icringbuffer.h" extern "C" { #include "vbitext/vbi.h" @@ -897,7 +898,7 @@ int MythPlayer::OpenFile(uint retries) if (!player_ctx || !player_ctx->buffer) return -1; - livetv = player_ctx->tvchain; + livetv = player_ctx->tvchain && player_ctx->buffer->LiveMode(); if (player_ctx->tvchain && player_ctx->tvchain->GetCardType(player_ctx->tvchain->GetCurPos()) == @@ -2048,6 +2049,7 @@ void MythPlayer::DisplayPauseFrame(void) SetBuffering(false); RefreshPauseFrame(); + PreProcessNormalFrame(); // Allow interactiveTV to draw on pause frame osdLock.lock(); videofiltersLock.lock(); @@ -2462,6 +2464,14 @@ void MythPlayer::SwitchToProgram(void) return; } + if (player_ctx->buffer->GetType() == ICRingBuffer::kRingBufferType) + { + // Restore original ringbuffer + ICRingBuffer *ic = dynamic_cast< ICRingBuffer* >(player_ctx->buffer); + player_ctx->buffer = ic->Take(); + delete ic; + } + player_ctx->buffer->OpenFile( pginfo->GetPlaybackURL(), RingBuffer::kLiveTVOpenTimeout); @@ -2596,6 +2606,14 @@ void MythPlayer::JumpToProgram(void) SendMythSystemPlayEvent("PLAY_CHANGED", pginfo); + if (player_ctx->buffer->GetType() == ICRingBuffer::kRingBufferType) + { + // Restore original ringbuffer + ICRingBuffer *ic = dynamic_cast< ICRingBuffer* >(player_ctx->buffer); + player_ctx->buffer = ic->Take(); + delete ic; + } + player_ctx->buffer->OpenFile( pginfo->GetPlaybackURL(), RingBuffer::kLiveTVOpenTimeout); @@ -2796,6 +2814,16 @@ void MythPlayer::EventLoop(void) JumpToProgram(); } + // Change interactive stream if requested + { QMutexLocker locker(&itvLock); + if (!m_newStream.isEmpty()) + { + QString stream = m_newStream; + m_newStream.clear(); + locker.unlock(); + JumpToStream(stream); + }} + // Disable fastforward if we are too close to the end of the buffer if (ffrew_skip > 1 && (CalcMaxFFTime(100, false) < 100)) { @@ -2832,22 +2860,25 @@ void MythPlayer::EventLoop(void) } // Handle end of file - if (GetEof()) + if (GetEof() && !allpaused) { - if (player_ctx->tvchain) +#if 0 && defined USING_MHEG + if (interactiveTV && interactiveTV->StreamStarted(false)) { - if (!allpaused && player_ctx->tvchain->HasNext()) - { - LOG(VB_GENERAL, LOG_NOTICE, LOC + "LiveTV forcing JumpTo 1"); - player_ctx->tvchain->JumpToNext(true, 1); - return; - } + Pause(); + return; } - else if (!allpaused) +#endif + + if (player_ctx->tvchain && player_ctx->tvchain->HasNext()) { - SetPlaying(false); + LOG(VB_GENERAL, LOG_NOTICE, LOC + "LiveTV forcing JumpTo 1"); + player_ctx->tvchain->JumpToNext(true, 1); return; } + + SetPlaying(false); + return; } // Handle rewind @@ -4875,6 +4906,135 @@ bool MythPlayer::SetVideoByComponentTag(int tag) return false; } +static inline double SafeFPS(DecoderBase *decoder) +{ + if (!decoder) + return 25; + double fps = decoder->GetFPS(); + return fps > 0 ? fps : 25.0; +} + +// Called from MHIContext::Begin/End/Stream on the MHIContext::StartMHEGEngine thread +bool MythPlayer::SetStream(const QString &stream) +{ + // The stream name is empty if the stream is closing + LOG(VB_PLAYBACK, LOG_INFO, LOC + QString("SetStream '%1'").arg(stream)); + + QMutexLocker locker(&itvLock); + m_newStream = stream; + m_newStream.detach(); + // Stream will be changed by JumpToStream called from EventLoop + // If successful will call interactiveTV->StreamStarted(); + + if (stream.isEmpty() && player_ctx->tvchain && + player_ctx->buffer->GetType() == ICRingBuffer::kRingBufferType) + { + // Restore livetv + SetEof(true); + player_ctx->tvchain->JumpToNext(false, 1); + player_ctx->tvchain->JumpToNext(true, 1); + } + + return !stream.isEmpty(); +} + +// Called from EventLoop pn the main application thread +void MythPlayer::JumpToStream(const QString &stream) +{ + LOG(VB_PLAYBACK, LOG_INFO, LOC + "JumpToStream - begin"); + + if (stream.isEmpty()) + return; // Shouldn't happen + + Pause(); + ResetCaptions(); + + ProgramInfo pginfo(stream); + SetPlayingInfo(pginfo); + + if (player_ctx->buffer->GetType() != ICRingBuffer::kRingBufferType) + player_ctx->buffer = new ICRingBuffer(stream, player_ctx->buffer); + else + player_ctx->buffer->OpenFile(stream); + + if (!player_ctx->buffer->IsOpen()) + { + LOG(VB_GENERAL, LOG_ERR, LOC + "JumpToStream buffer OpenFile failed"); + SetEof(true); + SetErrored(QObject::tr("Error opening remote stream buffer")); + return; + } + + watchingrecording = false; + totalLength = 0; + totalFrames = 0; + totalDuration = 0; + + if (OpenFile(120) < 0) // 120 retries ~= 60 seconds + { + LOG(VB_GENERAL, LOG_ERR, LOC + "JumpToStream OpenFile failed."); + SetEof(true); + SetErrored(QObject::tr("Error opening remote stream")); + return; + } + + if (totalLength == 0) + { + long long len = player_ctx->buffer->GetRealFileSize(); + totalLength = (int)(len / ((decoder->GetRawBitrate() * 1000) / 8)); + totalFrames = (int)(totalLength * SafeFPS(decoder)); + } + LOG(VB_PLAYBACK, LOG_INFO, LOC + + QString("JumpToStream length %1 bytes @ %2 Kbps = %3 Secs, %4 frames @ %5 fps") + .arg(player_ctx->buffer->GetRealFileSize()).arg(decoder->GetRawBitrate()) + .arg(totalLength).arg(totalFrames).arg(decoder->GetFPS()) ); + + SetEof(false); + + // the bitrate is reset by player_ctx->buffer->OpenFile()... + player_ctx->buffer->UpdateRawBitrate(decoder->GetRawBitrate()); + decoder->SetProgramInfo(pginfo); + + Play(); + ChangeSpeed(); + + player_ctx->SetPlayerChangingBuffers(false); +#if 0 && defined USING_MHEG + if (interactiveTV) interactiveTV->StreamStarted(); +#endif + + LOG(VB_PLAYBACK, LOG_INFO, LOC + "JumpToStream - end"); +} + +long MythPlayer::GetStreamPos() +{ + return (long)((1000 * GetFramesPlayed()) / SafeFPS(decoder)); +} + +long MythPlayer::GetStreamMaxPos() +{ + long maxpos = (long)(1000 * (totalDuration > 0 ? totalDuration : totalLength)); + long pos = GetStreamPos(); + return maxpos > pos ? maxpos : pos; +} + +long MythPlayer::SetStreamPos(long ms) +{ + uint64_t frameNum = (uint64_t)((ms * SafeFPS(decoder)) / 1000); + LOG(VB_PLAYBACK, LOG_INFO, LOC + QString("SetStreamPos %1 mS = frame %2, now=%3") + .arg(ms).arg(frameNum).arg(GetFramesPlayed()) ); + JumpToFrame(frameNum); + return ms; +} + +void MythPlayer::StreamPlay(bool play) +{ + if (play) + Play(); + else + Pause(); +} + /** \fn MythPlayer::SetDecoder(DecoderBase*) * \brief Sets the stream decoder, deleting any existing recorder. */ diff --git a/mythtv/libs/libmythtv/mythplayer.h b/mythtv/libs/libmythtv/mythplayer.h index 5d86d7a0071..2efbab64445 100644 --- a/mythtv/libs/libmythtv/mythplayer.h +++ b/mythtv/libs/libmythtv/mythplayer.h @@ -292,6 +292,11 @@ class MTV_PUBLIC MythPlayer // Public MHEG/MHI stream selection bool SetAudioByComponentTag(int tag); bool SetVideoByComponentTag(int tag); + bool SetStream(const QString &); + long GetStreamPos(); // mS + long GetStreamMaxPos(); // mS + long SetStreamPos(long); // mS + void StreamPlay(bool play = true); // LiveTV public stuff void CheckTVChain(); @@ -578,6 +583,7 @@ class MTV_PUBLIC MythPlayer // Private LiveTV stuff void SwitchToProgram(void); void JumpToProgram(void); + void JumpToStream(const QString&); protected: PlayerFlags playerFlags; @@ -711,6 +717,7 @@ class MTV_PUBLIC MythPlayer InteractiveTV *interactiveTV; bool itvEnabled; QMutex itvLock; + QString m_newStream; // Guarded by itvLock // OSD stuff OSD *osd; diff --git a/mythtv/libs/libmythtv/netstream.cpp b/mythtv/libs/libmythtv/netstream.cpp new file mode 100644 index 00000000000..151e25f0a23 --- /dev/null +++ b/mythtv/libs/libmythtv/netstream.cpp @@ -0,0 +1,900 @@ +/* Network stream + * Copyright 2011 Lawrence Rust + */ +#include "netstream.h" + +// C/C++ lib +#include +using std::getenv; +#include +#include + +// Qt +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include // qRegisterMetaType +#include +#include + +// Myth +#include "mythlogging.h" +#include "mythcorecontext.h" +#include "mythdirs.h" + + +/* + * Constants + */ +#define LOC "[netstream] " + + +/* + * Private data + */ +static QAtomicInt s_nRequest(1); // Unique NetStream request ID +static QMutex s_mtx; // Guard local static data e.g. NAMThread singleton + + +/* + * Private types + */ +// Custom event posted to NAMThread +class NetStreamRequest : public QEvent +{ +public: + static const QEvent::Type kType = QEvent::User; + + NetStreamRequest(int id, const QNetworkRequest &req) : + QEvent(kType), + m_id(id), + m_req(req), + m_bCancelled(false) + { } + + const int m_id; + const QNetworkRequest m_req; + volatile bool m_bCancelled; +}; + +class NetStreamAbort : public QEvent +{ +public: + static const QEvent::Type kType = static_cast< QEvent::Type >(QEvent::User + 1); + + NetStreamAbort(int id, QNetworkReply *reply) : + QEvent(kType), + m_id(id), + m_reply(reply) + { } + + const int m_id; + QNetworkReply * const m_reply; +}; + + +/** + * Network streaming request + */ +NetStream::NetStream(const QUrl &url, EMode mode /*= kPreferCache*/) : + m_id(s_nRequest.fetchAndAddRelaxed(1)), + m_state(kClosed), + m_pending(0), + m_reply(0), + m_nRedirections(0), + m_size(-1), + m_pos(0) +{ + setObjectName("NetStream " + url.toString()); + + m_request.setAttribute(QNetworkRequest::CacheLoadControlAttribute, + mode == kAlwaysCache ? QNetworkRequest::AlwaysCache : + mode == kPreferCache ? QNetworkRequest::PreferCache : + mode == kNeverCache ? QNetworkRequest::AlwaysNetwork : + QNetworkRequest::PreferNetwork ); + + // Receive requestStarted signals from NAMThread when it processes a NetStreamRequest + connect(&NAMThread::manager(), SIGNAL(requestStarted(int, QNetworkReply*)), + this, SLOT(slotRequestStarted(int, QNetworkReply*)), Qt::DirectConnection ); + + QMutexLocker locker(&m_mutex); + + if (Request(url)) + m_state = kPending; +} + +// virtual +NetStream::~NetStream() +{ + Abort(); + + QMutexLocker locker(&m_mutex); + + if (m_reply) + { + m_reply->disconnect(this); + m_reply->deleteLater(); + } +} + +static inline QString Source(const QNetworkRequest &request) +{ + switch (request.attribute(QNetworkRequest::CacheLoadControlAttribute).toInt()) + { + case QNetworkRequest::AlwaysCache: return "cache"; + case QNetworkRequest::PreferCache: return "cache-preferred"; + case QNetworkRequest::PreferNetwork: return "net-preferred"; + case QNetworkRequest::AlwaysNetwork: return "net"; + } + return "unknown"; +} + +static inline QString Source(const QNetworkReply* reply) +{ + return reply->attribute(QNetworkRequest::SourceIsFromCacheAttribute).toBool() ? + "cache" : "host"; +} + +// Send request to the network manager +// Caller must hold m_mutex +bool NetStream::Request(const QUrl& url) +{ + if (!IsSupported(url)) + { + LOG(VB_GENERAL, LOG_WARNING, LOC + + QString("(%1) Request unsupported URL: %2") + .arg(m_id).arg(url.toString()) ); + return false; + } + + if (m_pending) + { + // Cancel the pending request + m_pending->m_bCancelled = true; + m_pending = 0; + } + + if (m_reply) + { + // Abort the current request + // NB the abort method appears to only work if called from NAMThread + m_reply->disconnect(this); + NAMThread::PostEvent(new NetStreamAbort(m_id, m_reply)); + // NAMthread will delete the reply + m_reply = 0; + } + + m_request.setUrl(url); + + const QByteArray ua("User-Agent"); + if (!m_request.hasRawHeader(ua)) + m_request.setRawHeader(ua, "UK-MHEG/2 MYT001/001 MHGGNU/001"); + + if (m_pos > 0 || m_size >= 0) + m_request.setRawHeader("Range", QString("bytes=%1-").arg(m_pos).toAscii()); + +#ifndef QT_NO_OPENSSL +#if 1 // The BBC use a self certified cert so don't verify it + if (m_request.url().scheme() == "https") + { + // TODO use cert from carousel auth.tls. + QSslConfiguration ssl(QSslConfiguration::defaultConfiguration()); + ssl.setPeerVerifyMode(QSslSocket::VerifyNone); + m_request.setSslConfiguration(ssl); + } +#endif +#endif + + LOG(VB_FILE, LOG_INFO, LOC + QString("(%1) Requesting %2 from %3") + .arg(m_id).arg(m_request.url().toString()).arg(Source(m_request)) ); + m_pending = new NetStreamRequest(m_id, m_request); + NAMThread::PostEvent(m_pending); + return true; +} + +// signal from NAMThread manager that a request has been started +void NetStream::slotRequestStarted(int id, QNetworkReply *reply) +{ + QMutexLocker locker(&m_mutex); + + if (m_id != id) + return; + + m_pending = 0; // Event is no longer valid + + if (!m_reply) + { + LOG(VB_FILE, LOG_DEBUG, LOC + QString("(%1) Started %2-").arg(m_id).arg(m_pos) ); + + m_reply = reply; + m_state = kStarted; + + reply->setReadBufferSize(4*1024*1024L); // 0= unlimited, 1MB => 4secs @ 1.5Mbps + + // NB The following signals must be Qt::DirectConnection 'cos this slot + // was connected Qt::DirectConnection so the current thread is NAMThread + + // QNetworkReply signals + connect(reply, SIGNAL(finished()), this, SLOT(slotFinished()), Qt::DirectConnection ); +#ifndef QT_NO_OPENSSL + connect(reply, SIGNAL(sslErrors(const QList &)), this, + SLOT(slotSslErrors(const QList &)), Qt::DirectConnection ); +#endif + // QIODevice signals + connect(reply, SIGNAL(readyRead()), this, SLOT(slotReadyRead()), Qt::DirectConnection ); + } + else + LOG(VB_GENERAL, LOG_ERR, LOC + + QString("(%1) Started but m_reply not NULL").arg(m_id)); +} + +static qlonglong inline ContentLength(const QNetworkReply *reply) +{ + bool ok; + qlonglong len = reply->header(QNetworkRequest::ContentLengthHeader) + .toLongLong(&ok); + return ok ? len : -1; +} + +static qlonglong inline ContentRange(const QNetworkReply *reply, + qlonglong &first, qlonglong &last) +{ + first = last = -1; + + QByteArray range = reply->rawHeader("Content-Range"); + if (range.isEmpty()) + return -1; + + // See RFC 2616 14.16: 'bytes begin-end/size' + qlonglong len; + if (3 != std::sscanf(range.constData(), " bytes %lld - %lld / %lld", &first, &last, &len)) + { + LOG(VB_GENERAL, LOG_ERR, LOC + QString("Invalid Content-Range:'%1'") + .arg(range.constData()) ); + return -1; + } + + return len; +} + +static bool inline RequestRange(const QNetworkRequest &request, + qlonglong &first, qlonglong &last) +{ + first = last = -1; + + QByteArray range = request.rawHeader("Range"); + if (range.isEmpty()) + return false; + + if (1 > std::sscanf(range.constData(), " bytes %lld - %lld", &first, &last)) + { + LOG(VB_GENERAL, LOG_ERR, LOC + QString("Invalid Range:'%1'") + .arg(range.constData()) ); + return false; + } + + return true; +} + +// signal from QNetworkReply +void NetStream::slotReadyRead() +{ + QMutexLocker locker(&m_mutex); + + if (m_reply) + { + LOG(VB_FILE, LOG_DEBUG, LOC + QString("(%1) Ready %2 bytes") + .arg(m_id).arg(m_reply->bytesAvailable()) ); + + if (m_size < 0) + { + qlonglong first, last, len = ContentRange(m_reply, first, last); + if (len >= 0) + { + m_size = len; + LOG(VB_FILE, LOG_INFO, LOC + QString("(%1) range %2-%3/%4") + .arg(m_id).arg(first).arg(last).arg(len) ); + } + else + { + m_size = ContentLength(m_reply); + LOG(VB_FILE, LOG_INFO, LOC + QString("(%1) content length %2") + .arg(m_id).arg(m_size) ); + } + } + + if (m_state < kReady) + m_state = kReady; + + locker.unlock(); + emit ReadyRead(this); + locker.relock(); + + m_ready.wakeAll(); + } + else + LOG(VB_GENERAL, LOG_ERR, LOC + + QString("(%1) ReadyRead but m_reply = NULL").arg(m_id)); +} + +// signal from QNetworkReply +void NetStream::slotFinished() +{ + QMutexLocker locker(&m_mutex); + + if (m_reply) + { + QNetworkReply::NetworkError error = m_reply->error(); + if (QNetworkReply::NoError == error) + { + // Check for a re-direct + QUrl url = m_reply->attribute( + QNetworkRequest::RedirectionTargetAttribute).toUrl(); + if (!url.isValid()) + { + m_state = kFinished; + } + else if (m_nRedirections++ > 0) + { + LOG(VB_FILE, LOG_WARNING, LOC + QString("(%1) Too many redirections") + .arg(m_id)); + m_state = kFinished; + } + else if ((url = m_request.url().resolved(url)) == m_request.url()) + { + LOG(VB_FILE, LOG_WARNING, LOC + QString("(%1) Redirection loop to %2") + .arg(m_id).arg(url.toString()) ); + m_state = kFinished; + } + else + { + LOG(VB_FILE, LOG_INFO, LOC + QString("(%1) Redirecting").arg(m_id)); + m_state = Request(url) ? kPending : kFinished; + } + } + else + { + LOG(VB_FILE, LOG_WARNING, LOC + QString("(%1): %2") + .arg(m_id).arg(m_reply->errorString()) ); + m_state = kFinished; + } + + if (m_state == kFinished) + { + LOG(VB_FILE, LOG_INFO, LOC + QString("(%1) Finished %2/%3 bytes from %4") + .arg(m_id).arg(m_pos).arg(m_size).arg(Source(m_reply)) ); + + locker.unlock(); + emit Finished(this); + locker.relock(); + + m_finished.wakeAll(); + } + } + else + LOG(VB_GENERAL, LOG_ERR, LOC + QString("(%1) Finished but m_reply = NULL") + .arg(m_id)); +} + +#ifndef QT_NO_OPENSSL +// signal from QNetworkReply +void NetStream::slotSslErrors(const QList &errors) +{ + QMutexLocker locker(&m_mutex); + + if (m_reply) + { + bool bIgnore = true; + Q_FOREACH(const QSslError &e, errors) + { + LOG(VB_FILE, LOG_INFO, LOC + QString("(%1) SSL error %2: ") + .arg(m_id).arg(e.error()) + e.errorString() ); + switch (e.error()) + { +#if 1 // The BBC use a self certified cert + case QSslError::SelfSignedCertificateInChain: + break; +#endif + default: + bIgnore = false; + break; + } + } + + if (bIgnore) + { + LOG(VB_FILE, LOG_INFO, LOC + QString("(%1) SSL errors ignored").arg(m_id)); + m_reply->ignoreSslErrors(errors); + } + } + else + LOG(VB_GENERAL, LOG_ERR, LOC + + QString("(%1) SSL error but m_reply = NULL").arg(m_id) ); +} +#endif + + +/** + * RingBuffer interface + */ +// static +bool NetStream::IsSupported(const QUrl &url) +{ + return url.isValid() && + (url.scheme() == "http" || url.scheme() == "https") && + !url.authority().isEmpty() && + !url.path().isEmpty(); +} + +bool NetStream::IsOpen() const +{ + QMutexLocker locker(&m_mutex); + return m_state > kClosed; +} + +void NetStream::Abort() +{ + QMutexLocker locker(&m_mutex); + + if (m_pending) + { + LOG(VB_FILE, LOG_INFO, LOC + QString("(%1) Cancelled").arg(m_id) ); + m_pending->m_bCancelled = true; + m_pending = 0; + } + + if (m_reply && m_reply->isRunning()) + { + LOG(VB_FILE, LOG_INFO, LOC + QString("(%1) Abort").arg(m_id) ); + NAMThread::PostEvent(new NetStreamAbort(m_id, m_reply)); + // NAMthread will delete the reply + m_reply = 0; + } + + m_state = kFinished; +} + +int NetStream::safe_read(void *data, unsigned sz, unsigned millisecs /* = 0 */) +{ + QTime t; t.start(); + QMutexLocker locker(&m_mutex); + + if (m_size >= 0 && m_pos >= m_size) + return 0; // EOF + + while (m_state < kFinished && (!m_reply || m_reply->bytesAvailable() < sz)) + { + unsigned elapsed = t.elapsed(); + if (elapsed >= millisecs) + break; + m_ready.wait(&m_mutex, millisecs - elapsed); + } + + if (!m_reply) + return -1; + + qint64 avail = m_reply->read(reinterpret_cast< char* >(data), sz); + if (avail <= 0) + return m_state >= kFinished ? 0 : -1; // 0= EOF + + LOG(VB_FILE, LOG_DEBUG, LOC + QString("(%1) safe_read @ %4 => %2/%3, %5 mS") + .arg(m_id).arg(avail).arg(sz).arg(m_pos).arg(t.elapsed()) ); + m_pos += avail; + return (int)avail; +} + +qlonglong NetStream::Seek(qlonglong pos) +{ + QMutexLocker locker(&m_mutex); + + if (pos == m_pos) + return pos; + + if (pos < 0 || (m_size >= 0 && pos > m_size)) + { + LOG(VB_GENERAL, LOG_ERR, LOC + + QString("(%1) Seek(%2) out of range [0..%3]") + .arg(m_id).arg(pos).arg(m_size) ); + return -1; + } + + LOG(VB_FILE, LOG_INFO, LOC + QString("(%1) Seek(%2) curr %3 end %4") + .arg(m_id).arg(pos).arg(m_pos).arg(m_size) ); + m_pos = pos; + return Request(m_request.url()) ? m_pos : -1; +} + +qlonglong NetStream::GetReadPosition() const +{ + QMutexLocker locker(&m_mutex); + + return m_pos; +} + +qlonglong NetStream::GetSize() const +{ + QMutexLocker locker(&m_mutex); + + return m_size; +} + + +/** + * Synchronous interface + */ +bool NetStream::WaitTillReady(unsigned long time) +{ + QMutexLocker locker(&m_mutex); + + QTime t; t.start(); + while (m_state < kReady) + { + unsigned elapsed = t.elapsed(); + if (elapsed > time) + return false; + + m_ready.wait(&m_mutex, time - elapsed); + } + + return true; +} + +bool NetStream::WaitTillFinished(unsigned long time) +{ + QMutexLocker locker(&m_mutex); + + QTime t; t.start(); + while (m_state < kFinished) + { + unsigned elapsed = t.elapsed(); + if (elapsed > time) + return false; + + m_finished.wait(&m_mutex, time - elapsed); + } + + return true; +} + +QNetworkReply::NetworkError NetStream::GetError() const +{ + QMutexLocker locker(&m_mutex); + return !m_reply ? QNetworkReply::OperationCanceledError : m_reply->error(); +} + +QString NetStream::GetErrorString() const +{ + QMutexLocker locker(&m_mutex); + return !m_reply ? "Operation cancelled" : m_reply->errorString(); +} + +qlonglong NetStream::BytesAvailable() const +{ + QMutexLocker locker(&m_mutex); + return m_reply ? m_reply->bytesAvailable() : 0; +} + +QByteArray NetStream::ReadAll() +{ + QMutexLocker locker(&m_mutex); + + if (!m_reply) + return 0; + + QByteArray data = m_reply->readAll(); + m_pos += data.size(); + return data; +} + +/** + * Asynchronous interface + */ +bool NetStream::isStarted() const +{ + QMutexLocker locker(&m_mutex); + return m_state >= kStarted; +} + +bool NetStream::isReady() const +{ + QMutexLocker locker(&m_mutex); + return m_state >= kReady; +} + +bool NetStream::isFinished() const +{ + QMutexLocker locker(&m_mutex); + return m_state >= kFinished; +} + +/** + * Public helpers + */ +// static +bool NetStream::isAvailable() +{ + return NAMThread::isAvailable(); +} + +// Time when URI was last written to cache or invalid if not cached. +// static +QDateTime NetStream::GetLastModified(const QString &url) +{ + return NAMThread::GetLastModified(url); +} + + +/** + * NetworkAccessManager event loop thread + */ +//static +NAMThread & NAMThread::manager() +{ + QMutexLocker locker(&s_mtx); + + // Singleton + static NAMThread thread; + thread.start(); + return thread; +} + +NAMThread::NAMThread() : m_bQuit(false), m_nam(0) +{ + setObjectName("NAMThread"); + +#ifndef QT_NO_OPENSSL + // This ought to be done by the Qt lib but isn't in 4.7 + //Q_DECLARE_METATYPE(QList) + qRegisterMetaType< QList >(); +#endif +} + +// virtual +NAMThread::~NAMThread() +{ + QMutexLocker locker(&m_mutex); + delete m_nam; +} + +// virtual +void NAMThread::run() +{ + LOG(VB_MHEG, LOG_INFO, LOC "NAMThread starting"); + + m_nam = new QNetworkAccessManager(); + m_nam->setObjectName("NetStream NAM"); + + // Setup cache + QScopedPointer cache(new QNetworkDiskCache()); + cache->setCacheDirectory( + QDesktopServices::storageLocation(QDesktopServices::CacheLocation) ); + m_nam->setCache(cache.take()); + + // Setup a network proxy e.g. for TOR: socks://localhost:9050 + // TODO get this from mythdb + QString proxy(getenv("HTTP_PROXY")); + if (!proxy.isEmpty()) + { + QUrl url(proxy, QUrl::TolerantMode); + QNetworkProxy::ProxyType type = + url.scheme().isEmpty() ? QNetworkProxy::HttpProxy : + url.scheme() == "socks" ? QNetworkProxy::Socks5Proxy : + url.scheme() == "http" ? QNetworkProxy::HttpProxy : + url.scheme() == "https" ? QNetworkProxy::HttpProxy : + url.scheme() == "cache" ? QNetworkProxy::HttpCachingProxy : + url.scheme() == "ftp" ? QNetworkProxy::FtpCachingProxy : + QNetworkProxy::NoProxy; + if (QNetworkProxy::NoProxy != type) + { + LOG(VB_MHEG, LOG_INFO, LOC "Using proxy: " + proxy); + m_nam->setProxy(QNetworkProxy( + type, url.host(), url.port(), url.userName(), url.password() )); + } + else + { + LOG(VB_MHEG, LOG_ERR, LOC + QString("Unknown proxy type %1") + .arg(url.scheme()) ); + } + } + + // Quit when main app quits + connect(QCoreApplication::instance(), SIGNAL(aboutToQuit()), this, SLOT(quit()) ); + + m_running.release(); + + while(!m_bQuit) + { + // Process NAM events + QCoreApplication::processEvents(); + + QMutexLocker locker(&m_mutex); + m_work.wait(&m_mutex, 100); + while (!m_workQ.isEmpty()) + { + QScopedPointer< QEvent > ev(m_workQ.dequeue()); + locker.unlock(); + NewRequest(ev.data()); + } + } + + m_running.acquire(); + + delete m_nam; + m_nam = 0; + + LOG(VB_MHEG, LOG_INFO, LOC "NAMThread stopped"); +} + +// slot +void NAMThread::quit() +{ + m_bQuit = true; + QThread::quit(); +} + +// static +void NAMThread::PostEvent(QEvent *event) +{ + NAMThread &m = manager(); + QMutexLocker locker(&m.m_mutex); + m.m_workQ.enqueue(event); +} + +bool NAMThread::NewRequest(QEvent *event) +{ + switch (event->type()) + { + case NetStreamRequest::kType: + return StartRequest(dynamic_cast< NetStreamRequest* >(event)); + case NetStreamAbort::kType: + return AbortRequest(dynamic_cast< NetStreamAbort* >(event)); + default: + break; + } + return false; +} + +bool NAMThread::StartRequest(NetStreamRequest *p) +{ + if (!p) + { + LOG(VB_GENERAL, LOG_ERR, LOC "Invalid NetStreamRequest"); + return false; + } + + if (!p->m_bCancelled) + { + LOG(VB_FILE, LOG_DEBUG, LOC + QString("(%1) StartRequest").arg(p->m_id) ); + QNetworkReply *reply = m_nam->get(p->m_req); + emit requestStarted(p->m_id, reply); + } + else + LOG(VB_FILE, LOG_INFO, LOC + QString("(%1) NetStreamRequest cancelled").arg(p->m_id) ); + return true; +} + +bool NAMThread::AbortRequest(NetStreamAbort *p) +{ + if (!p) + { + LOG(VB_GENERAL, LOG_ERR, LOC "Invalid NetStreamAbort"); + return false; + } + + LOG(VB_FILE, LOG_INFO, LOC + QString("(%1) AbortRequest").arg(p->m_id) ); + p->m_reply->abort(); + p->m_reply->disconnect(); + delete p->m_reply; + return true; +} + +// static +bool NAMThread::isAvailable() +{ + NAMThread &m = manager(); + + if (!m.m_running.tryAcquire(1, 3000)) + return false; + + m.m_running.release(); + + QMutexLocker locker(&m.m_mutex); + + if (!m.m_nam) + return false; + + switch (m.m_nam->networkAccessible()) + { + case QNetworkAccessManager::Accessible: return true; + case QNetworkAccessManager::NotAccessible: return false; + case QNetworkAccessManager::UnknownAccessibility: return true; + } + return false; +} + +// Time when URI was last written to cache or invalid if not cached. +// static +QDateTime NAMThread::GetLastModified(const QString &url) +{ + NAMThread &m = manager(); + + QMutexLocker locker(&m.m_mutex); + + if (!m.m_nam) + return QDateTime(); // Invalid + + QAbstractNetworkCache *cache = m.m_nam->cache(); + if (!cache) + return QDateTime(); // Invalid + + QNetworkCacheMetaData meta = cache->metaData(QUrl(url)); + if (!meta.isValid()) + { + LOG(VB_FILE, LOG_DEBUG, LOC + QString("GetLastModified('%1') not in cache") + .arg(url)); + return QDateTime(); // Invalid + } + + // Check if expired + QDateTime const now(QDateTime::currentDateTime()); // local time + QDateTime expire = meta.expirationDate(); + if (expire.isValid() && expire.toLocalTime() < now) + { + LOG(VB_FILE, LOG_INFO, LOC + QString("GetLastModified('%1') past expiration %2") + .arg(url).arg(expire.toString())); + return QDateTime(); // Invalid + } + + // Get time URI was modified (Last-Modified header) NB this may be invalid + QDateTime lastMod = meta.lastModified(); + + QNetworkCacheMetaData::RawHeaderList headers = meta.rawHeaders(); + Q_FOREACH(const QNetworkCacheMetaData::RawHeader &h, headers) + { + // RFC 1123 date format: Thu, 01 Dec 1994 16:00:00 GMT + static const char kszFormat[] = "ddd, dd MMM yyyy HH:mm:ss 'GMT'"; + + QString const first(h.first.toLower()); + if (first == "cache-control") + { + QString const second(h.second.toLower()); + if (second == "no-cache" || second == "no-store") + { + LOG(VB_FILE, LOG_INFO, LOC + + QString("GetLastModified('%1') Cache-Control disabled").arg(url)); + cache->remove(QUrl(url)); + return QDateTime(); // Invalid + } + } + else if (first == "date") + { + QDateTime d = QDateTime::fromString(h.second, kszFormat); + if (!d.isValid()) + { + LOG(VB_GENERAL, LOG_WARNING, LOC + + QString("GetLastModified invalid Date header '%1'") + .arg(h.second.constData())); + continue; + } + d.setTimeSpec(Qt::UTC); + lastMod = d; + } + } + + LOG(VB_FILE, LOG_DEBUG, LOC + QString("GetLastModified('%1') last modified %2") + .arg(url).arg(lastMod.toString())); + return lastMod; +} + +/* End of file */ diff --git a/mythtv/libs/libmythtv/netstream.h b/mythtv/libs/libmythtv/netstream.h new file mode 100644 index 00000000000..b505f226f79 --- /dev/null +++ b/mythtv/libs/libmythtv/netstream.h @@ -0,0 +1,148 @@ +/* Network stream + * Copyright 2011 Lawrence Rust + */ +#ifndef NETSTREAM_H +#define NETSTREAM_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +class QUrl; +class QNetworkAccessManager; +class NetStreamRequest; +class NetStreamAbort; + + +/** + * Stream content from a URI + */ +class NetStream : public QObject +{ + Q_OBJECT + Q_DISABLE_COPY(NetStream) + +public: + enum EMode { kNeverCache, kPreferCache, kAlwaysCache }; + NetStream(const QUrl &, EMode mode = kPreferCache); + virtual ~NetStream(); + +public: + // RingBuffer interface + static bool IsSupported(const QUrl &); + bool IsOpen() const; + void Abort(); + int safe_read(void *data, unsigned size, unsigned millisecs = 0); + qlonglong Seek(qlonglong); + qlonglong GetReadPosition() const; + qlonglong GetSize() const; + + // Properties + QUrl Url() const { return m_request.url(); } + + // Synchronous interface + bool WaitTillReady(unsigned long millisecs); + bool WaitTillFinished(unsigned long millisecs); + QNetworkReply::NetworkError GetError() const; + QString GetErrorString() const; + qlonglong BytesAvailable() const; + QByteArray ReadAll(); + + // Async interface + bool isStarted() const; + bool isReady() const; + bool isFinished() const; + +signals: + void ReadyRead(QObject*); + void Finished(QObject*); + +public: + // Time when a URI was last written to cache or invalid if not cached. + static QDateTime GetLastModified(const QString &url); + // Is the network accessible + static bool isAvailable(); + + // Implementation +private slots: + // NAMThread signals + void slotRequestStarted(int, QNetworkReply *); + // QNetworkReply signals + void slotFinished(); +#ifndef QT_NO_OPENSSL + void slotSslErrors(const QList & errors); +#endif + // QIODevice signals + void slotReadyRead(); + +private: + bool Request(const QUrl &); + + const int m_id; // Unique request ID + + mutable QMutex m_mutex; // Protects r/w access to the following data + QNetworkRequest m_request; + enum { kClosed, kPending, kStarted, kReady, kFinished } m_state; + NetStreamRequest* m_pending; + QNetworkReply* m_reply; + int m_nRedirections; + qlonglong m_size; + qlonglong m_pos; + QWaitCondition m_ready; + QWaitCondition m_finished; +}; + + +/** + * Thread to process NetStream requests + */ +class NAMThread : public QThread +{ + Q_OBJECT + Q_DISABLE_COPY(NAMThread) + + // Use manager() to create + NAMThread(); + +public: + static NAMThread & manager(); // Singleton + virtual ~NAMThread(); + + static void PostEvent(QEvent *); + + static bool isAvailable(); // is network usable + static QDateTime GetLastModified(const QString &url); + +signals: + void requestStarted(int, QNetworkReply *); + + // Implementation +protected: + virtual void run(); // QThread override + bool NewRequest(QEvent *); + bool StartRequest(NetStreamRequest *); + bool AbortRequest(NetStreamAbort *); + +private slots: + void quit(); + +private: + volatile bool m_bQuit; + QSemaphore m_running; + mutable QMutex m_mutex; // Protects r/w access to the following data + QNetworkAccessManager *m_nam; + QQueue< QEvent * > m_workQ; + QWaitCondition m_work; +}; + +#endif /* ndef NETSTREAM_H */ diff --git a/mythtv/libs/libmythtv/ringbuffer.h b/mythtv/libs/libmythtv/ringbuffer.h index f208aeda908..7c7d2764c01 100644 --- a/mythtv/libs/libmythtv/ringbuffer.h +++ b/mythtv/libs/libmythtv/ringbuffer.h @@ -39,10 +39,13 @@ enum RingBufferType kRingBuffer_BD, kRingBuffer_HTTP, kRingBuffer_HLS, + kRingBuffer_MHEG }; class MTV_PUBLIC RingBuffer : protected MThread { + friend class ICRingBuffer; + public: static RingBuffer *Create(const QString &lfilename, bool write, bool usereadahead = true, @@ -85,6 +88,7 @@ class MTV_PUBLIC RingBuffer : protected MThread virtual bool IsBookmarkAllowed(void) { return true; } virtual int BestBufferSize(void) { return 32768; } static QString BitrateToString(uint64_t rate, bool hz = false); + RingBufferType GetType() const { return type; } // DVD and bluray methods bool IsDisc(void) const { return IsDVD() || IsBD(); }