Skip to content

Commit

Permalink
[core] Refactored CSndQueue::worker(...)
Browse files Browse the repository at this point in the history
Deleted unused m_ExitCond from CSndQueue and CRcvQueue
  • Loading branch information
maxsharabayko authored and rndi committed Oct 16, 2019
1 parent d389ff6 commit d7da7f5
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 64 deletions.
124 changes: 63 additions & 61 deletions srtcore/queue.cpp
Expand Up @@ -484,8 +484,7 @@ m_pChannel(NULL),
m_pTimer(NULL),
m_WindowLock(),
m_WindowCond(),
m_bClosing(false),
m_ExitCond()
m_bClosing(false)
{
pthread_cond_init(&m_WindowCond, NULL);
pthread_mutex_init(&m_WindowLock, NULL);
Expand Down Expand Up @@ -554,69 +553,13 @@ void* CSndQueue::worker(void* param)

while (!self->m_bClosing)
{
uint64_t ts = self->m_pSndUList->getNextProcTime();
uint64_t next_time = self->m_pSndUList->getNextProcTime();

#if defined(SRT_DEBUG_SNDQ_HIGHRATE)
self->m_WorkerStats.lIteration++;
#endif /* SRT_DEBUG_SNDQ_HIGHRATE */

if (ts > 0)
{
// wait until next processing time of the first socket on the list
uint64_t currtime;
CTimer::rdtsc(currtime);

#if defined(SRT_DEBUG_SNDQ_HIGHRATE)
if (self->m_ullDbgTime <= currtime) {
fprintf(stdout, "SndQueue %lu slt:%lu nrp:%lu snt:%lu nrt:%lu ctw:%lu\n",
self->m_WorkerStats.lIteration,
self->m_WorkerStats.lSleepTo,
self->m_WorkerStats.lNotReadyPop,
self->m_WorkerStats.lSendTo,
self->m_WorkerStats.lNotReadyTs,
self->m_WorkerStats.lCondWait);
memset(&self->m_WorkerStats, 0, sizeof(self->m_WorkerStats));
self->m_ullDbgTime = currtime + self->m_ullDbgPeriod;
}
#endif /* SRT_DEBUG_SNDQ_HIGHRATE */

THREAD_PAUSED();
if (currtime < ts)
{
self->m_pTimer->sleepto(ts);

#if defined(HAI_DEBUG_SNDQ_HIGHRATE)
self->m_WorkerStats.lSleepTo++;
#endif /* SRT_DEBUG_SNDQ_HIGHRATE */
}
THREAD_RESUMED();

// it is time to send the next pkt
sockaddr* addr;
CPacket pkt;
if (self->m_pSndUList->pop(addr, pkt) < 0)
{
continue;

#if defined(SRT_DEBUG_SNDQ_HIGHRATE)
self->m_WorkerStats.lNotReadyPop++;
#endif /* SRT_DEBUG_SNDQ_HIGHRATE */
}
if ( pkt.isControl() )
{
HLOGC(mglog.Debug, log << self->CONID() << "chn:SENDING: " << MessageTypeStr(pkt.getType(), pkt.getExtendedType()));
}
else
{
HLOGC(dlog.Debug, log << self->CONID() << "chn:SENDING SIZE " << pkt.getLength() << " SEQ: " << pkt.getSeqNo());
}
self->m_pChannel->sendto(addr, pkt);

#if defined(SRT_DEBUG_SNDQ_HIGHRATE)
self->m_WorkerStats.lSendTo++;
#endif /* SRT_DEBUG_SNDQ_HIGHRATE */
}
else
if (next_time <= 0)
{
#if defined(SRT_DEBUG_SNDQ_HIGHRATE)
self->m_WorkerStats.lNotReadyTs++;
Expand All @@ -634,7 +577,67 @@ void* CSndQueue::worker(void* param)
}
THREAD_RESUMED();
pthread_mutex_unlock(&self->m_WindowLock);

continue;
}

// wait until next processing time of the first socket on the list
uint64_t currtime;
CTimer::rdtsc(currtime);

#if defined(SRT_DEBUG_SNDQ_HIGHRATE)
if (self->m_ullDbgTime <= currtime)
{
fprintf(stdout,
"SndQueue %lu slt:%lu nrp:%lu snt:%lu nrt:%lu ctw:%lu\n",
self->m_WorkerStats.lIteration,
self->m_WorkerStats.lSleepTo,
self->m_WorkerStats.lNotReadyPop,
self->m_WorkerStats.lSendTo,
self->m_WorkerStats.lNotReadyTs,
self->m_WorkerStats.lCondWait);
memset(&self->m_WorkerStats, 0, sizeof(self->m_WorkerStats));
self->m_ullDbgTime = currtime + self->m_ullDbgPeriod;
}
#endif /* SRT_DEBUG_SNDQ_HIGHRATE */

THREAD_PAUSED();
if (currtime < next_time)
{
self->m_pTimer->sleepto(next_time);

#if defined(HAI_DEBUG_SNDQ_HIGHRATE)
self->m_WorkerStats.lSleepTo++;
#endif /* SRT_DEBUG_SNDQ_HIGHRATE */
}
THREAD_RESUMED();

// it is time to send the next pkt
sockaddr* addr;
CPacket pkt;
if (self->m_pSndUList->pop(addr, pkt) < 0)
{
continue;

#if defined(SRT_DEBUG_SNDQ_HIGHRATE)
self->m_WorkerStats.lNotReadyPop++;
#endif /* SRT_DEBUG_SNDQ_HIGHRATE */
}
if (pkt.isControl())
{
HLOGC(mglog.Debug,
log << self->CONID() << "chn:SENDING: " << MessageTypeStr(pkt.getType(), pkt.getExtendedType()));
}
else
{
HLOGC(dlog.Debug,
log << self->CONID() << "chn:SENDING SIZE " << pkt.getLength() << " SEQ: " << pkt.getSeqNo());
}
self->m_pChannel->sendto(addr, pkt);

#if defined(SRT_DEBUG_SNDQ_HIGHRATE)
self->m_WorkerStats.lSendTo++;
#endif /* SRT_DEBUG_SNDQ_HIGHRATE */
}

