Skip to content

Commit

Permalink
Hacky attempt to fix U routers. NOT FOR MERGE
Browse files Browse the repository at this point in the history
  • Loading branch information
Vort committed Jul 24, 2023
1 parent ae5239d commit 8c24c31
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 37 deletions.
58 changes: 43 additions & 15 deletions libi2pd/SSU2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ namespace transport
RunnableServiceWithWork ("SSU2"), m_ReceiveService ("SSU2r"),
m_SocketV4 (m_ReceiveService.GetService ()), m_SocketV6 (m_ReceiveService.GetService ()),
m_AddressV4 (boost::asio::ip::address_v4()), m_AddressV6 (boost::asio::ip::address_v6()),
m_TerminationTimer (GetService ()), m_CleanupTimer (GetService ()), m_ResendTimer (GetService ()),
m_TerminationTimer (GetService ()), m_CleanupTimer (GetService ()),
m_KeepAliveTimer(GetService()), m_ResendTimer (GetService ()),
m_IntroducersUpdateTimer (GetService ()), m_IntroducersUpdateTimerV6 (GetService ()),
m_IsPublished (true), m_IsSyncClockFromPeers (true), m_IsThroughProxy (false)
{
Expand Down Expand Up @@ -112,6 +113,7 @@ namespace transport
}
ScheduleTermination ();
ScheduleCleanup ();
ScheduleKeepAlive ();
ScheduleResend (false);
}
}
Expand All @@ -122,6 +124,7 @@ namespace transport
{
m_TerminationTimer.cancel ();
m_CleanupTimer.cancel ();
m_KeepAliveTimer.cancel();
m_ResendTimer.cancel ();
m_IntroducersUpdateTimer.cancel ();
m_IntroducersUpdateTimerV6.cancel ();
Expand Down Expand Up @@ -880,6 +883,29 @@ namespace transport
}
}

void SSU2Server::ScheduleKeepAlive()
{
m_KeepAliveTimer.expires_from_now(boost::posix_time::seconds(1));
m_KeepAliveTimer.async_wait(std::bind(&SSU2Server::HandleKeepAliveTimer,
this, std::placeholders::_1));
}

void SSU2Server::HandleKeepAliveTimer(const boost::system::error_code& ecode)
{
if (ecode != boost::asio::error::operation_aborted)
{
// Hack. In fact, introducers also needs to be checked for different v4/v6 interfaces
if (i2p::context.GetStatus() == eRouterStatusFirewalled)
{
auto ts = i2p::util::GetSecondsSinceEpoch();
for (auto it : m_Sessions)
it.second->OnKeepAliveTimer(ts);
}

ScheduleKeepAlive();
}
}

