Skip to content
Permalink
Browse files
add flowControl, heartbeat (#40)
* add flow control
* add heartBeat
* add task to check ack
* add task to check ack and remove lastAckTimestamp
* rest BarrierEvent
* remove AckType
* add checkMinPendingRequests
* use accumulate flush
* add timeout parameter method to startSession and finishSession
* use real ip start
* scheduleAtFixedRate -> scheduleWithFixedDelay
* add maxHeartbeatTimes
* add dataMessage release()
* max_heartbeat_timeouts -> max_timeout_heartbeat_count
* set backlog default value 511
  • Loading branch information
coderzc committed May 6, 2021
1 parent a554a5a commit c9e5ef4e953dde80b641a1014cf4eac48c1d47f3
Showing 34 changed files with 1,731 additions and 508 deletions.
@@ -35,6 +35,7 @@
import com.baidu.hugegraph.computer.core.network.TransportConf;
import com.baidu.hugegraph.config.ConfigOption;
import com.baidu.hugegraph.config.OptionHolder;
import com.baidu.hugegraph.util.Bytes;
import com.google.common.collect.ImmutableSet;

public class ComputerOptions extends OptionHolder {
@@ -380,30 +381,39 @@ public static synchronized ComputerOptions instance() {
false
);

public static final ConfigOption<Integer> TRANSPORT_SEND_BUFFER_SIZE =
public static final ConfigOption<Boolean> TRANSPORT_TCP_KEEP_ALIVE =
new ConfigOption<>(
"transport.send_buffer_size",
"The network send buffer size, 0 means using system " +
"defaults.",
"transport.transport_tcp_keep_alive",
"Whether enable TCP keep-alive.",
allowValues(true, false),
true
);

public static final ConfigOption<Integer> TRANSPORT_MAX_SYN_BACKLOG =
new ConfigOption<>(
"transport.max_syn_backlog",
"The capacity of SYN queue on server side, 0 means using " +
"system default value.",
nonNegativeInt(),
0
511
);

public static final ConfigOption<Integer> TRANSPORT_RECEIVE_BUFFER_SIZE =
new ConfigOption<>(
"transport.receive_buffer_size",
"The network receive buffer size, 0 means using system " +
"defaults.",
"The size of socket receive-buffer in bytes, 0 means " +
"using system default value.",
nonNegativeInt(),
0
);

public static final ConfigOption<Integer> TRANSPORT_BACKLOG =
public static final ConfigOption<Integer> TRANSPORT_SEND_BUFFER_SIZE =
new ConfigOption<>(
"transport.backlog",
"The server connection backlog, 0 means using system " +
"defaults.",
"transport.send_buffer_size",
"The size of socket send-buffer in bytes, 0 means using " +
"system default value.",
nonNegativeInt(),
// TODO: Test to get an best value
0
);

@@ -432,6 +442,24 @@ public static synchronized ComputerOptions instance() {
5_000L
);

public static final ConfigOption<Long> TRANSPORT_FINISH_SESSION_TIMEOUT =
new ConfigOption<>(
"transport.finish_session_timeout",
"The timeout(in ms) to finish session, " +
"0 means using (transport.sync_request_timeout * " +
"transport.max_pending_requests).",
nonNegativeInt(),
0L
);

public static final ConfigOption<Long> TRANSPORT_WRITE_SOCKET_TIMEOUT =
new ConfigOption<>(
"transport.write_socket_timeout",
"The timeout(in ms) to write data to socket buffer.",
positiveInt(),
3000L
);

public static final ConfigOption<Integer> TRANSPORT_NETWORK_RETRIES =
new ConfigOption<>(
"transport.network_retries",
@@ -441,24 +469,44 @@ public static synchronized ComputerOptions instance() {
3
);

public static final ConfigOption<Integer> TRANSPORT_WRITE_BUFFER_HIGH_MARK =
new ConfigOption<>(
"transport.write_buffer_high_mark",
"The high water mark for write buffer in bytes, " +
"it will trigger the sending unavailable if the number " +
"of queued bytes > write_buffer_high_mark.",
nonNegativeInt(),
64 * (int) Bytes.MB
);

public static final ConfigOption<Integer> TRANSPORT_WRITE_BUFFER_LOW_MARK =
new ConfigOption<>(
"transport.write_buffer_low_mark",
"The low water mark for write buffer in bytes, it will " +
"trigger the sending available if the number of queued " +
"bytes < write_buffer_low_mark." +
nonNegativeInt(),
32 * (int) Bytes.MB
);

public static final ConfigOption<Integer> TRANSPORT_MAX_PENDING_REQUESTS =
new ConfigOption<>(
"transport.max_pending_requests",
"The max number of client unreceived ack, " +
"if the number of unreceived ack greater than it, " +
"it will block the client from calling send.",
"it will trigger the sending unavailable if the number " +
"of unreceived ack >= max_pending_requests.",
positiveInt(),
50
8000
);

public static final ConfigOption<Integer> TRANSPORT_MIN_PENDING_REQUESTS =
new ConfigOption<>(
"transport.min_pending_requests",
"The minimum number of client unreceived ack, " +
"if the number of unreceived ack less than it, " +
"it will wake the client from calling send.",
"it will trigger the sending available if the number of " +
"unreceived ack < min_pending_requests.",
positiveInt(),
5
6000
);

public static final ConfigOption<Long> TRANSPORT_MIN_ACK_INTERVAL =
@@ -469,27 +517,32 @@ public static synchronized ComputerOptions instance() {
200L
);

public static final ConfigOption<Integer> TRANSPORT_HEARTBEAT_INTERVAL =
public static final ConfigOption<Long> TRANSPORT_SERVER_IDLE_TIMEOUT =
new ConfigOption<>(
"transport.heartbeat_interval_seconds",
"Time minimum interval(in seconds) of send heartbeat.",
"transport.server_idle_timeout",
"The max timeout(in ms) of server idle.",
positiveInt(),
60
120_000L
);

public static final ConfigOption<Integer> TRANSPORT_HEARTBEAT_TIMEOUT =
public static final ConfigOption<Long> TRANSPORT_HEARTBEAT_INTERVAL =
new ConfigOption<>(
"transport.heartbeat_timeout_seconds",
"The max timeout(in seconds) of heartbeat.",
"transport.heartbeat_interval",
"The minimum interval(in ms) between heartbeats on " +
"client side.",
positiveInt(),
120
20_000L
);

public static final ConfigOption<Boolean> TRANSPORT_TCP_KEEP_ALIVE =
public static final ConfigOption<Integer>
TRANSPORT_MAX_TIMEOUT_HEARTBEAT_COUNT =
new ConfigOption<>(
"transport.transport_tcp_keep_alive",
"Whether enable TCP keep-alive.",
allowValues(true, false),
true
"transport.max_timeout_heartbeat_count",
"The maximum times of timeout heartbeat on client side, " +
"if the number of timeouts waiting for heartbeat " +
"response continuously > max_heartbeat_timeouts the " +
"channel will be closed from client side.",
positiveInt(),
90
);
}
@@ -25,6 +25,7 @@
import com.baidu.hugegraph.computer.core.common.exception.IllegalArgException;
import com.baidu.hugegraph.computer.core.config.ComputerOptions;
import com.baidu.hugegraph.computer.core.config.Config;
import com.baidu.hugegraph.util.E;

import io.netty.channel.epoll.Epoll;

@@ -59,6 +60,19 @@ public int serverPort() {
return this.config.get(ComputerOptions.TRANSPORT_SERVER_PORT);
}

public int serverThreads() {
return this.config.get(ComputerOptions.TRANSPORT_SERVER_THREADS);
}

public int clientThreads() {
return this.config.get(ComputerOptions.TRANSPORT_CLIENT_THREADS);
}

public TransportProvider transportProvider() {
return this.config
.createObject(ComputerOptions.TRANSPORT_PROVIDER_CLASS);
}

/**
* IO mode: nio or epoll
*/
@@ -78,44 +92,24 @@ public IOMode ioMode() {
}

/**
* The transport provider
*/
public TransportProvider transportProvider() {
return this.config
.createObject(ComputerOptions.TRANSPORT_PROVIDER_CLASS);
}

/**
* Enabled EPOLL level trigger
* Whether enabled EPOLL level trigger
*/
public boolean epollLevelTriggered() {
return this.config.get(ComputerOptions.TRANSPORT_EPOLL_LT);
}

public boolean tcpKeepAlive() {
return this.config.get(ComputerOptions.TRANSPORT_TCP_KEEP_ALIVE);
}

/**
* Requested maximum length of the queue of incoming connections. If
* &lt; 1,
* the default Netty value of {@link io.netty.util.NetUtil#SOMAXCONN} will
* be used.
*/
public int backLog() {
return this.config.get(ComputerOptions.TRANSPORT_BACKLOG);
}

/**
* Number of threads used in the server EventLoop thread pool. Default to
* 0, which is CPUs.
*/
public int serverThreads() {
return this.config.get(ComputerOptions.TRANSPORT_SERVER_THREADS);
}

public int clientThreads() {
return this.config.get(ComputerOptions.TRANSPORT_CLIENT_THREADS);
}

public int sendBuffer() {
return this.config.get(ComputerOptions.TRANSPORT_SEND_BUFFER_SIZE);
public int maxSynBacklog() {
return this.config.get(ComputerOptions.TRANSPORT_MAX_SYN_BACKLOG);
}

/**
@@ -125,10 +119,14 @@ public int sendBuffer() {
* Assuming latency = 1ms, network_bandwidth = 10Gbps
* buffer size should be ~ 1.25MB
*/
public int receiveBuffer() {
public int sizeReceiveBuffer() {
return this.config.get(ComputerOptions.TRANSPORT_RECEIVE_BUFFER_SIZE);
}

public int sizeSendBuffer() {
return this.config.get(ComputerOptions.TRANSPORT_SEND_BUFFER_SIZE);
}

public int networkRetries() {
return this.config.get(ComputerOptions.TRANSPORT_NETWORK_RETRIES);
}
@@ -147,38 +145,62 @@ public long syncRequestTimeout() {
}

/**
* The max number of request allowed to unreceived ack.
* Note: If the number of unreceived ack greater than
* TRANSPORT_MAX_PENDING_REQUESTS,
* {@link TransportClient#send} will unavailable
* Timeout of finish session, if less than or equal 0 the default value is
* TRANSPORT_SYNC_REQUEST_TIMEOUT * TRANSPORT_MAX_PENDING_REQUESTS
*/
public long finishSessionTimeout() {
long timeout = this.config.get(
ComputerOptions.TRANSPORT_FINISH_SESSION_TIMEOUT);
return timeout > 0 ? timeout :
this.config.get(ComputerOptions.TRANSPORT_SYNC_REQUEST_TIMEOUT) *
this.config.get(ComputerOptions.TRANSPORT_MAX_PENDING_REQUESTS);
}

public long writeSocketTimeout() {
return this.config.get(ComputerOptions.TRANSPORT_WRITE_SOCKET_TIMEOUT);
}

public int writeBufferHighMark() {
return this.config
.get(ComputerOptions.TRANSPORT_WRITE_BUFFER_HIGH_MARK);
}

public int writeBufferLowMark() {
return this.config
.get(ComputerOptions.TRANSPORT_WRITE_BUFFER_LOW_MARK);
}

public int maxPendingRequests() {
return this.config.get(ComputerOptions.TRANSPORT_MAX_PENDING_REQUESTS);
}

/**
* The minimum number of client unreceived ack.
*/
public int minPendingRequests() {
return this.config.get(ComputerOptions.TRANSPORT_MIN_PENDING_REQUESTS);
int minPendingReqs = this.config.get(
ComputerOptions.TRANSPORT_MIN_PENDING_REQUESTS);

int maxPendingRequests = this.maxPendingRequests();

E.checkArgument(minPendingReqs <= maxPendingRequests,
"The min_pending_requests(%s) must be less than or " +
"equal to the max_pending_requests(%s).",
minPendingReqs, maxPendingRequests);
return minPendingReqs;
}

/**
* The minimum interval(in ms) of server reply ack.
*/
public long minAckInterval() {
return this.config.get(ComputerOptions.TRANSPORT_MIN_ACK_INTERVAL);
}

public int heartbeatInterval() {
return this.config.get(ComputerOptions.TRANSPORT_HEARTBEAT_INTERVAL);
public long serverIdleTimeout() {
return this.config.get(ComputerOptions.TRANSPORT_SERVER_IDLE_TIMEOUT);
}

public int heartbeatTimeout() {
return this.config.get(ComputerOptions.TRANSPORT_HEARTBEAT_TIMEOUT);
public long heartbeatInterval() {
return this.config.get(ComputerOptions.TRANSPORT_HEARTBEAT_INTERVAL);
}

public boolean tcpKeepAlive() {
return this.config.get(ComputerOptions.TRANSPORT_TCP_KEEP_ALIVE);
public int maxTimeoutHeartbeatCount() {
return this.config
.get(ComputerOptions.TRANSPORT_MAX_TIMEOUT_HEARTBEAT_COUNT);
}
}
@@ -19,12 +19,12 @@

package com.baidu.hugegraph.computer.core.network;

public enum TransportStatus {
public enum TransportState {

READY,
START_SEND,
START_SENT,
START_RECV,
ESTABLISH,
FINISH_SEND,
ESTABLISHED,
FINISH_SENT,
FINISH_RECV
}
@@ -51,7 +51,7 @@ public interface ManagedBuffer {
ManagedBuffer release();

/**
* Returns the reference count if applicable, otherwise return -1.
* Returns the reference count.
*/
int referenceCount();

0 comments on commit c9e5ef4

Please sign in to comment.