Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions docs/contributor-guide/flownode/batching_mode.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
---
keywords: [streaming process, flow management, Flownode components, Flownode limitations, batching mode]
description: Overview of Flownode's batching mode, a component providing continuous data aggregation capabilities to the database, including its architecture and query execution flow.
---

# Flownode Batching Mode Developer Guide

This guide provides a brief overview of the batching mode in `flownode`. It's intended for developers who want to understand the internal workings of this mode.

## Overview

The batching mode in `flownode` is designed for continuous data aggregation. It periodically executes a user-defined SQL query over small, discrete time windows. This is in contrast to a streaming mode where data is processed as it arrives.

The core idea is to:
1. Define a `flow` with a SQL query that aggregates data from a source table into a sink table.
2. The query typically includes a time window function (e.g., `date_bin`) on a timestamp column.
3. When new data is inserted into the source table, the system marks the corresponding time windows as "dirty."
4. A background task periodically wakes up, identifies these dirty windows, and re-runs the aggregation query for those specific time ranges.
5. The results are then inserted into the sink table, effectively updating the aggregated view.

## Architecture

The batching mode consists of several key components that work together to achieve this continuous aggregation. As shown in the diagram below:

![batching mode architecture](/batching_mode_arch.png)

### `BatchingEngine`

The `BatchingEngine` is the heart of the batching mode. It's a central component that manages all active flows. Its primary responsibilities are:

- **Task Management**: It maintains a map of `FlowId` to `BatchingTask`. It handles the creation, deletion, and retrieval of these tasks.
- **Event Dispatching**: When new data arrives (via `handle_inserts_inner`) or when time windows are explicitly marked as dirty (`handle_mark_dirty_time_window`), the `BatchingEngine` identifies which flows are affected and forwards the information to the corresponding `BatchingTask`s.

### `BatchingTask`

A `BatchingTask` represents a single, independent data flow. Each task is associated with one `flow` definition and runs in its own asynchronous loop.

- **Configuration (`TaskConfig`)**: This struct holds the immutable configuration for a flow, such as the SQL query, source and sink table names, and time window expression.
- **State (`TaskState`)**: This contains the dynamic, mutable state of the task, most importantly the `DirtyTimeWindows`.
- **Execution Loop**: The task runs an infinite loop (`start_executing_loop`) that:
1. Checks for a shutdown signal.
2. Waits for a scheduled interval or until it's woken up.
3. Generates a new query plan (`gen_insert_plan`) based on the current set of dirty time windows.
4. Executes the query (`execute_logical_plan`) against the database.
5. Cleans up the processed dirty windows.

### `TaskState` and `DirtyTimeWindows`

- **`TaskState`**: This struct tracks the runtime state of a `BatchingTask`. It includes `dirty_time_windows`, which is crucial for determining what work needs to be done.
- **`DirtyTimeWindows`**: This is a key data structure that keeps track of which time windows have received new data since the last query execution. It stores a set of non-overlapping time ranges. When a task's execution loop runs, it consults this structure to build a `WHERE` clause that filters the source table for only the dirty time windows.

### `TimeWindowExpr`

The `TimeWindowExpr` is a helper utility for dealing with time window functions like `TUMBLE`.

- **Evaluation**: It can take a timestamp and evaluate the time window expression to determine the start and end of the window that the timestamp falls into.
- **Window Size**: It can also determine the size (duration) of the time window from the expression.

This is essential for both marking windows as dirty and for generating the correct filter conditions when querying the source table.

## Query Execution Flow

Here's a simplified step-by-step walkthrough of how a query is executed in batch mode:

1. **Data Ingestion**: New data is written to a source table.
2. **Marking Dirty**: The `BatchingEngine` receives a notification about the new data. It uses the `TimeWindowExpr` associated with each relevant flow to determine which time windows are affected by the new data points. These windows are then added to the `DirtyTimeWindows` set in the corresponding `TaskState`.
3. **Task Wake-up**: The `BatchingTask`'s execution loop wakes up, either due to its periodic schedule or because it was notified of a large backlog of dirty windows.
4. **Plan Generation**: The task calls `gen_insert_plan`. This method:
- Inspects the `DirtyTimeWindows`.
- Generates a series of `OR`'d `WHERE` clauses (e.g., `(ts >= 't1' AND ts < 't2') OR (ts >= 't3' AND ts < 't4') ...`) that cover the dirty windows.
- Rewrites the original SQL query to include this new filter, ensuring that only the necessary data is processed.
5. **Execution**: The modified query plan is sent to the `Frontend` for execution. The database processes the aggregation on the filtered data.
6. **Upsert**: The results are inserted into the sink table. The sink table is typically defined with a primary key that includes the time window column, so new results for an existing window will overwrite (upsert) the old ones.
7. **State Update**: The `DirtyTimeWindows` set is cleared of the windows that were just processed. The task then goes back to sleep until the next interval.
12 changes: 8 additions & 4 deletions docs/contributor-guide/flownode/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,14 @@ description: Overview of Flownode, a component providing streaming process capab