void SSU2Server::ScheduleResend (bool more)
{
m_ResendTimer.expires_from_now (boost::posix_time::milliseconds (more ? SSU2_RESEND_CHECK_MORE_TIMEOUT :
Expand Down Expand Up @@ -997,29 +1023,34 @@ namespace transport
}
if (session && session->IsEstablished ())
{
if (ts < session->GetCreationTime () + SSU2_TO_INTRODUCER_SESSION_EXPIRATION)
{
session->SendKeepAlive ();
if (ts < session->GetCreationTime () + SSU2_TO_INTRODUCER_SESSION_DURATION)
if (ts < session->GetIntroducerSelectionTime () + SSU2_TO_INTRODUCER_SESSION_EXPIRATION)
{
if (ts < session->GetIntroducerSelectionTime() + SSU2_TO_INTRODUCER_SESSION_DURATION)
newList.push_back (it);
else
{
else
{
impliedList.push_back (it); // keep in introducers list, but not publish
session = nullptr;
session = nullptr;
}
}
else
{
session->SetIntroducerSelectionTime(0);
session = nullptr;
}
}
if (!session)
i2p::context.RemoveSSU2Introducer (it, v4);
{
auto abbr = i2p::data::GetIdentHashAbbreviation(it);
LogPrint(eLogDebug, "SSU2: Removing from RI introducer ", abbr);
i2p::context.RemoveSSU2Introducer(it, v4);
}
}
if (newList.size () < SSU2_MAX_NUM_INTRODUCERS)
{
auto sessions = FindIntroducers (SSU2_MAX_NUM_INTRODUCERS - newList.size (), v4, excluded);
if (sessions.empty () && !introducers.empty ())
{
// bump creation time for previous introducers if no new sessions found
LogPrint (eLogDebug, "SSU2: No new introducers found. Trying to reuse existing");
impliedList.clear ();
for (auto& it : introducers)
Expand All @@ -1030,7 +1061,6 @@ namespace transport
auto session = it1->second;
if (session->IsEstablished ())
{
session->SetCreationTime (session->GetCreationTime () + SSU2_TO_INTRODUCER_SESSION_DURATION);
if (std::find (newList.begin (), newList.end (), it) == newList.end ())
sessions.push_back (session);
}
Expand All @@ -1040,18 +1070,16 @@ namespace transport

for (const auto& it : sessions)
{
uint32_t exp = it->GetCreationTime () + SSU2_TO_INTRODUCER_SESSION_EXPIRATION;
if (ts + SSU2_TO_INTRODUCER_SESSION_DURATION/2 > exp)
continue; // don't pick too old session for introducer
i2p::data::RouterInfo::Introducer introducer;
introducer.iTag = it->GetRelayTag ();
introducer.iH = it->GetRemoteIdentity ()->GetIdentHash ();
introducer.iExp = exp;
introducer.iExp = ts + SSU2_TO_INTRODUCER_SESSION_EXPIRATION;
excluded.insert (it->GetRemoteIdentity ()->GetIdentHash ());
if (i2p::context.AddSSU2Introducer (introducer, v4))
{
LogPrint (eLogDebug, "SSU2: Introducer added ", it->GetRelayTag (), " at ",
i2p::data::GetIdentHashAbbreviation (it->GetRemoteIdentity ()->GetIdentHash ()));
it->SetIntroducerSelectionTime(ts);
newList.push_back (it->GetRemoteIdentity ()->GetIdentHash ());
if (newList.size () >= SSU2_MAX_NUM_INTRODUCERS) break;
}
Expand Down
9 changes: 5 additions & 4 deletions libi2pd/SSU2.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ namespace transport
const size_t SSU2_MAX_NUM_INTRODUCERS = 3;
const int SSU2_TO_INTRODUCER_SESSION_DURATION = 3600; // 1 hour
const int SSU2_TO_INTRODUCER_SESSION_EXPIRATION = 4800; // 80 minutes
const int SSU2_KEEP_ALIVE_INTERVAL = 15; // in seconds
const int SSU2_KEEP_ALIVE_INTERVAL_VARIANCE = 4; // in seconds
const int SSU2_PROXY_CONNECT_RETRY_TIMEOUT = 30; // in seconds

class SSU2Server: private i2p::util::RunnableServiceWithWork
Expand Down Expand Up @@ -118,6 +116,9 @@ namespace transport
void ScheduleCleanup ();
void HandleCleanupTimer (const boost::system::error_code& ecode);

void ScheduleKeepAlive();
void HandleKeepAliveTimer(const boost::system::error_code& ecode);

void ScheduleResend (bool more);
void HandleResendTimer (const boost::system::error_code& ecode);

Expand Down Expand Up @@ -156,8 +157,8 @@ namespace transport
i2p::util::MemoryPool<SSU2SentPacket> m_SentPacketsPool;
i2p::util::MemoryPool<SSU2IncompleteMessage> m_IncompleteMessagesPool;
i2p::util::MemoryPool<SSU2IncompleteMessage::Fragment> m_FragmentsPool;
boost::asio::deadline_timer m_TerminationTimer, m_CleanupTimer, m_ResendTimer,
m_IntroducersUpdateTimer, m_IntroducersUpdateTimerV6;
boost::asio::deadline_timer m_TerminationTimer, m_CleanupTimer, m_KeepAliveTimer,
m_ResendTimer, m_IntroducersUpdateTimer, m_IntroducersUpdateTimerV6;
std::shared_ptr<SSU2Session> m_LastSession;
bool m_IsPublished; // if we maintain introducers
bool m_IsSyncClockFromPeers;
Expand Down
73 changes: 58 additions & 15 deletions libi2pd/SSU2Session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,12 @@ namespace transport
m_Server (server), m_Address (addr), m_RemoteTransports (0),
m_DestConnID (0), m_SourceConnID (0), m_State (eSSU2SessionStateUnknown),
m_SendPacketNum (0), m_ReceivePacketNum (0), m_LastDatetimeSentPacketNum (0),
m_IsDataReceived (false), m_WindowSize (SSU2_MIN_WINDOW_SIZE),
m_IsDataReceived (false), m_IntroducerSelectionTime(0), m_WindowSize (SSU2_MIN_WINDOW_SIZE),
m_RTT (SSU2_RESEND_INTERVAL), m_RTO (SSU2_RESEND_INTERVAL*SSU2_kAPPA), m_RelayTag (0),
m_ConnectTimer (server.GetService ()), m_TerminationReason (eSSU2TerminationReasonNormalClose),
m_MaxPayloadSize (SSU2_MIN_PACKET_SIZE - IPV6_HEADER_SIZE - UDP_HEADER_SIZE - 32) // min size
m_MaxPayloadSize (SSU2_MIN_PACKET_SIZE - IPV6_HEADER_SIZE - UDP_HEADER_SIZE - 32), // min size
m_LastKeepAliveActivityTimestamp(0),
m_KeepAliveInterval(SSU2_KEEP_ALIVE_INTERVAL + rand() % SSU2_KEEP_ALIVE_INTERVAL_VARIANCE)
{
m_NoiseState.reset (new i2p::crypto::NoiseSymmetricState);
if (in_RemoteRouter && m_Address)
Expand All @@ -114,6 +116,8 @@ namespace transport
{
if (m_State == eSSU2SessionStateUnknown || m_State == eSSU2SessionStateTokenReceived)
{
auto abbr = i2p::data::GetIdentHashAbbreviation(GetRemoteIdentity()->GetIdentHash());
LogPrint(eLogDebug, "SSU2: Session connect with ", m_RemoteEndpoint, " (", abbr, ") initiated");
ScheduleConnectTimer ();
auto token = m_Server.FindOutgoingToken (m_RemoteEndpoint);
if (token)
Expand All @@ -134,16 +138,17 @@ namespace transport
shared_from_this (), std::placeholders::_1));
}

void SSU2Session::HandleConnectTimer (const boost::system::error_code& ecode)
void SSU2Session::HandleConnectTimer(const boost::system::error_code& ecode)
{
if (!ecode)
{
auto abbr = i2p::data::GetIdentHashAbbreviation(GetRemoteIdentity()->GetIdentHash());
// timeout expired
if (m_State == eSSU2SessionStateIntroduced) // WaitForIntroducer
LogPrint (eLogWarning, "SSU2: Session was not introduced after ", SSU2_CONNECT_TIMEOUT, " seconds");
LogPrint(eLogWarning, "SSU2: Session with ", abbr, " was not introduced after ", SSU2_CONNECT_TIMEOUT, " seconds");
else
LogPrint (eLogWarning, "SSU2: Session with ", m_RemoteEndpoint, " was not established after ", SSU2_CONNECT_TIMEOUT, " seconds");
Terminate ();
LogPrint(eLogWarning, "SSU2: Session with ", m_RemoteEndpoint, " (", abbr, ") was not established after ", SSU2_CONNECT_TIMEOUT, " seconds");
Terminate();
}
}

Expand Down Expand Up @@ -238,25 +243,46 @@ namespace transport
}
}

void SSU2Session::SendKeepAlive ()
void SSU2Session::OnKeepAliveTimer(uint64_t ts)
{
if (IsEstablished ())
if (IsEstablished () &&
ts > std::max(m_LastActivityTimestamp, m_LastKeepAliveActivityTimestamp) + m_KeepAliveInterval)
{
bool introducer = m_IntroducerSelectionTime != 0;

auto abbr = i2p::data::GetIdentHashAbbreviation(GetRemoteIdentity()->GetIdentHash());
if (introducer)
LogPrint(eLogDebug, "SSU2: Sending KeepAlive to introducer ", abbr);
else
LogPrint(eLogDebug, "SSU2: Sending KeepAlive to ", abbr);

uint8_t payload[20];
size_t payloadSize = CreatePaddingBlock (payload, 20, 8);
SendData (payload, payloadSize);
SendData (payload, payloadSize, 0, introducer);
m_LastKeepAliveActivityTimestamp = ts;
m_KeepAliveInterval = SSU2_KEEP_ALIVE_INTERVAL + rand() % SSU2_KEEP_ALIVE_INTERVAL_VARIANCE;
}
}

void SSU2Session::Terminate ()
{
if (m_State != eSSU2SessionStateTerminated)
{
if (GetRemoteIdentity())
{
auto abbr = i2p::data::GetIdentHashAbbreviation(GetRemoteIdentity()->GetIdentHash());
LogPrint(eLogDebug, "SSU2: Session with ", abbr, " was terminated");
}
else
LogPrint(eLogDebug, "SSU2: Session was terminated");
m_State = eSSU2SessionStateTerminated;
m_ConnectTimer.cancel ();
m_OnEstablished = nullptr;
if (m_RelayTag)
m_Server.RemoveRelay (m_RelayTag);
{
LogPrint(eLogDebug, "SSU2: Tag removed ", m_RelayTag);
m_Server.RemoveRelay(m_RelayTag);
}
m_SentHandshakePacket.reset (nullptr);
m_SessionConfirmedFragment.reset (nullptr);
m_PathChallenge.reset (nullptr);
Expand All @@ -269,12 +295,18 @@ namespace transport
m_ReceivedI2NPMsgIDs.clear ();
m_Server.RemoveSession (m_SourceConnID);
transports.PeerDisconnected (shared_from_this ());
LogPrint (eLogDebug, "SSU2: Session terminated");
}
}

void SSU2Session::RequestTermination (SSU2TerminationReason reason)
{
std::string abbr;
if (GetRemoteIdentity())
{
abbr = std::string(" (") +
i2p::data::GetIdentHashAbbreviation(GetRemoteIdentity()->GetIdentHash()) + ")";
}
LogPrint(eLogDebug, "SSU2", abbr, ": Termination requested, code = ", (int)reason);
if (m_State == eSSU2SessionStateEstablished || m_State == eSSU2SessionStateClosing)
{
m_TerminationReason = reason;
Expand All @@ -285,6 +317,8 @@ namespace transport

void SSU2Session::Established ()
{
auto abbr = i2p::data::GetIdentHashAbbreviation(GetRemoteIdentity()->GetIdentHash());
LogPrint(eLogDebug, "SSU2: Session with ", abbr, " was established");
m_State = eSSU2SessionStateEstablished;
m_EphemeralKeys = nullptr;
m_NoiseState.reset (nullptr);
Expand Down Expand Up @@ -1418,7 +1452,7 @@ namespace transport
return true;
}

uint32_t SSU2Session::SendData (const uint8_t * buf, size_t len, uint8_t flags)
uint32_t SSU2Session::SendData (const uint8_t * buf, size_t len, uint8_t flags, bool updateActivity)
{
if (len < 8)
{
Expand All @@ -1439,7 +1473,8 @@ namespace transport
header.ll[1] ^= CreateHeaderMask (m_KeyDataSend + 32, payload + (len + 4));
m_Server.Send (header.buf, 16, payload, len + 16, m_RemoteEndpoint);
m_SendPacketNum++;
m_LastActivityTimestamp = i2p::util::GetSecondsSinceEpoch ();
if (updateActivity)
m_LastActivityTimestamp = i2p::util::GetSecondsSinceEpoch ();
m_NumSentBytes += len + 32;
return m_SendPacketNum - 1;
}
Expand Down Expand Up @@ -1489,14 +1524,21 @@ namespace transport

void SSU2Session::HandlePayload (const uint8_t * buf, size_t len)
{
std::string abbr;
if (GetRemoteIdentity())
{
abbr = std::string(" (") +
i2p::data::GetIdentHashAbbreviation(GetRemoteIdentity()->GetIdentHash()) + ")";
}

size_t offset = 0;
while (offset < len)
{
uint8_t blk = buf[offset];
offset++;
auto size = bufbe16toh (buf + offset);
offset += 2;
LogPrint (eLogDebug, "SSU2: Block type ", (int)blk, " of size ", size);
LogPrint (eLogDebug, "SSU2", abbr, ": Block type ", (int)blk, " of size ", size);
if (offset + size > len)
{
LogPrint (eLogError, "SSU2: Unexpected block length ", size);
Expand Down Expand Up @@ -1595,12 +1637,13 @@ namespace transport
if (!m_RelayTag)
{
RAND_bytes ((uint8_t *)&m_RelayTag, 4);
LogPrint(eLogDebug, "SSU2: Tag created ", m_RelayTag);
m_Server.AddRelay (m_RelayTag, shared_from_this ());
}
break;
case eSSU2BlkRelayTag:
LogPrint (eLogDebug, "SSU2: RelayTag");
m_RelayTag = bufbe32toh (buf + offset);
LogPrint (eLogDebug, "SSU2: RelayTag ", m_RelayTag);
break;
case eSSU2BlkNewToken:
{
Expand Down
12 changes: 10 additions & 2 deletions libi2pd/SSU2Session.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ namespace transport
const int SSU2_MAX_NUM_RECEIVED_I2NP_MSGIDS = 5000; // how many msgID we store for duplicates check
const int SSU2_RECEIVED_I2NP_MSGIDS_CLEANUP_TIMEOUT = 10; // in seconds
const int SSU2_DECAY_INTERVAL = 20; // in seconds
const int SSU2_KEEP_ALIVE_INTERVAL = 14; // in seconds
const int SSU2_KEEP_ALIVE_INTERVAL_VARIANCE = 4; // in seconds
const size_t SSU2_MIN_WINDOW_SIZE = 16; // in packets
const size_t SSU2_MAX_WINDOW_SIZE = 256; // in packets
const size_t SSU2_MIN_RTO = 100; // in milliseconds
Expand Down Expand Up @@ -246,7 +248,7 @@ namespace transport
bool Introduce (std::shared_ptr<SSU2Session> session, uint32_t relayTag);
void WaitForIntroduction ();
void SendPeerTest (); // Alice, Data message
void SendKeepAlive ();
void OnKeepAliveTimer (uint64_t ts);
void RequestTermination (SSU2TerminationReason reason);
void CleanUp (uint64_t ts);
void FlushData ();
Expand All @@ -260,6 +262,9 @@ namespace transport
SSU2SessionState GetState () const { return m_State; };
void SetState (SSU2SessionState state) { m_State = state; };

uint32_t GetIntroducerSelectionTime() const { return m_IntroducerSelectionTime; };
void SetIntroducerSelectionTime(uint32_t ts) { m_IntroducerSelectionTime = ts; };

bool ProcessFirstIncomingMessage (uint64_t connID, uint8_t * buf, size_t len);
bool ProcessSessionCreated (uint8_t * buf, size_t len);
bool ProcessSessionConfirmed (uint8_t * buf, size_t len);
Expand Down Expand Up @@ -289,7 +294,7 @@ namespace transport
void KDFDataPhase (uint8_t * keydata_ab, uint8_t * keydata_ba);
void SendTokenRequest ();
void SendRetry ();
uint32_t SendData (const uint8_t * buf, size_t len, uint8_t flags = 0); // returns packet num
uint32_t SendData (const uint8_t * buf, size_t len, uint8_t flags = 0, bool updateActivity = true); // returns packet num
void SendQuickAck ();
void SendTermination ();
void SendHolePunch (uint32_t nonce, const boost::asio::ip::udp::endpoint& ep, const uint8_t * introKey, uint64_t token);
Expand Down Expand Up @@ -362,6 +367,9 @@ namespace transport
size_t m_MaxPayloadSize;
std::unique_ptr<i2p::data::IdentHash> m_PathChallenge;
std::unordered_map<uint32_t, uint32_t> m_ReceivedI2NPMsgIDs; // msgID -> timestamp in seconds
uint32_t m_IntroducerSelectionTime; // seconds since epoch
uint64_t m_LastKeepAliveActivityTimestamp;
int m_KeepAliveInterval;
};

inline uint64_t CreateHeaderMask (const uint8_t * kh, const uint8_t * nonce)
Expand Down
1 change: 0 additions & 1 deletion libi2pd/TransportSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ namespace transport
};

uint32_t GetCreationTime () const { return m_CreationTime; };
void SetCreationTime (uint32_t ts) { m_CreationTime = ts; }; // for introducers

virtual uint32_t GetRelayTag () const { return 0; };
virtual void SendLocalRouterInfo (bool update = false) { SendI2NPMessages ({ CreateDatabaseStoreMsg () }); };
Expand Down

0 comments on commit 8c24c31

Please sign in to comment.