Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,49 @@
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.netflix.config.DynamicPropertyFactory;

public class FixedThreadExecutor implements Executor {
private static final Logger LOGGER = LoggerFactory.getLogger(FixedThreadExecutor.class);

public static final String KEY_GROUP = "servicecomb.executor.default.group";

public static final String KEY_THREAD = "servicecomb.executor.default.thread-per-group";

// to avoid multiple network thread conflicted when put tasks to executor queue
private List<Executor> executorList = new ArrayList<>();

// for bind network thread to one executor
// it's impossible that has too many network thread, so index will not too big that less than 0
private AtomicInteger index = new AtomicInteger();

private Map<Long, Executor> threadExectorMap = new ConcurrentHashMap<>();

public FixedThreadExecutor() {
executorList.add(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
executorList.add(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
int groupCount = DynamicPropertyFactory.getInstance().getIntProperty(KEY_GROUP, 2).get();
int threadPerGroup = DynamicPropertyFactory.getInstance()
.getIntProperty(KEY_THREAD, Runtime.getRuntime().availableProcessors())
.get();
LOGGER.info("executor group {}, thread per group {}.", groupCount, threadPerGroup);

for (int groupIdx = 0; groupIdx < groupCount; groupIdx++) {
executorList.add(Executors.newFixedThreadPool(threadPerGroup));
}
}

@Override
public void execute(Runnable command) {
long threadId = Thread.currentThread().getId();
Executor executor = threadExectorMap.get(threadId);
if (executor == null) {
int idx = index.getAndIncrement() % executorList.size();
executor = executorList.get(idx);
threadExectorMap.put(threadId, executor);
}
Executor executor = threadExectorMap.computeIfAbsent(threadId, this::chooseExecutor);

executor.execute(command);
}

private Executor chooseExecutor(long threadId) {
int idx = index.getAndIncrement() % executorList.size();
return executorList.get(idx);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public SyncResponseExecutor() {
@Override
public void execute(Runnable cmd) {
this.cmd = cmd;

// one network thread, many connections, then this notify will be performance bottlenecks
// if save to a queue, and other thread(s) to invoke countDown, will get good performance
// but if have multile network thread, this "optimization" will reduce performance
// now not change this.
latch.countDown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package io.servicecomb.demo.client.perf;

import io.servicecomb.core.CseContext;
import io.servicecomb.demo.pojo.client.PojoClient;
import io.servicecomb.demo.server.Test;
import io.servicecomb.demo.server.TestRequest;
Expand All @@ -27,9 +26,6 @@ public class ClientThread extends CommonThread {
@Override
public void run() {
Test test = PojoClient.test;
CseContext.getInstance().getConsumerProviderManager().setTransport("pojo", Config.getTransport());

System.out.printf("test %s performance\n", Config.getTransport());

while (isRunning()) {
int idx = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.servicecomb.demo.client.perf;

import io.servicecomb.core.CseContext;
import io.servicecomb.foundation.common.utils.BeanUtils;
import io.servicecomb.foundation.common.utils.Log4jUtils;
import io.servicecomb.foundation.vertx.VertxUtils;
Expand All @@ -28,6 +29,9 @@ public static void main(String[] args) throws Exception {

System.out.println("mode:" + Config.getMode());

CseContext.getInstance().getConsumerProviderManager().setTransport("pojo", Config.getTransport());
System.out.printf("test %s performance\n", Config.getTransport());

if ("reactive".equals(Config.getMode())) {
Vertx vertx = VertxUtils.getOrCreateVertxByName("perfClient", null);
VertxUtils.deployVerticle(vertx, ClientVerticle.class, Config.getClientThread());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeoutException;

import org.slf4j.Logger;
Expand All @@ -38,6 +38,7 @@
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.impl.NetSocketImpl;

public class TcpClientConnection extends TcpConnection {
private static final Logger LOGGER = LoggerFactory.getLogger(TcpClientConnection.class);
Expand All @@ -49,9 +50,6 @@ enum Status {
WORKING
}

// 是在哪个context中创建的
private Context context;

private NetClient netClient;

private TcpClientConfig clientConfig;
Expand All @@ -64,27 +62,23 @@ enum Status {

private volatile Status status = Status.DISCONNECTED;

private NetSocket netSocket;

// 连接未建立时,临时保存发送消息的队列
private List<AbstractTcpClientPackage> tmpPackageList = new LinkedList<>();
// save msg before login success.
// before login, we can not know parameters, like: zip/codec compatibl, and so on
// so can only save package, can not save byteBuf
private Queue<AbstractTcpClientPackage> packageQueue = new ConcurrentLinkedQueue<>();

// 所有的访问,都在锁的保护中,是线程安全的
private volatile Map<Long, TcpRequest> requestMap = new ConcurrentHashMap<>();

public TcpClientConnection(Context context, NetClient netClient, String endpoint, TcpClientConfig clientConfig) {
this.context = context;
this.setContext(context);

this.netClient = netClient;
URIEndpointObject ipPort = new URIEndpointObject(endpoint);
this.socketAddress = ipPort.getSocketAddress();
this.remoteSupportLogin = Boolean.parseBoolean(ipPort.getFirst(TcpConst.LOGIN));
this.clientConfig = clientConfig;
}

public Context getContext() {
return context;
}

public boolean isLocalSupportLogin() {
return localSupportLogin;
}
Expand All @@ -101,23 +95,61 @@ protected boolean onLoginResponse(Buffer bodyBuffer) {
return true;
}

/**
* 回调在tcp client verticle线程执行
* send没有锁优化的意义,因为netSocket.write内部本身会加锁
*/
public synchronized void send(AbstractTcpClientPackage tcpClientPackage, long msTimeout,
public void send(AbstractTcpClientPackage tcpClientPackage, long msTimeout,
TcpResonseCallback callback) {
requestMap.put(tcpClientPackage.getMsgId(), new TcpRequest(msTimeout, callback));

if (Status.WORKING.equals(status)) {
TcpOutputStream os = tcpClientPackage.createStream();
netSocket.write(os.getBuffer());
if (writeToBufferQueue(tcpClientPackage)) {
return;
}

tmpPackageList.add(tcpClientPackage);
if (Status.DISCONNECTED.equals(status)) {
connect();
// before login success, no optimize, just make sure do not lost data
context.runOnContext(v -> {
if (!writeToBufferQueue(tcpClientPackage)) {
packageQueue.add(tcpClientPackage);
}

// connct must call in eventloop thread
// otherwise vertx will create a new eventloop thread for it if count
// of eventloop thread is not up to the limit.
if (Status.DISCONNECTED.equals(status)) {
connect();
}
});
}

private boolean writeToBufferQueue(AbstractTcpClientPackage tcpClientPackage) {
// read status maybe out of eventloop thread, it's not exact
// just optimize for main scenes
if (Status.WORKING.equals(status)) {
// encode in sender thread
try (TcpOutputStream os = tcpClientPackage.createStream()) {
write(os.getByteBuf());
}
return true;
}

return false;
}

@Override
protected void writeInContext() {
writePackageInContext();

super.writeInContext();
}

private void writePackageInContext() {
for (;;) {
AbstractTcpClientPackage pkg = packageQueue.poll();
if (pkg == null) {
break;
}

try (TcpOutputStream os = pkg.createStream()) {
Buffer buf = os.getBuffer();
netSocket.write(buf);
}
}
}

Expand All @@ -135,9 +167,12 @@ private void connect() {
});
}

private synchronized void onConnectSuccess(NetSocket socket) {
LOGGER.info("connect to address {} success", socketAddress.toString());
this.netSocket = socket;
private void onConnectSuccess(NetSocket socket) {
LOGGER.info("connectd to address {} success in thread {}.",
socketAddress.toString(),
Thread.currentThread().getName());
// currently, socket always be NetSocketImpl
this.initNetSocket((NetSocketImpl) socket);
socket.handler(new TcpParser(this::onReply));

socket.exceptionHandler(this::onException);
Expand All @@ -161,7 +196,7 @@ private void onException(Throwable e) {
e.getMessage());
}

private synchronized void onDisconnected(Throwable e) {
private void onDisconnected(Throwable e) {
this.status = Status.DISCONNECTED;
LOGGER.error("{} disconnected from {}, in thread {}, cause {}",
netSocket.localAddress().toString(),
Expand All @@ -172,7 +207,7 @@ private synchronized void onDisconnected(Throwable e) {
clearCachedRequest(e);
}

protected synchronized void tryLogin() {
protected void tryLogin() {
if (!localSupportLogin || !remoteSupportLogin) {
LOGGER.error(
"local or remote not support login, address={}, localSupportLogin={}, remoteSupportLogin={}.",
Expand All @@ -193,7 +228,7 @@ protected synchronized void tryLogin() {
}
}

private synchronized void onLoginResponse(AsyncResult<TcpData> asyncResult) {
private void onLoginResponse(AsyncResult<TcpData> asyncResult) {
if (asyncResult.failed()) {
LOGGER.error("login failed, address {}", socketAddress.toString(), asyncResult.cause());
// 在相应回调中设置状态
Expand All @@ -212,22 +247,12 @@ private synchronized void onLoginResponse(AsyncResult<TcpData> asyncResult) {
onLoginSuccess();
}

private synchronized void onLoginSuccess() {
if (!tmpPackageList.isEmpty()) {
LOGGER.info("writting cached buffer to address {}", socketAddress.toString());
for (AbstractTcpClientPackage tcpClientPackage : tmpPackageList) {
TcpOutputStream os = tcpClientPackage.createStream();
if (os != null) {
netSocket.write(os.getBuffer());
}
}
tmpPackageList.clear();
}

private void onLoginSuccess() {
this.status = Status.WORKING;
writeInContext();
}

private synchronized void onConnectFailed(Throwable cause) {
private void onConnectFailed(Throwable cause) {
// 连接失败
this.status = Status.DISCONNECTED;
String msg = String.format("connect to address %s failed.",
Expand All @@ -237,7 +262,7 @@ private synchronized void onConnectFailed(Throwable cause) {
clearCachedRequest(cause);
}

protected synchronized void clearCachedRequest(Throwable cause) {
protected void clearCachedRequest(Throwable cause) {
// 在onSendError,用户可能发起一次新的调用,需要避免作多余的清理
Map<Long, TcpRequest> oldMap = requestMap;
requestMap = new ConcurrentHashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,17 @@
import org.slf4j.LoggerFactory;

import io.servicecomb.foundation.vertx.tcp.TcpConnection;

import io.vertx.core.net.NetSocket;
import io.vertx.core.net.impl.NetSocketImpl;

public class TcpServerConnection extends TcpConnection {
private static final Logger LOGGER = LoggerFactory.getLogger(TcpServerConnection.class);

protected TcpParser splitter;

protected NetSocket netSocket;

public void init(NetSocket netSocket) {
this.netSocket = netSocket;
// currently, socket always be NetSocketImpl
this.initNetSocket((NetSocketImpl) netSocket);

String remoteAddress = netSocket.remoteAddress().toString();
LOGGER.info("connect from {}, in thread {}",
Expand Down
Loading