`Flownode` support both `standalone` and `distributed` mode. In `standalone` mode, `Flownode` runs in the same process as the database. In `distributed` mode, `Flownode` runs in a separate process and communicates with the database through the network.

There are two execution modes for a flow:
- **Streaming Mode**: The original mode where data is processed as it arrives.
- **Batching Mode**: A newer mode designed for continuous data aggregation. It periodically executes a user-defined SQL query over small, discrete time windows. All aggregation queries now use this mode. For more details, see the [Batching Mode Developer Guide](./batching_mode.md).

## Components

A `Flownode` contains all the components needed for the streaming process of a flow. Here we list the vital parts:
A `Flownode` contains all the components needed to execute a flow. The specific components involved depend on the execution mode (Streaming vs. Batching). At a high level, the key parts are:

- A `FlownodeManager` for receiving inserts forwarded from the `Frontend` and sending back results for the flow's sink table.
- A certain number of `FlowWorker` instances, each running in a separate thread. Currently for standalone mode, there is only one flow worker, but this may change in the future.
- A `Flow` is a task that actively receives data from the `source` and sends data to the `sink`. It is managed by the `FlownodeManager` and run by a `FlowWorker`.
- **Flow Manager**: A central component responsible for managing the lifecycle of all flows.
- **Task Executor**: The runtime environment where the flow logic is executed. In streaming mode, this is typically a `FlowWorker`; in batching mode, it's a `BatchingTask`.
- **Flow Task**: Represents a single, independent data flow, containing the logic for transforming data from a source to a sink.
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
---
keywords: [流处理, flow 管理, Flownode 组件, Flownode 限制, 批处理模式]
description: Flownode 批处理模式概述,一个为数据库提供持续数据聚合能力的组件,包括其架构和查询执行流程。
---

# Flownode 批处理模式开发者指南

本指南简要概述了 `flownode` 中的批处理模式。它旨在帮助希望了解此模式内部工作原理的开发人员。

## 概述

`flownode` 中的批处理模式专为持续数据聚合而设计。它在离散的、微小的时间窗口上周期性地执行用户定义的 SQL 查询。这与数据在到达时即被处理的流处理模式形成对比。

其核心思想是:
1. 定义一个带有 SQL 查询的 `flow`,该查询将数据从源表聚合到目标表。
2. 查询通常在时间戳列上包含一个时间窗口函数(例如 `date_bin`)。
3. 当新数据插入源表时,系统会将相应的时间窗口标记为“脏”(dirty)。
4. 一个后台任务会周期性地唤醒,识别这些脏窗口,并为那些特定的时间范围重新运行聚合查询。
5. 然后将结果插入到目标表中,从而有效地更新聚合视图。

## 架构

批处理模式由几个协同工作的关键组件组成,以实现这种持续聚合。如下图所示:

![batching mode architecture](/batching_mode_arch.png)

### `BatchingEngine`

`BatchingEngine` 是批处理模式的核心。它是一个管理所有活动 flow 的中心组件。其主要职责是:

- **任务管理**: 维护一个从 `FlowId` 到 `BatchingTask` 的映射。它处理这些任务的创建、删除和检索。
- **事件分发**: 当新数据到达(通过 `handle_inserts_inner`)或当时间窗口被显式标记为脏(`handle_mark_dirty_time_window`)时,`BatchingEngine` 会识别受影响的 flow,并将信息转发给相应的 `BatchingTask`。

### `BatchingTask`

`BatchingTask` 代表一个独立的、单个的数据流。每个任务都与一个 `flow` 定义相关联,并在其自己的异步循环中运行。

- **配置 (`TaskConfig`)**: 此结构体持有 flow 的不可变配置,例如 SQL 查询、源表和目标表名以及时间窗口表达式。
- **状态 (`TaskState`)**: 包含任务的动态、可变状态,最重要的是 `DirtyTimeWindows`。
- **执行循环**: 任务运行一个无限循环 (`start_executing_loop`),该循环:
1. 检查关闭信号。
2. 等待一个预定的时间间隔或直到被唤醒。
3. 基于当前的脏时间窗口集合生成一个新的查询计划 (`gen_insert_plan`)。
4. 对数据库执行查询 (`execute_logical_plan`)。
5. 清理已处理的脏窗口。

### `TaskState` 和 `DirtyTimeWindows`

- **`TaskState`**: 此结构体跟踪 `BatchingTask` 的运行时状态。它包括 `dirty_time_windows`,这对于确定需要完成哪些操作至关重要。
- **`DirtyTimeWindows`**: 这是一个关键的数据结构,用于跟踪自上次查询执行以来哪些时间窗口接收到了新数据。它存储一组不重叠的时间范围。当任务的执行循环运行时,它会参考此结构来构建一个 `WHERE` 子句,该子句仅过滤源表中的脏时间窗口。

### `TimeWindowExpr`

`TimeWindowExpr` 是一个用于处理像 `TUMBLE` 这样的时间窗口函数的辅助工具。

- **求值**: 它可以接受一个时间戳并对时间窗口表达式求值,以确定该时间戳所属窗口的开始和结束。
- **窗口大小**: 它还可以从表达式中确定时间窗口的大小(持续时间)。

这对于标记窗口为脏以及在查询源表时生成正确的过滤条件都至关重要。

## 查询执行流程

以下是批处理模式下查询执行的简化分步演练:

1. **数据摄取**: 新数据被写入源表。
2. **标记为脏**: `BatchingEngine` 收到有关新数据的通知。它使用与每个相关 flow 关联的 `TimeWindowExpr` 来确定哪些时间窗口受到新数据点的影响。然后将这些窗口添加到相应 `TaskState` 中的 `DirtyTimeWindows` 集合中。
3. **任务唤醒**: `BatchingTask` 的执行循环被唤醒,原因可能是其周期性调度,也可能是因为它被通知有大量积压的脏窗口。
4. **计划生成**: 任务调用 `gen_insert_plan`。此方法:
- 检查 `DirtyTimeWindows`。
- 生成一系列 `OR` 连接的 `WHERE` 子句(例如 `(ts >= 't1' AND ts < 't2') OR (ts >= 't3' AND ts < 't4') ...`),覆盖所有脏窗口。
- 重写原始 SQL 查询以包含此新过滤器,确保只处理必要的数据。
5. **执行**: 修改后的查询计划被发送到 `Frontend` 执行。数据库处理已过滤数据的聚合。
6. **Upsert**: 结果被插入到目标表中。目标表通常定义了一个包含时间窗口列的主键,因此现有窗口的新结果将覆盖(upsert)旧结果。
7. **状态更新**: `DirtyTimeWindows` 集合中刚刚处理过的窗口被清除。然后任务返回睡眠状态,直到下一个时间间隔。
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
keywords: [Flownode, 流处理, FlownodeManager, FlowWorker]
description: 介绍了 Flownode 的基本概念、组件和当前版本的支持情况,包括 FlownodeManager、FlowWorker 和 Flow 的功能
keywords: [流处理, flow 管理, 单机模式, Flownode 组件, Flownode 限制]
description: Flownode 概览,一个为数据库提供流处理能力的组件,包括其组件和当前的限制
---

# Flownode
Expand All @@ -10,12 +10,16 @@ description: 介绍了 Flownode 的基本概念、组件和当前版本的支持
`Flownode` 为数据库提供了一种简单的流处理(称为 `flow`)能力。
`Flownode` 管理 `flow`,这些 `flow` 是从 `source` 接收数据并将数据发送到 `sink` 的任务。

在当前版本中,`Flownode` 仅在单机模式中支持,未来将支持分布式模式。
`Flownode` 支持 `standalone`(单机)和 `distributed`(分布式)两种模式。在 `standalone` 模式下,`Flownode` 与数据库运行在同一进程中。在 `distributed` 模式下,`Flownode` 运行在单独的进程中,并通过网络与数据库通信。

一个 flow 有两种执行模式:
- **流处理模式 (Streaming Mode)**: 原始的模式,数据在到达时即被处理。
- **批处理模式 (Batching Mode)**: 一种为持续数据聚合设计的较新模式。它在离散的、微小的时间窗口上周期性地执行用户定义的 SQL 查询。目前所有的聚合查询都使用此模式。更多详情,请参阅[批处理模式开发者指南](./batching_mode.md)。

## 组件

`Flownode` 包含了 flow 流式处理的所有组件,以下是关键部分
`Flownode` 包含了执行一个 flow 所需的所有组件。所涉及的具体组件取决于执行模式(流处理 vs. 批处理)。在较高的层面上,关键部分包括

- `FlownodeManager`:用于接收从 `Frontend` 转发的插入数据并将结果发送回 flow 的 sink 表
- 一定数量的 `FlowWorker` 实例,每个实例在单独的线程中运行。当前在单机模式中只有一个 flow worker,但这可能会在未来发生变化
- `Flow` 是一个主动从 `source` 接收数据并将数据发送到 `sink` 的任务。由 `FlownodeManager` 管理并由 `FlowWorker` 运行
- **Flow Manager**: 一个负责管理所有 flow生命周期的中心组件
- **Task Executor**: flow 逻辑执行的运行时环境。在流处理模式下,这通常是一个 `FlowWorker`;在批处理模式下,它是一个 `BatchingTask`
- **Flow Task**: 代表一个独立的、单个的数据流,包含将数据从 source 转换为 sink 的逻辑
1 change: 1 addition & 0 deletions sidebars.ts
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,7 @@ const sidebars: SidebarsConfig = {
id: 'contributor-guide/flownode/overview',
label: 'Overview',
},
'contributor-guide/flownode/batching_mode',
'contributor-guide/flownode/dataflow',
'contributor-guide/flownode/arrangement'
],
Expand Down
Binary file added static/batching_mode_arch.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.