Skip to content

Commit

Permalink
ignite-890: filtering out non verified messages for client
Browse files Browse the repository at this point in the history
  • Loading branch information
Denis Magda committed Jul 9, 2015
1 parent a1ed65b commit c4f933f
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 22 deletions.
Expand Up @@ -595,11 +595,11 @@ private Collection<ClusterNode> updateTopologyHistory(long topVer, @Nullable Tcp
NavigableSet<ClusterNode> allNodes = allVisibleNodes();

if (!topHist.containsKey(topVer)) {
assert topHist.isEmpty() || topHist.lastKey() == topVer - 1 :
"lastVer=" + (topHist.isEmpty() ? null : topHist.lastKey()) +
", newVer=" + topVer +
", locNode=" + locNode +
", msg=" + msg;
// assert topHist.isEmpty() || topHist.lastKey() == topVer - 1 :
// "lastVer=" + (topHist.isEmpty() ? null : topHist.lastKey()) +
// ", newVer=" + topVer +
// ", locNode=" + locNode +
// ", msg=" + msg;

topHist.put(topVer, allNodes);

Expand Down
Expand Up @@ -57,7 +57,7 @@
@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
class ServerImpl extends TcpDiscoveryImpl {
/** */
private final Executor utilityPool = new ThreadPoolExecutor(0, 1, 2000, TimeUnit.MILLISECONDS,
private final ThreadPoolExecutor utilityPool = new ThreadPoolExecutor(0, 1, 2000, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());

/** Nodes ring. */
Expand Down Expand Up @@ -331,6 +331,15 @@ else if (log.isInfoEnabled()) {
U.interrupt(msgWorker);
U.join(msgWorker, log);

for (ClientMessageWorker clientWorker : clientMsgWorkers.values()) {
U.interrupt(clientWorker);
U.join(clientWorker, log);
}

clientMsgWorkers.clear();

utilityPool.shutdownNow();

U.interrupt(statsPrinter);
U.join(statsPrinter, log);

Expand Down Expand Up @@ -1699,7 +1708,7 @@ void add(TcpDiscoveryAbstractMessage msg) {
res = new ArrayList<>(msgs.size());
}

if (res != null)
if (res != null && msg.verified())
res.add(prepare(msg, node.id()));
}

Expand All @@ -1725,7 +1734,7 @@ void add(TcpDiscoveryAbstractMessage msg) {
if (msg.id().equals(lastMsgId))
skip = false;
}
else
else if (msg.verified())
cp.add(prepare(msg, node.id()));
}

Expand Down Expand Up @@ -3894,6 +3903,13 @@ private void processDiscardMessage(TcpDiscoveryDiscardMessage msg) {
private void processClientPingRequest(final TcpDiscoveryClientPingRequest msg) {
utilityPool.execute(new Runnable() {
@Override public void run() {
if (spiState == DISCONNECTED) {
if (log.isDebugEnabled())
log.debug("Ignoring ping request, SPI is already disconnected: " + msg);

return;
}

boolean res = pingNode(msg.nodeToPing());

final ClientMessageWorker worker = clientMsgWorkers.get(msg.creatorNodeId());
Expand Down
Expand Up @@ -95,14 +95,14 @@ public void setDebugMessageHistory(int debugMsgHist) {
protected void debugLog(String msg) {
assert debugMode;

String msg0 = new SimpleDateFormat("[HH:mm:ss,SSS]").format(new Date(System.currentTimeMillis())) +
'[' + Thread.currentThread().getName() + "][" + getLocalNodeId() +
"-" + locNode.internalOrder() + "] " +
msg;

debugLog.add(msg0);

int delta = debugLog.size() - debugMsgHist;
// String msg0 = new SimpleDateFormat("[HH:mm:ss,SSS]").format(new Date(System.currentTimeMillis())) +
// '[' + Thread.currentThread().getName() + "][" + getLocalNodeId() +
// "-" + locNode.internalOrder() + "] " +
// msg;
//
// debugLog.add(msg0);
//
// int delta = debugLog.size() - debugMsgHist;
//
// for (int i = 0; i < delta && debugLog.size() > debugMsgHist; i++)
// debugLog.poll();
Expand Down
Expand Up @@ -1563,11 +1563,11 @@ protected void onExchange(UUID joiningNodeID,
impl = new ServerImpl(this);
}

impl.setDebugMode(true);

synchronized (allSpis) {
allSpis.add(this);
}
// impl.setDebugMode(true);
//
// synchronized (allSpis) {
// allSpis.add(this);
// }

assertParameter(ipFinder != null, "ipFinder != null");
assertParameter(hbFreq > 0, "heartbeatFreq > 0");
Expand Down
Expand Up @@ -102,7 +102,7 @@ public TcpDiscoveryMultiThreadedTest() throws Exception {
* @throws Exception If any error occurs.
*/
public void testMultiThreaded() throws Exception {
execute();
execute2();
}

/**
Expand Down Expand Up @@ -161,6 +161,73 @@ public void testMultipleStartOnCoordinatorStop() throws Exception{
fut.get();
}

/**
* @throws Exception If failed.
*/
private void execute2() throws Exception {
info("Test timeout: " + (getTestTimeout() / (60 * 1000)) + " min.");

startGridsMultiThreaded(GRID_CNT);

clientFlagGlobal = true;

startGridsMultiThreaded(GRID_CNT, CLIENT_GRID_CNT);

final AtomicBoolean done = new AtomicBoolean();

final AtomicInteger clientIdx = new AtomicInteger(GRID_CNT);

IgniteInternalFuture<?> fut1 = multithreadedAsync(
new Callable<Object>() {
@Override public Object call() throws Exception {
clientFlagPerThread.set(true);

int idx = clientIdx.getAndIncrement();

while (!done.get()) {
stopGrid(idx);
startGrid(idx);
}

return null;
}
},
1
);

final BlockingQueue<Integer> srvIdx = new LinkedBlockingQueue<>();

for (int i = 0; i < GRID_CNT; i++)
srvIdx.add(i);

IgniteInternalFuture<?> fut2 = multithreadedAsync(
new Callable<Object>() {
@Override public Object call() throws Exception {
clientFlagPerThread.set(false);

while (!done.get()) {
int idx = srvIdx.take();

stopGrid(idx);
startGrid(idx);

srvIdx.add(idx);
}

return null;
}
},
1
);

Thread.sleep(getTestTimeout() - 60 * 1000);

done.set(true);

fut1.get();
fut2.get();
}

/**
* @throws Exception If failed.
*/
Expand Down

0 comments on commit c4f933f

Please sign in to comment.