Skip to content

Commit a7851a7

Browse files
committed
Fix for shutting down the client properly if the cluster connection attempts all fail. The Thread join now will detect if it is being called from running thread and avoid deadlock.
Removed the unneded test testTcpSocketConnectionTimeout_withIntMax. Added some additional tests for testing thread functionality.
1 parent 360b8d0 commit a7851a7

File tree

6 files changed

+140
-29
lines changed

6 files changed

+140
-29
lines changed

hazelcast/src/hazelcast/client/connection/ClusterListenerThread.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ namespace hazelcast {
8585
std::string("Error while connecting to cluster! =>") + e.what());
8686
isStartedSuccessfully = false;
8787
startLatch.countDown();
88+
clientContext.getLifecycleService().shutdown();
8889
return;
8990
}
9091
}

hazelcast/src/hazelcast/client/spi/ClusterService.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,9 @@ namespace hazelcast {
6969
}
7070

7171
void ClusterService::shutdown() {
72-
active = false;
72+
if (!active.compareAndSet(true, false)) {
73+
return;
74+
}
7375
if (NULL != clusterThread.getThread()) {
7476
// avoid anyone waiting on the start latch to get stuck
7577
clusterThread.startLatch.countDown();

hazelcast/src/hazelcast/util/Thread.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ namespace hazelcast {
5050
, isJoined(false)
5151
, isInterrupted(false){
5252
init(func, arg0, arg1, arg2, arg3);
53+
5354
}
5455

5556
long Thread::getThreadID() {
@@ -92,6 +93,11 @@ namespace hazelcast {
9293
if (!isJoined.compareAndSet(false, true)) {
9394
return true;
9495
}
96+
if (id == getThreadID()) {
97+
// called from inside the thread, deadlock possibility
98+
return false;
99+
}
100+
95101
DWORD err = WaitForSingleObject(thread, INFINITE);
96102
if (err != WAIT_OBJECT_0) {
97103
return false;
@@ -197,6 +203,12 @@ namespace hazelcast {
197203
if (!isJoined.compareAndSet(false, true)) {
198204
return true;
199205
}
206+
207+
if (pthread_equal(thread, pthread_self())) {
208+
// called from inside the thread, deadlock possibility
209+
return false;
210+
}
211+
200212
int err = pthread_join(thread, NULL);
201213
if (EINVAL == err || ESRCH == err || EDEADLK == err) {
202214
isJoined = false;

hazelcast/test/src/cluster/ClientConnectionTest.cpp

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,6 @@ namespace hazelcast {
4848
ASSERT_THROW(HazelcastClient client(config), exception::IllegalStateException);
4949
}
5050

51-
TEST_F(ClientConnectionTest, testTcpSocketConnectionTimeout_withIntMax) {
52-
HazelcastServer instance(*g_srvFactory, true);
53-
ClientConfig config;
54-
config.addAddress(Address("8.8.8.8", 5701));
55-
ASSERT_THROW(HazelcastClient client(config), exception::IllegalStateException);
56-
}
57-
5851
#ifdef HZ_BUILD_WITH_SSL
5952
TEST_F(ClientConnectionTest, testSslSocketTimeoutToOutsideNetwork) {
6053
HazelcastServer instance(*g_srvFactory, true);

hazelcast/test/src/cluster/ClusterTest.cpp

Lines changed: 90 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,64 @@ namespace hazelcast {
3434
class ClusterTest : public ClientTestSupport {
3535
};
3636

37+
class ClientAllStatesListener : public LifecycleListener {
38+
public:
39+
40+
ClientAllStatesListener(util::CountDownLatch *startingLatch, util::CountDownLatch *startedLatch = NULL,
41+
util::CountDownLatch *connectedLatch = NULL,
42+
util::CountDownLatch *disconnectedLatch = NULL,
43+
util::CountDownLatch *shuttingDownLatch = NULL,
44+
util::CountDownLatch *shutdownLatch = NULL)
45+
: startingLatch(startingLatch), startedLatch(startedLatch), connectedLatch(connectedLatch),
46+
disconnectedLatch(disconnectedLatch), shuttingDownLatch(shuttingDownLatch),
47+
shutdownLatch(shutdownLatch) { }
48+
49+
virtual void stateChanged(const LifecycleEvent &lifecycleEvent) {
50+
switch (lifecycleEvent.getState()) {
51+
case LifecycleEvent::STARTING:
52+
if (startingLatch) {
53+
startingLatch->countDown();
54+
}
55+
break;
56+
case LifecycleEvent::STARTED:
57+
if (startedLatch) {
58+
startedLatch->countDown();
59+
}
60+
break;
61+
case LifecycleEvent::CLIENT_CONNECTED:
62+
if (connectedLatch) {
63+
connectedLatch->countDown();
64+
}
65+
break;
66+
case LifecycleEvent::CLIENT_DISCONNECTED:
67+
if (disconnectedLatch) {
68+
disconnectedLatch->countDown();
69+
}
70+
break;
71+
case LifecycleEvent::SHUTTING_DOWN:
72+
if (shuttingDownLatch) {
73+
shuttingDownLatch->countDown();
74+
}
75+
break;
76+
case LifecycleEvent::SHUTDOWN:
77+
if (shutdownLatch) {
78+
shutdownLatch->countDown();
79+
}
80+
break;
81+
default:
82+
FAIL() << "No such state expected:" << lifecycleEvent.getState();
83+
}
84+
}
85+
86+
private:
87+
util::CountDownLatch *startingLatch;
88+
util::CountDownLatch *startedLatch;
89+
util::CountDownLatch *connectedLatch;
90+
util::CountDownLatch *disconnectedLatch;
91+
util::CountDownLatch *shuttingDownLatch;
92+
util::CountDownLatch *shutdownLatch;
93+
};
94+
3795
class SampleInitialListener : public InitialMembershipListener {
3896
public:
3997
SampleInitialListener(util::CountDownLatch &_memberAdded, util::CountDownLatch &_attributeLatch,
@@ -177,23 +235,6 @@ namespace hazelcast {
177235
util::CountDownLatch &addLatch;
178236
};
179237

180-
class LclForClusterTest : public LifecycleListener {
181-
public:
182-
LclForClusterTest(util::CountDownLatch &latch)
183-
: latch(latch) {
184-
185-
}
186-
187-
void stateChanged(const LifecycleEvent &event) {
188-
if (event.getState() == LifecycleEvent::CLIENT_CONNECTED) {
189-
latch.countDown();
190-
}
191-
}
192-
193-
private:
194-
util::CountDownLatch &latch;
195-
};
196-
197238
TEST_F(ClusterTest, testListenersWhenClusterDown) {
198239
HazelcastServer instance(*g_srvFactory);
199240

@@ -211,12 +252,12 @@ namespace hazelcast {
211252
m.addEntryListener(listener, true);
212253
instance.shutdown();
213254

214-
util::CountDownLatch lifecycleLatch(1);
215-
LclForClusterTest lifecycleListener(lifecycleLatch);
255+
util::CountDownLatch connectedLatch(1);
256+
ClientAllStatesListener lifecycleListener(NULL, NULL, &connectedLatch);
216257
hazelcastClient.addLifecycleListener(&lifecycleListener);
217258

218259
HazelcastServer instance2(*g_srvFactory);
219-
ASSERT_TRUE(lifecycleLatch.await(120));
260+
ASSERT_TRUE(connectedLatch.await(120));
220261
// Let enough time for the client to re-register the failed listeners
221262
util::sleep(5);
222263
m.put("sample", "entry");
@@ -228,6 +269,35 @@ namespace hazelcast {
228269
ClientConfig clientConfig;
229270
ASSERT_THROW(HazelcastClient client(clientConfig), exception::IllegalStateException);
230271
}
272+
273+
TEST_F(ClusterTest, testAllClientStates) {
274+
HazelcastServer instance(*g_srvFactory);
275+
276+
ClientConfig clientConfig;
277+
clientConfig.setAttemptPeriod(1000);
278+
clientConfig.setConnectionAttemptLimit(1);
279+
util::CountDownLatch startingLatch(1);
280+
util::CountDownLatch startedLatch(1);
281+
util::CountDownLatch connectedLatch(1);
282+
util::CountDownLatch disconnectedLatch(1);
283+
util::CountDownLatch shuttingDownLatch(1);
284+
util::CountDownLatch shutdownLatch(1);
285+
ClientAllStatesListener listener(&startingLatch, &startedLatch, &connectedLatch, &disconnectedLatch,
286+
&shuttingDownLatch, &shutdownLatch);
287+
clientConfig.addListener(&listener);
288+
289+
HazelcastClient client(clientConfig);
290+
291+
ASSERT_TRUE(startingLatch.await(0));
292+
ASSERT_TRUE(startedLatch.await(0));
293+
ASSERT_TRUE(connectedLatch.await(0));
294+
295+
instance.shutdown();
296+
297+
ASSERT_TRUE(disconnectedLatch.await(3));
298+
ASSERT_TRUE(shuttingDownLatch.await(5));
299+
ASSERT_TRUE(shutdownLatch.await(500));
300+
}
231301
}
232302
}
233303
}

hazelcast/test/src/util/ClientUtilTest.cpp

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include "hazelcast/util/Util.h"
2121
#include "hazelcast/util/Future.h"
2222
#include "hazelcast/util/Thread.h"
23+
#include "hazelcast/util/CountDownLatch.h"
2324

2425
#include <ctime>
2526
#include <errno.h>
@@ -29,7 +30,7 @@ namespace hazelcast {
2930
namespace client {
3031
namespace test {
3132
class ClientUtilTest : public ::testing::Test {
32-
public:
33+
protected:
3334
static void wakeTheConditionUp(util::ThreadArgs& args) {
3435
util::Mutex *mutex = (util::Mutex *)args.arg0;
3536
util::ConditionVariable *cv = (util::ConditionVariable *)args.arg1;
@@ -55,6 +56,19 @@ namespace hazelcast {
5556
std::auto_ptr<client::exception::IException> exception(new exception::IException("exceptionName", "details"));
5657
future->set_exception(exception);
5758
}
59+
60+
static void cancelJoinFromRunningThread(util::ThreadArgs& args) {
61+
util::Thread *currentThread = args.currentThread;
62+
util::CountDownLatch *latch = (util::CountDownLatch *) args.arg0;
63+
currentThread->cancel();
64+
ASSERT_FALSE(currentThread->join());
65+
latch->countDown();
66+
}
67+
68+
static void notifyExitingThread(util::ThreadArgs& args) {
69+
util::CountDownLatch *latch = (util::CountDownLatch *) args.arg0;
70+
latch->countDown();
71+
}
5872
};
5973

6074
TEST_F(ClientUtilTest, testConditionWaitTimeout) {
@@ -152,6 +166,25 @@ namespace hazelcast {
152166
ASSERT_EQ(threadName, thread.getThreadName());
153167
}
154168

169+
TEST_F (ClientUtilTest, testThreadJoinAfterThreadExited) {
170+
std::string threadName = "myThreadName";
171+
util::CountDownLatch latch(1);
172+
util::Thread thread(threadName, notifyExitingThread, &latch);
173+
ASSERT_TRUE(latch.await(2));
174+
// guarantee that the thread exited
175+
util::sleep(1);
176+
177+
// call join after thread exit
178+
thread.join();
179+
}
180+
181+
TEST_F (ClientUtilTest, testCancelJoinItselfFromTheRunningThread) {
182+
std::string threadName = "myThreadName";
183+
util::CountDownLatch latch(1);
184+
util::Thread thread(threadName, cancelJoinFromRunningThread, &latch);
185+
ASSERT_TRUE(latch.await(1000));
186+
}
187+
155188
void sleepyThread(util::ThreadArgs& args) {
156189
int sleepTime = *(int *)args.arg0;
157190
args.currentThread->interruptibleSleep(sleepTime);

0 commit comments

Comments
 (0)