From 30963e49fd2751e588e3088ff996de82692890cf Mon Sep 17 00:00:00 2001 From: Lead Generalist MTF Engineer Date: Sun, 1 Oct 2023 21:39:23 -0400 Subject: [PATCH] Fix crash observed in eqstreamfactory / clean up memleaks in ident/factory classes. Validate object returned from pop --- common/eq_stream_factory.cpp | 135 +++++++++++++++++++++-------------- common/eq_stream_ident.cpp | 28 +++++--- world/net.cpp | 4 ++ zone/main.cpp | 4 ++ 4 files changed, 107 insertions(+), 64 deletions(-) diff --git a/common/eq_stream_factory.cpp b/common/eq_stream_factory.cpp index d47389ff..80c1351a 100644 --- a/common/eq_stream_factory.cpp +++ b/common/eq_stream_factory.cpp @@ -192,7 +192,8 @@ timeval sleep_time; if ((num=select(sock+1,&readset,nullptr,nullptr,&sleep_time))<0) { // What do we wanna do? continue; - } else if (num==0) + } + else if (num == 0) continue; if(sock == -1) @@ -206,22 +207,24 @@ timeval sleep_time; #endif { // What do we wanna do? - } else { + } + else { + MStreams.lock(); - stream_itr = Streams.find(std::make_pair(from.sin_addr.s_addr, from.sin_port)); - oldstream_itr = OldStreams.find(std::make_pair(from.sin_addr.s_addr, from.sin_port)); - if (stream_itr == Streams.end() && oldstream_itr == OldStreams.end()) { + bool bIsNewStream = false; + if (buffer[1]==OP_SessionRequest) { - EQStream *s = new EQStream(from); - s->SetStreamType(StreamType); - Streams[std::make_pair(from.sin_addr.s_addr, from.sin_port)]=s; - WriterWork.Signal(); - Push(s); - s->AddBytesRecv(length); - s->Process(buffer,length); - s->SetLastPacketTime(Timer::GetCurrentTime()); + bIsNewStream = true; } - else { + + auto streamPair = std::make_pair(from.sin_addr.s_addr, from.sin_port); + + if (!bIsNewStream) + { + //stream_itr = Streams.find(std::make_pair(from.sin_addr.s_addr, from.sin_port)); + oldstream_itr = OldStreams.find(streamPair); + if (oldstream_itr == OldStreams.end()) + { EQOldStream *s = new EQOldStream(from, sock); s->SetStreamType(OldStream); OldStreams[std::make_pair(from.sin_addr.s_addr, from.sin_port)]=s; @@ -231,26 +234,58 @@ timeval sleep_time; s->SetLastPacketTime(Timer::GetCurrentTime()); s->ReceiveData(buffer,length); } + else + { + EQOldStream *oldcurstream = oldstream_itr->second; + if (oldcurstream != nullptr) + { + if (oldcurstream->CheckClosed()) + { + OldStreams.erase(oldstream_itr); + oldcurstream = nullptr; + } + else + oldcurstream->PutInUse(); + if (oldcurstream) { + //oldcurstream->AddBytesRecv(length); + oldcurstream->ParceEQPacket(length, buffer); + oldcurstream->SetLastPacketTime(Timer::GetCurrentTime()); + oldcurstream->ReleaseFromUse(); + } + } + else + { + OldStreams.erase(oldstream_itr); + } + } MStreams.unlock(); - } else { - - //newstr + } + else // newstr + { stream_itr = Streams.find(std::make_pair(from.sin_addr.s_addr, from.sin_port)); - EQStream *curstream = nullptr; - if(stream_itr != Streams.end()) - curstream = stream_itr->second; - //oldstr - oldstream_itr = OldStreams.find(std::make_pair(from.sin_addr.s_addr, from.sin_port)); - EQOldStream *oldcurstream = nullptr; - if(oldstream_itr != OldStreams.end()) - oldcurstream = oldstream_itr->second; + if (stream_itr == Streams.end()) { + EQStream *s = new EQStream(from); + s->SetStreamType(StreamType); + Streams[std::make_pair(from.sin_addr.s_addr, from.sin_port)] = s; + WriterWork.Signal(); + Push(s); + s->AddBytesRecv(length); + s->Process(buffer, length); + s->SetLastPacketTime(Timer::GetCurrentTime()); + MStreams.unlock(); + } + else { + EQStream *curstream = stream_itr->second; if(curstream != nullptr) { //dont bother processing incoming packets for closed connections if(curstream->CheckClosed()) + { + Streams.erase(stream_itr); curstream = nullptr; + } else curstream->PutInUse(); MStreams.unlock(); //the in use flag prevents the stream from being deleted while we are using it. @@ -262,24 +297,9 @@ timeval sleep_time; curstream->ReleaseFromUse(); } } - else if(oldcurstream != nullptr) - { - if(oldcurstream->CheckClosed()) - oldcurstream = nullptr; - else - oldcurstream->PutInUse(); - - MStreams.unlock(); //the in use flag prevents the stream from being deleted while we are using it. - - if(oldcurstream) { - //oldcurstream->AddBytesRecv(length); - oldcurstream->ParceEQPacket(length, buffer); - oldcurstream->SetLastPacketTime(Timer::GetCurrentTime()); - oldcurstream->ReleaseFromUse(); - } - } else { + Streams.erase(stream_itr); MStreams.unlock(); } } @@ -287,6 +307,7 @@ timeval sleep_time; } } } +} void EQStreamFactory::CheckTimeout() { @@ -307,13 +328,16 @@ void EQStreamFactory::CheckTimeout() if (state==CLOSED) { if (s->IsInUse()) { //give it a little time for everybody to finish with it - } else { + } + else { //everybody is done, we can delete it now - auto temp = stream_itr; - ++stream_itr; //let whoever has the stream outside delete it - delete temp->second; - Streams.erase(temp); + if (s) + { + delete s; + s = nullptr; + } + stream_itr = Streams.erase(stream_itr); continue; } } @@ -331,14 +355,17 @@ void EQStreamFactory::CheckTimeout() if (state==CLOSED) { if (s->IsInUse()) { //give it a little time for everybody to finish with it - } else { + } + else { //everybody is done, we can delete it now //cout << "Removing connection" << endl; - auto temp = oldstream_itr; - ++oldstream_itr; //let whoever has the stream outside delete it - delete temp->second; - OldStreams.erase(temp); + if (s) + { + delete s; + s = nullptr; + } + oldstream_itr = OldStreams.erase(oldstream_itr); continue; } } @@ -370,14 +397,14 @@ void EQStreamFactory::WriterLoop() break; MWriterRunning.unlock(); - wants_write.clear(); - old_wants_write.clear(); - decay=DecayTimer.Check(); //copy streams into a seperate list so we dont have to keep //MStreams locked while we are writting MStreams.lock(); + + wants_write.clear(); + old_wants_write.clear(); for(auto stream_itr = Streams.begin();stream_itr != Streams.end();stream_itr++) { // If it's time to decay the bytes sent, then let's do it before we try to write if (decay) diff --git a/common/eq_stream_ident.cpp b/common/eq_stream_ident.cpp index 1f0a75f8..abc75a46 100644 --- a/common/eq_stream_ident.cpp +++ b/common/eq_stream_ident.cpp @@ -19,7 +19,7 @@ EQStreamIdentifier::~EQStreamIdentifier() { Record *r = *cur; if (r != nullptr) r->stream->ReleaseFromUse(); - delete r; + safe_delete(r); } oldcur = m_oldstreams.begin(); oldend = m_oldstreams.end(); @@ -27,7 +27,7 @@ EQStreamIdentifier::~EQStreamIdentifier() { OldRecord *r = *oldcur; if (r != nullptr) r->stream->ReleaseFromUse(); - delete r; + safe_delete(r); } std::vector::iterator curp, endp; @@ -35,12 +35,14 @@ EQStreamIdentifier::~EQStreamIdentifier() { curp = m_patches.begin(); endp = m_patches.end(); for(; curp != endp; ++curp) { - delete *curp; + Patch* patch = *curp; + safe_delete(patch); } oldcurp = m_oldpatches.begin(); oldendp = m_oldpatches.end(); for(; oldcurp != oldendp; ++oldcurp) { - delete *oldcurp; + OldPatch* patch = *oldcurp; + safe_delete(patch); } } @@ -80,7 +82,7 @@ void EQStreamIdentifier::Process() { //this stream has failed to match any pattern in our timeframe. Log(Logs::General, Logs::Netcode, "[IDENTIFY] Unable to identify stream from %s:%d before timeout.", long2ip(r->stream->GetRemoteIP()).c_str(), ntohs(r->stream->GetRemotePort())); r->stream->ReleaseFromUse(); - delete r; + safe_delete(r); cur = m_streams.erase(cur); continue; } @@ -114,7 +116,7 @@ void EQStreamIdentifier::Process() { break; } r->stream->ReleaseFromUse(); - delete r; + safe_delete(r); cur = m_streams.erase(cur); continue; } @@ -170,7 +172,7 @@ void EQStreamIdentifier::Process() { //if we found a match, or were not able to identify it if(found_one || all_ready) { //cannot print ip/port here. r->stream is invalid. - delete r; + safe_delete(r); cur = m_streams.erase(cur); } else { ++cur; @@ -188,7 +190,7 @@ void EQStreamIdentifier::Process() { //this stream has failed to match any pattern in our timeframe. Log(Logs::Detail, Logs::Netcode, "Unable to identify stream from %s:%d before timeout.", long2ip(r->stream->GetRemoteIP()).c_str(), ntohs(r->stream->GetRemotePort())); r->stream->ReleaseFromUse(); - delete r; + safe_delete(r); oldcur = m_oldstreams.erase(oldcur); continue; } @@ -221,7 +223,7 @@ void EQStreamIdentifier::Process() { break; } r->stream->ReleaseFromUse(); - delete r; + safe_delete(r); oldcur = m_oldstreams.erase(oldcur); continue; } @@ -238,6 +240,8 @@ void EQStreamIdentifier::Process() { OldPatch *p = *oldcurp; //ask the stream to see if it matches the supplied signature + if (r) + { EQStream::MatchState res = r->stream->CheckSignature(&p->signature); switch(res) { case EQStream::MatchNotReady: @@ -263,18 +267,22 @@ void EQStreamIdentifier::Process() { break; } } + } //if we checked all patches and did not find a match. if(all_ready && !found_one) { //the stream cannot be identified. + if (r) + { Log(Logs::Detail, Logs::Netcode, "Unable to identify stream from %s:%d, no match found.", long2ip(r->stream->GetRemoteIP()).c_str(), ntohs(r->stream->GetRemotePort())); r->stream->ReleaseFromUse(); } + } //if we found a match, or were not able to identify it if(found_one || all_ready) { //cannot print ip/port here. r->stream is invalid. - delete r; + safe_delete(r); oldcur = m_oldstreams.erase(oldcur); } else { ++oldcur; diff --git a/world/net.cpp b/world/net.cpp index 23468cdd..9a1471d4 100644 --- a/world/net.cpp +++ b/world/net.cpp @@ -365,6 +365,8 @@ int main(int argc, char** argv) { //which will figure out what patch they are running, and set up the dynamic //structures and opcodes for that patch. struct in_addr in{}; + if (eqs == nullptr) + continue; in.s_addr = eqs->GetRemoteIP(); LogInfo("New connection from {0}:{1}", inet_ntoa(in),ntohs(eqs->GetRemotePort())); stream_identifier.AddStream(eqs); //takes the stream @@ -378,6 +380,8 @@ int main(int argc, char** argv) { //pull the stream out of the factory and give it to the stream identifier //which will figure out what patch they are running, and set up the dynamic //structures and opcodes for that patch. + if (eqos == nullptr) + continue; struct in_addr in{}; in.s_addr = eqos->GetRemoteIP(); Log(Logs::Detail, Logs::WorldServer, "New connection from %s:%d", inet_ntoa(in), ntohs(eqos->GetRemotePort())); diff --git a/zone/main.cpp b/zone/main.cpp index 2c593fac..8916b260 100644 --- a/zone/main.cpp +++ b/zone/main.cpp @@ -394,6 +394,8 @@ int main(int argc, char** argv) { //which will figure out what patch they are running, and set up the dynamic //structures and opcodes for that patch. struct in_addr in; + if (eqss == nullptr) + continue; in.s_addr = eqss->GetRemoteIP(); Log(Logs::Detail, Logs::WorldServer, "New connection from %s:%d", inet_ntoa(in), ntohs(eqss->GetRemotePort())); stream_identifier.AddStream(eqss); //takes the stream @@ -405,6 +407,8 @@ int main(int argc, char** argv) { //which will figure out what patch they are running, and set up the dynamic //structures and opcodes for that patch. struct in_addr in; + if (eqoss == nullptr) + continue; in.s_addr = eqoss->GetRemoteIP(); Log(Logs::Detail, Logs::WorldServer, "New connection from %s:%d", inet_ntoa(in), ntohs(eqoss->GetRemotePort())); stream_identifier.AddOldStream(eqoss); //takes the stream