Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support elastic connection holder like connection pool of datasource. #56

Closed
ujjboy opened this issue Apr 26, 2018 · 11 comments
Closed
Labels
enhancement New feature or request help wanted Extra attention is needed
Projects
Milestone

Comments

@ujjboy
Copy link
Member

ujjboy commented Apr 26, 2018

The default connection holder of SOFARPC is AllConnectConnectionHolder. It means the service consumer will connect to all service provider.

If the service provider has a lot of nodes, it will take a long time to establish all connections.

We hope to implement a ConnectionHolder like connection pool of data source. For example, if there are 100 providers, we can start to invoke after initial connections (such as 20 providers) have been established, and build the other 80 connections asynchronously and parallelly.

@ujjboy ujjboy added enhancement New feature or request help wanted Extra attention is needed labels Apr 26, 2018
@ujjboy ujjboy changed the title Support Support elastic connection holder like connection pool of datasource. Apr 26, 2018
@liangyuanpeng
Copy link
Contributor

liangyuanpeng commented May 6, 2018

我新建一个继承与AllConnectConnectionHolder的子类MinConnectConnectionHolder 然后重写addNode方法,这样的做法可不可行,初始化连接20(比如,可配置),其他80个新开一个线程并行的去建立连接
com.alipay.sofa.rpc.client.AllConnectConnectionHolder#addNode

before

protected void addNode(List<ProviderInfo> providerInfoList) {
        final String interfaceId = consumerConfig.getInterfaceId();
        int providerSize = providerInfoList.size();
        String appName = consumerConfig.getAppName();
        if (LOGGER.isInfoEnabled(appName)) {
            LOGGER.infoWithApp(appName, "Add provider of {}, size is : {}", interfaceId, providerSize);
        }
        if (providerSize > 0) {
            // 多线程建立连接
            int threads = Math.min(10, providerSize); // 最大10个
            final CountDownLatch latch = new CountDownLatch(providerSize);
            ThreadPoolExecutor initPool = new ThreadPoolExecutor(threads, threads,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(providerInfoList.size()),
                new NamedThreadFactory("CLI-CONN-" + interfaceId, true));
            int connectTimeout = consumerConfig.getConnectTimeout();
            for (final ProviderInfo providerInfo : providerInfoList) {
                final ClientTransportConfig config = providerToClientConfig(providerInfo);
                initPool.execute(new Runnable() {
                    @Override
                    public void run() {
                        ClientTransport transport = ClientTransportFactory.getClientTransport(config);
                        if (consumerConfig.isLazy()) {
                            uninitializedConnections.put(providerInfo, transport);
                            latch.countDown();
                        } else {
                            try {
                                initClientTransport(interfaceId, providerInfo, transport);
                            } finally {
                                latch.countDown(); // 连上或者抛异常
                            }
                        }
                    }
                });
            }

            try {
                int totalTimeout = ((providerSize % threads == 0) ? (providerSize / threads) : ((providerSize /
                    threads) + 1)) * connectTimeout + 500;
                latch.await(totalTimeout, TimeUnit.MILLISECONDS); // 一直等到子线程都结束
            } catch (InterruptedException e) {
                LOGGER.errorWithApp(appName, "Exception when add provider", e);
            } finally {
                initPool.shutdown(); // 关闭线程池
            }
        }
    }

after

protected void addNode(List<ProviderInfo> providerInfoList) {
        final String interfaceId = consumerConfig.getInterfaceId();
        int providerSize = providerInfoList.size();
        final String appName = consumerConfig.getAppName();
        if (LOGGER.isInfoEnabled(appName)) {
            LOGGER.infoWithApp(appName, "Add provider of {}, size is : {}", interfaceId, providerSize);
        }
        if (providerSize > 0) {
            // 多线程建立连接
            int threads = Math.min(10, providerSize); // 最大10个
            final CountDownLatch latch = new CountDownLatch(providerSize);
            ThreadPoolExecutor initPool = new ThreadPoolExecutor(threads, threads,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(providerInfoList.size()),
                new NamedThreadFactory("CLI-CONN-" + interfaceId, true));
            int connectTimeout = consumerConfig.getConnectTimeout();
            int firstInitPoolConnectNum = 0;
            NamedThreadFactory namedThreadFactory = new NamedThreadFactory("CLI--ASYN-CONN-",true);
            for (final ProviderInfo providerInfo : providerInfoList) {
                final ClientTransportConfig config = providerToClientConfig(providerInfo);
                if(firstInitPoolConnectNum>=consumerConfig.getMinSynConnectNum()) {
                    break;
                }
            firstInitPoolConnectNum++
                    initPool.execute(new Runnable() {
                        @Override
                        public void run() {
                            ClientTransport transport = ClientTransportFactory.getClientTransport(config);
                            if (consumerConfig.isLazy()) {
                                uninitializedConnections.put(providerInfo, transport);
                                latch.countDown();
                            } else {
                                try {
                                    initClientTransport(interfaceId, providerInfo, transport);
                                } finally {
                                    latch.countDown(); // 连上或者抛异常
                                }
                            }
                        }
                    });
              
            }

            final List<ProviderInfo> AsynConnectProviderInfoList = providerInfoList.subList(firstInitPoolConnectNum,providerInfoList.size()-1);
            if(!AsynConnectProviderInfoList.isEmpty()){
                namedThreadFactory.newThread(new Runnable() {
                    private FutureTask<String> futureTask;
                    @Override
                    public void run() {

                        ExecutorService executorService = Executors.newFixedThreadPool(1);
                        List<FutureTask> futureTaskList = new ArrayList<FutureTask>();
                        for (final ProviderInfo providerInfo : AsynConnectProviderInfoList) {
                            final ClientTransportConfig config = providerToClientConfig(providerInfo);

                            futureTask = new FutureTask<String>(new Callable<String>() {// 需要的数据类型是String,使用泛型实现!
                                @Override
                                public String call() throws Exception {
                                    ClientTransport transport = ClientTransportFactory.getClientTransport(config);
                                    if (consumerConfig.isLazy()) {
                                        uninitializedConnections.put(providerInfo, transport);
                                    } else {
                                        initClientTransport(interfaceId, providerInfo, transport);
                                    }
                                    return providerInfo.getHost()+":"+providerInfo.getPort();
                                }
                            });
                            executorService.submit(futureTask);
                        }

                        for(FutureTask futureTask :futureTaskList){
                            try {
                                futureTask.get();
                            }catch (Exception e){
                                LOGGER.errorWithApp(appName, "Exception when connect provider ", e);
                            }
                        }
                        executorService.shutdown();

                    }
                });
            }


            try {
                int totalTimeout = ((providerSize % threads == 0) ? (providerSize / threads) : ((providerSize /
                    threads) + 1)) * connectTimeout + 500;
                latch.await(totalTimeout, TimeUnit.MILLISECONDS); // 一直等到子线程都结束
            } catch (InterruptedException e) {
                LOGGER.errorWithApp(appName, "Exception when add provider", e);
            } finally {
                initPool.shutdown(); // 关闭线程池
            }
        }
    }

consumerConfig.getMinSynConnectNum() 这个是初始化连接数,其他的异步并行建立连接

@liangyuanpeng
Copy link
Contributor

我这样的一个思路是否可行

@ujjboy
Copy link
Member Author

ujjboy commented May 11, 2018

Sorry, 之前忘记回复了。 你上面的 before 和 after 是反了是吧?

连接数配置项加一个百分比(例如建立50%)和最小数(至少5个)。
不一定要加到ConsumerConfig 字段上,可以复用现在的 parameter map。

@liangyuanpeng
Copy link
Contributor

确实是,统一使用一个配置,而不是单独某个服务

@liangyuanpeng
Copy link
Contributor

经过修改,目前逻辑是这样,

创建一个MinConnectConnectionHolder并继承AllConnectConnectionHolder,

将AllConnectConnectionHolder类中一些原本是私有变量修改protected以提供给子类

MinConnectConnectionHolder使用,例如:uninitializedConnections、aliveConnections、subHealthConnections、retryConnections、providerLock

同时重写addNode方法,与父类中addNode方法不同的是根据设置初始化连接数,将服务者总数量-初始化连接数(需要异步并行的连接数)数量的服务提供者异步与消费者进行连接

其中,新增的两个设置参数有

弹性连接初始最少连接化百分比数 consumer.minconnect.precent
默认为0

弹性连接初始化最少连接数 consumer.minconnect.size
默认为5,默认使用该参数对应的值作为消费者与服务提供者的初始化连接的连接数

这两个参数保存在rpc-config-default.json中Consumer相关配置这一块作为默认配置

@Extension("min")
public class MinConnectConnectionHolder extends AllConnectConnectionHolder{

    /**
     * slf4j Logger for this class
     */
    private final static Logger LOGGER = LoggerFactory.getLogger(MinConnectConnectionHolder.class);

    /**
     * 弹性连接,初始化连接百分比数
     */
    protected  int minconnectPrecent = getIntValue(CONSUMER_MINCONNECT_PRECENT);

    /**
     * 弹性连接,初始化连接数
     */
    protected  int minconnectSize = getIntValue(CONCUMER_MINCONNECT_SIZE);

    /**
     * 构造函数
     *
     * @param consumerBootstrap 服务消费者配置
     */
    protected MinConnectConnectionHolder(ConsumerBootstrap consumerBootstrap) {
        super(consumerBootstrap);
        this.consumerConfig = consumerBootstrap.getConsumerConfig();
    }

    @Override
    protected void addNode(List<ProviderInfo> providerInfoList) {
        final String interfaceId = consumerConfig.getInterfaceId();
        int providerSize = providerInfoList.size();
        final String appName = consumerConfig.getAppName();
        if (LOGGER.isInfoEnabled(appName)) {
            LOGGER.infoWithApp(appName, "Add provider of {}, size is : {}", interfaceId, providerSize);
        }
        if (providerSize > 0) {
            // 多线程建立连接
            int threads = Math.min(10, providerSize); // 最大10个
            final CountDownLatch latch = new CountDownLatch(providerSize);
            ThreadPoolExecutor initPool = new ThreadPoolExecutor(threads, threads,
                    0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<Runnable>(providerInfoList.size()),
                    new NamedThreadFactory("CLI-CONN-" + interfaceId, true));
            int connectTimeout = consumerConfig.getConnectTimeout();
            int firstInitPoolConnectNum = 0;
            int minSynConnectNum = 0;
            //可自定义初始化连接的百分比数以及固定最小数
            //计算初始化连接最少数,优先使用初始化最小数属性进行计算,百分比属性默认为0
            if(minconnectPrecent>0){
                double precent = minconnectPrecent>=100?1:minconnectPrecent*0.01;
                minSynConnectNum = ((Double)(providerInfoList.size()*precent)).intValue();
            }else{
                minSynConnectNum = minconnectSize;
            }

            NamedThreadFactory namedThreadFactory = new NamedThreadFactory("CLI--ASYN-CONN-",true);
            for (final ProviderInfo providerInfo : providerInfoList) {
                final ClientTransportConfig config = providerToClientConfig(providerInfo);
                if(firstInitPoolConnectNum>=minSynConnectNum) {
                    break;
                }
                firstInitPoolConnectNum++;
                initPool.execute(new Runnable() {
                    @Override
                    public void run() {
                        ClientTransport transport = ClientTransportFactory.getClientTransport(config);
                        if (consumerConfig.isLazy()) {
                            uninitializedConnections.put(providerInfo, transport);
                            latch.countDown();
                        } else {
                            try {
                                initClientTransport(interfaceId, providerInfo, transport);
                            } finally {
                                latch.countDown(); // 连上或者抛异常
                            }
                        }
                    }
                });
            }

            final List<ProviderInfo> AsynConnectProviderInfoList = providerInfoList.subList(firstInitPoolConnectNum,providerInfoList.size()-1);
            if(!AsynConnectProviderInfoList.isEmpty()){
                namedThreadFactory.newThread(new Runnable() {
                    private FutureTask<String> futureTask;
                    @Override
                    public void run() {

                        ExecutorService executorService = Executors.newFixedThreadPool(1);
                        List<FutureTask> futureTaskList = new ArrayList<FutureTask>();
                        for (final ProviderInfo providerInfo : AsynConnectProviderInfoList) {
                            final ClientTransportConfig config = providerToClientConfig(providerInfo);

                            futureTask = new FutureTask<String>(new Callable<String>() {// 需要的数据类型是String,使用泛型实现!
                                @Override
                                public String call() throws Exception {
                                    ClientTransport transport = ClientTransportFactory.getClientTransport(config);
                                    if (consumerConfig.isLazy()) {
                                        uninitializedConnections.put(providerInfo, transport);
                                    } else {
                                        initClientTransport(interfaceId, providerInfo, transport);
                                    }
                                    return providerInfo.getHost()+":"+providerInfo.getPort();
                                }
                            });
                            executorService.submit(futureTask);
                        }

                        for(FutureTask futureTask :futureTaskList){
                            try {
                                futureTask.get();
                            }catch (Exception e){
                                LOGGER.errorWithApp(appName, "Exception when connect provider ", e);
                            }
                        }
                        executorService.shutdown();

                    }
                });
            }


            try {
                int totalTimeout = ((providerSize % threads == 0) ? (providerSize / threads) : ((providerSize /
                        threads) + 1)) * connectTimeout + 500;
                latch.await(totalTimeout, TimeUnit.MILLISECONDS); // 一直等到子线程都结束
            } catch (InterruptedException e) {
                LOGGER.errorWithApp(appName, "Exception when add provider", e);
            } finally {
                initPool.shutdown(); // 关闭线程池
            }
        }
    }
}

对于这一个功能扩展,我这样的一个思路是否可行呢?

@leizhiyuan
Copy link
Contributor

实在抱歉,之前漏掉了这个 issue.这个思路可行的. 麻烦帮忙提个 pr. @liangyuanpeng

@liangyuanpeng
Copy link
Contributor

好的,非常感谢

@SteNicholas
Copy link
Contributor

