diff --git a/blazingcache-core/pom.xml b/blazingcache-core/pom.xml index f72eaa5..aeb668b 100644 --- a/blazingcache-core/pom.xml +++ b/blazingcache-core/pom.xml @@ -6,14 +6,14 @@ blazingcache 2.3.0-SNAPSHOT - blazingcache-core + blazingcache-core jar BlazingCache :: Core Apache License, Version 2.0 http://www.apache.org/licenses/LICENSE-2.0.txt - repo + repo @@ -42,7 +42,7 @@ io.netty netty-tcnative-boringssl-static ${libs.netty4.tcnative} - compile + compile javax.servlet @@ -50,7 +50,7 @@ ${libs.servletapi} provided - + org.codehaus.jackson jackson-mapper-asl ${libs.jackson.mapper} @@ -102,7 +102,7 @@ ${libs.snappy} test - + org.apache.hadoop hadoop-minikdc @@ -130,7 +130,7 @@ org.slf4j slf4j-api - ${libs.slf4j} + ${libs.slf4j} org.slf4j diff --git a/blazingcache-core/src/main/java/blazingcache/network/netty/NettyChannel.java b/blazingcache-core/src/main/java/blazingcache/network/netty/NettyChannel.java index 3c36d91..4cb6cc7 100644 --- a/blazingcache-core/src/main/java/blazingcache/network/netty/NettyChannel.java +++ b/blazingcache-core/src/main/java/blazingcache/network/netty/NettyChannel.java @@ -46,7 +46,7 @@ public class NettyChannel extends Channel { private static final boolean DISCONNECT_ON_PENDING_REPLY_TIMEOUT = Boolean.parseBoolean(System.getProperty("blazingcache.nettychannel.disconnectonpendingreplytimeout", "true")); - volatile SocketChannel socket; + private volatile SocketChannel socket; private static final Logger LOGGER = Logger.getLogger(NettyChannel.class.getName()); private static final AtomicLong idGenerator = new AtomicLong(); @@ -55,7 +55,7 @@ public class NettyChannel extends Channel { private final Map pendingReplyMessagesDeadline = new ConcurrentHashMap<>(); private final ExecutorService callbackexecutor; private final NettyConnector connector; - private boolean ioErrors = false; + private volatile boolean ioErrors = false; private final long id = idGenerator.incrementAndGet(); private final boolean disconnectOnReplyTimeout; @@ -250,7 +250,7 @@ public void close() { } } - void exceptionCaught(Throwable cause) { + public void exceptionCaught(Throwable cause) { LOGGER.log(Level.SEVERE, this + " io-error " + cause, cause); ioErrors = true; } @@ -277,10 +277,12 @@ public void channelIdle() { processPendingReplyMessagesDeadline(); } + @Override public String getName() { return name; } + @Override public void setName(String name) { this.name = name; } diff --git a/blazingcache-core/src/main/java/blazingcache/server/CacheServerEndpoint.java b/blazingcache-core/src/main/java/blazingcache/server/CacheServerEndpoint.java index 32eabc8..304e209 100644 --- a/blazingcache-core/src/main/java/blazingcache/server/CacheServerEndpoint.java +++ b/blazingcache-core/src/main/java/blazingcache/server/CacheServerEndpoint.java @@ -90,7 +90,7 @@ void connectionClosed(CacheServerSideConnection con) { } void processIdleConnections() { - try { + try { List connections = new ArrayList<>(clientConnections.values()); for (CacheServerSideConnection cs : connections) { cs.processIdleConnection(); diff --git a/blazingcache-core/src/main/java/blazingcache/server/CacheServerSideConnection.java b/blazingcache-core/src/main/java/blazingcache/server/CacheServerSideConnection.java index dd9dd5a..65596d1 100644 --- a/blazingcache-core/src/main/java/blazingcache/server/CacheServerSideConnection.java +++ b/blazingcache-core/src/main/java/blazingcache/server/CacheServerSideConnection.java @@ -502,7 +502,7 @@ public String toString() { void sendKeyInvalidationMessage(String sourceClientId, RawString key, BroadcastRequestStatus invalidation) { Channel _channel = channel; if (_channel == null || !_channel.isValid()) { - // not connected, quindi cache vuota + // not connected, quindi cache vuota LOGGER.log(Level.SEVERE, "client " + clientId + " without channel, considering key " + key + " invalidated"); invalidation.clientDone(clientId); return; @@ -601,7 +601,7 @@ public void replyReceived(Message originalMessage, Message message, Throwable er void processIdleConnection() { Channel _channel = channel; - if (_channel != null && _channel.isValid()) { + if (_channel != null) { _channel.channelIdle(); } } diff --git a/blazingcache-core/src/test/java/blazingcache/client/ApparentlyStuckClientDueToServerSideErrorTest.java b/blazingcache-core/src/test/java/blazingcache/client/ApparentlyStuckClientDueToServerSideErrorTest.java new file mode 100644 index 0000000..88a4263 --- /dev/null +++ b/blazingcache-core/src/test/java/blazingcache/client/ApparentlyStuckClientDueToServerSideErrorTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to Diennea S.r.l. under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Diennea S.r.l. licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package blazingcache.client; + +import blazingcache.network.ServerHostData; +import blazingcache.network.netty.NettyCacheServerLocator; +import blazingcache.server.CacheServer; +import java.nio.charset.StandardCharsets; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import org.junit.Test; +import static org.junit.Assert.assertTrue; +import blazingcache.network.Message; +import blazingcache.network.netty.NettyChannel; +import blazingcache.server.CacheServerSideConnection; +import java.util.concurrent.atomic.AtomicReference; + +public class ApparentlyStuckClientDueToServerSideErrorTest { + + @Test + public void test() throws Exception { + byte[] data = "testdata".getBytes(StandardCharsets.UTF_8); + + ServerHostData serverHostData = new ServerHostData("localhost", 1234, "test", false, null); + try ( CacheServer cacheServer = new CacheServer("ciao", serverHostData)) { + + cacheServer.setSlowClientTimeout(10000); + cacheServer.start(); + AtomicReference _client2 = new AtomicReference<>(); + try ( CacheClient client1 = new CacheClient("theClient1", "ciao", new NettyCacheServerLocator(serverHostData)); CacheClient client2 = new CacheClient("theClient2", "ciao", + new NettyCacheServerLocator(serverHostData)) { + @Override + public void messageReceived(Message message) { + // swallow every message + // client2 will not answer to the "invaliate" message + // client1 has to wait until the server declares client2 dead + + // simulate a network error, only on the server side part + CacheServerSideConnection serverSideConnectionPeer2 = cacheServer.getAcceptor().getClientConnections().get(_client2.get().getClientId()); + // we are sure that we are using the NettyChannel, not JVMChannel + NettyChannel channel = (NettyChannel) serverSideConnectionPeer2.getChannel(); + channel.exceptionCaught(new Exception("dummy unpredictable error")); + } + + };) { + _client2.set(client2); + client1.start(); + client2.start(); + + assertTrue(client1.waitForConnection(10000)); + assertTrue(client2.waitForConnection(10000)); + + client1.load("foo", data, 0); + assertNotNull(client2.fetch("foo")); + + client1.invalidate("foo"); + + assertNull(client1.get("foo")); + + // client2 does not know that the server had problems and it still holds a copy of the value + assertNotNull(client2.get("foo")); + + } + + } + + } + +} diff --git a/pom.xml b/pom.xml index 52ddf63..b1f7865 100644 --- a/pom.xml +++ b/pom.xml @@ -12,13 +12,13 @@ Apache License, Version 2.0 http://www.apache.org/licenses/LICENSE-2.0.txt - repo + repo - https://github.com/diennea/blazingcache.git - scm:git:https://github.com/diennea/blazingcache.git - scm:git:https://github.com/diennea/blazingcache.git + https://github.com/diennea/blazingcache.git + scm:git:https://github.com/diennea/blazingcache.git + scm:git:https://github.com/diennea/blazingcache.git HEAD @@ -59,12 +59,12 @@ - https://github.com/diennea/blazingcache - + https://github.com/diennea/blazingcache + UTF-8 1.8 - 1.8 + 1.8 4.1.50.Final 2.0.31.Final 3.1.0 @@ -76,11 +76,11 @@ 2.7 3.2.1 5.0.0 - 1.0 + 1.0 3.1.0 - 4.0.4 + 4.0.4 0.8.5 - 1.7.30 + 1.7.30 @@ -115,35 +115,35 @@ org.jacoco jacoco-maven-plugin ${libs.jacoco} - + default - true - + true + ./blazingcache-core ./blazingcache-jcache ./blazingcache-services ./blazingcache-website ./blazingcache-site-skin - - - + + + dev.majordodo.org BlazingCache Public Repository https://dev.majordodo.org/nexus/content/repositories/releases/ - - + + dev.majordodo.org BlazingCache Public Repository https://dev.majordodo.org/nexus/content/repositories/snapshots/ - - - + + + org.jacoco @@ -151,19 +151,19 @@ ${libs.jacoco} - default-prepare-agent + default-prepare-agent prepare-agent - + org.apache.maven.plugins maven-surefire-plugin - 3.0.0-M5 - - @{argLine} -Xmx1024m -Djava.net.preferIpv4Stack=true + 3.0.0-M3 + + @{argLine} -Xmx1024m -Djava.net.preferIpv4Stack=true 1 false @@ -197,18 +197,18 @@ - - + + ossrh - false - + false + ./blazingcache-core - ./blazingcache-jcache - + ./blazingcache-jcache + ossrh @@ -220,8 +220,8 @@ org.apache.maven.plugins maven-surefire-plugin - 3.0.0-M5 - + 3.0.0-M3 + -Xmx1024m -Djava.net.preferIpv4Stack=true 1 false @@ -254,7 +254,7 @@ https://oss.sonatype.org/ false - + org.apache.maven.plugins maven-source-plugin @@ -297,14 +297,14 @@ - + - website + website ./blazingcache-site-skin - ./blazingcache-website + ./blazingcache-website