Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RocketMQ源码学习-Producer启动流程 #8

Open
aCoder2013 opened this issue Sep 17, 2017 · 0 comments
Open

RocketMQ源码学习-Producer启动流程 #8

aCoder2013 opened this issue Sep 17, 2017 · 0 comments

Comments

@aCoder2013
Copy link
Owner

aCoder2013 commented Sep 17, 2017

Apache RocketMQ is a distributed messaging and streaming platform with low latency, high performance and reliability, trillion-level capacity and flexible scalability.

前言

最近正好在研究RocketMQ,因此打算写一些相关的博文,这是第一篇关于Producer的博客

Producer的启动流程

一个简单的Demo

/*
* 指定一个全局唯一的Group
*/
DefaultMQProducer producer = new DefaultMQProducer("hello-group");
/*
* 指定name server的地址,可以是多个,分号分隔
*/
producer.setNamesrvAddr("localhost:9876");
/*
* 启动
*/
producer.start();

内部如何工作?

接下来我们就尝试搞清楚上面那坨代码具体都干了哪些东西:

    /*
    * 构造函数
    */
    public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) {
        this.producerGroup = producerGroup;
        defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
    }

    public void start() throws MQClientException {
        //将所有的调用都交给这个哥们去做
        this.defaultMQProducerImpl.start();
    }

可以看到DefaultMQProducer只是一个门面类,具体的实现都是由DefaultMQProducerImpl去做的:

    //默认的状态,是否可以用volatile修饰?
    private ServiceState serviceState = ServiceState.CREATE_JUST;

    public void start() throws MQClientException {
        this.start(true);
    }

    public void start(final boolean startFactory) throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                this.serviceState = ServiceState.START_FAILED;
                
                /*
                * 检查group的名字,不能为空也不能是默认的,因为需要全局唯一
                */
                this.checkConfig();

                if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
                    //如果实例名为空的话就改成进程的ID
                    this.defaultMQProducer.changeInstanceNameToPID();
                }

                //创建`MQClientFactory`实例(保存在一个map中,key的形式类似IP@进程ID)
                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);

                /*
                *   放到缓存中,组名作为Key
                *   ConcurrentMap<String, MQProducerInner> producerTable
                */
                boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
                if (!registerOK) {
                    /*
                    *  如果组名或者Producer为空的话就会返回false,但这里不会发生;另外内部是用ConcurrentHashMap#putIfAbsent实现的,如果
                    *  返回的值非空,说明已经创建过,那么这里也会返回false,也就避免了并发启动的问题
                    */
                    this.serviceState = ServiceState.CREATE_JUST;
                    throw new MQClientException("The producer group[" + this.defaultMQProducer.getProducerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }
                
                /*
                * 测试用,这里缓存的结构是`ConcurrentMap<String, TopicPublishInfo>`,key是topic,也就是在这里会缓存topic的路由信息,
                * 发送消息的时候也就会根据`TopicPublishInfo`的信息决定实际使用哪个queue发送
                */
                this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());

                if (startFactory) {
                    //启动MQClientFactory
                    mQClientFactory.start();
                }

                log.info("the producer [{}] start OK. sendMessageWithVIPChannel={}", this.defaultMQProducer.getProducerGroup(),
                    this.defaultMQProducer.isSendMessageWithVIPChannel());
                //标记为运行中
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The producer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        }

        //向所有的broker发送心跳(组名)
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
    }

这段代码非常的清晰,其中有一行比较核心的,大部分初始化的工作都是这里完成的mQClientFactory.start();,因此我们看看这里具体都是啥:

public void start() throws MQClientException {

        //用synchronized修饰保证线程安全性与内存可见性
        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // 如果未指定的话就会通过制定的接口去获取name server的地址,超时时间是3秒
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    /* 
                    * 启动用于通讯的客户端,内部是用Netty实现的
                    */
                    this.mQClientAPIImpl.start();
                    // 启动所有的定时任务
                    this.startScheduledTask();
                    // TODO:目前还不清楚为啥生产者还需要启动一个线程专门用于拉消息
                    this.pullMessageService.start();
                    // 启动均衡消息的线程
                    this.rebalanceService.start();
                    // 启动它内部的Producer
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                    break;
                case SHUTDOWN_ALREADY:
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
}

private void startScheduledTask() {
        if (null == this.clientConfig.getNamesrvAddr()) {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                @Override
                public void run() {
                    try {
                        /*
                        * 每两分钟抓取一次,也就是说可以通过这个服务定时摘掉挂了的broker,和心跳检测
                        * 双重保障
                        */
                        MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
                    } catch (Exception e) {
                        log.error("ScheduledTask fetchNameServerAddr exception", e);
                    }
                }
            }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
        }

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    //定时更新Topic的路由信息
                    MQClientInstance.this.updateTopicRouteInfoFromNameServer();
                } catch (Exception e) {
                    log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
                }
            }
        }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    //健康检查相关的
                    MQClientInstance.this.cleanOfflineBroker();
                    MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
                } catch (Exception e) {
                    log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
                }
            }
        }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    /*
                    * 持久化消费者当前消费的位移,这里说一下消费者的可能会踩到的坑,
                    * 对于同一个topic,不同group下的消费者offset是独立的,也就是
                    * 同一个消息会消费两次
                    */
                    MQClientInstance.this.persistAllConsumerOffset();
                } catch (Exception e) {
                    log.error("ScheduledTask persistAllConsumerOffset exception", e);
                }
            }
        }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                try {
                    /*
                    *  根据当前的积压调优线程池的核心线程数,不过看了下实现是空的
                    */
                    MQClientInstance.this.adjustThreadPool();
                } catch (Exception e) {
                    log.error("ScheduledTask adjustThreadPool exception", e);
                }
            }
        }, 1, 1, TimeUnit.MINUTES);
}

最后来一张流程图:

_ae6a3eb4-19cc-4a29-8ba3-759e5e0ce037

Flag Counter

@aCoder2013 aCoder2013 changed the title How Apache RocketMQ Start Producer? RocketMQ源码学习-Producer启动流程 Sep 17, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant