Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

netty服务端启动过程 #2

Open
diaosichengxuyuan opened this issue Sep 1, 2018 · 0 comments
Open

netty服务端启动过程 #2

diaosichengxuyuan opened this issue Sep 1, 2018 · 0 comments

Comments

@diaosichengxuyuan
Copy link
Owner

diaosichengxuyuan commented Sep 1, 2018

启动代码

 private void start() {
        //boss线程监听端口
        EventLoopGroup boss = new NioEventLoopGroup();
        //worker线程负责数据读写
        EventLoopGroup worker = new NioEventLoopGroup();
        try {
            //辅助启动类
            ServerBootstrap bootstrap = new ServerBootstrap();
            //设置线程池
            bootstrap.group(boss, worker);
            //设置socket工厂
            bootstrap.channel(NioServerSocketChannel.class);
            //设置管道工厂
            bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    //获取管道
                    ChannelPipeline pipeline = socketChannel.pipeline();
                    //字符串解码器
                    pipeline.addLast(new StringDecoder());
                    //字符串编码器
                    pipeline.addLast(new StringEncoder());
                    //处理类
                    pipeline.addLast(new ServerHandler4());
                }
            });
            //绑定端口
            ChannelFuture future = bootstrap.bind(8866).sync();
            System.out.println("server start ...... ");
            //等待服务端监听端口关闭
            future.channel().closeFuture().sync();
        } catch(InterruptedException e) {
            e.printStackTrace();
        } finally {
            //优雅退出,释放线程池资源
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }

分析代码

1. EventLoopGroup boss = new NioEventLoopGroup()

(1) EventLoopGroup可以认为是一个线程池或者线程组,此步创建父线程组,负责监听Channel事件,内部创建一个线程组数组,如果没有传入数组大小,数组大小默认为:

  DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

线程组数组大小取系统变量io.netty.eventLoopThreads和netty可用进程*2的较大值
(2) 创建线程组数组代码如下:

        children = new EventExecutor[nThreads];
        for(int i = 0; i < nThreads; i++) {
            boolean success = false;
            try {
                children[i] = newChild(executor, args);
                success = true;
            } catch(Exception e) {
            } finally {
            }
        }

线程组数组为children = new EventExecutor[nThreads]
(3) 线程组数组中的值:children[i] = newChild(executor, args)

    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }

可见线程组数组中存的都是NioEventLoop,而NioEventLoop extends SingleThreadEventLoop,是只有一个活动线程的线程池

2. EventLoopGroup worker = new NioEventLoopGroup()

创建子线程组,负责处理Channel事件,实现同上。
其实NioEventLoopGroup中默认共有16个线程池,每个线程池是只有一个活动线程的NioEventLoop,NioEventLoop也是线程池,只是只有一个活动线程罢了。NioEventLoop会绑定到一个具体的Channel,负责整个Channel由生到死所有事件的处理,也就是一个Channel中的所有事件都由一个NioEventLoop的一个线程处理(NioEventLoop也只有一个线程)。NioEventLoop个数是固定的,但是客户端和服务端连接的Channel却有很多,所以一个NioEventLoop可以服务于很多个Channel。

3. ServerBootstrap bootstrap = new ServerBootstrap()

创建启动类

4. bootstrap.group(boss, worker)

设置父线程组和子线程组

5. bootstrap.channel(NioServerSocketChannel.class)

内部基于NioServerSocketChannel创建ChannelFactory,ChannelFactory可以生成NioServerSocketChannel

6. bootstrap.childHandler

添加handler处理具体的事件,netty内部具体什么时间将handler添加到ChannelPipeline中在后面的代码中

7. ChannelFuture future = bootstrap.bind(8866).sync()绑定端口

(1) 由port创建java.net.InetSocketAddress
(2) doBind绑定,其中 initAndRegister和doBind0最重要

private ChannelFuture doBind(final SocketAddress localAddress) {
       //重点分析
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

        if (regFuture.isDone()) {
            ChannelPromise promise = channel.newPromise();
           //重点分析
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else{省略}
            return promise;
        }
    }

(3) initAndRegister()分为init和register两步

void init(Channel channel) throws Exception {
        final Map<ChannelOption<?>, Object> options = options0();
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }

        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }

        ChannelPipeline p = channel.pipeline();

        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
        }
       
       //将用户bootstrap.childHandler的事件添加到ChannelPipeline中
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

init主要有两个功能:初始化Channel配置、将用户bootstrap.childHandler的事件添加到ChannelPipeline中

protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }

doRegister()方法可以由initAndRegister()一步步调试得到,javaChannel().register(eventLoop().unwrappedSelector(), 0, this)此处是将java.nio.channels.ServerSocketChannel注册到java.nio.channels.Selector上,已经调试到jdk的代码了

(4) doBind0()

    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE)调用下面

    protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }

如果jdk大于7,则通过java.nio.channels.ServerSocketChannel#bind(java.net.SocketAddress, int)方法绑定地址;否则通过java.net.ServerSocket#bind(java.net.SocketAddress, int)方法绑定地址。

8. future.channel().closeFuture().sync()

阻塞住当前线程直到服务器端监听端口关闭

作者原创,转载请注明出处,违法必究!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant