Permalink
Browse files

Cleans up some code, internalizes some implementation bits and adds a

test.
  • Loading branch information...
1 parent 9508839 commit d25147a5f9d922afd45bfc1743bf545666bbd710 @tabish121 tabish121 committed Oct 15, 2013
@@ -31,28 +31,66 @@ using namespace decaf::lang;
using namespace decaf::lang::exceptions;
////////////////////////////////////////////////////////////////////////////////
-CompositeTaskRunner::CompositeTaskRunner() :
- tasks(), mutex(), thread(), threadTerminated(false), pending(false), shutDown(false) {
+namespace activemq {
+namespace threads {
- this->thread.reset(new Thread(this, "ActiveMQ CompositeTaskRunner Thread"));
+ class CompositeTaskRunnerImpl {
+ private:
+
+ CompositeTaskRunnerImpl(const CompositeTaskRunnerImpl&);
+ CompositeTaskRunnerImpl& operator= (const CompositeTaskRunnerImpl&);
+
+ public:
+
+ decaf::util::LinkedList<CompositeTask*> tasks;
+ mutable decaf::util::concurrent::Mutex mutex;
+
+ decaf::lang::Pointer<decaf::lang::Thread> thread;
+
+ bool threadTerminated;
+ bool pending;
+ bool shutdown;
+
+ public:
+
+ CompositeTaskRunnerImpl() : tasks(),
+ mutex(),
+ thread(),
+ threadTerminated(false),
+ pending(false),
+ shutdown(false) {
+ }
+
+ };
+
+}}
+
+////////////////////////////////////////////////////////////////////////////////
+CompositeTaskRunner::CompositeTaskRunner() : impl(new CompositeTaskRunnerImpl) {
+ this->impl->thread.reset(new Thread(this, "ActiveMQ CompositeTaskRunner Thread"));
}
////////////////////////////////////////////////////////////////////////////////
CompositeTaskRunner::~CompositeTaskRunner() {
try {
- this->shutdown();
- this->thread->join();
- this->thread.reset(NULL);
+ shutdown();
+ impl->thread->join();
+ impl->thread.reset(NULL);
+ }
+ AMQ_CATCHALL_NOTHROW()
+
+ try {
+ delete this->impl;
}
AMQ_CATCHALL_NOTHROW()
}
////////////////////////////////////////////////////////////////////////////////
void CompositeTaskRunner::start() {
- synchronized(&mutex) {
- if (!shutDown && !this->thread->isAlive()) {
- this->thread->start();
+ synchronized(&impl->mutex) {
+ if (!impl->shutdown && !this->impl->thread->isAlive()) {
+ this->impl->thread->start();
this->wakeup();
}
}
@@ -63,8 +101,8 @@ bool CompositeTaskRunner::isStarted() const {
bool result = false;
- synchronized(&mutex) {
- if (this->thread != NULL) {
+ synchronized(&impl->mutex) {
+ if (this->impl->thread != NULL && this->impl->thread->isAlive()) {
result = true;
}
}
@@ -75,44 +113,44 @@ bool CompositeTaskRunner::isStarted() const {
////////////////////////////////////////////////////////////////////////////////
void CompositeTaskRunner::shutdown(long long timeout) {
- synchronized(&mutex) {
- shutDown = true;
- pending = true;
- mutex.notifyAll();
+ synchronized(&impl->mutex) {
+ impl->shutdown = true;
+ impl->pending = true;
+ impl->mutex.notifyAll();
}
// Wait till the thread stops ( no need to wait if shutdown
// is called from thread that is shutting down)
- if (Thread::currentThread() != this->thread.get() && !threadTerminated) {
- this->thread->join(timeout);
+ if (Thread::currentThread() != this->impl->thread.get() && !impl->threadTerminated) {
+ this->impl->thread->join(timeout);
}
}
////////////////////////////////////////////////////////////////////////////////
void CompositeTaskRunner::shutdown() {
- synchronized(&mutex) {
- shutDown = true;
- pending = true;
- mutex.notifyAll();
+ synchronized(&impl->mutex) {
+ impl->shutdown = true;
+ impl->pending = true;
+ impl->mutex.notifyAll();
}
// Wait till the thread stops ( no need to wait if shutdown
// is called from thread that is shutting down)
- if (Thread::currentThread() != this->thread.get() && !threadTerminated) {
- this->thread->join();
+ if (Thread::currentThread() != this->impl->thread.get() && !impl->threadTerminated) {
+ impl->thread->join();
}
}
////////////////////////////////////////////////////////////////////////////////
void CompositeTaskRunner::wakeup() {
- synchronized(&mutex) {
- if (shutDown) {
+ synchronized(&impl->mutex) {
+ if (impl->shutdown) {
return;
}
- pending = true;
- mutex.notifyAll();
+ impl->pending = true;
+ impl->mutex.notifyAll();
}
}
@@ -122,46 +160,43 @@ void CompositeTaskRunner::run() {
try {
while (true) {
-
- synchronized(&mutex) {
- pending = false;
- if (shutDown) {
+ synchronized(&impl->mutex) {
+ impl->pending = false;
+ if (impl->shutdown) {
return;
}
}
if (!this->iterate()) {
-
// wait to be notified.
- synchronized(&mutex) {
- if (shutDown) {
+ synchronized(&impl->mutex) {
+ if (impl->shutdown) {
return;
}
- while (!pending && !shutDown) {
- mutex.wait();
+ while (!impl->pending && !impl->shutdown) {
+ impl->mutex.wait();
}
}
}
-
}
}
AMQ_CATCHALL_NOTHROW()
// Make sure we notify any waiting threads that thread
// has terminated.
- synchronized(&mutex) {
- threadTerminated = true;
- mutex.notifyAll();
+ synchronized(&impl->mutex) {
+ impl->threadTerminated = true;
+ impl->mutex.notifyAll();
}
}
////////////////////////////////////////////////////////////////////////////////
void CompositeTaskRunner::addTask(CompositeTask* task) {
if (task != NULL) {
- synchronized(&tasks) {
- this->tasks.add(task);
- this->wakeup();
+ synchronized(&impl->tasks) {
+ impl->tasks.add(task);
+ wakeup();
}
}
}
@@ -170,30 +205,30 @@ void CompositeTaskRunner::addTask(CompositeTask* task) {
void CompositeTaskRunner::removeTask(CompositeTask* task) {
if (task != NULL) {
- synchronized(&tasks) {
- this->tasks.remove(task);
- this->wakeup();
+ synchronized(&impl->tasks) {
+ impl->tasks.remove(task);
+ wakeup();
}
}
}
////////////////////////////////////////////////////////////////////////////////
bool CompositeTaskRunner::iterate() {
- synchronized(&tasks) {
+ synchronized(&impl->tasks) {
- for (int i = 0; i < tasks.size(); ++i) {
- CompositeTask* task = tasks.pop();
+ for (int i = 0; i < impl->tasks.size(); ++i) {
+ CompositeTask* task = impl->tasks.pop();
if (task->isPending()) {
task->iterate();
- tasks.addLast(task);
+ impl->tasks.addLast(task);
// Always return true, so that we check again for any of
// the other tasks that might now be pending.
return true;
} else {
- tasks.addLast(task);
+ impl->tasks.addLast(task);
}
}
}
@@ -31,6 +31,8 @@
namespace activemq {
namespace threads {
+ class CompositeTaskRunnerImpl;
+
/**
* A Task Runner that can contain one or more CompositeTasks that are each checked
* for pending work and run if any is present in the order that the tasks were added.
@@ -42,14 +44,7 @@ namespace threads {
public decaf::lang::Runnable {
private:
- decaf::util::LinkedList<CompositeTask*> tasks;
- mutable decaf::util::concurrent::Mutex mutex;
-
- decaf::lang::Pointer<decaf::lang::Thread> thread;
-
- bool threadTerminated;
- bool pending;
- bool shutDown;
+ CompositeTaskRunnerImpl* impl;
private:
@@ -112,8 +112,8 @@ void Thread::initializeSelf(Runnable* task, const std::string& name, long long s
std::string threadName = name;
- if( threadName.empty() ) {
- threadName = std::string( "Thread-" ) + Integer::toString( ++ThreadProperties::id );
+ if (threadName.empty()) {
+ threadName = std::string("Thread-") + Integer::toString(++ThreadProperties::id);
} else {
threadName = name;
}
@@ -130,7 +130,7 @@ Thread::~Thread() {
Threading::destroyThread(this->properties->handle);
delete this->properties;
}
- DECAF_CATCH_NOTHROW( Exception )
+ DECAF_CATCH_NOTHROW(Exception)
DECAF_CATCHALL_NOTHROW()
}
@@ -30,30 +30,33 @@ using namespace activemq::threads;
using namespace decaf::lang;
////////////////////////////////////////////////////////////////////////////////
-class CountingTask : public CompositeTask {
-private:
+namespace {
- int count;
- int goal;
- std::string name;
+ class CountingTask : public CompositeTask {
+ private:
-public:
+ int count;
+ int goal;
+ std::string name;
- CountingTask( const std::string& name, int goal ) : count(0), goal(goal), name(name) {}
+ public:
- int getCount() const {
- return count;
- }
+ CountingTask(const std::string& name, int goal) : count(0), goal(goal), name(name) {}
- virtual bool isPending() const {
- return count != goal;
- }
+ int getCount() const {
+ return count;
+ }
- virtual bool iterate() {
- return !( ++count == goal );
- }
+ virtual bool isPending() const {
+ return count != goal;
+ }
-};
+ virtual bool iterate() {
+ return !( ++count == goal );
+ }
+
+ };
+}
////////////////////////////////////////////////////////////////////////////////
void CompositeTaskRunnerTest::test() {
@@ -62,29 +65,36 @@ void CompositeTaskRunnerTest::test() {
CompositeTaskRunner runner;
- CountingTask task1( "task1", 100);
- CountingTask task2( "task2", 200);
+ CountingTask task1("task1", 100);
+ CountingTask task2("task2", 200);
- runner.addTask( &task1 );
- runner.addTask( &task2 );
+ runner.addTask(&task1);
+ runner.addTask(&task2);
runner.start();
runner.wakeup();
- while( attempts++ != 10 ) {
+ while (attempts++ != 10) {
- Thread::sleep( 1000 );
+ Thread::sleep(1000);
- if( task1.getCount() == 100 && task2.getCount() == 200 ) {
+ if (task1.getCount() == 100 && task2.getCount() == 200) {
break;
}
}
- CPPUNIT_ASSERT( task1.getCount() == 100 );
- CPPUNIT_ASSERT( task2.getCount() == 200 );
-
- runner.removeTask( &task1 );
- runner.removeTask( &task2 );
+ CPPUNIT_ASSERT(task1.getCount() == 100);
+ CPPUNIT_ASSERT(task2.getCount() == 200);
+ runner.removeTask(&task1);
+ runner.removeTask(&task2);
}
+////////////////////////////////////////////////////////////////////////////////
+void CompositeTaskRunnerTest::testCreateButNotStarted() {
+ Pointer<CompositeTaskRunner> runner(new CompositeTaskRunner);
+ CPPUNIT_ASSERT(!runner->isStarted());
+ runner->start();
+ runner->shutdown();
+ runner.reset(NULL);
+}
Oops, something went wrong.

0 comments on commit d25147a

Please sign in to comment.