Skip to content

Commit

Permalink
Add IPTVStreamHandler.
Browse files Browse the repository at this point in the history
This is a bit different from stream handlers used for other recorders because the
stream handler is tied to a bind address and port[s] instead of a recording device.

The purpose is the same, to allow overlapping records. But in this case the stream
handler is changed on every channel change.
  • Loading branch information
daniel-kristjansson committed Feb 16, 2012
1 parent 715c18c commit 02768bd
Show file tree
Hide file tree
Showing 5 changed files with 257 additions and 6 deletions.
27 changes: 23 additions & 4 deletions mythtv/libs/libmythtv/iptvchannel.cpp
Expand Up @@ -7,6 +7,7 @@
*/

// MythTV headers
#include "iptvstreamhandler.h"
#include "iptvchannel.h"
#include "mythlogging.h"
#include "mythdb.h"
Expand Down Expand Up @@ -40,6 +41,8 @@ bool IPTVChannel::Open(void)
}

m_open = true;
if (!m_last_channel_id.isEmpty())
m_stream_handler = IPTVStreamHandler::Get(m_last_channel_id);

return m_open;
}
Expand All @@ -48,14 +51,13 @@ void IPTVChannel::Close(void)
{
LOG(VB_GENERAL, LOG_INFO, LOC + "Close()");
QMutexLocker locker(&m_lock);
m_open = false;
if (m_stream_handler)
IPTVStreamHandler::Return(m_stream_handler);
}

bool IPTVChannel::IsOpen(void) const
{
QMutexLocker locker(&m_lock);
LOG(VB_GENERAL, LOG_INFO, LOC + QString("IsOpen() -> %1")
.arg(m_open ? "true" : "false"));
return m_open;
}

Expand All @@ -66,7 +68,24 @@ bool IPTVChannel::Tune(const QString &freqid, int finetune)
LOG(VB_GENERAL, LOG_INFO, LOC + QString("Tune(%1) TO BE IMPLEMENTED")
.arg(freqid));

// TODO IMPLEMENT
// TODO IMPLEMENT query from DB

QHostAddress addr(QHostAddress::Any);

int ports[3];
ports[0] = 5555;
ports[1] = -1;
ports[2] = -1;

QString channel_id = QString("%1!%2!%3!%4")
.arg(addr.toString()).arg(ports[0]).arg(ports[1]).arg(ports[2]);

if (m_stream_handler)
IPTVStreamHandler::Return(m_stream_handler);

m_stream_handler = IPTVStreamHandler::Get(channel_id);

m_last_channel_id = channel_id;

return false;
}
Expand Down
4 changes: 4 additions & 0 deletions mythtv/libs/libmythtv/iptvchannel.h
Expand Up @@ -15,6 +15,8 @@
// MythTV headers
#include "dtvchannel.h"

class IPTVStreamHandler;

class IPTVChannel : public DTVChannel
{
public:
Expand All @@ -32,6 +34,8 @@ class IPTVChannel : public DTVChannel
private:
mutable QMutex m_lock;
volatile bool m_open;
QString m_last_channel_id;
IPTVStreamHandler *m_stream_handler;
};

#endif // _IPTV_CHANNEL_H_
Expand Down
150 changes: 150 additions & 0 deletions mythtv/libs/libmythtv/iptvstreamhandler.cpp
@@ -0,0 +1,150 @@
// -*- Mode: c++ -*-

// Qt headers
#include <QUdpSocket>

// MythTV headers
#include "iptvstreamhandler.h"
#include "mythlogging.h"

#define LOC QString("IPTVSH(%1): ").arg(_device)

QMap<QString,IPTVStreamHandler*> IPTVStreamHandler::_handlers;
QMap<QString,uint> IPTVStreamHandler::_handlers_refcnt;
QMutex IPTVStreamHandler::_handlers_lock;

IPTVStreamHandler *IPTVStreamHandler::Get(const QString &devname)
{
QMutexLocker locker(&_handlers_lock);

QString devkey = devname.toUpper();

QMap<QString,IPTVStreamHandler*>::iterator it = _handlers.find(devkey);

if (it == _handlers.end())
{
IPTVStreamHandler *newhandler = new IPTVStreamHandler(devkey);
newhandler->Open();
_handlers[devkey] = newhandler;
_handlers_refcnt[devkey] = 1;

LOG(VB_RECORD, LOG_INFO,
QString("IPTVSH: Creating new stream handler %1 for %2")
.arg(devkey).arg(devname));
}
else
{
_handlers_refcnt[devkey]++;
uint rcount = _handlers_refcnt[devkey];
LOG(VB_RECORD, LOG_INFO,
QString("IPTVSH: Using existing stream handler %1 for %2")
.arg(devkey)
.arg(devname) + QString(" (%1 in use)").arg(rcount));
}

return _handlers[devkey];
}

