Flink
- wordcount
- streamwordcount
- 官网下载软件包
- bin/start-cluster.sh 启动
- TaskManagerRunner
- Jps
- StandaloneSessionClusterEntrypoint
- 页面: http://localhost:8081/#/overview
- 在页面提交jar, 设置参数,执行任务,观察流程图
- bin/stop-cluster.sh 关闭
- JobManger
- 控制程序运行的主进程,每个程序都有一个不同的jobManger
- 会先接受到要执行的程序:作业图(jobgraph)、逻辑数据流图(logical dataflow graph),打包了的所有类库以及资源jar包
- 将jobgraph转换为一个物理层面的执行图(executiongraph) 包含了所有可以并发执行的任务
- 向ResourceManger申请资源(TaskManger的slot), 将执行图分发给TaskManger, 并负责中央协调
- TaskManger
- Flink中会有多个TaskManger,每个TakManger会有一定数量的slots, slots 限制了任务的并行度
- 启动后向ResouceManger注册资源slots,然后将slots提供给JobManger调用执行task任务
- TaskManger 可以同用一个应用中的其他TaskManger 交换数据
- ResourceManger
- 负责管理TaskManger中的slots,TaskManger定义了flink中的基本资源单元
- flink可以根据不同的环境设置不同的资源管理器:YARN Mesos等
- JobManger申请资源时 需要的资源会将空闲的TaskManger分配给JobManger,不满足时还会向资源管理平台发起会话以提供启动TaskManger进程的容器
- Dispacher
- 可以跨作业进行,提供Rest接口
- 当一个应用被执行时,分发器将应用提交给JobManger
- 启动一个webui 方便监控和展示作业流程(yarn中没有Dispacher,故没有webui)
- 每一个TEaskManger都是一个jvm进程
- 一个TaskManger有一个或多个slot
- 默认情况下Flink 允许子任务共享slot,即使是不同任务的子任务
- Task slot 是静态概念,是指TaskManger具有的并发能力
example:
- 共有3个TaskManger 每个TaskManger3个slot, 共有9个slots
- 第一种情况,一个并行度,导致很多slots空闲
- 第二种情况,2个并行度,会分配到两个slots上面
- 第三种情况,9个并行度,会平均分到各个slot上面,但是由于sink也是9个输出文件式,可能乱序等问题
- 第四种情况,完美
- 所有flink程序都由三部分:Souce trasformation sink
- source 负责读取数据源 trasformation 负责各种算子的处理 sink负责输出
- 运行时,flink程序会映射为“逻辑数据流”(dataflow),包含三部分
- 每个dataflow 以一个或多个source开始,以一个或多个sink结束,dataflow 类似于任意的有向无环图DAG
- 在大部分情况下程序的转换运算(trasformation)和dataflow的operator一一对应
- streamGraph -》 JobGraph -》 ExecutionGraph -》 物理执行图
- streamGraph: 用户通过StreamApi编写的代码生成的最初的图,用来代表程序的拓扑结构
- JobGraph:streamGraph经过优化后生成的JobGraph,提交给jobManger的数据结构,将多个符合条件的节点chain合并成一个节点
- ExecutionGraph:JobManger根据JobGraph生成的ExecutionGraph,ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构
- 物理执行图:JobManger根据ExecutionGraph对job进行调度后, 在各个TaskManger上部署Task后形成的图,并不是一个具体的数据结构
- 特定算子的子任务(subTask)的个数称之为并行度
- 一个stream job 的并行度是指其所有算子中最大的并行度
- 在一个应用程序中,不同的算子可以有不同的并行度
- 算子之间传输数据的模式
- one-to-one: stream维护者分区以及元素的顺序(比如source和map之间), 这以为着map算子子任务看到到元素的个数以及顺序和source算子的子任务看的的元素个数以及顺序是相同的 map filter flatmap 等算子都是one-to-one的
- Redistributing: stream的分区会发生改变,每一个算子的子任务依据所选择的trasformation发送数据到不同的目标任务。 例如: keyby 根据hashcode重分区,而broadcast和rebalance会随机重新分区,这些算子都会引起Redistributie过程 而redistribute类似于spark的shuffle过程
- flink采用一种任务链的优化技术,可以在特定条件下减少本地通信开销,为了满足任务链的要求,必须将两个算子或多个算子,设置为相同的并行度 并通过本地转发(local forward)的方式链接
- 相同并行度的one-to-one操作,flink将这样相连的算子链接在一个task,原来的算子成为其中的subtask
- 并行度相同 one-to-one操作,二者缺一不可
- 但是针对比较大的task ,如果还要合并在一起会更加影响性能,故可以不选择合并
- env.disableOperatorChaining()
- 每个算子后面可以跟加 .disableChaining()
- 强行断开,形成一个新的链 代码:
object StreamWordCount {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// env.disableOperatorChaining() // 针对比较的的task ,可以选择不进行操作链合并
val paramTool: ParameterTool = ParameterTool.fromArgs(args)
// val host = "localhost"
// val port = 7777
val host = paramTool.get("host")
val port = paramTool.getInt("port")
// 接受一个socket文本流
val streamData = env.socketTextStream(host, port)
// 数据流处理
val wordCount = streamData.flatMap(_.split(" "))
.filter(_.nonEmpty)
// .disableChaining() 针对算子不使用chain合并操作
// .startNewChain() 强行断开形成一个新链
.map((_, 1))
.keyBy(0) // 流式没有group by 只有keyby 代替 nc -lk 7777
.sum(1)
wordCount.print()
.setParallelism(1) //设置并行度 及 slot 的个数 默认线是核数 cpu的core 数
// 任务执行
env.execute("stream wordcount job")
}
}
- 在并行度为1 的机器上,当设置任务提交参数为1时,则只有两个 chains
- source、flatMap、filter、map -》 agg、sink
- 在并行度为1 的机器上,当设置任务提交参数为2时,则只有3个 chains
- source -》 flatMap、filter、map -》 agg =》 sink
- 当设置env.disableOperatorChaining()
- source -》 flatMap -》 filter -》 map -》 agg -》 sink
- 当设置filter算子后设置.disableChaining() 1. source、flatMap -》 filter -》 map -》 agg、sink
- 当设置filter算子后设置.startNewChain() 1. source、flatMap -》 filter、 map -》 agg、sink
map flat filter keyby .keyBy("id") .keyBy(0) 返回的是JAVATUPLE .keyBy(_._1) 返回的是对应的类型 agg 绝大多多数聚合操作是针对keyStream
split 将一个stream 分成两部分 select: 根据名称选择类型 connect 优点: 可以对不同的流做不同的操作 缺点: 每次只能操作两个流 union 优点: 可以对多个流进行合并 缺点: 多条流类型必须相同
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_2.11</artifactId>
<version>1.9.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.11</artifactId>
<version>1.9.0</version>
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.9.0</version>
<scope>test</scope>
<classifier>tests</classifier>
</dependency>
- Stateless Operators
- FlatMap:
- Mock the Collector object using Mockito
- Use the ListCollector provided by Flink
- FlatMap:
- Stateful Operators
- OneInputStreamOperatorTestHarness (for operators on DataStreamss)
- KeyedOneInputStreamOperatorTestHarness (for operators on KeyedStreams)
- TwoInputStreamOperatorTestHarness (for operators of ConnectedStreams of two DataStreams)
- KeyedTwoInputStreamOperatorTestHarness (for operators on ConnectedStreams of two KeyedStreams)
- Timed Process Operators
- Many more examples for the usage of these test harnesses can be found in the Flink code base, e.g.:
- org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest is a good example for testing operators and user-defined functions, which depend on processing or event time.
- org.apache.flink.streaming.api.functions.sink.filesystem.LocalStreamingFileSinkTest shows how to test a custom sink with the AbstractStreamOperatorTestHarness. Specifically, it uses AbstractStreamOperatorTestHarness.snapshot and AbstractStreamOperatorTestHarness.initializeState to tests its interaction with Flink’s checkpointing mechanism.
- Many more examples for the usage of these test harnesses can be found in the Flink code base, e.g.:
- Job
- A few remarks on integration testing with MiniClusterWithClientResource:
- In order not to copy your whole pipeline code from production to test, make sources and sinks pluggable in your production code and inject special test sources and test sinks in your tests.
- The static variable in CollectSink is used here because Flink serializes all operators before distributing them across a cluster. Communicating with operators instantiated by a local Flink mini cluster via static variables is one way around this issue. Alternatively, you could write the data to files in a temporary directory with your test sink.
- You can implement a custom parallel source function for emitting watermarks if your job uses event timer timers.
- It is recommended to always test your pipelines locally with a parallelism > 1 to identify bugs which only surface for the pipelines executed in parallel.
- Prefer @ClassRule over @Rule so that multiple tests can share the same Flink cluster. Doing so saves a significant amount of time since the startup and shutdown of Flink clusters usually dominate the execution time of the actual tests. 6.If your pipeline contains custom state handling, you can test its correctness by enabling checkpointing and restarting the job within the mini cluster. For this, you need to trigger a failure by throwing an exception from (a test-only) user-defined function in your pipeline.
- A few remarks on integration testing with MiniClusterWithClientResource:
- 参考: