Skip to content

Conversation

@Cpaulyz
Copy link
Contributor

@Cpaulyz Cpaulyz commented Mar 19, 2025

Description

  1. Add GroupNode and parallelize TableFunctionProcessorNode https://apache-iotdb-project.feishu.cn/wiki/Us6uwGvtNiIhrCkG2DVcRb8Knsf
  2. Add built-in function: HOP, SESSION, VARIATION, CAPACITY
  3. Add some execution optimizations.

@codecov
Copy link

codecov bot commented Mar 19, 2025

Codecov Report

Attention: Patch coverage is 43.91026% with 350 lines in your changes missing coverage. Please review.

Project coverage is 39.41%. Comparing base (d34f1c5) to head (9b392bc).
Report is 50 commits behind head on master.

Files with missing lines Patch % Lines
...f/builtin/relational/tvf/SessionTableFunction.java 0.00% 51 Missing ⚠️
...builtin/relational/tvf/VariationTableFunction.java 0.00% 51 Missing ⚠️
...erator/process/function/TableFunctionOperator.java 0.00% 39 Missing ⚠️
.../builtin/relational/tvf/CapacityTableFunction.java 0.00% 37 Missing ⚠️
...nal/planner/optimizations/ParallelizeGrouping.java 68.42% 30 Missing ⚠️
...regation/grouped/StreamingAggregationOperator.java 0.00% 19 Missing ⚠️
...tion/grouped/StreamingHashAggregationOperator.java 21.05% 15 Missing ⚠️
...ngine/plan/planner/plan/node/PlanGraphPrinter.java 0.00% 13 Missing ⚠️
...engine/plan/relational/planner/node/GroupNode.java 38.09% 13 Missing ⚠️
...nner/distribute/TableDistributedPlanGenerator.java 89.42% 11 Missing ⚠️
... and 16 more
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #15136      +/-   ##
============================================
+ Coverage     39.40%   39.41%   +0.01%     
  Complexity      193      193              
============================================
  Files          4612     4630      +18     
  Lines        296784   298303    +1519     
  Branches      37045    37220     +175     
============================================
+ Hits         116938   117568     +630     
- Misses       179846   180735     +889     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@Cpaulyz Cpaulyz changed the title Add GroupNode and parallelize TableFunctionProcessorNode 2 Add GroupNode and parallelize TableFunctionProcessorNode Mar 24, 2025
Copy link
Contributor

@JackieTien97 JackieTien97 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a new subClass of ProjectOffPushDownRule for GroupNode

compareKey.rowIndex = low + (high - low) / 2;
int cmp = partitionComparator.compare(currentPartitionKey, compareKey);
if (cmp == 0) {
low = compareKey.rowIndex + 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need update low? otherwise, it will be dead-loop

private ListenableFuture<?> isBlocked;
private boolean finished = false;

private Queue<TsBlock> resultTsBlocks;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private Queue<TsBlock> resultTsBlocks;
private final Queue<TsBlock> resultTsBlocks;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
PlanNodeType.TABLE_GROUP_NODE.serialize(byteBuffer);
orderingScheme.serialize(byteBuffer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better add a new protected method in SortNode, to extract these common code together, then if SortNode add some new attributes, we won't need to change subClass of it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StreamSortNode has the same issue. Actually, there are some variables in SortNode is not used in StreamSortNode and GroupNode. So if SortNode add some new attributes, maybe it won't affact the logic of subClass?

@Cpaulyz
Copy link
Contributor Author

Cpaulyz commented Mar 26, 2025

...org/apache/iotdb/db/queryengine/execution/operator/process/function/PartitionRecognizer.java

GroupNode is a subClass of SortNode, so I think it projection push down will be executed in ·ProjectOffPushDownRule<SortNode>

private static class CountDataProcessor implements TableFunctionDataProcessor {

private final long size;
private final List<Long> currentRowIndexes = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no need to save this list? we can simplely record a curStartIndex and then [startStartIndex, curIndex] is for passThroughIndexBuilder

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea! Change List<Long> currentRowIndexes to long currentStartIndex

Comment on lines 103 to 104
private long windowStart = -1;
private long windowEnd = -1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if time is negative? better using Long.MIN_VALUE

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

private static class SessionDataProcessor implements TableFunctionDataProcessor {

private final long gap;
private final List<Long> currentRowIndexes = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to use list to save all

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Comment on lines 136 to 140
for (Long currentRowIndex : currentRowIndexes) {
properColumnBuilders.get(0).writeLong(windowStart);
properColumnBuilders.get(1).writeLong(windowEnd - gap);
passThroughIndexBuilder.writeLong(currentRowIndex);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for (Long currentRowIndex : currentRowIndexes) {
properColumnBuilders.get(0).writeLong(windowStart);
properColumnBuilders.get(1).writeLong(windowEnd - gap);
passThroughIndexBuilder.writeLong(currentRowIndex);
}
long currentWindowEnd = windowEnd - gap;
for (Long currentRowIndex : currentRowIndexes) {
properColumnBuilders.get(0).writeLong(windowStart);
properColumnBuilders.get(1).writeLong(currentWindowEnd);
passThroughIndexBuilder.writeLong(currentRowIndex);
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Comment on lines +45 to +46
throw new UDFException(
String.format("The type of the column [%s] is not as expected.", expectedFieldName));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
throw new UDFException(
String.format("The type of the column [%s] is not as expected.", expectedFieldName));
throw new IoTDBRuntimeException(String.format("The type of the column [%s] is not as expected.", expectedFieldName), TSStatusCode.SEMANTIC_ERROR.getStatusCode());

Comment on lines +51 to +53
throw new UDFException(
String.format(
"Required column [%s] not found in the source table argument.", expectedFieldName));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
throw new UDFException(
String.format(
"Required column [%s] not found in the source table argument.", expectedFieldName));
throw new IoTDBRuntimeException(
String.format(
"Required column [%s] not found in the source table argument.", expectedFieldName), TSStatusCode.SEMANTIC_ERROR.getStatusCode());

private static class VariationDataProcessor implements TableFunctionDataProcessor {

private final double gap;
private final List<Long> currentRowIndexes = new ArrayList<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to be a list

public class ParallelizeGrouping implements PlanOptimizer {
@Override
public PlanNode optimize(PlanNode plan, PlanOptimizer.Context context) {
if (!(context.getAnalysis().isQuery())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only query plan with GroupNode should continue

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How can I know if query plan is with GroupNode or not?

Copy link
Contributor

@JackieTien97 JackieTien97 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add catch(UDFException e) in StatementAnalyzer.visitTableFunctionInvocation and then extract error msg, rethrow a SematicException

List<ColumnBuilder> properColumnBuilders,
ColumnBuilder passThroughIndexBuilder) {
if (currentRowIndexes.size() >= size) {
if (windowIndex - curIndex == size) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (windowIndex - curIndex == size) {
if (curIndex - currentStartIndex == size) {

@sonarqubecloud
Copy link

sonarqubecloud bot commented Apr 1, 2025

Quality Gate Failed Quality Gate failed

Failed conditions
7.3% Duplication on New Code (required ≤ 5%)
B Reliability Rating on New Code (required ≥ A)

See analysis details on SonarQube Cloud

Catch issues before they fail your Quality Gate with our IDE extension SonarQube for IDE

@JackieTien97 JackieTien97 merged commit 7c30113 into master Apr 1, 2025
52 of 53 checks passed
@JackieTien97 JackieTien97 deleted the builtin-udtf branch April 1, 2025 11:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants