Skip to content

Commit

Permalink
[core] Fixed setting the peer rexmit flag on the RCV buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
maxsharabayko committed Feb 7, 2022
1 parent 5b7ac45 commit ac854f2
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 15 deletions.
4 changes: 2 additions & 2 deletions srtcore/buffer_rcv.cpp
Expand Up @@ -70,7 +70,7 @@ namespace {
* m_iMaxPosInc: none? (modified on add and ack
*/

CRcvBufferNew::CRcvBufferNew(int initSeqNo, size_t size, CUnitQueue* unitqueue, bool peerRexmit, bool bMessageAPI)
CRcvBufferNew::CRcvBufferNew(int initSeqNo, size_t size, CUnitQueue* unitqueue, bool bMessageAPI)
: m_entries(size)
, m_szSize(size) // TODO: maybe just use m_entries.size()
, m_pUnitQueue(unitqueue)
Expand All @@ -81,7 +81,7 @@ CRcvBufferNew::CRcvBufferNew(int initSeqNo, size_t size, CUnitQueue* unitqueue,
, m_iNotch(0)
, m_numOutOfOrderPackets(0)
, m_iFirstReadableOutOfOrder(-1)
, m_bPeerRexmitFlag(peerRexmit)
, m_bPeerRexmitFlag(true)
, m_bMessageAPI(bMessageAPI)
, m_iBytesCount(0)
, m_iPktsCount(0)
Expand Down
6 changes: 4 additions & 2 deletions srtcore/buffer_rcv.h
Expand Up @@ -51,7 +51,7 @@ class CRcvBufferNew
typedef sync::steady_clock::duration duration;

public:
CRcvBufferNew(int initSeqNo, size_t size, CUnitQueue* unitqueue, bool peerRexmit, bool bMessageAPI);
CRcvBufferNew(int initSeqNo, size_t size, CUnitQueue* unitqueue, bool bMessageAPI);

~CRcvBufferNew();

Expand Down Expand Up @@ -308,7 +308,7 @@ class CRcvBufferNew
size_t m_numOutOfOrderPackets; // The number of stored packets with "inorder" flag set to false
int m_iFirstReadableOutOfOrder; // In case of out ouf order packet, points to a position of the first such packet to
// read
const bool m_bPeerRexmitFlag; // Needed to read message number correctly
bool m_bPeerRexmitFlag; // Needed to read message number correctly
const bool m_bMessageAPI; // Operation mode flag: message or stream.

public: // TSBPD public functions
Expand All @@ -320,6 +320,8 @@ class CRcvBufferNew
/// @return 0
void setTsbPdMode(const time_point& timebase, bool wrap, duration delay);

void setPeerRexmitFlag(bool flag) { m_bPeerRexmitFlag = flag; }

void applyGroupTime(const time_point& timebase, bool wrp, uint32_t delay, const duration& udrift);

void applyGroupDrift(const time_point& timebase, bool wrp, const duration& udrift);
Expand Down
20 changes: 10 additions & 10 deletions srtcore/core.cpp
Expand Up @@ -2414,7 +2414,7 @@ bool srt::CUDT::interpretSrtHandshake(const CHandShake& hs,
}

// We still believe it should work, let's check the flags.
int ext_flags = SrtHSRequest::SRT_HSTYPE_HSFLAGS::unwrap(hs.m_iType);
const int ext_flags = SrtHSRequest::SRT_HSTYPE_HSFLAGS::unwrap(hs.m_iType);
if (ext_flags == 0)
{
m_RejectReason = SRT_REJ_ROGUE;
Expand Down Expand Up @@ -4019,15 +4019,16 @@ EConnectStatus srt::CUDT::processRendezvous(
m_ConnReq.m_iReqType = rsp_type;
m_ConnReq.m_extension = needs_extension;

// This must be done before prepareConnectionObjects().
// This must be done before prepareConnectionObjects(), because it sets ISN and m_iMaxSRTPayloadSize needed to create buffers.
if (!applyResponseSettings())
{
LOGC(cnlog.Error, log << "processRendezvous: rogue peer");
return CONN_REJECT;
}

// This must be done before interpreting and creating HSv5 extensions.
if (!prepareConnectionObjects(m_ConnRes, m_SrtHsSide, 0))
// The CryptoControl must be created by the prepareConnectionObjects() before interpreting and creating HSv5 extensions
// because the it will be used there.
if (!prepareConnectionObjects(m_ConnRes, m_SrtHsSide, NULL))
{
// m_RejectReason already handled
HLOGC(cnlog.Debug, log << "processRendezvous: rejecting due to problems in prepareConnectionObjects.");
Expand Down Expand Up @@ -4536,6 +4537,7 @@ EConnectStatus srt::CUDT::postConnect(const CPacket* pResponse, bool rendezvous,
// however in this case the HSREQ extension will not be attached,
// so it will simply go the "old way".
// (&&: skip if failed already)
// Must be called before interpretSrtHandshake() to create the CryptoControl.
ok = ok && prepareConnectionObjects(m_ConnRes, m_SrtHsSide, eout);

// May happen that 'response' contains a data packet that was sent in rendezvous mode.
Expand Down Expand Up @@ -5568,11 +5570,8 @@ bool srt::CUDT::prepareConnectionObjects(const CHandShake &hs, HandshakeSide hsd
return true;
}

bool bidirectional = false;
if (hs.m_iVersion > HS_VERSION_UDT4)
{
bidirectional = true; // HSv5 is always bidirectional
}
// HSv5 is always bidirectional
const bool bidirectional = (hs.m_iVersion > HS_VERSION_UDT4);

// HSD_DRAW is received only if this side is listener.
// If this side is caller with HSv5, HSD_INITIATOR should be passed.
Expand All @@ -5595,7 +5594,7 @@ bool srt::CUDT::prepareConnectionObjects(const CHandShake &hs, HandshakeSide hsd
m_pSndBuffer = new CSndBuffer(32, m_iMaxSRTPayloadSize);
#if ENABLE_NEW_RCVBUFFER
SRT_ASSERT(m_iISN != -1);
m_pRcvBuffer = new srt::CRcvBufferNew(m_iISN, m_config.iRcvBufSize, &(m_pRcvQueue->m_UnitQueue), m_bPeerRexmitFlag, m_config.bMessageAPI);
m_pRcvBuffer = new srt::CRcvBufferNew(m_iISN, m_config.iRcvBufSize, &(m_pRcvQueue->m_UnitQueue), m_config.bMessageAPI);
#else
m_pRcvBuffer = new CRcvBuffer(&(m_pRcvQueue->m_UnitQueue), m_config.iRcvBufSize);
#endif
Expand Down Expand Up @@ -8982,6 +8981,7 @@ void srt::CUDT::updateSrtRcvSettings()
enterCS(m_RecvLock);
#if ENABLE_NEW_RCVBUFFER
m_pRcvBuffer->setTsbPdMode(m_tsRcvPeerStartTime, false, milliseconds_from(m_iTsbPdDelay_ms));
m_pRcvBuffer->setPeerRexmitFlag(m_bPeerRexmitFlag);
#else
m_pRcvBuffer->setRcvTsbPdMode(m_tsRcvPeerStartTime, milliseconds_from(m_iTsbPdDelay_ms));
#endif
Expand Down
1 change: 1 addition & 0 deletions srtcore/core.h
Expand Up @@ -489,6 +489,7 @@ class CUDT
SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionLock)
EConnectStatus processRendezvous(const CPacket* response, const sockaddr_any& serv_addr, EReadStatus, CPacket& reqpkt);

/// Create the CryptoControl object based on the HS packet. Allocates sender and receiver buffers and loss lists.
SRT_ATR_NODISCARD SRT_ATTR_REQUIRES(m_ConnectionLock)
bool prepareConnectionObjects(const CHandShake &hs, HandshakeSide hsd, CUDTException *eout);

Expand Down
3 changes: 2 additions & 1 deletion test/test_buffer.cpp
Expand Up @@ -34,7 +34,8 @@ class CRcvBufferReadMsg
#if ENABLE_NEW_RCVBUFFER
const bool enable_msg_api = m_use_message_api;
const bool enable_peer_rexmit = true;
m_rcv_buffer = unique_ptr<CRcvBufferNew>(new CRcvBufferNew(m_init_seqno, m_buff_size_pkts, m_unit_queue.get(), enable_peer_rexmit, enable_msg_api));
m_rcv_buffer = unique_ptr<CRcvBufferNew>(new CRcvBufferNew(m_init_seqno, m_buff_size_pkts, m_unit_queue.get(), enable_msg_api));
m_rcv_buffer->setPeerRexmitFlag(enable_peer_rexmit);
#else
m_rcv_buffer = unique_ptr<CRcvBuffer>(new CRcvBuffer(m_unit_queue.get(), m_buff_size_pkts));
#endif
Expand Down

0 comments on commit ac854f2

Please sign in to comment.