-
Notifications
You must be signed in to change notification settings - Fork 334
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(flow): flush_flow
function
#4416
Conversation
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughThis update enhances the project's capabilities by introducing new features and improving existing functionality. Key changes include the addition of flow service handling, modifications to asynchronous functions, and updates to dependency management. These alterations aim to streamline data processing and error handling within the system. Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant FlowService
participant QueryEngine
participant DataStore
User->>FlowService: Initiate flush operation
FlowService->>QueryEngine: Request flush
QueryEngine->>DataStore: Execute data flush
DataStore-->>QueryEngine: Acknowledge completion
QueryEngine-->>FlowService: Flush completed
FlowService-->>User: Respond with status
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? |
@coderabbitai review |
Actions performedReview triggered.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Outside diff range, codebase verification and nitpick comments (8)
src/common/function/src/table.rs (1)
39-39
: Ensure comprehensive testing forFlushFlowFunction
.The search did not find any existing tests for
FlushFlowFunction
. Please add comprehensive tests to validate its functionality.
- File:
src/common/function/src/table.rs
- Line: 39
Analysis chain
Ensure comprehensive testing for
FlushFlowFunction
.The new function
FlushFlowFunction
has been registered. Ensure that comprehensive tests are added to validate its functionality.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify that comprehensive tests exist for `FlushFlowFunction`. # Test: Search for test files. Expect: Comprehensive tests for `FlushFlowFunction`. fd -t f -e rs test | xargs rg 'FlushFlowFunction'Length of output: 49
Script:
#!/bin/bash # Description: List all test files to verify their presence. fd -t f -e rs test # Description: Search for `FlushFlowFunction` in all Rust files to ensure comprehensive testing. fd -t f -e rs | xargs rg 'FlushFlowFunction'Length of output: 3745
Script:
#!/bin/bash # Description: Search for `FlushFlowFunction` in the identified test files to ensure comprehensive testing. rg 'FlushFlowFunction' src/auth/src/tests.rs \ src/cmd/tests/load_config_test.rs \ src/common/base/tests/buffer_tests.rs \ src/common/datasource/src/file_format/tests.rs \ src/common/datasource/src/test_util.rs \ src/common/datasource/src/tests.rs \ src/common/function/src/scalars/test.rs \ src/common/function/src/scalars/timestamp/greatest.rs \ src/common/macro/tests/test_derive.rs \ src/common/meta/src/ddl/test_util.rs \ src/common/meta/src/ddl/tests.rs \ src/common/meta/src/heartbeat/handler/tests.rs \ src/common/meta/src/key/test_utils.rs \ src/common/meta/src/kv_backend/test.rs \ src/common/meta/src/test_util.rs \ src/common/query/src/test_util.rs \ src/common/wal/src/test_util.rs \ src/datanode/src/tests.rs \ src/file-engine/src/test_util.rs \ src/flow/src/adapter/tests.rs \ src/frontend/src/heartbeat/handler/tests.rs \ src/index/src/fulltext_index/tests.rs \ src/log-store/src/test_util.rs \ src/meta-srv/src/procedure/region_migration/test_util.rs \ src/meta-srv/src/procedure/tests.rs \ src/meta-srv/src/pubsub/tests.rs \ src/meta-srv/src/selector/test_utils.rs \ src/meta-srv/src/test_util.rs \ src/metric-engine/src/test_util.rs \ src/mito2/src/cache/test_util.rs \ src/mito2/src/compaction/test_util.rs \ src/mito2/src/engine/alter_test.rs \ src/mito2/src/engine/append_mode_test.rs \ src/mito2/src/engine/basic_test.rs \ src/mito2/src/engine/batch_open_test.rs \ src/mito2/src/engine/catchup_test.rs \ src/mito2/src/engine/close_test.rs \ src/mito2/src/engine/compaction_test.rs \ src/mito2/src/engine/create_test.rs \ src/mito2/src/engine/drop_test.rs \ src/mito2/src/engine/filter_deleted_test.rs \ src/mito2/src/engine/flush_test.rs \ src/mito2/src/engine/merge_mode_test.rs \ src/mito2/src/engine/open_test.rs \ src/mito2/src/engine/parallel_test.rs \ src/mito2/src/engine/projection_test.rs \ src/mito2/src/engine/prune_test.rs \ src/mito2/src/engine/row_selector_test.rs \ src/mito2/src/engine/set_readonly_test.rs \ src/mito2/src/engine/truncate_test.rs \ src/mito2/src/manifest/tests.rs \ src/mito2/src/test_util.rs \ src/object-store/src/test_util.rs \ src/object-store/tests/object_store_test.rs \ src/operator/src/tests.rs \ src/promql/src/extension_plan/test_util.rs \ src/promql/src/functions/test_util.rs \ src/puffin/src/puffin_manager/tests.rs \ src/puffin/src/tests.rs \ src/query/src/optimizer/test_util.rs \ src/query/src/tests/argmax_test.rs \ src/query/src/tests/argmin_test.rs \ src/query/src/tests/mean_test.rs \ src/query/src/tests/percentile_test.rs \ src/query/src/tests/polyval_test.rs \ src/query/src/tests/query_engine_test.rs \ src/query/src/tests/scipy_stats_norm_cdf_test.rs \ src/query/src/tests/time_range_filter_test.rs \ src/query/src/tests.rs \ src/script/src/python/ffi_types/pair_tests/sample_testcases.rs \ src/script/src/python/ffi_types/pair_tests.rs \ src/script/src/python/ffi_types/vector/tests.rs \ src/script/src/python/rspython/builtins/test.rs \ src/script/src/python/rspython/test.rs \ src/script/src/test.rs \ src/servers/src/http/test_helpers.rs \ src/servers/tests/http/http_handler_test.rs \ src/servers/tests/http/http_test.rs \ src/servers/tests/http/influxdb_test.rs \ src/servers/tests/http/opentsdb_test.rs \ src/servers/tests/http/prom_store_test.rs \ src/servers/tests/mysql/mysql_server_test.rs \ src/servers/tests/mysql/mysql_writer_test.rs \ src/table/src/test_util.rs \ tests-fuzz/src/test_utils.rs \ tests-integration/src/test_util.rs \ tests-integration/src/tests/instance_kafka_wal_test.rs \ tests-integration/src/tests/instance_test.rs \ tests-integration/src/tests/promql_test.rs \ tests-integration/src/tests/test_util.rs \ tests-integration/src/tests.rsLength of output: 3470
src/common/function/src/handlers.rs (1)
68-83
: Issues found in theFlowServiceHandler
trait implementation and usage:
Incomplete Implementation:
- The
flush
method inMockFlowServiceHandler
is currently a placeholder and needs a proper implementation.
- File:
src/common/function/src/state.rs
- Line: Implementation of
MockFlowServiceHandler
Potential Runtime Issues:
- Ensure that
FlowServiceHandlerRef
is properly initialized wherever it is used to avoid potential runtime issues.
- Files:
src/query/src/query_engine/state.rs
src/query/src/query_engine.rs
src/common/function/src/state.rs
src/common/function/src/flush_flow.rs
Please address these issues to ensure the
FlowServiceHandler
trait is correctly implemented and used across the codebase.Analysis chain
Verify the implementation and usage of the new
FlowServiceHandler
trait.Ensure that the new
FlowServiceHandler
trait is correctly implemented and used throughout the codebase.
Verify the implementation details and usage of
FlowServiceHandler
.We need to inspect the implementations in
src/common/function/src/state.rs
andsrc/operator/src/flow.rs
, and check for the usage ofFlowServiceHandlerRef
.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the implementation and usage of the new `FlowServiceHandler` trait. # Test: Search for the `FlowServiceHandler` trait implementation. Expect: Correct implementation and usage. rg --type rust 'impl FlowServiceHandler'Length of output: 211
Script:
#!/bin/bash # Description: Inspect the implementation of FlowServiceHandler in src/common/function/src/state.rs. # Extract the implementation details from src/common/function/src/state.rs rg --type rust -A 10 'impl FlowServiceHandler for MockFlowServiceHandler' src/common/function/src/state.rsLength of output: 424
Script:
#!/bin/bash # Description: Inspect the implementation of FlowServiceHandler in src/operator/src/flow.rs. # Extract the implementation details from src/operator/src/flow.rs rg --type rust -A 10 'impl FlowServiceHandler for FlowServiceOperator' src/operator/src/flow.rsLength of output: 362
Script:
#!/bin/bash # Description: Verify the usage of FlowServiceHandlerRef. # Search for the usage of FlowServiceHandlerRef rg --type rust 'FlowServiceHandlerRef'Length of output: 1254
src/common/function/src/state.rs (1)
122-137
: Ensure completeness of the mock implementation forFlowServiceHandler
.The
MockFlowServiceHandler
implementation is incomplete as theflush
method uses thetodo!()
macro. Additionally, there is no evidence of test cases or usage ofMockFlowServiceHandler
to validate its functionality.
src/common/function/src/state.rs
: Theflush
method inMockFlowServiceHandler
is not implemented.- Ensure that the
MockFlowServiceHandler
is used in test cases to validate its functionality.Analysis chain
Ensure completeness of the mock implementation for
FlowServiceHandler
.Verify that the mock implementation for the
FlowServiceHandler
is complete and correctly implemented.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the completeness of the mock implementation for `FlowServiceHandler`. # Test: Search for the usage of the mock implementation. Expect: Correct and complete implementation. rg --type rust 'MockFlowServiceHandler'Length of output: 312
Script:
#!/bin/bash # Description: Verify the completeness of the mock implementation for `FlowServiceHandler`. # Step 1: Search for the `FlowServiceHandler` trait definition to identify all required methods. rg --type rust 'trait FlowServiceHandler' # Step 2: Search for test cases or usage of `MockFlowServiceHandler` in the codebase. rg --type rust 'MockFlowServiceHandler' --context 10Length of output: 3441
Script:
#!/bin/bash # Description: Verify the completeness of the mock implementation for `FlowServiceHandler`. # Search for the methods within the `FlowServiceHandler` trait definition. rg --type rust 'trait FlowServiceHandler' -A 20Length of output: 956
src/common/macro/src/admin_fn.rs (1)
156-156
: Consider dependency injection for handler management.The TODO comment suggests a potential future enhancement to improve handler management by considering dependency injection.
src/flow/src/adapter/worker.rs (3)
154-155
: Clarify theblocking
parameter in the docstring.The docstring mentions the
blocking
parameter but could benefit from a more detailed explanation of its purpose and behavior.- /// `blocking` indicate whether it will wait til all dataflows are finished computing if true or + /// `blocking` indicates whether the method will wait until all dataflows are finished computing (if true) or just start computing and return immediately (if false).
527-527
: Ensure test coverage for both blocking and non-blocking scenarios.The test cases currently only cover the blocking scenario (
run_available(true)
). Please add test cases to cover the non-blocking scenario (run_available(false)
) to ensure comprehensive testing of therun_available
function.
- Add test cases for
run_available(false)
.Analysis chain
Ensure test coverage for both blocking and non-blocking scenarios.
The test case should cover both blocking and non-blocking scenarios to ensure comprehensive testing of the
run_available
function.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify test coverage for both blocking and non-blocking scenarios. # Test: Search for test cases covering `run_available` with both `blocking` and `non-blocking` parameters. rg --type rust 'run_available'Length of output: 2034
Script:
#!/bin/bash # Description: Verify test coverage for both blocking and non-blocking scenarios. # Test: Search for test cases covering `run_available` with both `blocking` and `non-blocking` parameters. rg --type rust 'run_available\(.*true\)' -A 5 rg --type rust 'run_available\(.*false\)' -A 5Length of output: 1492
158-170
: Ensure proper error handling in non-blocking mode.The
call_no_resp
method returns aResult<(), Error>
, but the non-blocking branch in therun_available
method does not handle this result. Ensure that any potential errors are logged or managed appropriately.
src/flow/src/adapter/worker.rs
lines 167-170Analysis chain
Ensure proper error handling in non-blocking mode.
The
call_no_resp
method does not handle errors explicitly. Ensure that any potential errors are logged or managed appropriately.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify error handling for `call_no_resp` method. # Test: Search for error handling in `call_no_resp` method. Expect: Proper error handling or logging. rg --type rust 'call_no_resp'Length of output: 313
Script:
#!/bin/bash # Description: Extract the implementation of the `call_no_resp` method to verify error handling. # Search for the implementation of the `call_no_resp` method in Rust files. ast-grep --lang rust --pattern 'fn call_no_resp(&self, req: Request) -> Result<(), Error> { $$$ }'Length of output: 348
Script:
#!/bin/bash # Description: Extract the implementation of the `run_available` method to verify error handling. # Search for the implementation of the `run_available` method in Rust files. ast-grep --lang rust --pattern 'pub async fn run_available(&self, now: repr::Timestamp, blocking: bool) -> Result<(), Error> { $$$ }'Length of output: 1237
src/flow/src/adapter.rs (1)
567-567
: Ensure all calls torun_available
include theblocking
parameter.The following files contain calls to
run_available
that need to be updated to include the newblocking
parameter:
src/flow/src/compute/state.rs
src/flow/src/adapter/worker.rs
src/flow/src/compute/render.rs
Please update these calls to include the
blocking
parameter to maintain consistency in the codebase.Analysis chain
LGTM! But verify the usage of the
run_available
method.The code change is approved.
However, ensure that all calls to the
run_available
method are updated to include the newblocking
parameter.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify all calls to the `run_available` method include the new `blocking` parameter. # Test: Search for the calls to the `run_available` method. Expect: Only occurrences with the new parameter. rg --type rust -A 5 $'run_available'Length of output: 7661
Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Files ignored due to path filters (1)
Cargo.lock
is excluded by!**/*.lock
Files selected for processing (50)
- Cargo.toml (1 hunks)
- src/cmd/src/cli/repl.rs (1 hunks)
- src/common/function/Cargo.toml (1 hunks)
- src/common/function/src/flush_flow.rs (1 hunks)
- src/common/function/src/handlers.rs (1 hunks)
- src/common/function/src/lib.rs (1 hunks)
- src/common/function/src/state.rs (4 hunks)
- src/common/function/src/system/procedure_state.rs (2 hunks)
- src/common/function/src/table.rs (2 hunks)
- src/common/function/src/table/flush_compact_region.rs (2 hunks)
- src/common/function/src/table/flush_compact_table.rs (2 hunks)
- src/common/function/src/table/migrate_region.rs (2 hunks)
- src/common/macro/src/admin_fn.rs (5 hunks)
- src/common/macro/src/lib.rs (2 hunks)
- src/common/query/src/error.rs (4 hunks)
- src/datanode/src/datanode.rs (1 hunks)
- src/flow/src/adapter.rs (1 hunks)
- src/flow/src/adapter/flownode_impl.rs (2 hunks)
- src/flow/src/adapter/worker.rs (5 hunks)
- src/flow/src/compute/render/src_sink.rs (1 hunks)
- src/flow/src/error.rs (1 hunks)
- src/flow/src/expr/relation/func.rs (2 hunks)
- src/flow/src/expr/scalar.rs (1 hunks)
- src/flow/src/repr/relation.rs (1 hunks)
- src/flow/src/server.rs (1 hunks)
- src/flow/src/transform.rs (4 hunks)
- src/flow/src/transform/expr.rs (1 hunks)
- src/frontend/Cargo.toml (1 hunks)
- src/frontend/src/instance/builder.rs (2 hunks)
- src/operator/src/flow.rs (1 hunks)
- src/operator/src/lib.rs (1 hunks)
- src/query/src/datafusion.rs (1 hunks)
- src/query/src/query_engine.rs (5 hunks)
- src/query/src/query_engine/context.rs (1 hunks)
- src/query/src/query_engine/default_serializer.rs (1 hunks)
- src/query/src/query_engine/state.rs (3 hunks)
- src/query/src/range_select/plan_rewrite.rs (1 hunks)
- src/query/src/tests.rs (1 hunks)
- src/query/src/tests/query_engine_test.rs (3 hunks)
- src/query/src/tests/time_range_filter_test.rs (1 hunks)
- src/script/benches/py_benchmark.rs (1 hunks)
- src/script/src/python/engine.rs (1 hunks)
- src/script/src/test.rs (1 hunks)
- src/servers/tests/mod.rs (1 hunks)
- tests/cases/standalone/common/flow/flow_basic.result (1 hunks)
- tests/cases/standalone/common/flow/flow_basic.sql (1 hunks)
- tests/cases/standalone/common/flow/flow_call_df_func.result (1 hunks)
- tests/cases/standalone/common/flow/flow_call_df_func.sql (1 hunks)
- tests/cases/standalone/common/flow/show_create_flow.result (2 hunks)
- tests/cases/standalone/common/flow/show_create_flow.sql (1 hunks)
Files skipped from review due to trivial changes (5)
- src/common/function/src/system/procedure_state.rs
- src/common/function/src/table/flush_compact_table.rs
- src/common/function/src/table/migrate_region.rs
- src/operator/src/lib.rs
- tests/cases/standalone/common/flow/show_create_flow.sql
Additional comments not posted (105)
src/common/function/src/lib.rs (1)
18-18
: New moduleflush_flow
added.The addition of the
flush_flow
module expands the library's functionality. Ensure that the new module is thoroughly tested and documented.src/common/function/Cargo.toml (1)
26-27
: New dependenciesfutures
andfutures-util
added.The addition of these dependencies enhances the project's asynchronous programming capabilities. Ensure that these dependencies are compatible with the existing codebase and do not introduce any conflicts.
src/common/function/src/table.rs (1)
25-25
: New functionFlushFlowFunction
registered.The registration of the
FlushFlowFunction
enhances the capabilities of theTableFunction
. Ensure that the new function is thoroughly tested and documented.src/query/src/tests.rs (1)
55-55
: Verify the correctness of the additional parameter.Ensure that the additional
None
parameter in theQueryEngineFactory::new
call is correct and does not introduce any issues.Verification successful
The additional parameter is correct.
The
QueryEngineFactory::new
function definition matches the parameters used in the call, including the additionalNone
parameter forflow_service_handler
.
catalog_manager
region_query_handler: None
table_mutation_handler: None
procedure_service_handler: None
flow_service_handler: None
with_dist_planner: false
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the correctness of the additional parameter in the `QueryEngineFactory::new` call. # Test: Search for the `QueryEngineFactory::new` function definition to understand the parameters. Expect: The function definition should match the parameters used in the call. rg --type rust -A 5 $'fn new'Length of output: 354015
Script:
#!/bin/bash # Description: Search for the `QueryEngineFactory` struct or class and locate the `new` function definition within it. # Search for the `QueryEngineFactory` struct or class definition rg --type rust 'struct QueryEngineFactory|impl QueryEngineFactory' -A 20Length of output: 1605
tests/cases/standalone/common/flow/flow_basic.sql (6)
1-6
: LGTM!The table creation commands are correct and follow standard SQL syntax.
8-12
: LGTM!The flow creation commands are correct and follow standard SQL syntax.
13-13
: Reminder: Confirm the necessity of the flush flow command.The TODO comment indicates that further confirmation is needed to determine if it is necessary to flush the flow at this point.
14-14
: LGTM!The use of the
flush_flow
function is correct and ensures that the flow is flushed and the latest results are written to the sink table.
16-38
: LGTM!The commands for inserting data, flushing the flow, selecting data, and dropping the flow and tables are correct and follow standard SQL syntax. The use of the
flush_flow
function is consistent.
40-60
: LGTM!The commands for creating tables, creating a flow with interval selections, showing the flow creation, and dropping the flow and tables are correct and follow standard SQL syntax.
src/frontend/Cargo.toml (1)
29-29
: LGTM!The new entry
common-function.workspace = true
correctly includes thecommon-function
module in the workspace configuration.tests/cases/standalone/common/flow/show_create_flow.result (7)
Line range hint
1-8
: LGTM!The table
numbers_input_show
is created correctly with columnsnumber
andts
.
10-14
: LGTM!The table
out_num_cnt_show
is created correctly with columnsnumber
andts
.
16-17
: LGTM!The SQL statement correctly retrieves flow information for
filter_numbers_show
.
21-22
: LGTM!The SQL statement correctly shows flows like
filter_numbers_show
.
26-28
: LGTM!The flow
filter_numbers_show
is created correctly to sink data intoout_num_cnt_show
fromnumbers_input_show
.
30-38
: LGTM!The SQL statement correctly shows the create flow statement for
filter_numbers_show
.
56-74
: LGTM!The SQL statements correctly drop the flow
filter_numbers_show
and the tablesout_num_cnt_show
andnumbers_input_show
.src/common/function/src/flush_flow.rs (6)
1-27
: LGTM!The license header and imports are correct.
28-34
: LGTM!The
flush_signature
function correctly returns the signature for theflush_flow
function.
36-41
: LGTM!The
flush_flow
function definition and macro annotation are correct.
42-63
: LGTM!The parameter validation in the
flush_flow
function is correct.
64-67
: LGTM!The
flush_flow
function correctly calls theflush
method on theflow_service_handler
.
68-71
: LGTM!The
flush_flow
function correctly returns the number of affected rows.src/script/src/test.rs (1)
59-59
: LGTM!The
QueryEngineFactory
instantiation is correctly modified with an additionalNone
argument.src/query/src/query_engine/context.rs (1)
85-85
: Verify the handling of the newNone
parameter.Ensure that the new
None
parameter added to theQueryEngineState
instantiation is correctly handled and does not introduce any issues.Verification successful
Verify the handling of the new
None
parameter.Ensure that the new
None
parameter added to theQueryEngineState
instantiation is correctly handled and does not introduce any issues.
Verify the handling of the new
None
parameter.Ensure that the new
None
parameter added to theQueryEngineState
instantiation is correctly handled and does not introduce any issues.
Verify the handling of the new
None
parameter.Ensure that the new
None
parameter added to theQueryEngineState
instantiation is correctly handled and does not introduce any issues.
Verify the handling of the new
None
parameter.Ensure that the new
None
parameter added to theQueryEngineState
instantiation is correctly handled and does not introduce any issues.
Verified: The
None
parameter is handled correctly.The
QueryEngineState::new
function manages theNone
parameter appropriately by passing it along to theSessionState
andFunctionState
, ensuring it does not introduce any issues.
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the usage of the new `None` parameter in `QueryEngineState`. # Test: Search for the `QueryEngineState` instantiation. Expect: Correct handling of the new parameter. rg --type rust 'QueryEngineState::new'Length of output: 282
Script:
#!/bin/bash # Extract the implementation of QueryEngineState::new function from the relevant files. # Search for the implementation of QueryEngineState::new function in the repository. ast-grep --lang rust --pattern 'impl QueryEngineState { pub fn new($$$) -> $_ { $$$ } }'Length of output: 98
Script:
#!/bin/bash # Search for the implementation of the QueryEngineState::new function and include context lines for better understanding. rg --type rust 'impl QueryEngineState' -A 20Length of output: 1729
Script:
#!/bin/bash # Extract more lines from the implementation of the QueryEngineState::new function to verify the handling of None values. rg --type rust 'impl QueryEngineState' -A 40Length of output: 3157
Script:
#!/bin/bash # Extract more lines from the implementation of the QueryEngineState::new function to verify the handling of None values. rg --type rust 'impl QueryEngineState' -A 60Length of output: 4768
src/common/function/src/state.rs (1)
Line range hint
15-26
:
Verify the handling of the newflow_service_handler
field.Ensure that the new
flow_service_handler
field added to theFunctionState
struct is correctly handled and does not introduce any issues.Verification successful
Verified the handling of the new
flow_service_handler
field.The new
flow_service_handler
field added to theFunctionState
struct is correctly handled in the relevant files. The initialization, usage, and mock implementation are consistent with the expected behavior.
query_engine.rs
: Properly initialized and passed in constructors.state.rs
: Similar initialization and state management.admin_fn.rs
: Appears in macro handling.flush_flow.rs
: Used in a function to call theflush
method.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the handling of the new `flow_service_handler` field in `FunctionState`. # Test: Search for the usage of the `flow_service_handler` field. Expect: Correct handling of the new field. rg --type rust 'flow_service_handler'Length of output: 982
Script:
#!/bin/bash # Description: Gather detailed information about the handling of `flow_service_handler` field. # Search for the initialization and usage of `flow_service_handler` in `query_engine.rs` rg -A 5 'flow_service_handler' src/query/src/query_engine.rs # Search for the initialization and usage of `flow_service_handler` in `state.rs` rg -A 5 'flow_service_handler' src/query/src/query_engine/state.rs # Search for the initialization and usage of `flow_service_handler` in `admin_fn.rs` rg -A 5 'flow_service_handler' src/common/macro/src/admin_fn.rs # Search for the initialization and usage of `flow_service_handler` in `flush_flow.rs` rg -A 5 'flow_service_handler' src/common/function/src/flush_flow.rsLength of output: 2216
tests/cases/standalone/common/flow/flow_call_df_func.sql (10)
1-6
: LGTM!The table
numbers_input_df_func
is created correctly with appropriate columns and constraints.
9-12
: LGTM!The flow
test_numbers_df_func
is created correctly with appropriate SQL commands.
16-19
: LGTM!The data is inserted correctly into the
numbers_input_df_func
table.
21-22
: LGTM!The
flush_flow
function is called correctly to ensure the table is created and data is inserted.
24-24
: LGTM!The data is selected correctly from the
out_num_cnt_df_func
flow.
26-26
: LGTM!The
flush_flow
function is called correctly to ensure the flow is flushed.
28-31
: LGTM!The data is inserted correctly into the
numbers_input_df_func
table.
33-33
: LGTM!The
flush_flow
function is called correctly to ensure the flow is flushed.
37-39
: LGTM!The flow
test_numbers_df_func
and tablesnumbers_input_df_func
andout_num_cnt_df_func
are dropped correctly.
14-14
: Verify the correctness offlush_flow
function.Ensure that the
flush_flow
function is implemented correctly and works as expected.src/operator/src/flow.rs (4)
1-32
: LGTM!The import statements and the
FlowServiceOperator
struct are defined correctly.
34-44
: LGTM!The implementation of the
FlowServiceOperator
struct is correct and follows best practices.
46-55
: LGTM!The implementation of the
FlowServiceHandler
trait forFlowServiceOperator
is correct and follows best practices.
58-128
: LGTM! But verify the correctness of theflush_inner
method.The implementation of the
flush_inner
method forFlowServiceOperator
is correct and follows best practices.However, ensure that the method works as expected and handles all edge cases.
src/common/function/src/table/flush_compact_region.rs (7)
22-22
: LGTM!The import statement for
snafu::ensure
is correct and necessary for the macro definition.
80-80
: LGTM!The import statement for
Function
andFunctionContext
is correct and necessary for the tests.
Line range hint
48-52
:
LGTM!The macro is used correctly to define the
flush_region
function.
Line range hint
54-54
:
LGTM!The macro is used correctly to define the
compact_region
function.
Line range hint
56-56
:
LGTM!The function signature is defined correctly and follows best practices.
82-82
: LGTM!The test for the
flush_region
function is correct and covers the necessary cases.
Line range hint
84-84
:
LGTM!The test for the
compact_region
function is correct and covers the necessary cases.tests/cases/standalone/common/flow/flow_basic.result (11)
1-6
: Table creation looks good.The table
numbers_input_basic
is created with appropriate columns and constraints.
10-13
: Flow creation looks good.The flow
test_numbers_basic
is created to sink results toout_num_cnt_basic
.
17-18
: Clarify the necessity of flushing the flow here.The TODO comment suggests confirming the necessity of flushing the flow at this point.
Is it necessary to flush the flow here, or can it be removed?
26-29
: Data insertion looks good.The data is inserted into the
numbers_input_basic
table with appropriate values.
33-39
: Flushing the flow looks good.The
flush_flow
function is used to flush the flowtest_numbers_basic
.
41-47
: Querying the flow results looks good.The query retrieves the expected results from the
out_num_cnt_basic
table.
49-55
: Flushing the flow looks good.The
flush_flow
function is used again to flush the flowtest_numbers_basic
.
57-60
: Additional data insertion looks good.The additional data is inserted into the
numbers_input_basic
table with appropriate values.
64-70
: Flushing the flow looks good.The
flush_flow
function is used again to flush the flowtest_numbers_basic
.
72-79
: Final query looks good.The final query retrieves the expected results from the
out_num_cnt_basic
table.
81-91
: Cleanup commands look good.The flow and tables are dropped as expected.
src/common/macro/src/lib.rs (1)
Line range hint
76-89
:
Documentation update looks good.The documentation for the
admin_fn
attribute macro is updated to includeFlowServiceHandlerRef
as an accepted type for the first argument. The note about usage within thecommon-function
crate is also clear.src/query/src/query_engine.rs (3)
108-116
: Constructor update looks good.The
flow_service_handler
parameter is added to theQueryEngineFactory
constructor. The changes are consistent and correctly integrated.
Line range hint
127-136
:
Constructor with plugins update looks good.The
flow_service_handler
parameter is also added to theQueryEngineFactory
constructor with plugins. The changes are consistent and correctly integrated.
170-170
: Test case update looks good.The test case for the
QueryEngineFactory
is updated to reflect the new parameterflow_service_handler
. The changes are consistent and correctly integrated.src/query/src/query_engine/default_serializer.rs (1)
143-143
: Ensure the new parameter is correctly handled.The addition of the new parameter
None
to theQueryEngineFactory
constructor should be verified to ensure it is correctly handled within the factory. Ensure that this change does not introduce any side effects or require additional handling in the factory implementation.src/query/src/tests/query_engine_test.rs (3)
50-50
: Ensure the new parameter is correctly handled.The addition of the new parameter
None
to theQueryEngineFactory
constructor should be verified to ensure it is correctly handled within the factory. Ensure that this change does not introduce any side effects or require additional handling in the factory implementation.
132-132
: Ensure the new parameter is correctly handled.The addition of the new parameter
None
to theQueryEngineFactory::new_with_plugins
constructor should be verified to ensure it is correctly handled within the factory. Ensure that this change does not introduce any side effects or require additional handling in the factory implementation.
162-162
: Ensure the new parameter is correctly handled.The addition of the new parameter
None
to theQueryEngineFactory
constructor should be verified to ensure it is correctly handled within the factory. Ensure that this change does not introduce any side effects or require additional handling in the factory implementation.src/flow/src/error.rs (1)
153-153
: Enhance error reporting with#[snafu(source)]
attribute.The addition of the
#[snafu(source)]
attribute to theraw
field in theDatafusion
variant improves error reporting by providing more context about the underlying cause of the error. This change is beneficial for debugging and logging purposes.src/script/benches/py_benchmark.rs (1)
56-56
: Verify the correctness of additionalNone
arguments.The
QueryEngineFactory::new
call now includes additionalNone
arguments. Ensure that these changes align with the updated constructor and do not introduce any issues.src/query/src/tests/time_range_filter_test.rs (1)
105-106
: Verify the correctness of additionalNone
arguments.The
QueryEngineFactory::new
call now includes additionalNone
arguments. Ensure that these changes align with the updated constructor and do not introduce any issues.src/frontend/src/instance/builder.rs (3)
23-23
: Import statement is correct.The import of
FlowMetadataManager
is necessary for the integration in theFrontendBuilder
.
28-28
: Import statement is correct.The import of
FlowServiceOperator
is necessary for the integration in theFrontendBuilder
.
158-166
: Instantiation and integration ofFlowMetadataManager
andFlowServiceOperator
are correct.The changes are necessary for the new functionality related to flow management.
src/flow/src/compute/render/src_sink.rs (2)
132-135
: Approved: Added debug log for visibility.The debug log provides better visibility into the number of rows being sent, which is useful for monitoring and debugging.
139-139
: Approved: Added error log for better error handling.The error log explicitly indicates when the
UnboundedSink
is closed, improving the error handling mechanism.src/servers/tests/mod.rs (1)
218-218
: Verify the impact of the addedNone
argument.The additional
None
argument in theQueryEngineFactory::new
method may affect the configuration of thequery_engine
. Ensure that this change does not introduce any issues and verify its impact on the function's behavior and initialization of underlying components.Verification successful
Verify the implementation of
QueryEngineFactory::new
to handle the additionalNone
argument.The additional
None
argument in theQueryEngineFactory::new
method is consistently used across various files. Ensure that the implementation of this method correctly handles the added argument and does not introduce any issues.
The additional
None
argument in theQueryEngineFactory::new
method is correctly handled.The implementation of the
QueryEngineFactory::new
method insrc/query/src/query_engine.rs
correctly accepts the additionalNone
argument and delegates it tonew_with_plugins
. The consistent use of this new signature across the codebase indicates that the change is intentional and does not introduce any issues.
src/query/src/query_engine.rs
: Implementation ofQueryEngineFactory::new
method.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the impact of the added `None` argument in the `QueryEngineFactory::new` method. # Test: Search for the `QueryEngineFactory::new` method usage. Expect: Only occurances of the new signature. rg --type rust -A 5 $'QueryEngineFactory::new'Length of output: 7326
Script:
#!/bin/bash # Description: Inspect the implementation of the `QueryEngineFactory::new` method to verify handling of the additional `None` argument. # Search for the implementation of `QueryEngineFactory::new` method. ast-grep --lang rust --pattern 'impl QueryEngineFactory { pub fn new($_, $_, $_, $_, $_, $_, $_) -> $_ { $$$ } }'Length of output: 123
Script:
#!/bin/bash # Description: Locate the file containing the `QueryEngineFactory` implementation and inspect the `new` method. # Locate the file containing the `QueryEngineFactory` implementation. rg --type rust 'impl QueryEngineFactory' -l | xargs -I {} sh -c 'echo "File: {}"; rg --type rust "pub fn new" {} -A 10'Length of output: 1111
src/flow/src/adapter/flownode_impl.rs (3)
102-102
: Approved: Added debug log for visibility.The debug log provides better visibility into the start of the flush operation, which is useful for monitoring and debugging.
107-125
: Approved: Implemented flush operation.The flush operation is correctly implemented, ensuring that all sender data is flushed, available tasks are run, and writeback requests are sent. The debug log at the end provides useful information about the flush operation's outcome.
103-106
: Verify the use of sleep.The 100 ms sleep is used to ensure that input data is properly sent before the flush occurs. Verify if this is the best approach and if there are any potential issues.
src/common/macro/src/admin_fn.rs (3)
168-171
: Addition ofFlowServiceHandlerRef
is appropriate.The new match arm for
FlowServiceHandlerRef
follows the existing pattern and is necessary for the newflush_flow
function.
190-196
: Type path updates improve clarity.The fully qualified paths for
ConcreteDataType
andFunctionContext
enhance type clarity and ensure correct type references.
254-256
: Error handling improvements are well-implemented.Using
common_telemetry::error!
andsnafu::Location::default()
aligns the logging mechanism with the broader telemetry framework and provides better context in error reporting.Cargo.toml (1)
122-122
: Dependency update togreptime-proto
is appropriate.The revision identifier for the
greptime-proto
Git repository has been updated, indicating an upgrade to a newer commit. This may include bug fixes, new features, or other modifications.src/flow/src/expr/relation/func.rs (2)
24-24
: Import update forIntoError
is necessary.Including
IntoError
from thesnafu
crate is necessary for the new error handling approach.
204-207
: Error handling modifications improve clarity and propagation.Using the
into_error
method enhances clarity and potentially improves error propagation by ensuring that the original error is included in the resulting error object.src/cmd/src/cli/repl.rs (1)
292-292
: Verify the necessity and future use of the new parameter.The new parameter
new_param
is currently set toNone
and is not used within the function. Ensure that this parameter is necessary and will be utilized in future updates.src/common/query/src/error.rs (2)
242-247
: LGTM! The new error variants are well-defined.The new error variants
MissingFlowServiceHandler
andFlownodeNotFound
are well-defined and include thelocation
field for better context in error reporting.Also applies to: 262-266
284-285
: LGTM! The new error variants are appropriately mapped to status codes.The new error variants
FlownodeNotFound
andMissingFlowServiceHandler
have been appropriately mapped toStatusCode::EngineExecuteQuery
andStatusCode::Unexpected
respectively.Also applies to: 299-299
src/query/src/query_engine/state.rs (1)
23-25
: LGTM! The new handler is well-integrated.The new handler
FlowServiceHandlerRef
is well-integrated into theQueryEngineState
struct and its initialization logic.Also applies to: 88-88, 144-144
src/flow/src/transform.rs (2)
303-303
: LGTM!The additional
None
argument in theQueryEngineFactory
constructor call expands configuration options without altering existing behavior.
140-145
: LGTM! Ensure correctness of theSimplifyExpressions
optimization.The addition of the
SimplifyExpressions
optimization should enhance performance. Ensure that the optimization does not introduce any unexpected behavior.tests/cases/standalone/common/flow/flow_call_df_func.result (3)
1-49
: LGTM!The test case for
sum(abs(number))
is comprehensive, covering table creation, data insertion, and result verification using theflush_flow
function.
94-174
: LGTM!The test case for
abs(sum(number))
is comprehensive, covering table creation, data insertion, and result verification using theflush_flow
function.
187-370
: LGTM!The test case for
date_bin
anddate_trunc
is comprehensive, covering table creation, data insertion, and result verification using theflush_flow
function.src/flow/src/server.rs (1)
278-278
: LGTM!The additional
None
argument in theQueryEngineFactory::new_with_plugins
function call expands configuration options without altering existing behavior.src/flow/src/adapter/worker.rs (1)
349-355
: Verify response handling forRequest::RunAvail
.Ensure that the response handling for the
Request::RunAvail
variant is correct, especially whenblocking
is true.Verification successful
Verify implementations of
call_with_resp
andcall_no_resp
forRequest::RunAvail
.Ensure that the response handling for the
Request::RunAvail
variant is correct in bothcall_with_resp
andcall_no_resp
.
Gather full implementations of
call_with_resp
andcall_no_resp
forRequest::RunAvail
.Examine the implementations to ensure proper response handling for the
Request::RunAvail
variant.
Verified response handling for
Request::RunAvail
.The response handling for the
Request::RunAvail
variant is correctly implemented in bothcall_with_resp
andcall_no_resp
.
call_with_resp
correctly sends the request and awaits a response.call_no_resp
correctly sends the request without expecting a response.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify response handling for `Request::RunAvail` variant. # Test: Search for `Request::RunAvail` handling. Expect: Proper response handling. rg --type rust 'Request::RunAvail'Length of output: 316
Script:
#!/bin/bash # Description: Verify implementations of `call_with_resp` and `call_no_resp` for `Request::RunAvail`. # Search for `call_with_resp` and `call_no_resp` implementations. rg --type rust 'fn call_with_resp' rg --type rust 'fn call_no_resp'Length of output: 275
Script:
#!/bin/bash # Description: Gather full implementations of `call_with_resp` and `call_no_resp` for `Request::RunAvail`. # Extract full implementations of `call_with_resp` and `call_no_resp`. rg --type rust -A 20 'async fn call_with_resp' src/flow/src/adapter/worker.rs rg --type rust -A 20 'fn call_no_resp' src/flow/src/adapter/worker.rsLength of output: 1444
src/script/src/python/engine.rs (1)
401-401
: Verify handling of additional parameter inQueryEngineFactory
.Ensure that the additional
None
parameter in theQueryEngineFactory
instantiation is handled correctly.Verification successful
The additional
None
parameter inQueryEngineFactory
is handled correctly.The
new
method ofQueryEngineFactory
correctly forwards all parameters, including the additionalNone
parameter, to thenew_with_plugins
method, which manages them appropriately.
src/query/src/query_engine.rs
:
- Line 103-120: Implementation of
QueryEngineFactory::new
method.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify handling of additional `None` parameter in `QueryEngineFactory`. # Test: Search for `QueryEngineFactory` instantiation. Expect: Proper handling of additional parameter. rg --type rust 'QueryEngineFactory'Length of output: 3340
Script:
#!/bin/bash # Extract the implementation of the `QueryEngineFactory::new` method in `src/query/src/query_engine.rs`. ast-grep --lang rust --pattern $'impl QueryEngineFactory { pub fn new($_, $_, $_, $_, $_, $_, $_) { $$$ } }'Length of output: 117
Script:
#!/bin/bash # Extract all function implementations within the `impl QueryEngineFactory` block in `src/query/src/query_engine.rs`. ast-grep --lang rust --pattern $'impl QueryEngineFactory { $$$ }'Length of output: 3393
src/flow/src/repr/relation.rs (1)
379-381
: LGTM! Verify the usage of thecontext
method.The change improves error reporting by adding contextual information. Ensure that the
context
method is used correctly.Verification successful
LGTM! Verify the usage of the
context
method.The change improves error reporting by adding contextual information. Ensure that the
context
method is used correctly.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the usage of the `context` method from the `snafu` crate. # Test: Search for the `context` method usage. Expect: Proper usage of the `context` method. rg --type rust 'context'Length of output: 391165
src/datanode/src/datanode.rs (1)
324-324
: LGTM! But verify the instantiation ofQueryEngineFactory
.The code change is approved.
However, ensure that all instantiations of
QueryEngineFactory
are updated to include the new parameter.Verification successful
Verification Complete: All instances of
QueryEngineFactory
include the new parameter.The code change is verified, and all instantiations of
QueryEngineFactory
have been updated to include the new parameter.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify all instantiations of `QueryEngineFactory` include the new parameter. # Test: Search for the instantiation of `QueryEngineFactory`. Expect: Only occurrences with the new parameter. rg --type rust -A 5 $'QueryEngineFactory::new'Length of output: 7326
src/query/src/datafusion.rs (1)
548-548
: LGTM! But verify the instantiation ofQueryEngineFactory
.The code change is approved.
However, ensure that all instantiations of
QueryEngineFactory
are updated to include the new parameter.Verification successful
All instantiations of
QueryEngineFactory
include the new parameter.The code change is verified, and all instances of
QueryEngineFactory::new
have been updated to include the additionalNone
parameter.
- src/script/src/test.rs
- src/servers/tests/mod.rs
- src/script/benches/py_benchmark.rs
- src/script/src/python/engine.rs
- src/query/src/tests.rs
- src/query/src/query_engine.rs
- src/query/src/datafusion.rs
- src/query/src/query_engine/default_serializer.rs
- src/query/src/tests/time_range_filter_test.rs
- src/query/src/tests/query_engine_test.rs
- src/query/src/range_select/plan_rewrite.rs
- src/flow/src/transform.rs
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify all instantiations of `QueryEngineFactory` include the new parameter. # Test: Search for the instantiation of `QueryEngineFactory`. Expect: Only occurrences with the new parameter. rg --type rust -A 5 $'QueryEngineFactory::new'Length of output: 7326
src/flow/src/adapter.rs (1)
565-565
: LGTM! But verify the usage of therun_available
method.The code change is approved.
However, ensure that all calls to the
run_available
method are updated to include the newblocking
parameter.Verification successful
Verified: All calls to the
run_available
method have been updated to include the newblocking
parameter.The code change is approved, and no further issues were found.
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify all calls to the `run_available` method include the new `blocking` parameter. # Test: Search for the calls to the `run_available` method. Expect: Only occurrences with the new parameter. rg --type rust -A 5 $'run_available'Length of output: 7661
src/flow/src/expr/scalar.rs (1)
367-367
: Improved error handling withcontext
method.The use of the
context
method from theSnafu
library provides more descriptive error messages, improving the clarity and maintainability of the code.src/flow/src/transform/expr.rs (2)
99-99
: Improved error handling withcontext
method.The use of the
context
method from theSnafu
library provides more descriptive error messages, improving the clarity and maintainability of the code.
106-107
: Improved error handling withcontext
method.The use of the
context
method from theSnafu
library provides more descriptive error messages, improving the clarity and maintainability of the code.src/query/src/range_select/plan_rewrite.rs (1)
611-611
: Verify the handling of the additional parameter inQueryEngineFactory
.The constructor call for
QueryEngineFactory
now includes an additionalNone
parameter, expanding the parameters from four to five. Ensure that theQueryEngineFactory
constructor and related methods correctly handle this new parameter.Verification successful
The additional parameter in
QueryEngineFactory
is correctly handled.The
QueryEngineFactory
constructor has been updated to include an additionalNone
parameter (flow_service_handler
), which is correctly passed and utilized within thenew_with_plugins
method. This ensures that the new parameter is properly integrated into theQueryEngineFactory
instantiation process.
src/query/src/query_engine.rs
: Verified handling of the additional parameter in theQueryEngineFactory
constructor.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the handling of the additional parameter in `QueryEngineFactory`. # Test: Search for the `QueryEngineFactory` constructor definition and its usage. Expect: Correct handling of the new parameter. rg --type rust -A 5 $'QueryEngineFactory::new'Length of output: 7326
Script:
#!/bin/bash # Description: Locate the definition of the `QueryEngineFactory` constructor to verify the handling of the additional parameter. # Search for the definition of `QueryEngineFactory` constructor. rg --type rust 'impl QueryEngineFactory' -A 50Length of output: 3380
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #4416 +/- ##
==========================================
- Coverage 85.03% 84.70% -0.34%
==========================================
Files 1075 1076 +1
Lines 192544 192585 +41
==========================================
- Hits 163739 163132 -607
- Misses 28805 29453 +648 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM 👍
I hereby agree to the terms of the GreptimeDB CLA.
Refer to a related PR or issue link (optional)
What's changed and what's your intention?
added a admin function
flush_flow
, which will signal flow worker to flush flow immediately and write newest results to sink table.flush_flow
admin_fn, also tweakadmin_fn
macro to support itadmin_fn
macro and use as many full path as possibleFlowServiceOperator
to support flush flow in contextflush_flow
For ease of reviewing, the major changes are listed in the following files, others are just mostly adding new args when creating
QueryEngine
:src/common/function/src/flush_flow.rs
for implflush_flow
function in sqlsrc/flow/src/adapter/flownode_impl.rs
to impl flush flow in flow worker, note it will first sleep for 100 ms now before doing anything to prevent running flush before inputs are send to flowsrc/common/function/src/handlers.rs
andsrc/operator/src/flow.rs
whichimpl FlowServiceHandler for FlowServiceOperator
SQLNESS SLEEP <TIME>
toselect flush_flow('test_numbers_basic')*0;
Checklist
Summary by CodeRabbit
New Features
FlowServiceHandler
andFlowServiceOperator
for improved flow management.Bug Fixes
Documentation
admin_fn
macro to include support forFlowServiceHandlerRef
.Chores