Permalink
Browse files

Almost complete rewrite of RAOP server

Let me commend Mark Kendall for his previous implementation. As far as I could tell, his RAOP server was the only one implementing Airtunes v2 functionalities with a/v sync timestamps.
The rewrite serves several purposes. Mainly, I wanted to take ownership of this area of the code, and there were things I couldn't figure out. Most likely because whatever Mark used for AirPort's technical documentation isn't what I got.
Technical description of RAOP came mostly from:
-http://git.zx2c4.com/Airtunes2/about
-http://blog.technologeek.org/airtunes-v2

I found the later to be the most correct.

Main area of focus:
-Audio Quality: Over slow or poor network connectivity (e.g. slow wireless) and with lots of packet drops: audio would have been corrupted (the system played audio packets in the order they were received, and not in the order they were supposed to be). For testing purposes, I simulated a 30% packet drop, and playback remained perfect.
-A/V Sync: Achieving perfect A/V sync across all platforms and with all the different type of audio architecture is almost impossible. However, I believe the results achieved are very good. Playback will automatically adjust itself according to the network latency and the audio hardware latency.

New features:
We now retrieve the media's metadata: Album name, artist name, song title and coverart. This only works when using iTunes. iOS device do not send metadata unless using FairPlay encryption. We only support RSA encryption.
Currently, this information is only shown in the logs, but in the future we'll be able to nicely show them in mythfrontend.

Additional credits:
-http://nto.github.com/AirPlay.html, gave information about how to configure Bonjour in order to receive MetaData from iTunes
-http://code.google.com/p/ytrack/wiki/DMAP : iTunes DMAP metadata structure
  • Loading branch information...
1 parent e778f23 commit bd883d7216b497d8193975e3b19cb7134eaff710 @jyavenard jyavenard committed Apr 30, 2012
View
1,262 mythtv/libs/libmythtv/mythraopconnection.cpp
@@ -2,24 +2,41 @@
#include <QTcpSocket>
#include <QUdpSocket>
#include <QtEndian>
+#include <QTextStream>
#include "mythlogging.h"
#include "mythcorecontext.h"
#include "mythdirs.h"
#include "serverpool.h"
-#include <netinet/in.h> // for ntohs
#include "audiooutput.h"
#include "mythraopdevice.h"
#include "mythraopconnection.h"
#define LOC QString("RAOP Conn: ")
#define MAX_PACKET_SIZE 2048
-#define DEFAULT_SAMPLE_RATE 44100
RSA* MythRAOPConnection::g_rsa = NULL;
+// RAOP RTP packet type
+#define TIMING_REQUEST 0x52
+#define TIMING_RESPONSE 0x53
+#define SYNC 0x54
+#define FIRSTSYNC (0x54 | 0x80)
+#define RANGE_RESEND 0x55
+#define AUDIO_RESEND 0x56
+#define AUDIO_DATA 0x60
+#define FIRSTAUDIO_DATA (0x60 | 0x80)
+
+
+// Size (in ms) of audio buffered in audio card
+#define AUDIOCARD_BUFFER 800
+// How frequently we may call ProcessAudio (via QTimer)
+// ideally 20ms, but according to documentation
+// anything lower than 50ms on windows, isn't reliable
+#define AUDIO_BUFFER 100
+
class NetStream : public QTextStream
{
public:
@@ -38,15 +55,29 @@ class NetStream : public QTextStream
MythRAOPConnection::MythRAOPConnection(QObject *parent, QTcpSocket *socket,
QByteArray id, int port)
- : QObject(parent), m_watchdogTimer(NULL), m_socket(socket),
- m_textStream(NULL), m_hardwareId(id),
- m_dataPort(port), m_dataSocket(NULL),
- m_clientControlSocket(NULL), m_clientControlPort(0),
- m_audio(NULL), m_codec(NULL), m_codeccontext(NULL),
- m_sampleRate(DEFAULT_SAMPLE_RATE), m_queueLength(0), m_allowVolumeControl(true),
- m_seenPacket(false), m_lastPacketSequence(0), m_lastPacketTimestamp(0),
- m_lastSyncTime(0), m_lastSyncTimestamp(0), m_lastLatency(0), m_latencyAudio(0),
- m_latencyQueued(0), m_latencyCounter(0), m_avSync(0), m_audioTimer(NULL)
+ : QObject(parent), m_watchdogTimer(NULL), m_socket(socket),
+ m_textStream(NULL), m_hardwareId(id),
+ m_incomingHeaders(), m_incomingContent(), m_incomingPartial(false),
+ m_incomingSize(0),
+ m_dataSocket(NULL), m_dataPort(port),
+ m_clientControlSocket(NULL), m_clientControlPort(0),
+ m_clientTimingSocket(NULL), m_clientTimingPort(0),
+ m_audio(NULL), m_codec(NULL), m_codeccontext(NULL),
+ m_channels(2), m_sampleSize(16), m_frameRate(44100),
+ m_framesPerPacket(352),m_dequeueAudioTimer(NULL),
+ m_queueLength(0), m_streamingStarted(false),
+ m_allowVolumeControl(true),
+ //audio sync
+ m_seqNum(0),
+ m_lastSequence(0), m_lastTimestamp(0),
+ m_currentTimestamp(0), m_nextSequence(0), m_nextTimestamp(0),
+ m_bufferLength(0), m_timeLastSync(0),
+ m_cardLatency(0), m_adjustedLatency(0), m_audioStarted(false),
+ // clock sync
+ m_masterTimeStamp(0), m_deviceTimeStamp(0), m_networkLatency(0),
+ m_clockSkew(0),
+ m_audioTimer(NULL),
+ m_progressStart(0), m_progressCurrent(0), m_progressEnd(0)
{
}
@@ -57,17 +88,23 @@ MythRAOPConnection::~MythRAOPConnection()
// stop and delete watchdog timer
if (m_watchdogTimer)
+ {
m_watchdogTimer->stop();
- delete m_watchdogTimer;
- m_watchdogTimer = NULL;
+ delete m_watchdogTimer;
+ }
+
+ if (m_dequeueAudioTimer)
+ {
+ m_dequeueAudioTimer->stop();
+ delete m_dequeueAudioTimer;
+ }
// delete main socket
if (m_socket)
{
m_socket->close();
m_socket->deleteLater();
}
- m_socket = NULL;
// delete data socket
if (m_dataSocket)
@@ -76,7 +113,6 @@ MythRAOPConnection::~MythRAOPConnection()
m_dataSocket->close();
m_dataSocket->deleteLater();
}
- m_dataSocket = NULL;
// client control socket
if (m_clientControlSocket)
@@ -85,16 +121,15 @@ MythRAOPConnection::~MythRAOPConnection()
m_clientControlSocket->close();
m_clientControlSocket->deleteLater();
}
- m_clientControlSocket = NULL;
// close audio decoder
DestroyDecoder();
+ // free decoded audio buffer
+ ResetAudio();
+
// close audio device
CloseAudioDevice();
-
- // free decoded audio buffer
- ExpireAudio(UINT64_MAX);
}
bool MythRAOPConnection::Init(void)
@@ -148,9 +183,16 @@ bool MythRAOPConnection::Init(void)
connect(m_watchdogTimer, SIGNAL(timeout()), this, SLOT(timeout()));
m_watchdogTimer->start(10000);
+ m_dequeueAudioTimer = new QTimer();
+ connect(m_dequeueAudioTimer, SIGNAL(timeout()), this, SLOT(ProcessAudio()));
+
return true;
}
+/**
+ * Socket incoming data signal handler
+ * use for audio, control and timing socket
+ */
void MythRAOPConnection::udpDataReady(void)
{
QUdpSocket *socket = dynamic_cast<QUdpSocket*>(sender());
@@ -172,129 +214,192 @@ void MythRAOPConnection::udpDataReady(void)
void MythRAOPConnection::udpDataReady(QByteArray buf, QHostAddress peer,
quint16 port)
{
- // get the time of day
- // NOTE: the previous code would perform this once, and loop internally
- // since the new code loops externally, and this must get performed
- // on each pass, there may be issues
- timeval t;
- gettimeofday(&t, NULL);
- uint64_t timenow = (t.tv_sec * 1000) + (t.tv_usec / 1000);
-
// restart the idle timer
if (m_watchdogTimer)
m_watchdogTimer->start(10000);
if (!m_audio || !m_codec || !m_codeccontext)
return;
- unsigned int type = (unsigned int)((char)(buf[1] & ~0x80));
-
- if (type == 0x54)
- ProcessSyncPacket(buf, timenow);
+ uint8_t type;
+ uint16_t seq;
+ uint64_t timestamp;
- if (!(type == 0x60 || type == 0x56))
+ if (!GetPacketType(buf, type, seq, timestamp))
+ {
+ LOG(VB_GENERAL, LOG_DEBUG, LOC +
+ QString("Packet doesn't start with valid Rtp Header (0x%1)")
+ .arg((uint8_t)buf[0], 0, 16));
return;
+ }
+
+ switch (type)
+ {
+ case SYNC:
+ case FIRSTSYNC:
+ ProcessSync(buf);
+ ProcessAudio();
+ return;
+
+ case FIRSTAUDIO_DATA:
+ m_nextSequence = seq;
+ m_nextTimestamp = timestamp;
+ // With iTunes we know what the first sequence is going to be.
+ // iOS device do not tell us before streaming start what the first
+ // packet is going to be.
+ m_streamingStarted = true;
+ break;
+
+ case AUDIO_DATA:
+ case AUDIO_RESEND:
+ break;
+
+ case TIMING_RESPONSE:
+ ProcessTimeResponse(buf);
+ return;
- int offset = type == 0x60 ? 0 : 4;
- uint16_t this_sequence = ntohs(*(uint16_t *)(buf.data() + offset + 2));
- uint64_t this_timestamp = FramesToMs(ntohl(*(uint64_t*)(buf.data() + offset + 4)));
- uint16_t expected_sequence = m_lastPacketSequence + 1; // should wrap ok
+ default:
+ LOG(VB_GENERAL, LOG_DEBUG, LOC +
+ QString("Packet type (0x%1) not handled")
+ .arg(type, 0, 16));
+ return;
+ }
+ timestamp = framesToMs(timestamp);
+ if (timestamp < m_currentTimestamp)
+ {
+ LOG(VB_GENERAL, LOG_DEBUG, LOC +
+ QString("Received packet %1 too late, ignoring")
+ .arg(seq));
+ return;
+ }
// regular data packet
- if (type == 0x60)
+ if (type == AUDIO_DATA || type == FIRSTAUDIO_DATA)
{
- if (m_seenPacket && (this_sequence != expected_sequence))
- SendResendRequest(timenow, expected_sequence, this_sequence);
+ if (m_streamingStarted && seq != m_nextSequence)
+ SendResendRequest(timestamp, m_nextSequence, seq);
- // don't update the sequence for resends
- m_seenPacket = true;
- m_lastPacketSequence = this_sequence;
- m_lastPacketTimestamp = this_timestamp;
+ m_nextSequence = seq + 1;
+ m_nextTimestamp = timestamp;
+ m_streamingStarted = true;
}
+ if (!m_streamingStarted)
+ return;
+
// resent packet
- if (type == 0x56)
+ if (type == AUDIO_RESEND)
{
- if (m_resends.contains(this_sequence))
+ if (m_resends.contains(seq))
{
- LOG(VB_GENERAL, LOG_DEBUG, LOC + QString("Received required resend %1")
- .arg(this_sequence));
- m_resends.remove(this_sequence);
+ LOG(VB_GENERAL, LOG_DEBUG, LOC +
+ QString("Received required resend %1 (with ts:%2 last:%3)")
+ .arg(seq).arg(timestamp).arg(m_nextSequence));
+ m_resends.remove(seq);
}
else
- LOG(VB_GENERAL, LOG_WARNING, LOC + QString("Received unexpected resent packet %1")
- .arg(this_sequence));
+ LOG(VB_GENERAL, LOG_WARNING, LOC +
+ QString("Received unexpected resent packet %1")
+ .arg(seq));
}
- ExpireResendRequests(timenow);
-
- offset += 12;
- char* data_in = buf.data() + offset;
- int len = buf.size() - offset;
- if (len < 16)
+ // Check that the audio packet is valid, do so by decoding it. If an error
+ // occurs, ask to resend it
+ QList<AudioData> *decoded = new QList<AudioData>();
+ int numframes = decodeAudioPacket(type, &buf, decoded);
+ if (numframes < 0)
+ {
+ // an error occurred, ask for the audio packet once again.
+ LOG(VB_GENERAL, LOG_ERR, LOC + QString("Error decoding audio"));
+ SendResendRequest(timestamp, seq, seq+1);
return;
+ }
+ AudioPacket frames;
+ frames.seq = seq;
+ frames.data = decoded;
+ m_audioQueue.insert(timestamp, frames);
+ ProcessAudio();
+}
- int aeslen = len & ~0xf;
- unsigned char iv[16];
- unsigned char decrypted_data[MAX_PACKET_SIZE];
- memcpy(iv, m_AESIV.data(), sizeof(iv));
- AES_cbc_encrypt((const unsigned char*)data_in,
- decrypted_data, aeslen,
- &m_aesKey, iv, AES_DECRYPT);
- memcpy(decrypted_data + aeslen, data_in + aeslen, len - aeslen);
+void MythRAOPConnection::ProcessSync(const QByteArray &buf)
+{
+ bool first = (uint8_t)buf[0] == 0x90; // First sync is 0x90,0x55
+ const char *req = buf.constData();
+ uint64_t current_ts = qFromBigEndian(*(uint32_t*)(req + 4));
+ uint64_t next_ts = qFromBigEndian(*(uint32_t*)(req + 16));
- AVPacket tmp_pkt;
- AVCodecContext *ctx = m_codeccontext;
+ uint64_t current = framesToMs(current_ts);
+ uint64_t next = framesToMs(next_ts);
- av_init_packet(&tmp_pkt);
- tmp_pkt.data = decrypted_data;
- tmp_pkt.size = len;
+ m_currentTimestamp = current;
+ m_nextTimestamp = next;
+ m_bufferLength = m_nextTimestamp - m_currentTimestamp;
- while (tmp_pkt.size > 0)
+ if (first)
{
- int decoded_data_size = AVCODEC_MAX_AUDIO_FRAME_SIZE;
- int16_t *samples = (int16_t *)av_mallocz(AVCODEC_MAX_AUDIO_FRAME_SIZE);
- int ret = avcodec_decode_audio3(ctx, samples,
- &decoded_data_size, &tmp_pkt);
+ LOG(VB_GENERAL, LOG_DEBUG, LOC + QString("Receiving first SYNC packet"));
+ }
+ else
+ {
+ LOG(VB_GENERAL, LOG_DEBUG, LOC + QString("Receiving SYNC packet"));
+ }
- if (ret < 0)
- {
- LOG(VB_GENERAL, LOG_ERR, LOC + QString("Error decoding audio"));
- break;
- }
+ timeval t; gettimeofday(&t, NULL);
+ m_timeLastSync = t.tv_sec * 1000 + t.tv_usec / 1000;
- if (decoded_data_size > 0)
- {
- int frames = (ctx->channels <= 0 || decoded_data_size < 0) ? -1 :
- decoded_data_size /
- (ctx->channels * av_get_bits_per_sample_fmt(ctx->sample_fmt)>>3);
- AudioFrame aframe;
- aframe.samples = samples;
- aframe.frames = frames;
- aframe.size = decoded_data_size;
+ LOG(VB_GENERAL, LOG_DEBUG, LOC + QString("SYNC: cur:%1 next:%2 time:%3")
+ .arg(m_currentTimestamp).arg(m_nextTimestamp).arg(m_timeLastSync));
- if (m_audioQueue.contains(this_timestamp))
- LOG(VB_GENERAL, LOG_WARNING,
- LOC + "Duplicate packet timestamp.");
+ uint64_t delay = framesToMs(m_audioQueue.size() * m_framesPerPacket);
+ delay += m_networkLatency;
- m_audioQueue.insert(this_timestamp, aframe);
- m_queueLength += aframe.frames;
- ProcessAudio(timenow);
+ // Calculate audio card latency
+ if (first)
+ {
+ m_cardLatency = AudioCardLatency();
+ // if audio isn't started, start playing 200ms worth of silence
+ // and measure timestamp difference
+ LOG(VB_GENERAL, LOG_DEBUG, LOC +
+ QString("Audio hardware latency: %1ms").arg(m_cardLatency));
+ }
- this_timestamp += (frames * 1000) / m_sampleRate;
- }
+ uint64_t audiots = m_audio->GetAudiotime();
+ if (m_audioStarted)
+ {
+ m_adjustedLatency = (int64_t)audiots - (int64_t)m_currentTimestamp;
+ }
+ if (m_adjustedLatency > (int64_t)m_bufferLength)
+ {
+ // Too much delay in playback
+ // will reset audio card in next ProcessAudio
+ m_audioStarted = false;
+ m_adjustedLatency = 0;
+ }
- tmp_pkt.data += ret;
- tmp_pkt.size -= ret;
+ delay += m_audio->GetAudioBufferedTime();
+ delay += m_adjustedLatency;
+
+ // Expire old audio
+ ExpireResendRequests(m_currentTimestamp);
+ int res = ExpireAudio(m_currentTimestamp);
+ if (res > 0)
+ {
+ LOG(VB_GENERAL, LOG_DEBUG, LOC + QString("Drop %1 packets").arg(res));
}
-}
-uint64_t MythRAOPConnection::FramesToMs(uint64_t timestamp)
-{
- return (uint64_t)((double)timestamp * 1000.0 / m_sampleRate);
+ LOG(VB_GENERAL, LOG_DEBUG, LOC +
+ QString("Queue=%1 buffer=%2ms ideal=%3ms diffts:%4ms")
+ .arg(m_audioQueue.size())
+ .arg(delay)
+ .arg(m_bufferLength)
+ .arg(m_adjustedLatency));
}
-void MythRAOPConnection::SendResendRequest(uint64_t timenow,
+/**
+ * SendResendRequest:
+ * Request RAOP client to resend missed RTP packets
+ */
+void MythRAOPConnection::SendResendRequest(uint64_t timestamp,
uint16_t expected, uint16_t got)
{
if (!m_clientControlSocket)
@@ -304,40 +409,47 @@ void MythRAOPConnection::SendResendRequest(uint64_t timenow,
(int16_t)(((int32_t)got + UINT16_MAX + 1) - expected) :
got - expected;
- LOG(VB_GENERAL, LOG_INFO, LOC + QString("Missed %1 packet(s): expected %2 got %3")
- .arg(missed).arg(expected).arg(got));
+ LOG(VB_GENERAL, LOG_INFO, LOC +
+ QString("Missed %1 packet(s): expected %2 got %3 ts:%4")
+ .arg(missed).arg(expected).arg(got).arg(timestamp));
char req[8];
req[0] = 0x80;
- req[1] = 0x55 | 0x80;
- *(uint16_t *)(req + 2) = htons(1);
- *(uint16_t *)(req + 4) = htons(expected); // missed seqnum
- *(uint16_t *)(req + 6) = htons(missed); // count
-
- if (m_clientControlSocket->writeDatagram(req, 8, m_peerAddress, m_clientControlPort) == 8)
+ req[1] = RANGE_RESEND | 0x80;
+ *(uint16_t *)(req + 2) = qToBigEndian(m_seqNum++);
+ *(uint16_t *)(req + 4) = qToBigEndian(expected); // missed seqnum
+ *(uint16_t *)(req + 6) = qToBigEndian(missed); // count
+
+ if (m_clientControlSocket->writeDatagram(req, sizeof(req),
+ m_peerAddress, m_clientControlPort)
+ == sizeof(req))
{
for (uint16_t count = 0; count < missed; count++)
{
LOG(VB_GENERAL, LOG_INFO, LOC + QString("Sent resend for %1")
.arg(expected + count));
- m_resends.insert(expected + count, timenow);
+ m_resends.insert(expected + count, timestamp);
}
}
else
LOG(VB_GENERAL, LOG_ERR, LOC + "Failed to send resend request.");
}
-void MythRAOPConnection::ExpireResendRequests(uint64_t timenow)
+/**
+ * ExpireResendRequests:
+ * Expire resend requests that are older than timestamp. Those requests are
+ * expired when audio with older timestamp has already been played
+ */
+void MythRAOPConnection::ExpireResendRequests(uint64_t timestamp)
{
if (!m_resends.size())
return;
- uint64_t too_old = timenow - 500;
QMutableMapIterator<uint16_t,uint64_t> it(m_resends);
while (it.hasNext())
{
it.next();
- if (it.value() < too_old)
+ if (it.value() < timestamp && m_streamingStarted)
{
LOG(VB_GENERAL, LOG_WARNING, LOC +
QString("Never received resend packet %1").arg(it.key()));
@@ -346,104 +458,291 @@ void MythRAOPConnection::ExpireResendRequests(uint64_t timenow)
}
}
-void MythRAOPConnection::ProcessSyncPacket(const QByteArray &buf, uint64_t timenow)
+/**
+ * SendTimeRequest:
+ * Send a time request to the RAOP client.
+ */
+void MythRAOPConnection::SendTimeRequest(void)
{
- m_lastSyncTimestamp = FramesToMs(ntohl(*(uint64_t*)(buf.data() + 4)));
- uint64_t now = FramesToMs(ntohl(*(uint64_t*)(buf.data() + 16)));
- m_lastLatency = now - m_lastSyncTimestamp;
- m_lastSyncTime = timenow;
- uint64_t averageaudio = 0;
- uint64_t averagequeue = 0;
- double averageav = 0;
- if (m_latencyCounter)
+ if (!m_clientControlSocket) // should never happen
+ return;
+
+ timeval t;
+ gettimeofday(&t, NULL);
+
+ char req[32];
+ req[0] = 0x80;
+ req[1] = TIMING_REQUEST | 0x80;
+ // this is always 0x00 0x07 according to http://blog.technologeek.org/airtunes-v2
+ // no other value works
+ req[2] = 0x00;
+ req[3] = 0x07;
+ *(uint32_t *)(req + 4) = (uint32_t)0;
+ *(uint64_t *)(req + 8) = (uint64_t)0;
+ *(uint64_t *)(req + 16) = (uint64_t)0;
+ *(uint32_t *)(req + 24) = qToBigEndian((uint32_t)t.tv_sec);
+ *(uint32_t *)(req + 28) = qToBigEndian((uint32_t)t.tv_usec);
+
+ if (m_clientControlSocket->writeDatagram(req, sizeof(req), m_peerAddress, m_clientTimingPort) != sizeof(req))
{
- averageaudio = m_latencyAudio / m_latencyCounter;
- averagequeue = m_latencyQueued / m_latencyCounter;
- averageav = m_avSync / (double)m_latencyCounter;
+ LOG(VB_GENERAL, LOG_ERR, LOC + "Failed to send resend time request.");
+ return;
}
+ LOG(VB_GENERAL, LOG_DEBUG, LOC +
+ QString("Requesting master time (Local %1.%2)")
+ .arg(t.tv_sec).arg(t.tv_usec));
+}
- if (m_audio)
+/**
+ * ProcessTimeResponse:
+ * Calculate the network latency, we do not use the reference time send by itunes
+ * instead we measure the time lapsed between the request and the response
+ * the latency is calculated in ms
+ */
+void MythRAOPConnection::ProcessTimeResponse(const QByteArray &buf)
+{
+ timeval t1, t2;
+ const char *req = buf.constData();
+
+ t1.tv_sec = qFromBigEndian(*(uint32_t*)(req + 8));
+ t1.tv_usec = qFromBigEndian(*(uint32_t*)(req + 12));
+
+ gettimeofday(&t2, NULL);
+ uint64_t time1, time2;
+ time1 = t1.tv_sec * 1000 + t1.tv_usec / 1000;
+ time2 = t2.tv_sec * 1000 + t2.tv_usec / 1000;
+ LOG(VB_GENERAL, LOG_DEBUG, LOC + QString("Read back time (Local %1.%2)")
+ .arg(t1.tv_sec).arg(t1.tv_usec));
+ // network latency equal time difference in ms between request and response
+ // divide by two for approximate time of one way trip
+ m_networkLatency = (time2 - time1) / 2;
+
+ // now calculate the time difference between the client and us.
+ // this is NTP time, where sec is in seconds, and ticks is in 1/2^32s
+ uint32_t sec = qFromBigEndian(*(uint32_t*)(req + 24));
+ uint32_t ticks = qFromBigEndian(*(uint32_t*)(req + 28));
+ // convert ticks into ms
+ int64_t master = NTPToLocal(sec, ticks);
+ m_clockSkew = master - time2;
+}
+
+uint64_t MythRAOPConnection::NTPToLocal(uint32_t sec, uint32_t ticks)
+{
+ return (int64_t)sec * 1000LL + (((int64_t)ticks * 1000LL) >> 32);
+}
+
+bool MythRAOPConnection::GetPacketType(const QByteArray &buf, uint8_t &type,
+ uint16_t &seq, uint64_t &timestamp)
+{
+ // All RAOP packets start with | 0x80/0x90 (first sync) | PACKET_TYPE |
+ if ((uint8_t)buf[0] != 0x80 && (uint8_t)buf[0] != 0x90)
{
- uint64_t total = averageaudio + averagequeue;
- LOG(VB_GENERAL, LOG_DEBUG, LOC +
- QString("Sync packet: Timestamp: %1 Current Audio ts: %2 (avsync %3ms) "
- "Latency: audio %4 queue %5 total %6ms <-> target %7ms")
- .arg(m_lastSyncTimestamp).arg(m_audio->GetAudiotime()).arg(averageav, 0)
- .arg(averageaudio).arg(averagequeue)
- .arg(total).arg(m_lastLatency));
+ return false;
+ }
+
+ type = (char)buf[1];
+ // Is it first sync packet?
+ if ((uint8_t)buf[0] == 0x90 && type == FIRSTSYNC)
+ {
+ return true;
+ }
+ if (type != FIRSTAUDIO_DATA)
+ {
+ type &= ~0x80;
}
- m_latencyAudio = m_latencyQueued = m_latencyCounter = m_avSync = 0;
+
+ if (type != AUDIO_DATA && type != FIRSTAUDIO_DATA && type != AUDIO_RESEND)
+ return true;
+
+ const char *ptr = buf.constData();
+ if (type == AUDIO_RESEND)
+ {
+ ptr += 4;
+ }
+ seq = qFromBigEndian(*(uint16_t *)(ptr + 2));
+ timestamp = qFromBigEndian(*(uint32_t*)(ptr + 4));
+ return true;
}
-int MythRAOPConnection::ExpireAudio(uint64_t timestamp)
+// Audio decode / playback related routines
+
+uint32_t MythRAOPConnection::decodeAudioPacket(uint8_t type,
+ const QByteArray *buf,
+ QList<AudioData> *dest)
{
- int res = 0;
- QMutableMapIterator<uint64_t,AudioFrame> it(m_audioQueue);
- while (it.hasNext())
+ const char *data_in = buf->constData();
+ int len = buf->size();
+ if (type == AUDIO_RESEND)
{
- it.next();
- if (it.key() < timestamp)
+ data_in += 4;
+ len -= 4;
+ }
+ data_in += 12;
+ len -= 12;
+ if (len < 16)
+ return -1;
+
+ int aeslen = len & ~0xf;
+ unsigned char iv[16];
+ unsigned char decrypted_data[MAX_PACKET_SIZE];
+ memcpy(iv, m_AESIV.constData(), sizeof(iv));
+ AES_cbc_encrypt((const unsigned char*)data_in,
+ decrypted_data, aeslen,
+ &m_aesKey, iv, AES_DECRYPT);
+ memcpy(decrypted_data + aeslen, data_in + aeslen, len - aeslen);
+
+ AVPacket tmp_pkt;
+ AVCodecContext *ctx = m_codeccontext;
+
+ av_init_packet(&tmp_pkt);
+ tmp_pkt.data = decrypted_data;
+ tmp_pkt.size = len;
+
+ uint32_t frames_added = 0;
+ while (tmp_pkt.size > 0)
+ {
+ int decoded_data_size = AVCODEC_MAX_AUDIO_FRAME_SIZE;
+ int16_t *samples = (int16_t *)av_mallocz(AVCODEC_MAX_AUDIO_FRAME_SIZE);
+ int ret = avcodec_decode_audio3(ctx, samples, &decoded_data_size, &tmp_pkt);
+
+ if (ret < 0)
{
- AudioFrame frame = it.value();
- av_free((void *)frame.samples);
- m_audioQueue.remove(it.key());
- m_queueLength -= frame.frames;
- res++;
+ return -1;
}
+
+ if (decoded_data_size > 0)
+ {
+ int frames = decoded_data_size /
+ (ctx->channels * av_get_bits_per_sample_fmt(ctx->sample_fmt)>>3);
+ frames_added += frames;
+ AudioData block;
+ block.data = samples;
+ block.length = decoded_data_size;
+ block.frames = frames;
+ dest->append(block);
+ }
+ tmp_pkt.data += ret;
+ tmp_pkt.size -= ret;
}
- return res;
+ return frames_added;
}
-void MythRAOPConnection::ProcessAudio(uint64_t timenow)
+void MythRAOPConnection::ProcessAudio()
{
- if (!m_audio)
- {
- ExpireAudio(UINT64_MAX);
+ if (!m_streamingStarted || !m_audio)
return;
+
+ if (m_audio->IsPaused())
+ {
+ // ALSA takes a while to unpause, enough to have SYNC starting to drop
+ // packets, so unpause as early as possible
+ m_audio->Pause(false);
}
+ timeval t; gettimeofday(&t, NULL);
+ uint64_t dtime = (t.tv_sec * 1000 + t.tv_usec / 1000) - m_timeLastSync;
+ uint64_t rtp = dtime + m_currentTimestamp + m_networkLatency;
+ uint64_t buffered = m_audioStarted ? m_audio->GetAudioBufferedTime() : 0;
+
+ // Keep audio framework buffer as short as possible, keeping everything in
+ // m_audioQueue, so we can easily reset the least amount possible
+ if (buffered > AUDIOCARD_BUFFER)
+ return;
+
+ // Also make sure m_audioQueue never goes to less than 1/3 of the RDP stream
+ // total latency, this should gives us enough time to receive missed packets
+ uint64_t queue = framesToMs(m_audioQueue.size() * m_framesPerPacket);
+ if (queue < m_bufferLength / 3)
+ return;
- uint64_t updatedsync = m_lastSyncTimestamp + (timenow - m_lastSyncTime);
+ rtp += buffered;
+ rtp += m_cardLatency;
- // expire anything that is late
- int dumped = ExpireAudio(m_lastSyncTimestamp-m_lastLatency);
+ // How many packets to add to the audio card, to fill AUDIOCARD_BUFFER
+ int max_packets = ((AUDIOCARD_BUFFER - buffered)
+ * m_frameRate / 1000) / m_framesPerPacket;
+ int i = 0;
+ uint64_t timestamp = 0;
- if (dumped > 0)
+ QMapIterator<uint64_t,AudioPacket> packet_it(m_audioQueue);
+ while (packet_it.hasNext() && i <= max_packets)
{
- LOG(VB_GENERAL, LOG_INFO, LOC + QString("Dumped %1 audio packets")
- .arg(dumped));
+ packet_it.next();
+
+ timestamp = packet_it.key();
+ if (timestamp < rtp)
+ {
+ if (!m_audioStarted)
+ {
+ m_audio->Reset(); // clear audio card
+ }
+ AudioPacket frames = packet_it.value();
+
+ if (m_lastSequence != frames.seq)
+ {
+ LOG(VB_GENERAL, LOG_ERR, LOC +
+ QString("Audio discontinuity seen. Played %1 (%3) expected %2")
+ .arg(frames.seq).arg(m_lastSequence).arg(timestamp));
+ m_lastSequence = frames.seq;
+ }
+ m_lastSequence++;
+
+ QList<AudioData>::iterator it = frames.data->begin();
+ for (; it != frames.data->end(); it++)
+ {
+ m_audio->AddData((char *)it->data, it->length,
+ timestamp, it->frames);
+ timestamp += m_audio->LengthLastData();
+ }
+ i++;
+ m_audioStarted = true;
+ }
+ else // QMap is sorted, so no need to continue if not found
+ break;
}
- int64_t avsync = (int64_t)(m_audio->GetAudiotime() - (int64_t)updatedsync);
- uint64_t queue_length = FramesToMs(m_queueLength);
- uint64_t ideal_ts = updatedsync - queue_length - avsync + m_lastLatency;
+ ExpireAudio(timestamp);
+ m_lastTimestamp = timestamp;
- m_avSync += avsync;
- m_latencyAudio += m_audio->GetAudioBufferedTime();;
- m_latencyQueued += queue_length;
- m_latencyCounter++;
+ // restart audio timer should we stop receiving data on regular interval,
+ // we need to continue processing the audio queue
+ m_dequeueAudioTimer->start(AUDIO_BUFFER);
+}
- QMapIterator<uint64_t,AudioFrame> it(m_audioQueue);
- while (it.hasNext())
+int MythRAOPConnection::ExpireAudio(uint64_t timestamp)
+{
+ int res = 0;
+ QMutableMapIterator<uint64_t,AudioPacket> packet_it(m_audioQueue);
+ while (packet_it.hasNext())
{
- it.next();
- if (it.key() < ideal_ts)
+ packet_it.next();
+ if (packet_it.key() < timestamp)
{
- AudioFrame aframe = it.value();
- m_audio->AddData((char *)aframe.samples, aframe.size,
- it.key(), aframe.frames);
+ AudioPacket frames = packet_it.value();
+ if (frames.data)
+ {
+ QList<AudioData>::iterator it = frames.data->begin();
+ for (; it != frames.data->end(); it++)
+ {
+ av_free(it->data);
+ }
+ delete frames.data;
+ }
+ m_audioQueue.remove(packet_it.key());
+ res++;
}
}
-
- ExpireAudio(ideal_ts);
+ return res;
}
void MythRAOPConnection::ResetAudio(void)
{
- ExpireAudio(UINT64_MAX);
- m_latencyCounter = m_latencyAudio = m_latencyQueued = m_avSync = 0;
- m_seenPacket = false;
if (m_audio)
+ {
m_audio->Reset();
+ }
+ ExpireAudio(UINT64_MAX);
+ ExpireResendRequests(UINT64_MAX);
+ m_audioStarted = false;
}
void MythRAOPConnection::timeout(void)
@@ -458,37 +757,95 @@ void MythRAOPConnection::audioRetry(void)
{
MythRAOPDevice* p = (MythRAOPDevice*)parent();
if (p && p->NextInAudioQueue(this) && OpenAudioDevice())
+ {
CreateDecoder();
+ }
}
if (m_audio && m_codec && m_codeccontext)
+ {
StopAudioTimer();
+ }
}
+/**
+ * readClient: signal handler for RAOP client connection
+ * Handle initialisation of session
+ */
void MythRAOPConnection::readClient(void)
{
QTcpSocket *socket = (QTcpSocket *)sender();
if (!socket)
return;
- QList<QByteArray> lines;
- while (socket->canReadLine())
+ QByteArray data = socket->readAll();
+ LOG(VB_GENERAL, LOG_DEBUG, LOC + QString("readClient(%1): ")
+ .arg(data.size()) + data.constData());
+
+ // For big content, we may be called several times for a single packet
+ if (!m_incomingPartial)
+ {
+ m_incomingHeaders.clear();
+ m_incomingContent.clear();
+ m_incomingSize = 0;
+
+ QTextStream stream(data);
+ QString line;
+ do
+ {
+ line = stream.readLine();
+ if (line.size() == 0)
+ break;
+ LOG(VB_GENERAL, LOG_DEBUG, LOC + QString("Header = %1").arg(line));
+ m_incomingHeaders.append(line);
+ if (line.contains("Content-Length:"))
+ {
+ m_incomingSize = line.mid(line.indexOf(" ") + 1).toInt();
+ }
+ }
+ while (!line.isNull());
+
+ if (m_incomingHeaders.size() == 0)
+ return;
+
+ if (!stream.atEnd())
+ {
+ int pos = stream.pos();
+ if (pos > 0)
+ {
+ m_incomingContent.append(data.mid(pos));
+ }
+ }
+ }
+ else
+ {
+ m_incomingContent.append(data);
+ }
+
+ // If we haven't received all the content yet, wait (see when receiving
+ // coverart
+ if (m_incomingContent.size() < m_incomingSize)
+ {
+ m_incomingPartial = true;
+ return;
+ }
+ else
{
- QByteArray line = socket->readLine();
- lines.append(line);
- LOG(VB_GENERAL, LOG_DEBUG, LOC + "readClient: " + line.trimmed().data());
+ m_incomingPartial = false;
}
+ LOG(VB_GENERAL, LOG_DEBUG, LOC + QString("Content(%1) = %2")
+ .arg(m_incomingContent.size()).arg(m_incomingContent.constData()));
- if (lines.size())
- ProcessRequest(lines);
+ ProcessRequest(m_incomingHeaders, m_incomingContent);
}
-void MythRAOPConnection::ProcessRequest(const QList<QByteArray> &lines)
+void MythRAOPConnection::ProcessRequest(const QStringList &header,
+ const QByteArray &content)
{
- if (lines.isEmpty())
+ if (header.isEmpty())
return;
- RawHash tags = FindTags(lines);
+ RawHash tags = FindTags(header);
if (!tags.contains("CSeq"))
{
@@ -498,98 +855,131 @@ void MythRAOPConnection::ProcessRequest(const QList<QByteArray> &lines)
*m_textStream << "RTSP/1.0 200 OK\r\n";
- if (tags.contains("Apple-Challenge"))
- {
- LOG(VB_GENERAL, LOG_DEBUG, LOC + QString("Received Apple-Challenge"));
-
- *m_textStream << "Apple-Response: ";
- if (!LoadKey())
- return;
- int tosize = RSA_size(LoadKey());
- unsigned char to[tosize];
+ QString option = header[0].left(header[0].indexOf(" "));
- QByteArray challenge = QByteArray::fromBase64(tags["Apple-Challenge"].data());
- int challenge_size = challenge.size();
- if (challenge_size != 16)
- {
- LOG(VB_GENERAL, LOG_ERR, LOC +
- QString("Decoded challenge size %1, expected 16").arg(challenge_size));
- if (challenge_size > 16)
- challenge_size = 16;
- }
-
- int i = 0;
- unsigned char from[38];
- memcpy(from, challenge.data(), challenge_size);
- i += challenge_size;
- if (m_socket->localAddress().protocol() == QAbstractSocket::IPv4Protocol)
- {
- uint32_t ip = m_socket->localAddress().toIPv4Address();
- ip = qToBigEndian(ip);
- memcpy(from + i, &ip, 4);
- i += 4;
- }
- else if (m_socket->localAddress().protocol() == QAbstractSocket::IPv6Protocol)
+ // process RTP-info field
+ bool gotRTP = false;
+ uint16_t RTPseq;
+ uint64_t RTPtimestamp;
+ if (tags.contains("RTP-Info"))
+ {
+ gotRTP = true;
+ QString data = tags["RTP-Info"];
+ QStringList items = data.split(";");
+ foreach (QString item, items)
{
- // NB IPv6 untested
- Q_IPV6ADDR ip = m_socket->localAddress().toIPv6Address();
- //ip = qToBigEndian(ip);
- memcpy(from + i, &ip, 16);
- i += 16;
+ if (item.startsWith("seq"))
+ {
+ RTPseq = item.mid(item.indexOf("=") + 1).trimmed().toUShort();
+ }
+ else if (item.startsWith("rtptime"))
+ {
+ RTPtimestamp = item.mid(item.indexOf("=") + 1).trimmed().toUInt();
+ }
}
- memcpy(from + i, m_hardwareId.data(), RAOP_HARDWARE_ID_SIZE);
- i += RAOP_HARDWARE_ID_SIZE;
+ LOG(VB_GENERAL, LOG_INFO, LOC + QString("RTP-Info: seq=%1 rtptime=%2")
+ .arg(RTPseq).arg(RTPtimestamp));
+ }
- int pad = 32 - i;
- if (pad > 0)
+ if (option == "OPTIONS")
+ {
+ if (tags.contains("Apple-Challenge"))
{
- memset(from + i, 0, pad);
- i += pad;
- }
+ LOG(VB_GENERAL, LOG_DEBUG, LOC + QString("Received Apple-Challenge"));
+
+ *m_textStream << "Apple-Response: ";
+ if (!LoadKey())
+ return;
+ int tosize = RSA_size(LoadKey());
+ uint8_t *to = new uint8_t[tosize];
+
+ QByteArray challenge =
+ QByteArray::fromBase64(tags["Apple-Challenge"].toAscii());
+ int challenge_size = challenge.size();
+ if (challenge_size != 16)
+ {
+ LOG(VB_GENERAL, LOG_ERR, LOC +
+ QString("Decoded challenge size %1, expected 16")
+ .arg(challenge_size));
+ if (challenge_size > 16)
+ challenge_size = 16;
+ }
- LOG(VB_GENERAL, LOG_DEBUG, LOC + QString("Full base64 response: '%1' size %2")
- .arg(QByteArray((const char*)from, i).toBase64().data()).arg(i));
+ int i = 0;
+ unsigned char from[38];
+ memcpy(from, challenge.constData(), challenge_size);
+ i += challenge_size;
+ if (m_socket->localAddress().protocol() ==
+ QAbstractSocket::IPv4Protocol)
+ {
+ uint32_t ip = m_socket->localAddress().toIPv4Address();
+ ip = qToBigEndian(ip);
+ memcpy(from + i, &ip, 4);
+ i += 4;
+ }
+ else if (m_socket->localAddress().protocol() ==
+ QAbstractSocket::IPv6Protocol)
+ {
+ // NB IPv6 untested
+ Q_IPV6ADDR ip = m_socket->localAddress().toIPv6Address();
+ //ip = qToBigEndian(ip);
+ memcpy(from + i, &ip, 16);
+ i += 16;
+ }
+ memcpy(from + i, m_hardwareId.constData(), RAOP_HARDWARE_ID_SIZE);
+ i += RAOP_HARDWARE_ID_SIZE;
- RSA_private_encrypt(i, from, to, LoadKey(), RSA_PKCS1_PADDING);
+ int pad = 32 - i;
+ if (pad > 0)
+ {
+ memset(from + i, 0, pad);
+ i += pad;
+ }
- QByteArray base64 = QByteArray((const char*)to, tosize).toBase64();
+ LOG(VB_GENERAL, LOG_DEBUG, LOC +
+ QString("Full base64 response: '%1' size %2")
+ .arg(QByteArray((const char*)from, i).toBase64().constData())
+ .arg(i));
- for (int pos = base64.size() - 1; pos > 0; pos--)
- {
- if (base64[pos] == '=')
- base64[pos] = ' ';
- else
- break;
- }
- LOG(VB_GENERAL, LOG_DEBUG, QString("tSize=%1 tLen=%2 tResponse=%3")
- .arg(tosize).arg(base64.size()).arg(base64.data()));
- *m_textStream << base64.trimmed() << "\r\n";
- }
+ RSA_private_encrypt(i, from, to, LoadKey(), RSA_PKCS1_PADDING);
- QByteArray option = lines[0].left(lines[0].indexOf(" "));
+ QByteArray base64 = QByteArray((const char*)to, tosize).toBase64();
+ delete[] to;
- if (option == "OPTIONS")
- {
+ for (int pos = base64.size() - 1; pos > 0; pos--)
+ {
+ if (base64[pos] == '=')
+ base64[pos] = ' ';
+ else
+ break;
+ }
+ LOG(VB_GENERAL, LOG_DEBUG, QString("tSize=%1 tLen=%2 tResponse=%3")
+ .arg(tosize).arg(base64.size()).arg(base64.constData()));
+ *m_textStream << base64.trimmed() << "\r\n";
+ }
StartResponse(m_textStream, option, tags["CSeq"]);
- *m_textStream << "Public: ANNOUNCE, SETUP, RECORD, PAUSE, FLUSH, TEARDOWN, OPTIONS, GET_PARAMETER, SET_PARAMETER\r\n";
+ *m_textStream << "Public: ANNOUNCE, SETUP, RECORD, PAUSE, FLUSH, "
+ "TEARDOWN, OPTIONS, GET_PARAMETER, SET_PARAMETER. POST, GET\r\n";
}
else if (option == "ANNOUNCE")
{
- foreach (QByteArray line, lines)
+ QStringList lines = splitLines(content);
+ foreach (QString line, lines)
{
if (line.startsWith("a=rsaaeskey:"))
{
- QByteArray key = line.mid(12).trimmed();
- QByteArray decodedkey = QByteArray::fromBase64(key);
- LOG(VB_GENERAL, LOG_DEBUG, LOC + QString("RSAAESKey: %1 (decoded size %2)")
- .arg(key.data()).arg(decodedkey.size()));
+ QString key = line.mid(12).trimmed();
+ QByteArray decodedkey = QByteArray::fromBase64(key.toAscii());
+ LOG(VB_GENERAL, LOG_DEBUG, LOC +
+ QString("RSAAESKey: %1 (decoded size %2)")
+ .arg(key).arg(decodedkey.size()));
if (LoadKey())
{
int size = sizeof(char) * RSA_size(LoadKey());
char *decryptedkey = new char[size];
if (RSA_private_decrypt(decodedkey.size(),
- (const unsigned char*)decodedkey.data(),
+ (const unsigned char*)decodedkey.constData(),
(unsigned char*)decryptedkey,
LoadKey(), RSA_PKCS1_OAEP_PADDING))
{
@@ -609,21 +999,27 @@ void MythRAOPConnection::ProcessRequest(const QList<QByteArray> &lines)
}
else if (line.startsWith("a=aesiv:"))
{
- QByteArray aesiv = line.mid(8).trimmed();
- m_AESIV = QByteArray::fromBase64(aesiv.data());
- LOG(VB_GENERAL, LOG_DEBUG, LOC + QString("AESIV: %1 (decoded size %2)")
- .arg(aesiv.data()).arg(m_AESIV.size()));
+ QString aesiv = line.mid(8).trimmed();
+ m_AESIV = QByteArray::fromBase64(aesiv.toAscii());
+ LOG(VB_GENERAL, LOG_DEBUG, LOC +
+ QString("AESIV: %1 (decoded size %2)")
+ .arg(aesiv).arg(m_AESIV.size()));
}
else if (line.startsWith("a=fmtp:"))
{
m_audioFormat.clear();
- QByteArray format = line.mid(7).trimmed();
- QList<QByteArray> fmts = format.split(' ');
- foreach (QByteArray fmt, fmts)
+ QString format = line.mid(7).trimmed();
+ QList<QString> fmts = format.split(' ');
+ foreach (QString fmt, fmts)
m_audioFormat.append(fmt.toInt());
foreach (int fmt, m_audioFormat)
- LOG(VB_GENERAL, LOG_DEBUG, LOC + QString("Audio parameter: %1").arg(fmt));
+ LOG(VB_GENERAL, LOG_DEBUG, LOC +
+ QString("Audio parameter: %1").arg(fmt));
+ m_framesPerPacket = m_audioFormat[1];
+ m_sampleSize = m_audioFormat[3];
+ m_channels = m_audioFormat[7];
+ m_frameRate = m_audioFormat[11];
}
}
StartResponse(m_textStream, option, tags["CSeq"]);
@@ -636,6 +1032,7 @@ void MythRAOPConnection::ProcessRequest(const QList<QByteArray> &lines)
int timing_port = 0;
QString data = tags["Transport"];
QStringList items = data.split(";");
+
foreach (QString item, items)
{
if (item.startsWith("control_port"))
@@ -652,6 +1049,8 @@ void MythRAOPConnection::ProcessRequest(const QList<QByteArray> &lines)
QString("control port: %1 timing port: %2")
.arg(control_port).arg(timing_port));
+ m_peerAddress = m_socket->peerAddress();
+
if (m_clientControlSocket)
{
m_clientControlSocket->disconnect();
@@ -660,29 +1059,84 @@ void MythRAOPConnection::ProcessRequest(const QList<QByteArray> &lines)
}
m_clientControlSocket = new QUdpSocket(this);
- if (!m_clientControlSocket->bind(control_port))
+ int controlbind_port = findNextBindingPort(m_clientControlSocket,
+ control_port);
+ if (controlbind_port < 0)
{
LOG(VB_GENERAL, LOG_ERR, LOC +
- QString("Failed to bind to client control port %1. "
- "Control of audio stream may fail")
- .arg(control_port));
+ QString("Failed to bind to client control port. "
+ "Control of audio stream may fail"));
}
else
{
LOG(VB_GENERAL, LOG_INFO, LOC +
- QString("Bound to client control port %1").arg(control_port));
+ QString("Bound to client control port %1 on port %2")
+ .arg(control_port).arg(controlbind_port));
+ }
+ m_clientControlPort = control_port;
+ connect(m_clientControlSocket, SIGNAL(readyRead()),
+ this, SLOT(udpDataReady()));
+
+ if (m_clientTimingSocket)
+ {
+ m_clientTimingSocket->disconnect();
+ m_clientTimingSocket->close();
+ delete m_clientTimingSocket;
}
+ m_clientTimingSocket = new QUdpSocket(this);
+ int timingbind_port = findNextBindingPort(m_clientTimingSocket,
+ timing_port);
+ if (timingbind_port < 0)
+ {
+ LOG(VB_GENERAL, LOG_ERR, LOC +
+ QString("Failed to bind to client timing port. "
+ "Timing of audio stream will be incorrect"));
+ }
+ else
+ {
+ LOG(VB_GENERAL, LOG_INFO, LOC +
+ QString("Bound to client timing port %1 on port %2")
+ .arg(timing_port).arg(timingbind_port));
+ }
+ m_clientTimingPort = timing_port;
+ connect(m_clientTimingSocket, SIGNAL(readyRead()),
+ this, SLOT(udpDataReady()));
+
if (OpenAudioDevice())
CreateDecoder();
- m_peerAddress = m_socket->peerAddress();
- m_clientControlPort = control_port;
- connect(m_clientControlSocket, SIGNAL(readyRead()), this, SLOT(udpDataReady()));
+ // Recreate transport line with new ports value
+ QString newdata;
+ bool first = true;
+ foreach (QString item, items)
+ {
+ if (!first)
+ {
+ newdata += ";";
+ }
+ if (item.startsWith("control_port"))
+ {
+ newdata += "control_port=" + QString::number(controlbind_port);
+ }
+ else if (item.startsWith("timing_port"))
+ {
+ newdata += "timing_port=" + QString::number(timingbind_port);
+ }
+ else
+ {
+ newdata += item;
+ }
+ first = false;
+ }
+ if (!first)
+ {
+ newdata += ";";
+ }
+ newdata += "server_port=" + QString::number(m_dataPort);
StartResponse(m_textStream, option, tags["CSeq"]);
- *m_textStream << "Transport: " << tags["Transport"].data();
- *m_textStream << ";server_port=" << QString::number(m_dataPort);
+ *m_textStream << "Transport: " << newdata;
*m_textStream << "\r\nSession: MYTHTV\r\n";
}
else
@@ -693,65 +1147,131 @@ void MythRAOPConnection::ProcessRequest(const QList<QByteArray> &lines)
}
else if (option == "RECORD")
{
- StartResponse(m_textStream, option, tags["CSeq"]);
- }
- else if (option == "TEARDOWN")
- {
- *m_textStream << "Connection: close\r\n";
+ if (gotRTP)
+ {
+ m_nextSequence = RTPseq;
+ m_nextTimestamp = RTPtimestamp;
+ }
+ // Ask for master clock value to determine time skew and average network latency
+ SendTimeRequest();
StartResponse(m_textStream, option, tags["CSeq"]);
}
else if (option == "FLUSH")
{
+ if (gotRTP)
+ {
+ m_nextSequence = RTPseq;
+ m_nextTimestamp = RTPtimestamp;
+ m_currentTimestamp = m_nextTimestamp - m_bufferLength;
+ }
+ // determine RTP timestamp of last sample played
+ uint64_t timestamp = m_audioStarted && m_audio ?
+ m_audio->GetAudiotime() : m_lastTimestamp;
+ *m_textStream << "RTP-Info: rtptime=" << QString::number(timestamp);
+ m_streamingStarted = false;
ResetAudio();
- *m_textStream << "flush\r\n";
StartResponse(m_textStream, option, tags["CSeq"]);
}
else if (option == "SET_PARAMETER")
{
- foreach (QByteArray line, lines)
+ if (tags.contains("Content-Type"))
{
- StartResponse(m_textStream, option, tags["CSeq"]);
- if (line.startsWith("volume:") && m_allowVolumeControl && m_audio)
+ if (tags["Content-Type"] == "text/parameters")
{
- QByteArray rawvol = line.mid(7).trimmed();
- float volume = (rawvol.toFloat() + 30.0) / 0.3;
- if (volume < 0.01)
- volume = 0.0;
- LOG(VB_GENERAL, LOG_INFO, LOC + QString("Setting volume to %1 (raw %3)")
- .arg(volume).arg(rawvol.data()));
- m_audio->SetCurrentVolume((int)volume);
+ QString name = content.left(content.indexOf(":"));
+ QString param = content.mid(content.indexOf(":") + 1).trimmed();
+
+ LOG(VB_GENERAL, LOG_DEBUG, LOC +
+ QString("text/parameters: name=%1 parem=%2")
+ .arg(name).arg(param));
+
+ if (name == "volume" && m_allowVolumeControl && m_audio)
+ {
+ float volume = (param.toFloat() + 30.0f) * 100.0f / 30.0f;
+ if (volume < 0.01f)
+ volume = 0.0f;
+ LOG(VB_GENERAL, LOG_INFO,
+ LOC + QString("Setting volume to %1 (raw %3)")
+ .arg(volume).arg(param));
+ m_audio->SetCurrentVolume((int)volume);
+ }
+ else if (name == "progress")
+ {
+ QStringList items = param.split("/");
+ if (items.size() == 3)
+ {
+ m_progressStart = items[0].toUInt();
+ m_progressCurrent = items[1].toUInt();
+ m_progressEnd = items[2].toUInt();
+ }
+ int length =
+ (m_progressEnd-m_progressStart) / m_frameRate;
+ int current =
+ (m_progressCurrent-m_progressStart) / m_frameRate;
+
+ LOG(VB_GENERAL, LOG_INFO,
+ LOC +QString("Progress: %1/%2")
+ .arg(stringFromSeconds(current))
+ .arg(stringFromSeconds(length)));
+ }
+ }
+ else if(tags["Content-Type"] == "image/jpeg")
+ {
+ // Receiving image coverart
+ m_artwork = content;
+ }
+ else if (tags["Content-Type"] == "application/x-dmap-tagged")
+ {
+ // Receiving DMAP metadata
+ QMap<QString,QString> map = decodeDMAP(content);
+ LOG(VB_GENERAL, LOG_INFO,
+ QString("Receiving Title:%1 Artist:%2 Album:%3 Format:%4")
+ .arg(map["minm"]).arg(map["asar"])
+ .arg(map["asal"]).arg(map["asfm"]));
}
}
+ StartResponse(m_textStream, option, tags["CSeq"]);
+ }
+ else if (option == "TEARDOWN")
+ {
+ StartResponse(m_textStream, option, tags["CSeq"]);
+ *m_textStream << "Connection: close\r\n";
}
else
{
LOG(VB_GENERAL, LOG_DEBUG, LOC + QString("Command not handled: %1")
- .arg(option.data()));
+ .arg(option));
StartResponse(m_textStream, option, tags["CSeq"]);
}
FinishResponse(m_textStream, m_socket, option, tags["CSeq"]);
}
+
void MythRAOPConnection::StartResponse(NetStream *stream,
- QByteArray &option, QByteArray &cseq)
+ QString &option, QString &cseq)
{
if (!stream)
return;
LOG(VB_GENERAL, LOG_DEBUG, LOC + QString("%1 sequence %2")
- .arg(option.data()).arg(cseq.data()));
+ .arg(option).arg(cseq));
*stream << "Audio-Jack-Status: connected; type=analog\r\n";
*stream << "CSeq: " << cseq << "\r\n";
}
void MythRAOPConnection::FinishResponse(NetStream *stream, QTcpSocket *socket,
- QByteArray &option, QByteArray &cseq)
+ QString &option, QString &cseq)
{
*stream << "\r\n";
stream->flush();
LOG(VB_GENERAL, LOG_DEBUG, LOC + QString("Finished %1 %2 , Send: %3")
- .arg(option.data()).arg(cseq.data()).arg(socket->flush()));
+ .arg(option).arg(cseq).arg(socket->flush()));
}
+/**
+ * LoadKey. Load RSA key into static variable for re-using it later
+ * The RSA key is resident in memory for the entire duration of the application
+ * as such RSA_free is never called on it.
+ */
RSA* MythRAOPConnection::LoadKey(void)
{
static QMutex lock;
@@ -786,7 +1306,7 @@ RSA* MythRAOPConnection::LoadKey(void)
return NULL;
}
-RawHash MythRAOPConnection::FindTags(const QList<QByteArray> &lines)
+RawHash MythRAOPConnection::FindTags(const QStringList &lines)
{
RawHash result;
if (lines.isEmpty())
@@ -804,6 +1324,91 @@ RawHash MythRAOPConnection::FindTags(const QList<QByteArray> &lines)
return result;
}
+QStringList MythRAOPConnection::splitLines(const QByteArray &lines)
+{
+ QStringList list;
+ QTextStream stream(lines);
+
+ QString line;
+ do
+ {
+ line = stream.readLine();
+ if (!line.isNull())
+ {
+ list.append(line);
+ }
+ }
+ while (!line.isNull());
+
+ return list;
+}
+
+/**
+ * stringFromSeconds:
+ *
+ * Usage: stringFromSeconds(seconds)
+ * Description: create a string in the format HH:mm:ss from a duration in seconds
+ * HH: will not be displayed if there's less than one hour
+ */
+QString MythRAOPConnection::stringFromSeconds(int time)
+{
+ int hour = time / 3600;
+ int minute = (time - hour * 3600) / 60;
+ int seconds = time - hour * 3600 - minute * 60;
+ QString str;
+
+ if (hour)
+ {
+ str += QString("%1:").arg(hour);
+ }
+ if (minute < 10)
+ {
+ str += "0";
+ }
+ str += QString("%1:").arg(minute);
+ if (seconds < 10)
+ {
+ str += "0";
+ }
+ str += QString::number(seconds);
+ return str;
+}
+
+/**
+ * framesDuration
+ * Description: return the duration in ms of frames
+ *
+ */
+uint64_t MythRAOPConnection::framesToMs(uint64_t frames)
+{
+ return (frames * 1000ULL) / m_frameRate;
+}
+
+/**
+ * decodeDMAP:
+ *
+ * Usage: decodeDMAP(QByteArray &dmap)
+ * Description: decode the DMAP (Digital Media Access Protocol) object.
+ * The object returned is a map of the dmap tags and their associated content
+ */
+QMap<QString,QString> MythRAOPConnection::decodeDMAP(const QByteArray &dmap)
+{
+ QMap<QString,QString> result;
+ int offset = 8;
+ while (offset < dmap.size())
+ {
+ QString tag = dmap.mid(offset, 4);
+ offset += 4;
+ uint32_t length = qFromBigEndian(*(uint32_t *)(dmap.constData() + offset));
+ offset += sizeof(uint32_t);
+ QString content = QString::fromUtf8(dmap.mid(offset,
+ length).constData());
+ offset += length;
+ result.insert(tag, content);
+ }
+ return result;
+}
+
bool MythRAOPConnection::CreateDecoder(void)
{
DestroyDecoder();
@@ -816,7 +1421,8 @@ bool MythRAOPConnection::CreateDecoder(void)
m_codec = avcodec_find_decoder(CODEC_ID_ALAC);
if (!m_codec)
{
- LOG(VB_GENERAL, LOG_ERR, LOC + "Failed to create ALAC decoder- going silent...");
+ LOG(VB_GENERAL, LOG_ERR, LOC
+ + "Failed to create ALAC decoder- going silent...");
return false;
}
@@ -827,7 +1433,8 @@ bool MythRAOPConnection::CreateDecoder(void)
memset(extradata, 0, 36);
if (m_audioFormat.size() < 12)
{
- LOG(VB_GENERAL, LOG_ERR, LOC + "Creating decoder but haven't seen audio format.");
+ LOG(VB_GENERAL, LOG_ERR, LOC +
+ "Creating decoder but haven't seen audio format.");
}
else
{
@@ -847,7 +1454,8 @@ bool MythRAOPConnection::CreateDecoder(void)
m_codeccontext->channels = m_channels;
if (avcodec_open(m_codeccontext, m_codec) < 0)
{
- LOG(VB_GENERAL, LOG_ERR, LOC + "Failed to open ALAC decoder - going silent...");
+ LOG(VB_GENERAL, LOG_ERR, LOC +
+ "Failed to open ALAC decoder - going silent...");
DestroyDecoder();
return false;
}
@@ -872,21 +1480,17 @@ bool MythRAOPConnection::OpenAudioDevice(void)
{
CloseAudioDevice();
- m_sampleRate = m_audioFormat.size() >= 12 ? m_audioFormat[11] : DEFAULT_SAMPLE_RATE;
- m_channels = m_audioFormat[7] > 0 ? m_audioFormat[7] : 2;
- if (m_sampleRate < 1)
- m_sampleRate = DEFAULT_SAMPLE_RATE;
-
QString passthru = gCoreContext->GetNumSetting("PassThruDeviceOverride", false)
? gCoreContext->GetSetting("PassThruOutputDevice") : QString::null;
QString device = gCoreContext->GetSetting("AudioOutputDevice");
m_audio = AudioOutput::OpenAudio(device, passthru, FORMAT_S16, m_channels,
- 0, m_sampleRate, AUDIOOUTPUT_MUSIC,
+ 0, m_frameRate, AUDIOOUTPUT_MUSIC,
m_allowVolumeControl, false);
if (!m_audio)
{
- LOG(VB_GENERAL, LOG_ERR, LOC + "Failed to open audio device. Going silent...");
+ LOG(VB_GENERAL, LOG_ERR, LOC +
+ "Failed to open audio device. Going silent...");
CloseAudioDevice();
StartAudioTimer();
return false;
@@ -895,7 +1499,8 @@ bool MythRAOPConnection::OpenAudioDevice(void)
QString error = m_audio->GetError();
if (!error.isEmpty())
{
- LOG(VB_GENERAL, LOG_ERR, LOC + QString("Audio not initialised. Message was '%1'")
+ LOG(VB_GENERAL, LOG_ERR, LOC +
+ QString("Audio not initialised. Message was '%1'")
.arg(error));
CloseAudioDevice();
StartAudioTimer();
@@ -926,7 +1531,52 @@ void MythRAOPConnection::StartAudioTimer(void)
void MythRAOPConnection::StopAudioTimer(void)
{
if (m_audioTimer)
+ {
m_audioTimer->stop();
+ }
delete m_audioTimer;
m_audioTimer = NULL;
}
+
+/**
+ * AudioCardLatency:
+ * Description: Play silence and calculate audio latency between input / output
+ */
+int64_t MythRAOPConnection::AudioCardLatency(void)
+{
+ if (!m_audio)
+ return 0;
+
+ uint64_t timestamp = 123456;
+
+ int16_t *samples = (int16_t *)av_mallocz(AVCODEC_MAX_AUDIO_FRAME_SIZE);
+ int frames = AUDIOCARD_BUFFER * m_frameRate / 1000;
+ m_audio->AddData((char *)samples,
+ frames * (m_sampleSize>>3) * m_channels,
+ timestamp,
+ frames);
+ av_free(samples);
+ usleep(AUDIOCARD_BUFFER * 1000);
+ uint64_t audiots = m_audio->GetAudiotime();
+ return (int64_t)timestamp - (int64_t)audiots;
+}
+
+int MythRAOPConnection::findNextBindingPort(QUdpSocket *socket, int baseport)
+{
+ // try a few ports in case the first is in use
+ int port = baseport;
+ while (port < baseport + RAOP_PORT_RANGE)
+ {
+ if (socket->bind(port))
+ {
+ break;
+ }
+ port++;
+ }
+
+ if (port >= baseport + RAOP_PORT_RANGE)
+ {
+ return -1;
+ }
+ return port;
+}
View
141 mythtv/libs/libmythtv/mythraopconnection.h
@@ -5,6 +5,7 @@
#include <QMap>
#include <QHash>
#include <QHostAddress>
+#include <QStringList>
#include <openssl/rsa.h>
#include <openssl/pem.h>
@@ -21,8 +22,23 @@ class QTimer;
class AudioOutput;
class ServerPool;
class NetStream;
+class AudioData;
+struct AudioData;
-typedef QHash<QByteArray,QByteArray> RawHash;
+typedef QHash<QString,QString> RawHash;
+
+struct AudioData
+{
+ int16_t *data;
+ int32_t length;
+ int32_t frames;
+};
+
+struct AudioPacket
+{
+ uint16_t seq;
+ QList<AudioData> *data;
+};
class MythRAOPConnection : public QObject
{
@@ -38,6 +54,7 @@ class MythRAOPConnection : public QObject
QTcpSocket* GetSocket() { return m_socket; }
int GetDataPort() { return m_dataPort; }
bool HasAudio() { return m_audio; }
+ static QMap<QString,QString> decodeDMAP(const QByteArray &dmap);
public slots:
void readClient(void);
@@ -50,38 +67,63 @@ class MythRAOPConnection : public QObject
static RSA* LoadKey(void);
private:
- uint64_t FramesToMs(uint64_t timestamp);
- void ProcessSyncPacket(const QByteArray &buf, uint64_t timenow);
- void SendResendRequest(uint64_t timenow, uint16_t expected,
- uint16_t got);
- void ExpireResendRequests(uint64_t timenow);
- int ExpireAudio(uint64_t timestamp);
- void ProcessAudio(uint64_t timenow);
- void ResetAudio(void);
- void ProcessRequest(const QList<QByteArray> &lines);
- void StartResponse(NetStream *stream,
- QByteArray &option, QByteArray &cseq);
- void FinishResponse(NetStream *stream, QTcpSocket *socket,
- QByteArray &option, QByteArray &cseq);
- RawHash FindTags(const QList<QByteArray> &lines);
- bool CreateDecoder(void);
- void DestroyDecoder(void);
- bool OpenAudioDevice(void);
- void CloseAudioDevice(void);
- void StartAudioTimer(void);
- void StopAudioTimer(void);
+ void ProcessSync(const QByteArray &buf);
+ void SendResendRequest(uint64_t timestamp,
+ uint16_t expected, uint16_t got);
+ void ExpireResendRequests(uint64_t timestamp);
+ uint32_t decodeAudioPacket(uint8_t type, const QByteArray *buf,
+ QList<AudioData> *dest);
+ void ProcessAudio();
+ int ExpireAudio(uint64_t timestamp);
+ void ResetAudio(void);
+ void ProcessRequest(const QStringList &header,
+ const QByteArray &content);
+ void StartResponse(NetStream *stream,
+ QString &option, QString &cseq);
+ void FinishResponse(NetStream *stream, QTcpSocket *socket,
+ QString &option, QString &cseq);
+ RawHash FindTags(const QStringList &lines);
+ bool CreateDecoder(void);
+ void DestroyDecoder(void);
+ bool OpenAudioDevice(void);
+ void CloseAudioDevice(void);
+ void StartAudioTimer(void);
+ void StopAudioTimer(void);
+
+ // time sync
+ void SendTimeRequest(void);
+ void ProcessTimeResponse(const QByteArray &buf);
+ uint64_t NTPToLocal(uint32_t sec, uint32_t ticks);
+
+ // incoming data packet
+ bool GetPacketType(const QByteArray &buf, uint8_t &type,
+ uint16_t &seq, uint64_t &timestamp);
+
+ // utility functions
+ int64_t AudioCardLatency(void);
+ int findNextBindingPort(QUdpSocket *socket, int port);
+ QStringList splitLines(const QByteArray &lines);
+ QString stringFromSeconds(int seconds);
+ uint64_t framesToMs(uint64_t frames);
QTimer *m_watchdogTimer;
// comms socket
QTcpSocket *m_socket;
NetStream *m_textStream;
QByteArray m_hardwareId;
- // incoming audio
+ QStringList m_incomingHeaders;
+ QByteArray m_incomingContent;
+ bool m_incomingPartial;
+ int32_t m_incomingSize;
QHostAddress m_peerAddress;
- int m_dataPort;
ServerPool *m_dataSocket;
+ int m_dataPort;
QUdpSocket *m_clientControlSocket;
int m_clientControlPort;
+ QUdpSocket *m_clientTimingSocket;
+ int m_clientTimingPort;
+
+ // incoming audio
QMap<uint16_t,uint64_t> m_resends;
// crypto
QByteArray m_AESIV;
@@ -92,31 +134,46 @@ class MythRAOPConnection : public QObject
AVCodec *m_codec;
AVCodecContext *m_codeccontext;
QList<int> m_audioFormat;
- int m_sampleRate;
int m_channels;
- typedef struct
- {
- int16_t *samples;
- uint32_t frames;
- uint32_t size;
- } AudioFrame;
-
- QMap<uint64_t, AudioFrame> m_audioQueue;
+ int m_sampleSize;
+ int m_frameRate;
+ int m_framesPerPacket;
+ QTimer *m_dequeueAudioTimer;
+
+ QMap<uint64_t, AudioPacket> m_audioQueue;
uint32_t m_queueLength;
+ bool m_streamingStarted;
bool m_allowVolumeControl;
+
+ // packet index, increase after each resend packet request
+ uint16_t m_seqNum;
// audio/packet sync
- bool m_seenPacket;
- int16_t m_lastPacketSequence;
- uint64_t m_lastPacketTimestamp;
- uint64_t m_lastSyncTime;
- uint64_t m_lastSyncTimestamp;
- uint64_t m_lastLatency;
- uint64_t m_latencyAudio;
- uint64_t m_latencyQueued;
- uint64_t m_latencyCounter;
- int64_t m_avSync;
+ uint16_t m_lastSequence;
+ uint64_t m_lastTimestamp;
+ uint64_t m_currentTimestamp;
+ uint16_t m_nextSequence;
+ uint64_t m_nextTimestamp;
+ uint64_t m_bufferLength;
+ uint64_t m_timeLastSync;
+ int64_t m_cardLatency;
+ int64_t m_adjustedLatency;
+ bool m_audioStarted;
+
+ // clock sync
+ uint64_t m_masterTimeStamp;
+ uint64_t m_deviceTimeStamp;
+ uint64_t m_networkLatency;
+ int64_t m_clockSkew; // difference in ms between reference
+
// audio retry timer
QTimer *m_audioTimer;
+
+ //Current Stream Info
+ uint32_t m_progressStart;
+ uint32_t m_progressCurrent;
+ uint32_t m_progressEnd;
+ QByteArray m_artwork;
+ QByteArray m_dmap;
};
#endif // MYTHRAOPCONNECTION_H
View
22 mythtv/libs/libmythtv/mythraopdevice.cpp
@@ -161,15 +161,18 @@ void MythRAOPDevice::Start(void)
txt.append(6); txt.append("tp=UDP");
txt.append(8); txt.append("sm=false");
txt.append(8); txt.append("sv=false");
- txt.append(4); txt.append("ek=1");
- txt.append(6); txt.append("et=0,1");
- txt.append(6); txt.append("cn=0,1");
- txt.append(4); txt.append("ch=2");
- txt.append(5); txt.append("ss=16");
- txt.append(8); txt.append("sr=44100");
- txt.append(8); txt.append("pw=false");
+ txt.append(4); txt.append("ek=1"); //
+ txt.append(6); txt.append("et=0,1"); // encryption type: no, RSA
+ txt.append(6); txt.append("cn=0,1"); // audio codec: pcm, alac
+ txt.append(4); txt.append("ch=2"); // audio channels
+ txt.append(5); txt.append("ss=16"); // sample size
+ txt.append(8); txt.append("sr=44100"); // sample rate
+ txt.append(8); txt.append("pw=false"); // no password
txt.append(4); txt.append("vn=3");
- txt.append(9); txt.append("txtvers=1");
+ txt.append(9); txt.append("txtvers=1"); // TXT record version 1
+ txt.append(8); txt.append("md=0,1,2"); // metadata-type: text, artwork, progress
+ txt.append(9); txt.append("vs=130.14");
+ txt.append(7); txt.append("da=true");
LOG(VB_GENERAL, LOG_INFO, QString("Registering service %1.%2 port %3 TXT %4")
.arg(QString(name)).arg(QString(type)).arg(m_setupPort).arg(QString(txt)));
@@ -224,7 +227,8 @@ void MythRAOPDevice::newConnection(QTcpSocket *client)
return;
}
- LOG(VB_GENERAL, LOG_ERR, LOC + "Failed to initialise client connection - closing.");
+ LOG(VB_GENERAL, LOG_ERR, LOC +
+ "Failed to initialise client connection - closing.");
delete obj;
client->disconnectFromHost();
delete client;

0 comments on commit bd883d7

Please sign in to comment.