Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

深入理解之 Apache Pulsar Connector 与 Function 关系篇 #5

Open
tuteng opened this issue May 25, 2019 · 0 comments
Open

深入理解之 Apache Pulsar Connector 与 Function 关系篇 #5

tuteng opened this issue May 25, 2019 · 0 comments

Comments

@tuteng
Copy link
Contributor

tuteng commented May 25, 2019

背景知识

  • Connector:Apache Pulsar 的连接器,包括 Source 和 Sink 两个组件。
  • Functions:Apache Pulsar 的轻量级计算组件。

Instance 架构

使用 pulsar-admin source、pulsar-admin sink 或 pulsar-admin function 命令操作 Source、Sink 或 Function 时,在 Worker 上会启动一个 Instance,Instance 架构如下图所示。

image

  • Worker Service:在此处运行 Instance。
  • Source:将外部系统的数据输入至 Pulsar。在命令行中,可以使用 pulsar-admin 操作 Source。
  • Sink:将 Pulsar 的数据输出至外部系统。 在命令行中,可以使用 pulsar-admin 操作 Sink。
  • Function:执行轻量级计算。在命令行中,可以使用 pulsar-admin 操作。 Function。
  • IdentityFunction、PulsarSource 和 PulsarSink:Instance 的组件。
  • Consumer:Pulsar 的消费者。
  • Producer:Pulsar 的生产者。
  • queue: 进行数据传递的数据结构。
  • 红色箭头:从外部系统流入至 Pulsar 的数据和从 Pulsar 流出至外部系统的数据。
  • 黄色箭头:流入至 Pulsar Topic 的数据和从 Pulsar Topic 流出的数据。

前文提到,使用 pulsar-admin 启动一个 Source、Sink 或 Function 时,实际是启动了一个 Instance,该 Instance 跑在 Worker(图中标注 Worker Service 的深蓝色方块)上。

上图有两个 Worker Service:

  • Worker Service1 上有两个 Instance,分别是 Source 和 Function。
  • Worker Service2 上有一个 Instance,即 Sink。

另外,图中的 PulsarSource、IdentityFunction 和 PulsarSink 组件都是 Instance 的一部分。当外部系统向 Pulsar 输入数据时,会使用 pulsar-admin source 启动 Source 的 Instance,该 Instance 会初始化三个组件,代码如下:

// sink 组件,负责数据输出
 setupOutput(contextImpl);
// source 组件,负责数据输入
setupInput(contextImpl);
// function 组件,负责简单计算和过滤等
return new JavaInstance(contextImpl, object);

首先初始化数据的出口,如果数据目的地有问题,就不会继续进行。

每个组件内部有以下逻辑判断:

if (sinkSpec.getClassName().isEmpty()) {
  // 未指定 className,则使用系统默认的组件初始化,即前文提到的 PulsarSink
} else {
  // 如果可以找到 className,则使用它进行初始化
}
// If source classname is not set, we default pulsar source
if (sourceSpec.getClassName().isEmpty()) {
  // 未指定 className,则使用系统默认的组件初始化,即前文提到的 PulsarSource
} else {
  // 如果可以找到 className,则使用它进行初始化
}
// create the functions
if (userClassObject instanceof Function) {
  // 使用默认的的 IdentityFunction 进行初始化
    this.function = (Function) userClassObject;
} else {
    // 使用用户定义的 Function
    this.javaUtilFunction = (java.util.function.Function) userClassObject;
}

Source 命令启动后,Instance 发现提供了 className,则会使用 className 替换系统默认的 Source 组件接收外部系统的数据。收到数据后,会将这些数据放在 queue 中。

无论是用户自定义的 source,还是系统默认的 PulsarSource,收到数据之后都会执行以下代码:

consume(record);
public void consume(Record<T> record) {
      try {
          queue.put(record);
      } catch (InterruptedException e) {
          throw new RuntimeException(e);
      }
  }

Pulsar 会把数据放至队列中,等待 Function 处理。这里已指定了 Source ,因此使用系统默认的 IdentityFunction。PulsarSource 和用户自定义 source 有以下区别:

  • 用户自定义 Source:用于与外部系统集成(图中的红色箭头)。例如,数据库和日志等。
  • 系统默认的 PulsarSource:会启动 Consumer ,用于消费来自 Pulsar topic 的数据(图中指向 Consumer 的黄色箭头)。

数据流入至 Function 时,会执行以下逻辑:

// process the message
result = javaInstance.handleMessage(currentRecord, currentRecord.getValue());
public JavaExecutionResult handleMessage(Record<?> record, Object input) {
  ...
  if (function != null) {
    // 用户自定义 function 的处理逻辑
      output = function.process(input, context);
  } else {
    // 系统默认 IdentityFunction 的处理逻辑
      output = javaUtilFunction.apply(input);
  }
  ...
}

数据在 Function 处理完成之后,会进入到下一个组件,即 Sink,Sink 同样会执行类似的判断。如果是用户自定义 Sink,则会调用用户的类执行初始化;如果没有用户自定义 Sink,则使用系统默认的 PulsarSink 执行初始化,从而完成数据的输出,这里使用了 PulsarSink。

PulsarSink 与用户自定义 Sink 有以下区别:

  • 用户自定义 Sink:将数据输出至外部系统(图中第二个红色箭头)。
  • 系统默认的 PulsarSink:会初始化 Pulsar 的 Producer,将数据输出至 Pulsar Topic(图中指向外部的黄色箭头)。

以上流程是在命令行中执行 pulsar-admin source 时进行的逻辑,当执行 Function 时,会将 IdentityFunction 替换为用户的 Function 对象,但是 Source 和 Sink 会使用系统默认的 PulsarSource 和 PulsarSink 进行初始化(图中标注 Function 的蓝色方块)。pulsar-admin sink 同样如此,此处不再赘述。

总结

本文分享了 Source、Sink 和 Function 之间的关系以及数据流通的过程。 Instance 实际包含了三个组件:Source、Sink 和 Function,这三个组件是不同的,因此,“Source 和 Sink 是一种特殊的 Function” 的说法是不准确的。同时,本文还介绍了每个 Instance 对 pub/sub 模型的封装。Source 和 Sink 作为 Instance 中单独的组件,在构建与外部系统的生态时有重要意义。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant