Skip to content

Commit

Permalink
MHEG: Fixes for InteractionChannel streaming from network URI's
Browse files Browse the repository at this point in the history
This patch includes fixes for:
- Prevent memory leak on stream close
- Improve logging of streams with no content length header
- Add a mutex to prevent concurrent access to QNetworkAccessManager

Signed-off-by: Lawrence Rust <lvr@softsystem.co.uk>
Signed-off-by: Stuart Morgan <smorgan@mythtv.org>
(cherry picked from commit 73c810b)
  • Loading branch information
Lawrence Rust authored and stuartm committed Sep 7, 2013
1 parent fd308a7 commit ea77f42
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 35 deletions.
20 changes: 12 additions & 8 deletions mythtv/libs/libmythtv/mhi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -431,10 +431,13 @@ bool MHIContext::GetCarouselData(QString objectPath, QByteArray &result)
// same thread this is safe. Otherwise we need to make a deep copy of
// the result.

QMutexLocker locker(&m_runLock);
bool bReported = false;
QTime t; t.start();
while (!m_stop)
{
locker.unlock();

if (isIC)
{
switch (m_ic.GetFile(objectPath, result, cert))
Expand Down Expand Up @@ -481,7 +484,9 @@ bool MHIContext::GetCarouselData(QString objectPath, QByteArray &result)
// some more packets. We should eventually find out if this item is
// present.
ProcessDSMCCQueue();
m_engine_wait.wait(&m_runLock, 300);

locker.relock();
m_engine_wait.wait(locker.mutex(), 300);
}
return false; // Stop has been set. Say the object isn't present.
}
Expand Down Expand Up @@ -588,10 +593,9 @@ bool MHIContext::OfferKey(QString key)
.arg(key).arg(action).arg(m_keyQueue.size()) );
{ QMutexLocker locker(&m_keyLock);
m_keyQueue.enqueue(action);}
QMutexLocker locker2(&m_runLock);
m_engine_wait.wakeAll();
// Accept the key except 'exit' (16) in 'always available' (3) state.
// This allows re-use of Esc as TEXTEXIT for RC's with a single backup button
return action != 16 || m_keyProfile != 3;
return true;
}

// Called from MythPlayer::VideoStart and MythPlayer::ReinitOSD
Expand Down Expand Up @@ -993,7 +997,7 @@ void MHIContext::EndStream()
// Callback from MythPlayer when a stream starts or stops
bool MHIContext::StreamStarted(bool bStarted)
{
if (!m_notify)
if (!m_engine || !m_notify)
return false;

LOG(VB_MHEG, LOG_INFO, QString("[mhi] Stream 0x%1 %2")
Expand All @@ -1019,7 +1023,7 @@ bool MHIContext::BeginAudio(int tag)
return m_parent->GetNVP()->SetAudioByComponentTag(tag);
return false;
}

// Stop playing audio
void MHIContext::StopAudio()
{
Expand All @@ -1033,13 +1037,13 @@ bool MHIContext::BeginVideo(int tag)

if (tag < 0)
return true; // Leave it at the default.

m_videoTag = tag;
if (m_parent->GetNVP())
return m_parent->GetNVP()->SetVideoByComponentTag(tag);
return false;
}

// Stop displaying video
void MHIContext::StopVideo()
{
Expand Down
3 changes: 2 additions & 1 deletion mythtv/libs/libmythtv/mythplayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,8 @@ int MythPlayer::OpenFile(uint retries)
MythTimer peekTimer; peekTimer.start();
while (player_ctx->buffer->Peek(testbuf, testreadsize) != testreadsize)
{
if (peekTimer.elapsed() > 1000 || bigTimer.elapsed() > timeout)
// NB need to allow for streams encountering network congestion
if (peekTimer.elapsed() > 5000 || bigTimer.elapsed() > timeout)
{
LOG(VB_GENERAL, LOG_ERR, LOC +
QString("OpenFile(): Could not read first %1 bytes of '%2'")
Expand Down
72 changes: 48 additions & 24 deletions mythtv/libs/libmythtv/netstream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ using std::getenv;
*/
static QAtomicInt s_nRequest(1); // Unique NetStream request ID
static QMutex s_mtx; // Guard local static data e.g. NAMThread singleton
const qint64 kMaxBuffer = 4 * 1024 * 1024L; // 0= unlimited, 1MB => 4secs @ 1.5Mbps


/*
Expand Down Expand Up @@ -175,7 +176,7 @@ bool NetStream::Request(const QUrl& url)

if (m_reply)
{
// Abort the current request
// Abort the current reply
// NB the abort method appears to only work if called from NAMThread
m_reply->disconnect(this);
NAMThread::PostEvent(new NetStreamAbort(m_id, m_reply));
Expand Down Expand Up @@ -259,8 +260,9 @@ bool NetStream::Request(const QUrl& url)
}
#endif

LOG(VB_FILE, LOG_INFO, LOC + QString("(%1) Requesting %2 from %3")
.arg(m_id).arg(m_request.url().toString()).arg(Source(m_request)) );
LOG(VB_FILE, LOG_INFO, LOC + QString("(%1) Request %2 bytes=%3- from %4")
.arg(m_id).arg(m_request.url().toString())
.arg(m_pos).arg(Source(m_request)) );
m_pending = new NetStreamRequest(m_id, m_request);
NAMThread::PostEvent(m_pending);
return true;
Expand All @@ -278,12 +280,13 @@ void NetStream::slotRequestStarted(int id, QNetworkReply *reply)

if (!m_reply)
{
LOG(VB_FILE, LOG_DEBUG, LOC + QString("(%1) Started %2-").arg(m_id).arg(m_pos) );
LOG(VB_FILE, LOG_INFO, LOC + QString("(%1) Started 0x%2")
.arg(m_id).arg(quintptr(reply),0,16) );

m_reply = reply;
m_state = kStarted;

reply->setReadBufferSize(4*1024*1024L); // 0= unlimited, 1MB => 4secs @ 1.5Mbps
reply->setReadBufferSize(kMaxBuffer);

// NB The following signals must be Qt::DirectConnection 'cos this slot
// was connected Qt::DirectConnection so the current thread is NAMThread
Expand Down Expand Up @@ -357,23 +360,28 @@ void NetStream::slotReadyRead()

if (m_reply)
{
LOG(VB_FILE, LOG_DEBUG, LOC + QString("(%1) Ready %2 bytes")
.arg(m_id).arg(m_reply->bytesAvailable()) );
qint64 avail = m_reply->bytesAvailable();
LOG(VB_FILE, (avail <= 2 * kMaxBuffer) ? LOG_DEBUG :
(avail <= 4 * kMaxBuffer) ? LOG_INFO : LOG_WARNING,
LOC + QString("(%1) Ready 0x%2, %3 bytes available").arg(m_id)
.arg(quintptr(m_reply),0,16).arg(avail) );

if (m_size < 0)
if (m_size < 0 || m_state < kReady)
{
qlonglong first, last, len = ContentRange(m_reply, first, last);
if (len >= 0)
{
m_size = len;
LOG(VB_FILE, LOG_INFO, LOC + QString("(%1) range %2-%3/%4")
.arg(m_id).arg(first).arg(last).arg(len) );
LOG(VB_FILE, LOG_INFO, LOC + QString("(%1) Ready 0x%2, range %3-%4/%5")
.arg(m_id).arg(quintptr(m_reply),0,16).arg(first).arg(last).arg(len) );
}
else
{
m_size = ContentLength(m_reply);
LOG(VB_FILE, LOG_INFO, LOC + QString("(%1) content length %2")
.arg(m_id).arg(m_size) );
if (m_state < kReady || m_size >= 0)
LOG(VB_FILE, LOG_INFO, LOC +
QString("(%1) Ready 0x%2, content length %3")
.arg(m_id).arg(quintptr(m_reply),0,16).arg(m_size) );
}
}

Expand Down Expand Up @@ -435,8 +443,11 @@ void NetStream::slotFinished()

if (m_state == kFinished)
{
LOG(VB_FILE, LOG_INFO, LOC + QString("(%1) Finished %2/%3 bytes from %4")
.arg(m_id).arg(m_pos).arg(m_size).arg(Source(m_reply)) );
if (m_size < 0)
m_size = m_pos + m_reply->size();

LOG(VB_FILE, LOG_INFO, LOC + QString("(%1) Finished 0x%2 %3/%4 bytes from %5")
.arg(m_id).arg(quintptr(m_reply),0,16).arg(m_pos).arg(m_size).arg(Source(m_reply)) );

locker.unlock();
emit Finished(this);
Expand Down Expand Up @@ -517,9 +528,12 @@ void NetStream::Abort()
m_pending = 0;
}

if (m_reply && m_reply->isRunning())
if (m_reply)
{
LOG(VB_FILE, LOG_INFO, LOC + QString("(%1) Abort").arg(m_id) );
if (m_state >= kStarted && m_state < kFinished)
LOG(VB_FILE, LOG_INFO, LOC + QString("(%1) Abort 0x%2")
.arg(m_id).arg(quintptr(m_reply),0,16) );

NAMThread::PostEvent(new NetStreamAbort(m_id, m_reply));
// NAMthread will delete the reply
m_reply = 0;
Expand All @@ -544,6 +558,9 @@ int NetStream::safe_read(void *data, unsigned sz, unsigned millisecs /* = 0 */)
m_ready.wait(&m_mutex, millisecs - elapsed);
}

locker.unlock();
QMutexLocker lockNAM(NAMThread::GetMutex());
locker.relock();
if (!m_reply)
return -1;

Expand Down Expand Up @@ -712,7 +729,7 @@ NAMThread & NAMThread::manager()
return thread;
}

NAMThread::NAMThread() : m_bQuit(false), m_nam(0)
NAMThread::NAMThread() : m_bQuit(false), m_mutexNAM(QMutex::Recursive), m_nam(0)
{
setObjectName("NAMThread");

Expand Down Expand Up @@ -775,18 +792,25 @@ void NAMThread::run()

m_running.release();

QMutexLocker lockNAM(&m_mutexNAM);
while(!m_bQuit)
{
// Process NAM events
QCoreApplication::processEvents();

lockNAM.unlock();

QMutexLocker locker(&m_mutex);
m_work.wait(&m_mutex, 100);

lockNAM.relock();

while (!m_workQ.isEmpty())
{
QScopedPointer< QEvent > ev(m_workQ.dequeue());
locker.unlock();
NewRequest(ev.data());
locker.relock();
}
}

Expand All @@ -805,12 +829,10 @@ void NAMThread::quit()
QThread::quit();
}

// static
void NAMThread::PostEvent(QEvent *event)
void NAMThread::Post(QEvent *event)
{
NAMThread &m = manager();
QMutexLocker locker(&m.m_mutex);
m.m_workQ.enqueue(event);
QMutexLocker locker(&m_mutex);
m_workQ.enqueue(event);
}

bool NAMThread::NewRequest(QEvent *event)
Expand All @@ -837,8 +859,9 @@ bool NAMThread::StartRequest(NetStreamRequest *p)

if (!p->m_bCancelled)
{
LOG(VB_FILE, LOG_DEBUG, LOC + QString("(%1) StartRequest").arg(p->m_id) );
QNetworkReply *reply = m_nam->get(p->m_req);
LOG(VB_FILE, LOG_DEBUG, LOC + QString("(%1) StartRequest 0x%2")
.arg(p->m_id).arg(quintptr(reply),0,16) );
emit requestStarted(p->m_id, reply);
}
else
Expand All @@ -854,7 +877,8 @@ bool NAMThread::AbortRequest(NetStreamAbort *p)
return false;
}

LOG(VB_FILE, LOG_INFO, LOC + QString("(%1) AbortRequest").arg(p->m_id) );
LOG(VB_FILE, LOG_DEBUG, LOC + QString("(%1) AbortRequest 0x%2").arg(p->m_id)
.arg(quintptr(p->m_reply),0,16) );
p->m_reply->abort();
p->m_reply->disconnect();
delete p->m_reply;
Expand Down
8 changes: 6 additions & 2 deletions mythtv/libs/libmythtv/netstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,10 @@ class NAMThread : public QThread
static NAMThread & manager(); // Singleton
virtual ~NAMThread();

static void PostEvent(QEvent *);
static inline void PostEvent(QEvent *e) { manager().Post(e); }
void Post(QEvent *event);

static inline QMutex* GetMutex() { return &manager().m_mutexNAM; }

static bool isAvailable(); // is network usable
static QDateTime GetLastModified(const QString &url);
Expand All @@ -143,8 +146,9 @@ private slots:

volatile bool m_bQuit;
QSemaphore m_running;
mutable QMutex m_mutex; // Protects r/w access to the following data
mutable QMutex m_mutexNAM; // Provides recursive access to m_nam
QNetworkAccessManager *m_nam;
mutable QMutex m_mutex; // Protects r/w access to the following data
QQueue< QEvent * > m_workQ;
QWaitCondition m_work;
};
Expand Down

0 comments on commit ea77f42

Please sign in to comment.