void IPTVStreamHandler::Return(IPTVStreamHandler * & ref)
{
QMutexLocker locker(&_handlers_lock);

QString devname = ref->_device;

QMap<QString,uint>::iterator rit = _handlers_refcnt.find(devname);
if (rit == _handlers_refcnt.end())
return;

if (*rit > 1)
{
ref = NULL;
(*rit)--;
return;
}

QMap<QString,IPTVStreamHandler*>::iterator it = _handlers.find(devname);
if ((it != _handlers.end()) && (*it == ref))
{
LOG(VB_RECORD, LOG_INFO, QString("IPTVSH: Closing handler for %1")
.arg(devname));
ref->Close();
delete *it;
_handlers.erase(it);
}
else
{
LOG(VB_GENERAL, LOG_ERR,
QString("IPTVSH Error: Couldn't find handler for %1")
.arg(devname));
}

_handlers_refcnt.erase(rit);
ref = NULL;
}

IPTVStreamHandler::IPTVStreamHandler(const QString &device) :
StreamHandler(device)
{
QStringList parts = device.split("!");
if (parts.size() >= 4)
{
m_addr = QHostAddress(parts[0]);
m_ports[0] = parts[1].toInt();
m_ports[1] = parts[2].toInt();
m_ports[2] = parts[3].toInt();
}
else
{
m_ports[0] = -1;
m_ports[1] = -1;
m_ports[2] = -1;
}
}

void IPTVStreamHandler::run(void)
{
RunProlog();

// Open our ports...
// TODO Error handling..
for (uint i = 0; i < sizeof(m_ports)/sizeof(int); i++)
{
if (m_ports[i] >= 0)
{
m_sockets[i] = new QUdpSocket();
m_helpers[i] = new IPTVStreamHandlerHelper(this, m_sockets[i]);
m_sockets[i]->bind(m_addr, m_ports[i]);
}
}

exec();

// Clean up
for (uint i = 0; i < sizeof(m_ports)/sizeof(int); i++)
{
if (m_sockets[i])
{
delete m_sockets[i];
m_sockets[i] = NULL;
delete m_helpers[i];
m_helpers[i] = NULL;
}
}

RunEpilog();
}

void IPTVStreamHandlerHelper::ReadPending(void)
{
QByteArray datagram;
QHostAddress sender;
quint16 senderPort;

while (m_socket->hasPendingDatagrams())
{
datagram.resize(m_socket->pendingDatagramSize());
m_socket->readDatagram(datagram.data(), datagram.size(),
&sender, &senderPort);
// TODO actually do something with the data..
}
}
78 changes: 78 additions & 0 deletions mythtv/libs/libmythtv/iptvstreamhandler.h
@@ -0,0 +1,78 @@
// -*- Mode: c++ -*-

#ifndef _IPTVSTREAMHANDLER_H_
#define _IPTVSTREAMHANDLER_H_

#include <vector>
using namespace std;

#include <QUdpSocket>
#include <QString>
#include <QMutex>
#include <QMap>

#include "util.h"
#include "DeviceReadBuffer.h"
#include "mpegstreamdata.h"
#include "streamhandler.h"
#include "dtvconfparserhelpers.h"

class IPTVStreamHandler;
class DTVSignalMonitor;
class IPTVChannel;

class IPTVStreamHandlerHelper : QObject
{
public:
IPTVStreamHandlerHelper(IPTVStreamHandler *p, QUdpSocket *s) :
m_parent(p), m_socket(s)
{
connect(m_socket, SIGNAL(readyRead()),
this, SLOT(ReadPending()));
}

public slots:
void ReadPending(void);

private:
IPTVStreamHandler *m_parent;
QUdpSocket *m_socket;
};

class IPTVStreamHandler : public StreamHandler
{
public:
static IPTVStreamHandler *Get(const QString &devicename);
static void Return(IPTVStreamHandler * & ref);

virtual void AddListener(MPEGStreamData *data,
bool allow_section_reader = false,
bool needs_drb = false,
QString output_file = QString())
{
StreamHandler::AddListener(data, false, false, output_file);
} // StreamHandler

private:
IPTVStreamHandler(const QString &);

bool Open(void) { return true; }
void Close(void) { MThread::exit(0); }

virtual void run(void); // MThread

private:
mutable QMutex m_lock;
// TODO should we care about who is broadcasting to us?
QHostAddress m_addr;
int m_ports[3];
QUdpSocket *m_sockets[3];
IPTVStreamHandlerHelper *m_helpers[3];

// for implementing Get & Return
static QMutex _handlers_lock;
static QMap<QString, IPTVStreamHandler*> _handlers;
static QMap<QString, uint> _handlers_refcnt;
};

#endif // _IPTVSTREAMHANDLER_H_
4 changes: 2 additions & 2 deletions mythtv/libs/libmythtv/libmythtv.pro
Expand Up @@ -568,10 +568,10 @@ using_backend {

# Support for RTP/UDP streams
HEADERS += iptvchannel.h iptvrecorder.h
HEADERS += iptvsignalmonitor.h
HEADERS += iptvsignalmonitor.h iptvstreamhandler.h

SOURCES += iptvchannel.cpp iptvrecorder.cpp
SOURCES += iptvsignalmonitor.cpp
SOURCES += iptvsignalmonitor.cpp iptvstreamhandler.cpp

# Support for HDHomeRun box
using_hdhomerun {
Expand Down

0 comments on commit 02768bd

Please sign in to comment.