Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

netty客户端启动过程 #3

Open
diaosichengxuyuan opened this issue Sep 1, 2018 · 0 comments
Open

netty客户端启动过程 #3

diaosichengxuyuan opened this issue Sep 1, 2018 · 0 comments

Comments

@diaosichengxuyuan
Copy link
Owner

diaosichengxuyuan commented Sep 1, 2018

启动代码

        //worker负责读写数据
        NioEventLoopGroup worker = new NioEventLoopGroup();
        try {
            //辅助启动类
            Bootstrap bootstrap = new Bootstrap();
            //设置线程池
            bootstrap.group(worker);
            //设置socket工厂
            bootstrap.channel(NioSocketChannel.class);
            //设置管道
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    //获取管道
                    ChannelPipeline pipeline = socketChannel.pipeline();
                    //字符串解码器
                    pipeline.addLast(new StringDecoder());
                    //字符串编码器
                    pipeline.addLast(new StringEncoder());
                    //处理类
                    pipeline.addLast(new ClientHandler4());
                }
            });
            //发起异步连接操作
            ChannelFuture future = bootstrap.connect(new InetSocketAddress("127.0.0.1", 8866)).sync();
            //等待客户端链路关闭
            future.channel().closeFuture().sync();
        } catch(InterruptedException e) {
            e.printStackTrace();
        } finally {
            //优雅的退出,释放NIO线程组
            worker.shutdownGracefully();
        }
    }

分析代码

1. NioEventLoopGroup worker = new NioEventLoopGroup()

创建线程组处理客户端事件,具体过程同服务端#2

2. Bootstrap bootstrap = new Bootstrap()

创建启动类

3. bootstrap.group(worker)

设置线程组到启动类

4. bootstrap.channel(NioSocketChannel.class)

内部基于NioSocketChannel创建ChannelFactory,ChannelFactory可以生成NioSocketChannel

5. bootstrap.handler

添加handler处理具体的事件,业务逻辑就写在各handler中

6. ChannelFuture future = bootstrap.connect(new InetSocketAddress("127.0.0.1", 8866)).sync()

(1) new InetSocketAddress(String hostname, int port)
根据主机名和端口创建InetSocketAddress,指远程连接的地址和端口
(2) bootstrap.connect调用doResolveAndConnect,重要的有两步initAndRegister和doResolveAndConnect0,而initAndRegister包括init和register

 private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
        //重点分析
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.isDone()) {
            if (!regFuture.isSuccess()) {
                return regFuture;
            }
            return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
        } else {
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                        promise.setFailure(cause);
                    } else {
                        promise.registered();
                        doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }

(3)init方法中包括重要的p.addLast(config.handler()),即在ChannelPipeline中添加handler

    void init(Channel channel) throws Exception {
        ChannelPipeline p = channel.pipeline();
        p.addLast(config.handler());

        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }

        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }
        }
    }

p.addLast(config.handler())方法调用下面的addLast

 public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            checkMultiplicity(handler);
            newCtx = newContext(group, filterName(name, handler), handler);
            addLast0(newCtx);
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }
            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }

其中newCtx = newContext(group, filterName(name, handler), handler)是将hanlder封装到ChannelHandlerContext中; addLast0(newCtx)是将ChannelHandlerContext添加到handler中。

(4) register中有一行代码AbstractChannel.this.eventLoop = eventLoop即将线程组放到Channel中,而Channel创建的时候已经绑定了ChannelPipeline,这样Channel中指定了所用的处理线程池eventLoop,同时绑定了ChannelPipeline,而ChannelPipeline中持有了许多ChannelHandler,每个ChannelHandler有自己的ChannelHandlerContext。

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
            }
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }

            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                }
            }
        }

上面包含register0方法调用下面的:

protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }

javaChannel().register(eventLoop().unwrappedSelector(), 0, this)此处是将java.nio.channels.ServerSocketChannel注册到java.nio.channels.Selector上,已经调试到jdk的代码了
(5) doResolveAndConnect0调用下面的

       protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
        if (localAddress != null) {
            doBind0(localAddress);
        }

        boolean success = false;
        try {
            boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
            if (!connected) {
                selectionKey().interestOps(SelectionKey.OP_CONNECT);
            }
            success = true;
            return connected;
        } finally {
            if (!success) {
                doClose();
            }
        }
    }

SocketUtils.connect(javaChannel(), remoteAddress)调用java.nio.channels.SocketChannel#connect(java.net.SocketAddress)方法连接远程地址

7. future.channel().closeFuture().sync()

阻塞线程直到客户端关闭

作者原创,转载请注明出处,违法必究!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant