Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

从 Function Worker 到 Pulsar IO #7

Open
tuteng opened this issue Feb 2, 2020 · 0 comments
Open

从 Function Worker 到 Pulsar IO #7

tuteng opened this issue Feb 2, 2020 · 0 comments

Comments

@tuteng
Copy link
Contributor

tuteng commented Feb 2, 2020

前言

本文主要分享关于 Function Worker 的实现原理,以及如何一步一步演化出 Pulsar IO。

Function Worker

image

这张图是对 Function Worker 非常直观的描述,可以看到 Function Worker 主要由 Metadata Manager,Scheduler Manager,Runtime Manager 和 Membership Manager 组成。

  • Metadata Manager: 用来存放元数据。
  • Membership Manager: 主要用来维护各个 Worker 之间的关系,选举就是在这里完成的。
  • Scheduler Manager: 用来完成调度。
  • Runtime Manager: 就是我们的 function 和 Pulsar IO 真正运行的地方。

准备知识

订阅

image

在 Pulsar 中有四种订阅模式,分别是 Exclusive,Failover,Shared 和 Key_Shared

  • Exclusive: 在同一个 Topic 上不能出现相同名字的订阅。
  • Failover:如果当前的订阅挂了,会有另外一个相同名字的订阅顶替它,这是一种容灾和高可用的策略,Function Worker 的高可用就是基于此来实现的。
  • Shared: 同一个 Topic 上可以出现多个相同的订阅,数据会被轮流发给不同的订阅者。
  • key_Shared: Shared 订阅模式的升级版。

生产、消费

Consumer consumer = client.newConsumer()
  .topic("my-topic")
  .subscriptionName("my-subscription")
  .subscribe();

Reader reader = pulsarClient.newReader()
        .topic(topic)
        .startMessageId(id)
        .create();

Producer<byte[]> producer = client.newProducer()
        .topic(topic)
        .create();

这是生产和消费的代码,逻辑很简单,但是很重要,在 Function Worker 的实现中大量复用了这部分逻辑。上面包括了初始化 Consumer、Reader、Producer 三部分的内容。Reader 可以认为是对 Consumer 的封装,基于 Exclusive 的定于模式,使用起来更加方便。

Membership Manager

image

Membership Manager 维护了各个 Worker 之间的关系,当我们启动了多个 Function Worker 的时候,会在这里基于 Pulsar 的 Failover 订阅模式选举出一个 leader,后面的调度都由该 leader 完成。

在这个图上有 Worker1 Worker2 和 Worker3 三个 Function Worker,它们基于 Pulsar 的 Failover 订阅模式使用 participant 的订阅名称订订阅到 coordinate 这个 Topic 上,当前 Worker2 成为了它们中的 leader。基于 Failover 订阅模式,如果一个 Worker 挂了,另一个会自动变成 leader。关于选举的测试可以参考这篇文章。

当各个 Function Worker 启动之后,首先会使用这样的代码来实现初始化一个 leader,它们都会使用相同的的订阅名称订阅到相同的 Topic 上,但是只有一个处于活跃状态,该 Worker 就是 leader,后面的调度任务都是由它来完成。

consumer = (ConsumerImpl<byte[]>) client.newConsumer()
                .topic(workerConfig.getClusterCoordinationTopic())
                .subscriptionName(COORDINATION_TOPIC_SUBSCRIPTION)
                .subscriptionType(SubscriptionType.Failover)
                .consumerEventListener(this)
                .property(WORKER_IDENTIFIER, consumerName)
                .subscribe();

function worker 作为 Pulsar IO 和 Pulsar Function 的运行时环境,基于 Pulsar 的 pub/sub 实现了高可用,选举,调度等, function worker 可以完全独立于 Pulsar 部署,对于我们使用 Pulsar 来构建大型系统有很多可以借鉴的地方。

Metadata Manager

image

我们对 Source/Sink/Function 做的一些操作,例如 listupdatecreatedelete 等的元数据管理都是在这里实现的。

Reader<byte[]> reader = pulsarClient.newReader()
                    .topic(this.workerConfig.getFunctionMetadataTopic())
                    .startMessageId(MessageId.earliest)
                    .create();

初始化了一个 reader,从 Pulsar 的 Topic 中接收数据,Metadata Manager 的实现,主要做了几件事情:

  • 初始化 Reader。
  • 设置订阅 metadata 这个 Topic。
  • 设置从 earliest 开始消费。

用户的请求数据会通过一个 Producer 发送过来。

pulsarClient.newProducer().topic(functionMetadataTopic).create()

Scheduler Manager

image

Producer<byte[]> producer = client.newProducer().topic(config.getFunctionAssignmentTopic())
                                .enableBatching(false)
                                ......
                                .createAsync().get(10, TimeUnit.SECONDS);

调度服务是基于 producer 来实现,调度算法实现了 IScheduler 接口,当前实支持 RoundRobin 的调度算法。获取到消息并执行,开始分配任务。

Reader<byte[]> reader = this.getWorkerService().getClient().newReader()
                    .topic(this.getWorkerConfig().getFunctionAssignmentTopic()).readCompacted(true)
                    .startMessageId(MessageId.earliest).create();

Runtime Manager

image

用来执行我们提交的实例,包括 create, stop, delete, start, restart 等。
当前 Runtime 支持基于进程,线程和 k8s 模式来运行,instance 支持使用 Python、Java 和 Golang 来开发。

从 Instance 到 Function

现在 Function Worker 的各个组件都介绍完了,接着我们来看一下运行在 Function Worker 上的 Instance。

def process(input):
    return "{}!".format(input)

这是使用 Python 开发的一个 Instance,它会接收一个输入 input,并在最后面增加一个 !,然后输出。这样 Function 就出现了,如下图,它接收输入,进行计算,然后输出。

image

在 Function 中输入正好对应 Pulsar 的 pub/sub 模型中的 Consumer,输出则对应 Producer,计算层是用户自己开发的逻辑。Function 与 Function 通过 Topic 进行连接。

从 Function 到 Pulsar IO

image

上面在 Pulsar 中的计算模型 Function 已经出现了,可以看到对于 Function 来说,它处理的是 Pulsar 内部的数据,它总是能从一个 Topic 中接收数据,然后输出数据到另外的一个 Topic 中,这很好的处理了 Pulsar 中一些简单的计算任务,但是很多时候我们都需要同外部系统打交道,这时我们就迫切的需要 Pulsar 能够方便的连接外部系统,例如 db,log 等。

Source

image

如图我们把 Function 模型中左边的 Consumer 去掉,保留右边的 Producer,自然就可以从外部系统接收数据,这样就实现了 Pulsar IO 中的 Source。

Sink

image

如图我们把 Function 模型中右边的 Producer 去掉,保留左边的 Consumer,这样就实现了将 Pulsar Topic 中的数据输出到外部系统,在 Pulsar 中我们称之为 Sink。

总结

从 Instance 到 Function 再到 Source 和 Sink,基于 Pulsar 的 Consumer 和 Producer 似乎是自然而然的事情,Function Worker 作为 Pulsar IO 和 Pulsar Function 的运行时环境,基于 Pulsar 的 pub/sub 实现了高可用,选举,调度等,Function Worker 可以完全独立于 Pulsar 部署,对于我们使用 Pulsar 来构建大型系统有很多可以借鉴的地方。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant