Skip to content

Commit

Permalink
handling of conflicting subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
Glenn-1990 committed Jul 15, 2015
1 parent f5c5afb commit 5d78935
Show file tree
Hide file tree
Showing 8 changed files with 284 additions and 27 deletions.
56 changes: 56 additions & 0 deletions pvr.hts/resources/language/resource.language.en_gb/strings.po
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,59 @@ msgctxt "#30361"
msgid "Record max once per day"
msgstr ""

#empty strings from id 30362 to 30399

#. Subscription states representation

msgctxt "#30400"
msgid "No free adapter available!"
msgstr ""

msgctxt "#30401"
msgid "Livestream aborted, adapter stolen by other subscription!"
msgstr ""

msgctxt "#30402"
msgid "Scrambled channel!"
msgstr ""

msgctxt "#30403"
msgid "No signal!"
msgstr ""

msgctxt "#30404"
msgid "Subscription error!"
msgstr ""

msgctxt "#30405"
msgid "Failed to hijack an adapter!"
msgstr ""

msgctxt "#30406"
msgid "User limit reached!"
msgstr ""

#. Subscription conflict dialog

msgctxt "#30407"
msgid "Do you want to increase the priority to hijack an adapter?"
msgstr ""

msgctxt "#30408"
msgid "WARNING: this can abort an active recording or livestream!"
msgstr ""

msgctxt "#30409"
msgid "Ignore"
msgstr ""

msgctxt "#30410"
msgid "Increase priority"
msgstr ""

msgctxt "#30411"
msgid "Subscription conflict!"
msgstr ""



8 changes: 7 additions & 1 deletion src/HTSPConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,14 @@ htsmsg_t *CHTSPConnection::SendAndWait0 ( const char *method, htsmsg_t *msg, int
}

/* Wait for response */
msg = resp.Get(m_mutex, iResponseTimeout);
msg = resp.Get(m_mutex, iResponseTimeout > 0 ? iResponseTimeout : 1);

This comment has been minimized.

Copy link
@Jalle19

Jalle19 Jul 23, 2015

iResponseTimeout can never be 0, or at least it shouldn't be. That makes these checks unnecessary.

m_messages.erase(seq);

/* No response needed */
if (!iResponseTimeout && !msg)
return NULL;

/* No response received */
if (!msg)
{
//XBMC->QueueNotification(QUEUE_ERROR, "Command %s failed: No response received", method);
Expand Down
179 changes: 166 additions & 13 deletions src/HTSPDemuxer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "platform/threads/atomics.h"
#include "platform/util/timeutils.h"
#include "platform/sockets/tcp.h"
#include <thread>

extern "C" {
#include "libhts/htsmsg_binary.h"
Expand All @@ -40,6 +41,26 @@ using namespace std;
using namespace ADDON;
using namespace PLATFORM;

bool bDialogForceStart = false;

void DialogLivestreamAborted( void )
{
bDialogForceStart = false;
bDialogForceStart = GUI->Dialog_YesNo_ShowAndGetInput(
XBMC->GetLocalizedString(30411),XBMC->GetLocalizedString(30401),
XBMC->GetLocalizedString(30407),XBMC->GetLocalizedString(30408),
XBMC->GetLocalizedString(30409),XBMC->GetLocalizedString(30410));
}

void DialogLivestreamNostart( void )
{
bDialogForceStart = false;
bDialogForceStart = GUI->Dialog_YesNo_ShowAndGetInput(
XBMC->GetLocalizedString(30411),XBMC->GetLocalizedString(30400),
XBMC->GetLocalizedString(30407),XBMC->GetLocalizedString(30408),
XBMC->GetLocalizedString(30409),XBMC->GetLocalizedString(30410));
}

CHTSPDemuxer::CHTSPDemuxer ( CHTSPConnection &conn )
: m_conn(conn), m_pktBuffer((size_t)-1),
m_seekTime(INVALID_SEEKTIME)
Expand All @@ -53,7 +74,7 @@ CHTSPDemuxer::~CHTSPDemuxer ( void )
void CHTSPDemuxer::Connected ( void )
{
/* Re-subscribe */
if (m_subscription.active)
if (!m_subscription.state == SUBSCRIPTION_STOPPED)

This comment has been minimized.

Copy link
@Jalle19

Jalle19 Jul 23, 2015

Exclamation sign is in the wrong place, same a few lines down.

{
tvhdebug("demux re-starting stream");
SendSubscribe(true);
Expand All @@ -68,7 +89,7 @@ void CHTSPDemuxer::Connected ( void )
void CHTSPDemuxer::Close0 ( void )
{
/* Send unsubscribe */
if (m_subscription.active)
if (!m_subscription.state == SUBSCRIPTION_STOPPED)
SendUnsubscribe();

/* Clear */
Expand Down Expand Up @@ -100,10 +121,10 @@ bool CHTSPDemuxer::Open ( const PVR_CHANNEL &chn )
SendSubscribe();

/* Send unsubscribe if subscribing failed */
if (!m_subscription.active)
if (m_subscription.state == SUBSCRIPTION_STOPPED)
SendUnsubscribe();

return m_subscription.active;
return (!m_subscription.state == SUBSCRIPTION_STOPPED);
}

void CHTSPDemuxer::Close ( void )
Expand Down Expand Up @@ -147,7 +168,7 @@ bool CHTSPDemuxer::Seek
htsmsg_t *m;

CLockObject lock(m_conn.Mutex());
if (!m_subscription.active)
if (m_subscription.state == SUBSCRIPTION_STOPPED)
return false;

tvhdebug("demux seek %d", time);
Expand Down Expand Up @@ -185,7 +206,7 @@ bool CHTSPDemuxer::Seek
void CHTSPDemuxer::Speed ( int speed )
{
CLockObject lock(m_conn.Mutex());
if (!m_subscription.active)
if (m_subscription.state == SUBSCRIPTION_STOPPED)
return;
m_subscription.speed = speed;
SendSpeed();
Expand Down Expand Up @@ -254,7 +275,10 @@ void CHTSPDemuxer::SendSubscribe ( bool force )

htsmsg_destroy(m);

m_subscription.active = true;
m_subscription.state = SUBSCRIPTION_STARTING;
m_subscription.weight = 0; // use the default weight

This comment has been minimized.

Copy link
@Jalle19

Jalle19 Jul 23, 2015

I believe there are constants for subscription weights now that the predictive tuning PR was merged.

time(&m_subscription.start); // set start time

tvhdebug("demux successfully subscribed to %d", m_subscription.channelId);
}

Expand All @@ -267,7 +291,7 @@ void CHTSPDemuxer::SendUnsubscribe ( void )
htsmsg_add_u32(m, "subscriptionId", m_subscription.subscriptionId);

/* Mark subscription as inactive immediately in case this command fails */
m_subscription.active = false;
m_subscription.state = SUBSCRIPTION_STOPPED;

/* Send and Wait */
tvhdebug("demux unsubscribe from %d", m_subscription.channelId);
Expand Down Expand Up @@ -298,6 +322,30 @@ void CHTSPDemuxer::SendSpeed ( bool force )
htsmsg_destroy(m);
}

void CHTSPDemuxer::SendWeight ( int weight )
{
if (m_subscription.state == SUBSCRIPTION_STOPPED)
return;

htsmsg_t *m;

/* Save current weight */
m_subscription.weight = weight;

/* Build message */
m = htsmsg_create_map();
htsmsg_add_u32(m, "subscriptionId", m_subscription.subscriptionId);
htsmsg_add_u32(m, "weight", m_subscription.weight);

tvhdebug("demux send weight %u", m_subscription.weight);

/* Send and Wait */
if ((m = m_conn.SendAndWait0("subscriptionChangeWeight", m,0)) == NULL)
return;

htsmsg_destroy(m);
}

/* **************************************************************************
* Parse incoming data
* *************************************************************************/
Expand Down Expand Up @@ -353,7 +401,7 @@ void CHTSPDemuxer::ParseMuxPacket ( htsmsg_t *m )
int iStreamId;

/* Ignore packets while switching channels */
if (!m_subscription.active)
if (m_subscription.state == SUBSCRIPTION_STOPPED)
{
tvhdebug("Ignored mux packet due to channel switch");
return;
Expand Down Expand Up @@ -586,15 +634,120 @@ void CHTSPDemuxer::ParseSubscriptionSpeed ( htsmsg_t *m )

void CHTSPDemuxer::ParseSubscriptionStatus ( htsmsg_t *m )
{
const char *status;
if (m_subscription.state == SUBSCRIPTION_STOPPED)
return;

const char *status, *error;
status = htsmsg_get_str(m, "status");
error = htsmsg_get_str(m, "subscriptionError");

// this field is absent when everything is fine
if (status != NULL)
{
tvhinfo("Bad subscription status: %s", status);
XBMC->QueueNotification(QUEUE_INFO, status);

/* 'subscriptionError' was added in htsp v20 */
/* Use 'status' for older tvheadend backends */
if (m_conn.GetProtocol() >= 20)
{
if (error != NULL)
{
if (!strcmp("badSignal", error))
m_subscription.state = SUBSCRIPTION_NOSIGNAL;
else if (!strcmp("scrambled", error))
m_subscription.state = SUBSCRIPTION_SCRAMBLED;
else if (!strcmp("userLimit", error))
m_subscription.state = SUBSCRIPTION_USERLIMIT;
else if (!strcmp("noFreeAdapter", error))
{
/* No free adapter, AKA subscription conflict */
SubscriptionConflict();
}
else
m_subscription.state = SUBSCRIPTION_UNKNOWN;

/* Show an OSD message */
ShowSubscriptionError();
}
else
m_subscription.state = SUBSCRIPTION_RUNNING;
}
else
{
// this field is absent when everything is fine
if (status != NULL)
{
XBMC->QueueNotification(QUEUE_INFO, status);
m_subscription.state = SUBSCRIPTION_UNKNOWN;
}
else
m_subscription.state = SUBSCRIPTION_RUNNING;
}
}

void CHTSPDemuxer::SubscriptionConflict ( void )
{
if (m_subscription.state == SUBSCRIPTION_RUNNING)
{
/* Subscription was running before, but the adapter got stolen by another subscription */
/* Ask user if he wants to hijack an adapter back by increasing ist's weight */
std::thread job(DialogLivestreamAborted);
job.detach();
m_subscription.state = SUBSCRIPTION_NOFREEADAPTER_ABORT;
}
else if (m_subscription.state == SUBSCRIPTION_STARTING ||
m_subscription.state == SUBSCRIPTION_NOFREEADAPTER_NOSTART_WAITING)
{
time_t now;
time(&now);

/* No free adapter found to start this channel */
/* Ask user if he wants to hijack an adapter after 15s */
if ((now-m_subscription.start) > 15)
{
std::thread job(DialogLivestreamNostart);
job.detach();
m_subscription.state = SUBSCRIPTION_NOFREEADAPTER_NOSTART;
}
else
m_subscription.state = SUBSCRIPTION_NOFREEADAPTER_NOSTART_WAITING;
}
else if (m_subscription.state == SUBSCRIPTION_NOFREEADAPTER_NOSTART ||
m_subscription.state == SUBSCRIPTION_NOFREEADAPTER_ABORT )
{
if (bDialogForceStart)
{
bDialogForceStart = false;
SendWeight(100);
m_subscription.state = SUBSCRIPTION_FORCERUNNING;
}
}
else if (m_subscription.state == SUBSCRIPTION_FORCERUNNING)
{
/* Increase weight in steps of 25 till we are running */
/* Limit weight to +600 (Highest priority in tvh) */
if (m_subscription.weight <= 600)
SendWeight(m_subscription.weight + 25);
else
m_subscription.state = SUBSCRIPTION_FORCERUNNING_FAILED;
}
}

void CHTSPDemuxer::ShowSubscriptionError ( void )
{
if (m_subscription.state == SUBSCRIPTION_NOFREEADAPTER_NOSTART ||
m_subscription.state == SUBSCRIPTION_NOFREEADAPTER_NOSTART_WAITING)
XBMC->QueueNotification(QUEUE_WARNING, XBMC->GetLocalizedString(30400));
else if (m_subscription.state == SUBSCRIPTION_NOFREEADAPTER_ABORT)
XBMC->QueueNotification(QUEUE_WARNING, XBMC->GetLocalizedString(30401));
else if (m_subscription.state == SUBSCRIPTION_SCRAMBLED)
XBMC->QueueNotification(QUEUE_WARNING, XBMC->GetLocalizedString(30402));
else if (m_subscription.state == SUBSCRIPTION_NOSIGNAL)
XBMC->QueueNotification(QUEUE_WARNING, XBMC->GetLocalizedString(30403));
else if (m_subscription.state == SUBSCRIPTION_UNKNOWN)
XBMC->QueueNotification(QUEUE_ERROR, XBMC->GetLocalizedString(30404));
else if (m_subscription.state == SUBSCRIPTION_FORCERUNNING_FAILED)
XBMC->QueueNotification(QUEUE_WARNING, XBMC->GetLocalizedString(30405));
else if (m_subscription.state == SUBSCRIPTION_USERLIMIT)
XBMC->QueueNotification(QUEUE_WARNING, XBMC->GetLocalizedString(30406));
}

void CHTSPDemuxer::ParseQueueStatus ( htsmsg_t *_unused(m) )
Expand Down

0 comments on commit 5d78935

Please sign in to comment.