Refactor usage of WorkerMetadata and StageMetadata#10756
Refactor usage of WorkerMetadata and StageMetadata#10756xiangfu0 wants to merge 2 commits intoapache:masterfrom
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #10756 +/- ##
============================================
+ Coverage 68.48% 70.34% +1.86%
- Complexity 6463 6475 +12
============================================
Files 2152 2152
Lines 115789 115780 -9
Branches 17500 17498 -2
============================================
+ Hits 79295 81445 +2150
+ Misses 30885 28661 -2224
- Partials 5609 5674 +65
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
aacac59 to
a58d3c7
Compare
a58d3c7 to
36eee3a
Compare
36eee3a to
2a0b6c6
Compare
| return new DistributedStagePlan(stageId, serverAddress, | ||
| dispatchableSubPlan.getQueryStageList().get(stageId).getPlanFragment().getFragmentRoot(), | ||
| dispatchableSubPlan.getQueryStageList().get(stageId).toStageMetadata()); | ||
| StageMetadata.from(dispatchableSubPlan.getQueryStageList().get(stageId)), |
There was a problem hiding this comment.
the right way to solve this is do not construct DistributedStagePlan on broker. instead construct the proto object directly and when deserialize, put them into multiple DistributedStagePlan
There was a problem hiding this comment.
e.g. intead of doing this in Dispatcher
_executorService.submit(() -> client.submit(Worker.QueryRequest.newBuilder().setStagePlan(
QueryPlanSerDeUtils.serialize(
constructDistributedStagePlan(dispatchableSubPlan, finalStageId, virtualServerAddress)))
.putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_ID, String.valueOf(requestId))
.putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS, String.valueOf(timeoutMs))
.putAllMetadata(queryOptions).build(), finalStageId, queryServerInstance, deadline,
dispatchCallbacks::offer));
- directly serialized the dispatchableSubPlan to proto format:
QueryPlanSerDeUtils.serialize(dispatchableSubPlan, finalStageId, virtualServerAddress)
- when deserialize, deserialize it into multiple distributed plan
List<DistributedStagePlan> QueryPlanSerDeUtils.deserialize(Worker.StagerPlan stagePlan)
| string virtualAddress = 2; | ||
| StageNode stageRoot = 3; | ||
| StageMetadata stageMetadata = 4; | ||
| WorkerMetadata workerMetadata = 5; |
There was a problem hiding this comment.
this might not be the right abstraction for proto.
we want to send a single proto request from broker to server to launch all workers, not sending 1 grpc request per worker which will be inefficient. let's still keep the wire protocol as a list.
|
this can be closed as covered by #10791 |
pinot-query-plannertopinot-query-runtimepackageWorkerMetadataforOpChainExecutionContextWorkerMetadataout fromStageMetadatatoDistributedStagePlan