Skip to content
Permalink
Browse files
Add sessionActive for client (#59)
  • Loading branch information
coderzc committed Jun 10, 2021
1 parent f524ec2 commit e39e7be34772f5592c27e68c58a4f512161cfb2c
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 0 deletions.
@@ -81,6 +81,12 @@ boolean send(MessageType messageType, int partition, ByteBuffer buffer)
*/
boolean active();

/**
* To check whether the session is active to use
* @return true if session is active
*/
boolean sessionActive();

/**
* Close the client.
* NOTE: If the client is created with {@link ConnectionManager}, need to
@@ -31,6 +31,7 @@
import com.baidu.hugegraph.computer.core.network.ConnectionId;
import com.baidu.hugegraph.computer.core.network.TransportClient;
import com.baidu.hugegraph.computer.core.network.TransportConf;
import com.baidu.hugegraph.computer.core.network.TransportState;
import com.baidu.hugegraph.computer.core.network.message.Message;
import com.baidu.hugegraph.computer.core.network.message.MessageType;
import com.baidu.hugegraph.computer.core.network.session.ClientSession;
@@ -87,6 +88,16 @@ public boolean active() {
return this.channel.isActive();
}

@Override
public boolean sessionActive() {
if (!this.active()) {
return false;
}
TransportState state = this.session.state();
return state == TransportState.ESTABLISHED ||
state == TransportState.FINISH_SENT;
}

protected ClientSession clientSession() {
return this.session;
}
@@ -22,8 +22,11 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

@@ -369,4 +372,28 @@ public void testCheckMinPendingRequests() {
Assert.assertThrows(IllegalArgumentException.class,
conf::minPendingRequests);
}

@Test
public void testSessionActive() throws IOException, InterruptedException,
ExecutionException,
TimeoutException {
NettyTransportClient client = (NettyTransportClient) this.oneClient();

Assert.assertFalse(client.sessionActive());

CompletableFuture<Void> future = client.startSessionAsync();
Assert.assertFalse(client.sessionActive());

future.get(5, TimeUnit.SECONDS);
Assert.assertTrue(client.sessionActive());

CompletableFuture<Void> finishFuture = client.finishSessionAsync();
Assert.assertTrue(client.sessionActive());

finishFuture.get(5, TimeUnit.SECONDS);
Assert.assertFalse(client.sessionActive());

client.close();
Assert.assertFalse(client.sessionActive());
}
}

0 comments on commit e39e7be

Please sign in to comment.