Skip to content

Commit

Permalink
Issue 148: "Stuck client" may not be detected in case of non-open soc…
Browse files Browse the repository at this point in the history
…ket (#164)

* Issue 148: 'Stuck client' may not be detected in case of non-open socket

Co-authored-by: Enrico Olivelli <enrico.olivelli@diennea.com>
  • Loading branch information
eolivelli and Enrico Olivelli committed Jul 1, 2020
1 parent 68f8a17 commit 8c72f31
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 50 deletions.
12 changes: 6 additions & 6 deletions blazingcache-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
<artifactId>blazingcache</artifactId>
<version>2.3.0-SNAPSHOT</version>
</parent>
<artifactId>blazingcache-core</artifactId>
<artifactId>blazingcache-core</artifactId>
<packaging>jar</packaging>
<name>BlazingCache :: Core</name>
<licenses>
<license>
<name>Apache License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
<distribution>repo</distribution>
<distribution>repo</distribution>
</license>
</licenses>
<properties>
Expand Down Expand Up @@ -42,15 +42,15 @@
<groupId>io.netty</groupId>
<artifactId>netty-tcnative-boringssl-static</artifactId>
<version>${libs.netty4.tcnative}</version>
<scope>compile</scope>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<version>${libs.servletapi}</version>
<scope>provided</scope>
</dependency>
<dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>${libs.jackson.mapper}</version>
Expand Down Expand Up @@ -102,7 +102,7 @@
<version>${libs.snappy}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minikdc</artifactId>
Expand Down Expand Up @@ -130,7 +130,7 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${libs.slf4j}</version>
<version>${libs.slf4j}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
public class NettyChannel extends Channel {

private static final boolean DISCONNECT_ON_PENDING_REPLY_TIMEOUT = Boolean.parseBoolean(System.getProperty("blazingcache.nettychannel.disconnectonpendingreplytimeout", "true"));
volatile SocketChannel socket;
private volatile SocketChannel socket;
private static final Logger LOGGER = Logger.getLogger(NettyChannel.class.getName());
private static final AtomicLong idGenerator = new AtomicLong();

Expand All @@ -55,7 +55,7 @@ public class NettyChannel extends Channel {
private final Map<String, Long> pendingReplyMessagesDeadline = new ConcurrentHashMap<>();
private final ExecutorService callbackexecutor;
private final NettyConnector connector;
private boolean ioErrors = false;
private volatile boolean ioErrors = false;
private final long id = idGenerator.incrementAndGet();
private final boolean disconnectOnReplyTimeout;

Expand Down Expand Up @@ -250,7 +250,7 @@ public void close() {
}
}

void exceptionCaught(Throwable cause) {
public void exceptionCaught(Throwable cause) {
LOGGER.log(Level.SEVERE, this + " io-error " + cause, cause);
ioErrors = true;
}
Expand All @@ -277,10 +277,12 @@ public void channelIdle() {
processPendingReplyMessagesDeadline();
}

@Override
public String getName() {
return name;
}

@Override
public void setName(String name) {
this.name = name;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ void connectionClosed(CacheServerSideConnection con) {
}

void processIdleConnections() {
try {
try {
List<CacheServerSideConnection> connections = new ArrayList<>(clientConnections.values());
for (CacheServerSideConnection cs : connections) {
cs.processIdleConnection();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ public String toString() {
void sendKeyInvalidationMessage(String sourceClientId, RawString key, BroadcastRequestStatus invalidation) {
Channel _channel = channel;
if (_channel == null || !_channel.isValid()) {
// not connected, quindi cache vuota
// not connected, quindi cache vuota
LOGGER.log(Level.SEVERE, "client " + clientId + " without channel, considering key " + key + " invalidated");
invalidation.clientDone(clientId);
return;
Expand Down Expand Up @@ -601,7 +601,7 @@ public void replyReceived(Message originalMessage, Message message, Throwable er

void processIdleConnection() {
Channel _channel = channel;
if (_channel != null && _channel.isValid()) {
if (_channel != null) {
_channel.channelIdle();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to Diennea S.r.l. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Diennea S.r.l. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package blazingcache.client;

import blazingcache.network.ServerHostData;
import blazingcache.network.netty.NettyCacheServerLocator;
import blazingcache.server.CacheServer;
import java.nio.charset.StandardCharsets;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
import blazingcache.network.Message;
import blazingcache.network.netty.NettyChannel;
import blazingcache.server.CacheServerSideConnection;
import java.util.concurrent.atomic.AtomicReference;

public class ApparentlyStuckClientDueToServerSideErrorTest {

@Test
public void test() throws Exception {
byte[] data = "testdata".getBytes(StandardCharsets.UTF_8);

ServerHostData serverHostData = new ServerHostData("localhost", 1234, "test", false, null);
try ( CacheServer cacheServer = new CacheServer("ciao", serverHostData)) {

cacheServer.setSlowClientTimeout(10000);
cacheServer.start();
AtomicReference<CacheClient> _client2 = new AtomicReference<>();
try ( CacheClient client1 = new CacheClient("theClient1", "ciao", new NettyCacheServerLocator(serverHostData)); CacheClient client2 = new CacheClient("theClient2", "ciao",
new NettyCacheServerLocator(serverHostData)) {
@Override
public void messageReceived(Message message) {
// swallow every message
// client2 will not answer to the "invaliate" message
// client1 has to wait until the server declares client2 dead

// simulate a network error, only on the server side part
CacheServerSideConnection serverSideConnectionPeer2 = cacheServer.getAcceptor().getClientConnections().get(_client2.get().getClientId());
// we are sure that we are using the NettyChannel, not JVMChannel
NettyChannel channel = (NettyChannel) serverSideConnectionPeer2.getChannel();
channel.exceptionCaught(new Exception("dummy unpredictable error"));
}

};) {
_client2.set(client2);
client1.start();
client2.start();

assertTrue(client1.waitForConnection(10000));
assertTrue(client2.waitForConnection(10000));

client1.load("foo", data, 0);
assertNotNull(client2.fetch("foo"));

client1.invalidate("foo");

assertNull(client1.get("foo"));

// client2 does not know that the server had problems and it still holds a copy of the value
assertNotNull(client2.get("foo"));

}

}

}

}
76 changes: 38 additions & 38 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
<license>
<name>Apache License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
<distribution>repo</distribution>
<distribution>repo</distribution>
</license>
</licenses>
<scm>
<url>https://github.com/diennea/blazingcache.git</url>
<connection>scm:git:https://github.com/diennea/blazingcache.git</connection>
<developerConnection>scm:git:https://github.com/diennea/blazingcache.git</developerConnection>
<url>https://github.com/diennea/blazingcache.git</url>
<connection>scm:git:https://github.com/diennea/blazingcache.git</connection>
<developerConnection>scm:git:https://github.com/diennea/blazingcache.git</developerConnection>
<tag>HEAD</tag>
</scm>
<developers>
Expand Down Expand Up @@ -59,12 +59,12 @@
</developer>
</developers>
<issueManagement>
<url>https://github.com/diennea/blazingcache</url>
</issueManagement>
<url>https://github.com/diennea/blazingcache</url>
</issueManagement>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.target>1.8</maven.compiler.target>
<libs.netty4>4.1.50.Final</libs.netty4>
<libs.netty4.tcnative>2.0.31.Final</libs.netty4.tcnative>
<libs.servletapi>3.1.0</libs.servletapi>
Expand All @@ -76,11 +76,11 @@
<libs.commonsio>2.7</libs.commonsio>
<libs.minikdc>3.2.1</libs.minikdc>
<libs.curator>5.0.0</libs.curator>
<libs.jcipi-annotations>1.0</libs.jcipi-annotations>
<libs.jcipi-annotations>1.0</libs.jcipi-annotations>
<libs.spotbugsannotations>3.1.0</libs.spotbugsannotations>
<libs.spotbugsmaven>4.0.4</libs.spotbugsmaven>
<libs.spotbugsmaven>4.0.4</libs.spotbugsmaven>
<libs.jacoco>0.8.5</libs.jacoco>
<libs.slf4j>1.7.30</libs.slf4j>
<libs.slf4j>1.7.30</libs.slf4j>
</properties>
<dependencies>
<dependency>
Expand Down Expand Up @@ -115,55 +115,55 @@
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>${libs.jacoco}</version>
</plugin>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>default</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<activeByDefault>true</activeByDefault>
</activation>
<modules>
<module>./blazingcache-core</module>
<module>./blazingcache-jcache</module>
<module>./blazingcache-services</module>
<module>./blazingcache-website</module>
<module>./blazingcache-site-skin</module>
</modules>
<distributionManagement>
<repository>
</modules>
<distributionManagement>
<repository>
<id>dev.majordodo.org</id>
<name>BlazingCache Public Repository</name>
<url>https://dev.majordodo.org/nexus/content/repositories/releases/</url>
</repository>
<snapshotRepository>
</repository>
<snapshotRepository>
<id>dev.majordodo.org</id>
<name>BlazingCache Public Repository</name>
<url>https://dev.majordodo.org/nexus/content/repositories/snapshots/</url>
</snapshotRepository>
</distributionManagement>
<build>
</snapshotRepository>
</distributionManagement>
<build>
<plugins>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>${libs.jacoco}</version>
<executions>
<execution>
<id>default-prepare-agent</id>
<id>default-prepare-agent</id>
<goals>
<goal>prepare-agent</goal>
</goals>
</execution>
</executions>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M5</version>
<configuration>
<argLine>@{argLine} -Xmx1024m -Djava.net.preferIpv4Stack=true</argLine>
<version>3.0.0-M3</version>
<configuration>
<argLine>@{argLine} -Xmx1024m -Djava.net.preferIpv4Stack=true</argLine>
<forkCount>1</forkCount>
<reuseForks>false</reuseForks>
</configuration>
Expand Down Expand Up @@ -197,18 +197,18 @@
</dependency>
</dependencies>
</plugin>
</plugins>
</build>
</plugins>
</build>
</profile>
<profile>
<id>ossrh</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<activeByDefault>false</activeByDefault>
</activation>
<modules>
<module>./blazingcache-core</module>
<module>./blazingcache-jcache</module>
</modules>
<module>./blazingcache-jcache</module>
</modules>
<distributionManagement>
<snapshotRepository>
<id>ossrh</id>
Expand All @@ -220,8 +220,8 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M5</version>
<configuration>
<version>3.0.0-M3</version>
<configuration>
<argLine>-Xmx1024m -Djava.net.preferIpv4Stack=true</argLine>
<forkCount>1</forkCount>
<reuseForks>false</reuseForks>
Expand Down Expand Up @@ -254,7 +254,7 @@
<nexusUrl>https://oss.sonatype.org/</nexusUrl>
<autoReleaseAfterClose>false</autoReleaseAfterClose>
</configuration>
</plugin>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
Expand Down Expand Up @@ -297,14 +297,14 @@
</execution>
</executions>
</plugin>
</plugins>
</plugins>
</build>
</profile>
<profile>
<id>website</id>
<id>website</id>
<modules>
<module>./blazingcache-site-skin</module>
<module>./blazingcache-website</module>
<module>./blazingcache-website</module>
</modules>
</profile>
</profiles>
Expand Down

0 comments on commit 8c72f31

Please sign in to comment.