Skip to content

Commit

Permalink
[core] Fix m_GroupOf->updateReadState() in message mode (#2204)
Browse files Browse the repository at this point in the history
Fix ackDataUpTo(..) with the old RCV buffer.


Co-authored-by: hondaxiao <hondaxiao@tencent.com>
Co-authored-by: Maxim Sharabayko <maxsharabayko@haivision.com>
  • Loading branch information
3 people committed Dec 8, 2021
1 parent 545700f commit c8cb38f
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 13 deletions.
6 changes: 4 additions & 2 deletions srtcore/buffer_rcv.h
Expand Up @@ -163,8 +163,10 @@ class CRcvBufferNew
/// IF skipseqno == -1, no missing packet but 1st not ready to play.
PacketInfo getFirstValidPacketInfo() const;

/// Get information on the packets available to be read
/// @returns a pair of sequence numbers
/// Get information on packets available to be read.
/// @returns a pair of sequence numbers (first available; first unavailable).
///
/// @note CSeqNo::seqoff(first, second) is 0 if nothing to read.
std::pair<int, int> getAvailablePacketsRange() const;

size_t countReadable() const;
Expand Down
16 changes: 5 additions & 11 deletions srtcore/core.cpp
Expand Up @@ -7734,7 +7734,7 @@ void srt::CUDT::releaseSynch()
// [[using locked(m_RcvBufferLock)]];
int32_t srt::CUDT::ackDataUpTo(int32_t ack)
{
const int acksize = CSeqNo::seqoff(m_iRcvLastSkipAck, ack);
const int acksize SRT_ATR_UNUSED = CSeqNo::seqoff(m_iRcvLastSkipAck, ack);

HLOGC(xtlog.Debug, log << "ackDataUpTo: %" << m_iRcvLastSkipAck << " -> %" << ack
<< " (" << acksize << " packets)");
Expand All @@ -7750,21 +7750,15 @@ int32_t srt::CUDT::ackDataUpTo(int32_t ack)
LOGC(xtlog.Error, log << "IPE: Acknowledged seqno %" << ack << " outruns the RCV buffer state %" << range.first
<< " - %" << range.second);
}
return acksize;
return CSeqNo::decseq(range.second);
#else
// NOTE: This is new towards UDT and prevents spurious
// wakeup of select/epoll functions when no new packets
// were signed off for extraction.
if (acksize > 0)
{
const int distance = m_pRcvBuffer->ackData(acksize);
return CSeqNo::decseq(ack, distance);
m_pRcvBuffer->ackData(acksize);
}

// If nothing was confirmed, then use the current buffer span
const int distance = m_pRcvBuffer->getRcvDataSize();
if (distance > 0)
return CSeqNo::decseq(ack, distance);
return ack;
#endif
}
Expand Down Expand Up @@ -8006,7 +8000,7 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size)
// IF ack %> m_iRcvLastAck
if (CSeqNo::seqcmp(ack, m_iRcvLastAck) > 0)
{
const int32_t first_seq SRT_ATR_UNUSED = ackDataUpTo(ack);
const int32_t group_read_seq SRT_ATR_UNUSED = ackDataUpTo(ack);
InvertedLock un_bufflock (m_RcvBufferLock);

#if ENABLE_EXPERIMENTAL_BONDING
Expand Down Expand Up @@ -8101,7 +8095,7 @@ int srt::CUDT::sendCtrlAck(CPacket& ctrlpkt, int size)
// The current "APP reader" needs to simply decide as to whether
// the next CUDTGroup::recv() call should return with no blocking or not.
// When the group is read-ready, it should update its pollers as it sees fit.
m_parent->m_GroupOf->updateReadState(m_SocketID, first_seq);
m_parent->m_GroupOf->updateReadState(m_SocketID, group_read_seq);
}
}
#endif
Expand Down
6 changes: 6 additions & 0 deletions srtcore/core.h
Expand Up @@ -1043,7 +1043,13 @@ class CUDT
int processConnectRequest(const sockaddr_any& addr, CPacket& packet);
static void addLossRecord(std::vector<int32_t>& lossrecord, int32_t lo, int32_t hi);
int32_t bake(const sockaddr_any& addr, int32_t previous_cookie = 0, int correction = 0);

/// @brief Acknowledge reading position up to the @p seq.
/// Updates m_iRcvLastAck and m_iRcvLastSkipAck to @p seq.
/// @param seq first unacknowledged packet sequence number.
/// @return
int32_t ackDataUpTo(int32_t seq);

void handleKeepalive(const char* data, size_t lenghth);

/// Locks m_RcvBufferLock and retrieves the available size of the receiver buffer.
Expand Down
5 changes: 5 additions & 0 deletions srtcore/group.h
Expand Up @@ -341,7 +341,12 @@ class CUDTGroup
void addEPoll(int eid);
void removeEPollEvents(const int eid);
void removeEPollID(const int eid);

/// @brief Update read-ready state.
/// @param sock member socket ID (unused)
/// @param sequence the latest packet sequence number available for reading.
void updateReadState(SRTSOCKET sock, int32_t sequence);

void updateWriteState();
void updateFailedLink();
void activateUpdateEvent(bool still_have_items);
Expand Down

0 comments on commit c8cb38f

Please sign in to comment.