Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Codechange] Reworked the NetworkStreamManager mutexes #949

Merged
merged 1 commit into from May 19, 2016
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
64 changes: 32 additions & 32 deletions source/main/network/NetworkStreamManager.cpp
Expand Up @@ -40,39 +40,35 @@ NetworkStreamManager::~NetworkStreamManager()
void NetworkStreamManager::addLocalStream(Streamable *stream, stream_register_t *reg, unsigned int size)
{
#ifdef USE_SOCKETW
std::lock_guard<std::mutex> lock(m_stream_mutex);
// for own streams: count stream id up ...
int mysourceid = gEnv->network->getUID();

// use counting streamid
stream->setSourceID(mysourceid);
stream->setStreamID(streamid);

// add IDs to registration
reg->origin_sourceid = mysourceid;
reg->origin_streamid = streamid;
reg->status = 0;

// tell the stream that its a local stream (an origin)
stream->isOrigin = true;

// add new stream map to the streams map
if (streams.find(mysourceid) == streams.end())
streams[mysourceid] = std::map < unsigned int, Streamable *>();
// map the stream
streams[mysourceid][streamid] = stream;
LOG("adding local stream: " + TOSTRING(mysourceid) + ":"+ TOSTRING(streamid) + ", type: " + TOSTRING(reg->type));
// send stream setup notice to server
if (size == 0) size = sizeof(stream_register_t);
stream->addPacket(MSG2_STREAM_REGISTER, size, (char*)reg);

// increase stream counter
{
std::lock_guard<std::mutex> lock(m_stream_mutex);

streams[mysourceid][streamid] = stream;

if (size == 0) size = sizeof(stream_register_t);
stream->addPacket(MSG2_STREAM_REGISTER, size, (char*)reg);
LOG("adding local stream: " + TOSTRING(mysourceid) + ":"+ TOSTRING(streamid) + ", type: " + TOSTRING(reg->type));
}

streamid++;
#endif // USE_SOCKETW
}

void NetworkStreamManager::addRemoteStream(Streamable *stream, int rsource, int rstreamid)
{
std::lock_guard<std::mutex> lock(m_stream_mutex);

streams[rsource][rstreamid] = stream;
LOG("adding remote stream: " + TOSTRING(rsource) + ":"+ TOSTRING(rstreamid));
}
Expand All @@ -87,16 +83,18 @@ void NetworkStreamManager::removeStream(int sourceid, int streamid)
sourceid = mysourceid;
}

std::lock_guard<std::mutex> lock(m_stream_mutex);
{
std::lock_guard<std::mutex> lock(m_stream_mutex);

std::map < int, std::map < unsigned int, Streamable *> >::iterator it_source = streams.find(sourceid);
std::map < unsigned int, Streamable *>::iterator it_stream;
std::map < int, std::map < unsigned int, Streamable *> >::iterator it_source = streams.find(sourceid);
std::map < unsigned int, Streamable *>::iterator it_stream;

if (it_source != streams.end() && !it_source->second.empty())
{
it_stream = it_source->second.find(streamid);
if (it_stream != it_source->second.end())
streams[sourceid].erase(it_stream);
if (it_source != streams.end() && !it_source->second.empty())
{
it_stream = it_source->second.find(streamid);
if (it_stream != it_source->second.end())
streams[sourceid].erase(it_stream);
}
}

if (sourceid != mysourceid)
Expand All @@ -123,14 +121,16 @@ void NetworkStreamManager::resumeStream(Streamable *stream)
#ifdef USE_SOCKETW
void NetworkStreamManager::removeUser(int sourceID)
{
std::lock_guard<std::mutex> lock(m_stream_mutex);
if (streams.find(sourceID) == streams.end())
{
// no such stream?!
return;
std::lock_guard<std::mutex> lock(m_stream_mutex);
if (streams.find(sourceID) == streams.end())
{
// no such stream?!
return;
}
// found and deleted
streams.erase(streams.find(sourceID));
}
// found and deleted
streams.erase(streams.find(sourceID));

// now iterate over all factories and remove their instances (only triggers)
std::vector < StreamableFactoryInterface * >::iterator it;
Expand All @@ -144,6 +144,7 @@ void NetworkStreamManager::removeUser(int sourceID)
void NetworkStreamManager::pushReceivedStreamMessage(header_t header, char *buffer)
{
std::lock_guard<std::mutex> lock(m_stream_mutex);

if (streams.find(header.source) == streams.end())
{
// no such stream?!
Expand Down Expand Up @@ -204,7 +205,6 @@ void NetworkStreamManager::update()

void NetworkStreamManager::syncRemoteStreams()
{
std::lock_guard<std::mutex> lock(m_stream_mutex);
// iterate over all factories
std::vector < StreamableFactoryInterface * >::iterator it;
for (it=factories.begin(); it!=factories.end(); it++)
Expand All @@ -216,6 +216,7 @@ void NetworkStreamManager::syncRemoteStreams()
void NetworkStreamManager::receiveStreams()
{
std::lock_guard<std::mutex> lock(m_stream_mutex);

std::map < int, std::map < unsigned int, Streamable *> >::iterator it;
for (it=streams.begin(); it!=streams.end(); it++)
{
Expand All @@ -230,7 +231,6 @@ void NetworkStreamManager::receiveStreams()

void NetworkStreamManager::addFactory(StreamableFactoryInterface *factory)
{
std::lock_guard<std::mutex> lock(m_stream_mutex);
this->factories.push_back(factory);
}