Skip to content
This repository has been archived by the owner on Oct 12, 2022. It is now read-only.

Commit

Permalink
rewrite rwmutext unittest
Browse files Browse the repository at this point in the history
- use semaphores to control thread progress
- test different reader/writer scenarios and
  policies
  • Loading branch information
MartinNowak committed Aug 24, 2013
1 parent 9a419df commit a349935
Showing 1 changed file with 107 additions and 86 deletions.
193 changes: 107 additions & 86 deletions src/core/sync/rwmutex.d
Expand Up @@ -399,117 +399,138 @@ private:
////////////////////////////////////////////////////////////////////////////////


version( unittest )
unittest
{
static if( !is( typeof( Thread ) ) )
private import core.thread;
import core.atomic, core.thread, core.sync.semaphore;


void testRead( ReadWriteMutex.Policy policy )
static void runTest(ReadWriteMutex.Policy policy)
{
auto mutex = new ReadWriteMutex( policy );
auto synInfo = new Object;
int numThreads = 10;
int numReaders = 0;
int maxReaders = 0;
scope mutex = new ReadWriteMutex(policy);
scope rdSemA = new Semaphore, rdSemB = new Semaphore,
wrSemA = new Semaphore, wrSemB = new Semaphore;
shared size_t numReaders, numWriters;

void readerFn()
{
synchronized( mutex.reader )
synchronized (mutex.reader)
{
synchronized( synInfo )
{
if( ++numReaders > maxReaders )
maxReaders = numReaders;
}
Thread.sleep( dur!"msecs"(1) );
synchronized( synInfo )
{
--numReaders;
}
atomicOp!"+="(numReaders, 1);
rdSemA.notify();
rdSemB.wait();
atomicOp!"-="(numReaders, 1);
}
}

auto group = new ThreadGroup;

for( int i = 0; i < numThreads; ++i )
{
group.create( &readerFn );
}
group.joinAll();
assert( numReaders < 1 && maxReaders > 1 );
}


void testReadWrite( ReadWriteMutex.Policy policy )
{
auto mutex = new ReadWriteMutex( policy );
auto synInfo = new Object;
int numThreads = 10;
int numReaders = 0;
int numWriters = 0;
int maxReaders = 0;
int maxWriters = 0;
int numTries = 5;

void readerFn()
void writerFn()
{
for( int i = 0; i < numTries; ++i )
synchronized (mutex.writer)
{
synchronized( mutex.reader )
{
synchronized( synInfo )
{
if( ++numReaders > maxReaders )
maxReaders = numReaders;
}
Thread.sleep( dur!"usecs"(100) );
synchronized( synInfo )
{
--numReaders;
}
}
atomicOp!"+="(numWriters, 1);
wrSemA.notify();
wrSemB.wait();
atomicOp!"-="(numWriters, 1);
}
}

void writerFn()
void waitQueued(size_t queuedReaders, size_t queuedWriters)
{
for( int i = 0; i < numTries; ++i )
for (;;)
{
synchronized( mutex.writer )
synchronized (mutex.m_commonMutex)
{
synchronized( synInfo )
{
if( ++numWriters > maxWriters )
maxWriters = numWriters;
}
Thread.sleep( dur!"usecs"(100) );
synchronized( synInfo )
{
--numWriters;
}
if (mutex.m_numQueuedReaders == queuedReaders &&
mutex.m_numQueuedWriters == queuedWriters)
break;
}
Thread.yield();
}
}

auto group = new ThreadGroup;
scope group = new ThreadGroup;

for( int i = 0; i < numThreads; ++i )
// 2 simultaneous readers
group.create(&readerFn); group.create(&readerFn);
rdSemA.wait(); rdSemA.wait();
assert(numReaders == 2);
rdSemB.notify(); rdSemB.notify();
group.joinAll();
assert(numReaders == 0);
foreach (t; group) group.remove(t);

// 1 writer at a time
group.create(&writerFn); group.create(&writerFn);
wrSemA.wait();
assert(!wrSemA.tryWait());
assert(numWriters == 1);
wrSemB.notify();
wrSemA.wait();
assert(numWriters == 1);
wrSemB.notify();
group.joinAll();
assert(numWriters == 0);
foreach (t; group) group.remove(t);

// reader and writer are mutually exclusive
group.create(&readerFn);
rdSemA.wait();
group.create(&writerFn);
waitQueued(0, 1);
assert(!wrSemA.tryWait());
assert(numReaders == 1 && numWriters == 0);
rdSemB.notify();
wrSemA.wait();
assert(numReaders == 0 && numWriters == 1);
wrSemB.notify();
group.joinAll();
assert(numReaders == 0 && numWriters == 0);
foreach (t; group) group.remove(t);

// writer and reader are mutually exclusive
group.create(&writerFn);
wrSemA.wait();
group.create(&readerFn);
waitQueued(1, 0);
assert(!rdSemA.tryWait());
assert(numReaders == 0 && numWriters == 1);
wrSemB.notify();
rdSemA.wait();
assert(numReaders == 1 && numWriters == 0);
rdSemB.notify();
group.joinAll();
assert(numReaders == 0 && numWriters == 0);
foreach (t; group) group.remove(t);

// policy determines whether queued reader or writers progress first
group.create(&writerFn);
wrSemA.wait();
group.create(&readerFn);
group.create(&writerFn);
waitQueued(1, 1);
assert(numReaders == 0 && numWriters == 1);
wrSemB.notify();

if (policy == ReadWriteMutex.Policy.PREFER_READERS)
{
rdSemA.wait();
assert(numReaders == 1 && numWriters == 0);
rdSemB.notify();
wrSemA.wait();
assert(numReaders == 0 && numWriters == 1);
wrSemB.notify();
}
else if (policy == ReadWriteMutex.Policy.PREFER_WRITERS)
{
group.create( &readerFn );
group.create( &writerFn );
wrSemA.wait();
assert(numReaders == 0 && numWriters == 1);
wrSemB.notify();
rdSemA.wait();
assert(numReaders == 1 && numWriters == 0);
rdSemB.notify();
}
group.joinAll();
assert( numReaders < 1 && maxReaders > 1 &&
numWriters < 1 && maxWriters < 2 );
}


unittest
{
testRead( ReadWriteMutex.Policy.PREFER_READERS );
testRead( ReadWriteMutex.Policy.PREFER_WRITERS );
testReadWrite( ReadWriteMutex.Policy.PREFER_READERS );
testReadWrite( ReadWriteMutex.Policy.PREFER_WRITERS );
assert(numReaders == 0 && numWriters == 0);
foreach (t; group) group.remove(t);
}
runTest(ReadWriteMutex.Policy.PREFER_READERS);
runTest(ReadWriteMutex.Policy.PREFER_WRITERS);
}

0 comments on commit a349935

Please sign in to comment.