THREAD_EXIT();
Expand Down Expand Up @@ -1054,7 +1057,6 @@ CRcvQueue::CRcvQueue():
m_pTimer(NULL),
m_iPayloadSize(),
m_bClosing(false),
m_ExitCond(),
m_LSLock(),
m_pListener(NULL),
m_pRendezvousQueue(NULL),
Expand Down
4 changes: 1 addition & 3 deletions srtcore/queue.h
Expand Up @@ -168,7 +168,7 @@ friend class CSndQueue;

/// Update the timestamp of the UDT instance on the list.
/// @param [in] u pointer to the UDT instance
/// @param [in] resechedule if the timestampe shoudl be rescheduled
/// @param [in] reschedule if the timestamp should be rescheduled

void update(const CUDT* u, EReschedule reschedule);

Expand Down Expand Up @@ -415,7 +415,6 @@ friend class CUDTUnited;
pthread_cond_t m_WindowCond;

volatile bool m_bClosing; // closing the worker
pthread_cond_t m_ExitCond;

#if defined(SRT_DEBUG_SNDQ_HIGHRATE)//>>debug high freq worker
uint64_t m_ullDbgPeriod;
Expand Down Expand Up @@ -494,7 +493,6 @@ friend class CUDTUnited;
int m_iPayloadSize; // packet payload size

volatile bool m_bClosing; // closing the worker
pthread_cond_t m_ExitCond;

private:
int setListener(CUDT* u);
Expand Down

0 comments on commit d7da7f5

Please sign in to comment.