Skip to content

Commit

Permalink
Merge pull request #18 from SecretsOTheP/takp-fork
Browse files Browse the repository at this point in the history
Fix crash observed in eqstreamfactory / clean up memleaks in ident/fa…
  • Loading branch information
regneq committed Oct 2, 2023
2 parents 199f0b2 + 30963e4 commit 5d3d867
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 64 deletions.
135 changes: 81 additions & 54 deletions common/eq_stream_factory.cpp
Expand Up @@ -192,7 +192,8 @@ timeval sleep_time;
if ((num=select(sock+1,&readset,nullptr,nullptr,&sleep_time))<0) {
// What do we wanna do?
continue;
} else if (num==0)
}
else if (num == 0)
continue;

if(sock == -1)
Expand All @@ -206,22 +207,24 @@ timeval sleep_time;
#endif
{
// What do we wanna do?
} else {
}
else {

MStreams.lock();
stream_itr = Streams.find(std::make_pair(from.sin_addr.s_addr, from.sin_port));
oldstream_itr = OldStreams.find(std::make_pair(from.sin_addr.s_addr, from.sin_port));
if (stream_itr == Streams.end() && oldstream_itr == OldStreams.end()) {
bool bIsNewStream = false;

if (buffer[1]==OP_SessionRequest) {
EQStream *s = new EQStream(from);
s->SetStreamType(StreamType);
Streams[std::make_pair(from.sin_addr.s_addr, from.sin_port)]=s;
WriterWork.Signal();
Push(s);
s->AddBytesRecv(length);
s->Process(buffer,length);
s->SetLastPacketTime(Timer::GetCurrentTime());
bIsNewStream = true;
}
else {

auto streamPair = std::make_pair(from.sin_addr.s_addr, from.sin_port);

if (!bIsNewStream)
{
//stream_itr = Streams.find(std::make_pair(from.sin_addr.s_addr, from.sin_port));
oldstream_itr = OldStreams.find(streamPair);
if (oldstream_itr == OldStreams.end())
{
EQOldStream *s = new EQOldStream(from, sock);
s->SetStreamType(OldStream);
OldStreams[std::make_pair(from.sin_addr.s_addr, from.sin_port)]=s;
Expand All @@ -231,26 +234,58 @@ timeval sleep_time;
s->SetLastPacketTime(Timer::GetCurrentTime());
s->ReceiveData(buffer,length);
}
else
{
EQOldStream *oldcurstream = oldstream_itr->second;
if (oldcurstream != nullptr)
{
if (oldcurstream->CheckClosed())
{
OldStreams.erase(oldstream_itr);
oldcurstream = nullptr;
}
else
oldcurstream->PutInUse();
if (oldcurstream) {
//oldcurstream->AddBytesRecv(length);
oldcurstream->ParceEQPacket(length, buffer);
oldcurstream->SetLastPacketTime(Timer::GetCurrentTime());
oldcurstream->ReleaseFromUse();
}
}
else
{
OldStreams.erase(oldstream_itr);
}

}
MStreams.unlock();
} else {

//newstr
}
else // newstr
{
stream_itr = Streams.find(std::make_pair(from.sin_addr.s_addr, from.sin_port));
EQStream *curstream = nullptr;
if(stream_itr != Streams.end())
curstream = stream_itr->second;
//oldstr
oldstream_itr = OldStreams.find(std::make_pair(from.sin_addr.s_addr, from.sin_port));
EQOldStream *oldcurstream = nullptr;
if(oldstream_itr != OldStreams.end())
oldcurstream = oldstream_itr->second;
if (stream_itr == Streams.end()) {
EQStream *s = new EQStream(from);
s->SetStreamType(StreamType);
Streams[std::make_pair(from.sin_addr.s_addr, from.sin_port)] = s;
WriterWork.Signal();
Push(s);
s->AddBytesRecv(length);
s->Process(buffer, length);
s->SetLastPacketTime(Timer::GetCurrentTime());
MStreams.unlock();
}
else {
EQStream *curstream = stream_itr->second;

if(curstream != nullptr)
{
//dont bother processing incoming packets for closed connections
if(curstream->CheckClosed())
{
Streams.erase(stream_itr);
curstream = nullptr;
}
else
curstream->PutInUse();
MStreams.unlock(); //the in use flag prevents the stream from being deleted while we are using it.
Expand All @@ -262,31 +297,17 @@ timeval sleep_time;
curstream->ReleaseFromUse();
}
}
else if(oldcurstream != nullptr)
{
if(oldcurstream->CheckClosed())
oldcurstream = nullptr;
else
oldcurstream->PutInUse();

MStreams.unlock(); //the in use flag prevents the stream from being deleted while we are using it.

if(oldcurstream) {
//oldcurstream->AddBytesRecv(length);
oldcurstream->ParceEQPacket(length, buffer);
oldcurstream->SetLastPacketTime(Timer::GetCurrentTime());
oldcurstream->ReleaseFromUse();
}
}
else
{
Streams.erase(stream_itr);
MStreams.unlock();
}
}
}
}
}
}
}

void EQStreamFactory::CheckTimeout()
{
Expand All @@ -307,13 +328,16 @@ void EQStreamFactory::CheckTimeout()
if (state==CLOSED) {
if (s->IsInUse()) {
//give it a little time for everybody to finish with it
} else {
}
else {
//everybody is done, we can delete it now
auto temp = stream_itr;
++stream_itr;
//let whoever has the stream outside delete it
delete temp->second;
Streams.erase(temp);
if (s)
{
delete s;
s = nullptr;
}
stream_itr = Streams.erase(stream_itr);
continue;
}
}
Expand All @@ -331,14 +355,17 @@ void EQStreamFactory::CheckTimeout()
if (state==CLOSED) {
if (s->IsInUse()) {
//give it a little time for everybody to finish with it
} else {
}
else {
//everybody is done, we can delete it now
//cout << "Removing connection" << endl;
auto temp = oldstream_itr;
++oldstream_itr;
//let whoever has the stream outside delete it
delete temp->second;
OldStreams.erase(temp);
if (s)
{
delete s;
s = nullptr;
}
oldstream_itr = OldStreams.erase(oldstream_itr);
continue;
}
}
Expand Down Expand Up @@ -370,14 +397,14 @@ void EQStreamFactory::WriterLoop()
break;
MWriterRunning.unlock();

wants_write.clear();
old_wants_write.clear();

decay=DecayTimer.Check();

//copy streams into a seperate list so we dont have to keep
//MStreams locked while we are writting
MStreams.lock();

wants_write.clear();
old_wants_write.clear();
for(auto stream_itr = Streams.begin();stream_itr != Streams.end();stream_itr++) {
// If it's time to decay the bytes sent, then let's do it before we try to write
if (decay)
Expand Down
28 changes: 18 additions & 10 deletions common/eq_stream_ident.cpp
Expand Up @@ -19,28 +19,30 @@ EQStreamIdentifier::~EQStreamIdentifier() {
Record *r = *cur;
if (r != nullptr)
r->stream->ReleaseFromUse();
delete r;
safe_delete(r);
}
oldcur = m_oldstreams.begin();
oldend = m_oldstreams.end();
for(; oldcur != oldend; ++oldcur) {
OldRecord *r = *oldcur;
if (r != nullptr)
r->stream->ReleaseFromUse();
delete r;
safe_delete(r);
}

std::vector<Patch *>::iterator curp, endp;
std::vector<OldPatch *>::iterator oldcurp, oldendp;
curp = m_patches.begin();
endp = m_patches.end();
for(; curp != endp; ++curp) {
delete *curp;
Patch* patch = *curp;
safe_delete(patch);
}
oldcurp = m_oldpatches.begin();
oldendp = m_oldpatches.end();
for(; oldcurp != oldendp; ++oldcurp) {
delete *oldcurp;
OldPatch* patch = *oldcurp;
safe_delete(patch);
}
}

Expand Down Expand Up @@ -80,7 +82,7 @@ void EQStreamIdentifier::Process() {
//this stream has failed to match any pattern in our timeframe.
Log(Logs::General, Logs::Netcode, "[IDENTIFY] Unable to identify stream from %s:%d before timeout.", long2ip(r->stream->GetRemoteIP()).c_str(), ntohs(r->stream->GetRemotePort()));
r->stream->ReleaseFromUse();
delete r;
safe_delete(r);
cur = m_streams.erase(cur);
continue;
}
Expand Down Expand Up @@ -114,7 +116,7 @@ void EQStreamIdentifier::Process() {
break;
}
r->stream->ReleaseFromUse();
delete r;
safe_delete(r);
cur = m_streams.erase(cur);
continue;
}
Expand Down Expand Up @@ -170,7 +172,7 @@ void EQStreamIdentifier::Process() {
//if we found a match, or were not able to identify it
if(found_one || all_ready) {
//cannot print ip/port here. r->stream is invalid.
delete r;
safe_delete(r);
cur = m_streams.erase(cur);
} else {
++cur;
Expand All @@ -188,7 +190,7 @@ void EQStreamIdentifier::Process() {
//this stream has failed to match any pattern in our timeframe.
Log(Logs::Detail, Logs::Netcode, "Unable to identify stream from %s:%d before timeout.", long2ip(r->stream->GetRemoteIP()).c_str(), ntohs(r->stream->GetRemotePort()));
r->stream->ReleaseFromUse();
delete r;
safe_delete(r);
oldcur = m_oldstreams.erase(oldcur);
continue;
}
Expand Down Expand Up @@ -221,7 +223,7 @@ void EQStreamIdentifier::Process() {
break;
}
r->stream->ReleaseFromUse();
delete r;
safe_delete(r);
oldcur = m_oldstreams.erase(oldcur);
continue;
}
Expand All @@ -238,6 +240,8 @@ void EQStreamIdentifier::Process() {
OldPatch *p = *oldcurp;

//ask the stream to see if it matches the supplied signature
if (r)
{
EQStream::MatchState res = r->stream->CheckSignature(&p->signature);
switch(res) {
case EQStream::MatchNotReady:
Expand All @@ -263,18 +267,22 @@ void EQStreamIdentifier::Process() {
break;
}
}
}

//if we checked all patches and did not find a match.
if(all_ready && !found_one) {
//the stream cannot be identified.
if (r)
{
Log(Logs::Detail, Logs::Netcode, "Unable to identify stream from %s:%d, no match found.", long2ip(r->stream->GetRemoteIP()).c_str(), ntohs(r->stream->GetRemotePort()));
r->stream->ReleaseFromUse();
}
}

//if we found a match, or were not able to identify it
if(found_one || all_ready) {
//cannot print ip/port here. r->stream is invalid.
delete r;
safe_delete(r);
oldcur = m_oldstreams.erase(oldcur);
} else {
++oldcur;
Expand Down
4 changes: 4 additions & 0 deletions world/net.cpp
Expand Up @@ -365,6 +365,8 @@ int main(int argc, char** argv) {
//which will figure out what patch they are running, and set up the dynamic
//structures and opcodes for that patch.
struct in_addr in{};
if (eqs == nullptr)
continue;
in.s_addr = eqs->GetRemoteIP();
LogInfo("New connection from {0}:{1}", inet_ntoa(in),ntohs(eqs->GetRemotePort()));
stream_identifier.AddStream(eqs); //takes the stream
Expand All @@ -378,6 +380,8 @@ int main(int argc, char** argv) {
//pull the stream out of the factory and give it to the stream identifier
//which will figure out what patch they are running, and set up the dynamic
//structures and opcodes for that patch.
if (eqos == nullptr)
continue;
struct in_addr in{};
in.s_addr = eqos->GetRemoteIP();
Log(Logs::Detail, Logs::WorldServer, "New connection from %s:%d", inet_ntoa(in), ntohs(eqos->GetRemotePort()));
Expand Down
4 changes: 4 additions & 0 deletions zone/main.cpp
Expand Up @@ -394,6 +394,8 @@ int main(int argc, char** argv) {
//which will figure out what patch they are running, and set up the dynamic
//structures and opcodes for that patch.
struct in_addr in;
if (eqss == nullptr)
continue;
in.s_addr = eqss->GetRemoteIP();
Log(Logs::Detail, Logs::WorldServer, "New connection from %s:%d", inet_ntoa(in), ntohs(eqss->GetRemotePort()));
stream_identifier.AddStream(eqss); //takes the stream
Expand All @@ -405,6 +407,8 @@ int main(int argc, char** argv) {
//which will figure out what patch they are running, and set up the dynamic
//structures and opcodes for that patch.
struct in_addr in;
if (eqoss == nullptr)
continue;
in.s_addr = eqoss->GetRemoteIP();
Log(Logs::Detail, Logs::WorldServer, "New connection from %s:%d", inet_ntoa(in), ntohs(eqoss->GetRemotePort()));
stream_identifier.AddOldStream(eqoss); //takes the stream
Expand Down

0 comments on commit 5d3d867

Please sign in to comment.