@ujjboy @leizhiyuan 关于弹性长连接我这边有个思路不知道你俩是否赞成,参考Netty4的源码ChannelPool接口实现类SimpleChannelPool的acquireHealthyFromPoolOrNew()方法:

 /**
     * Tries to retrieve healthy channel from the pool if any or creates a new channel otherwise.
     * @param promise the promise to provide acquire result.
     * @return future for acquiring a channel.
     */
    private Future<Channel> acquireHealthyFromPoolOrNew(final Promise<Channel> promise) {
        try {
            final Channel ch = pollChannel();
            if (ch == null) {
                // No Channel left in the pool bootstrap a new Channel
                Bootstrap bs = bootstrap.clone();
                bs.attr(POOL_KEY, this);
                ChannelFuture f = connectChannel(bs);
                if (f.isDone()) {
                    notifyConnect(f, promise);
                } else {
                    f.addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            notifyConnect(future, promise);
                        }
                    });
                }
                return promise;
            }
            EventLoop loop = ch.eventLoop();
            if (loop.inEventLoop()) {
                doHealthCheck(ch, promise);
            } else {
                loop.execute(new Runnable() {
                    @Override
                    public void run() {
                        doHealthCheck(ch, promise);
                    }
                });
            }
        } catch (Throwable cause) {
            promise.tryFailure(cause);
        }
        return promise;
    }

这样的话不用做初始化那么多连接的操作,通过增加监听器懒加载的方式去添加新连接,一方面减少初始化连接的配置,一方面更加弹性,这思路是否可行?

@ujjboy
Copy link
Member Author

ujjboy commented Jun 22, 2018

@SteNicholas Netty 这里的长连接是对同一个 provider 建立多个长连接, 你做的这个 feature 做的是对一个服务的多个 provider 里的长连接管理。 维度不一样。

@SteNicholas
Copy link
Contributor

@ujjboy 集成Netty的SimpleChannelPool,实现多个provider的长连接管理,如下代码所示,初始情况下池子里没有连接,每次获取的时候,如果池子里没有空闲的,就创建一根,用完自行释放,这样可以嚒?

public class MultiRemoteChannelPool extends SimpleChannelPool {

    private AtomicInteger currentRemoteAddressIndex;

    private final List<SocketAddress> remoteAddressList;

    public MultiRemoteChannelPool(List<SocketAddress> remoteAddressList, Bootstrap bootstrap, ChannelPoolHandler
            handler) {
        super(bootstrap, handler);
        currentRemoteAddressIndex = new AtomicInteger(0);
        this.remoteAddressList = remoteAddressList;
    }

    @Override
    protected ChannelFuture connectChannel(Bootstrap bs) {

        return super.connectChannel(bs.remoteAddress(nextSocketAddress()));
    }

    private SocketAddress nextSocketAddress() {
        currentRemoteAddressIndex.compareAndSet(remoteAddressList.size(), 0);

        return remoteAddressList.get(currentRemoteAddressIndex.getAndIncrement());
    }


    public static void main(String[] args) throws InterruptedException {
        Bootstrap bootstrap = new Bootstrap();
        NioEventLoopGroup group = new NioEventLoopGroup();

        bootstrap.group(group).channel(NioSocketChannel.class);
        ArrayList<SocketAddress> socketAddresses = new ArrayList<>();

        socketAddresses.add(new InetSocketAddress("115.239.211.112", 80));
        socketAddresses.add(new InetSocketAddress("202.102.94.124", 80));
        socketAddresses.add(new InetSocketAddress("36.110.144.110", 80));


        MultiRemoteChannelPool multiRemoteChannelPool = new MultiRemoteChannelPool(socketAddresses, bootstrap, new
                ChannelPoolHandler() {
                    @Override
                    public void channelReleased(Channel ch) {
                    }

                    @Override
                    public void channelAcquired(Channel ch) {
                    }

                    @Override
                    public void channelCreated(Channel ch) {
                    }
                });


        System.out.println(multiRemoteChannelPool.acquire().sync().getNow().remoteAddress());
        System.out.println(multiRemoteChannelPool.acquire().sync().getNow().remoteAddress());
        System.out.println(multiRemoteChannelPool.acquire().sync().getNow().remoteAddress());
        System.out.println(multiRemoteChannelPool.acquire().sync().getNow().remoteAddress());
    }
}

@ujjboy
Copy link
Member Author

ujjboy commented Jun 22, 2018

你这就跳过了 Router 和 LoadBalancer 的阶段了。

@leizhiyuan leizhiyuan added this to the 5.5.0 milestone Jul 1, 2018
@ujjboy ujjboy added this to To do in v5.5.x via automation Aug 8, 2018
@ujjboy ujjboy closed this as completed Aug 8, 2018
v5.5.x automation moved this from To do to Done Aug 8, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request help wanted Extra attention is needed
Projects
v5.5.x
  
Done
Development

No branches or pull requests

4 participants