From eb1052cad15a6fadd75069b49ddc47832c426290 Mon Sep 17 00:00:00 2001 From: Rajat Venkatesh <1638298+vrajat@users.noreply.github.com> Date: Mon, 27 Jan 2025 10:52:25 +0530 Subject: [PATCH 1/9] Add ServerExecutionInfo to replace pairs maintaining the same info. --- .../api/resources/PinotBrokerDebug.java | 12 ++-- .../BaseSingleStageBrokerRequestHandler.java | 17 ++--- .../GrpcBrokerRequestHandler.java | 12 ++-- .../SingleConnectionBrokerRequestHandler.java | 6 +- .../broker/routing/BrokerRoutingManager.java | 17 ++--- .../pinot/core/routing/RoutingTable.java | 7 +- .../core/routing/ServerExecutionInfo.java | 66 +++++++++++++++++++ .../pinot/core/transport/QueryRouter.java | 19 +++--- .../pinot/query/routing/WorkerManager.java | 9 +-- .../planner/physical/TableScanVisitor.java | 2 +- 10 files changed, 117 insertions(+), 50 deletions(-) create mode 100644 pinot-core/src/main/java/org/apache/pinot/core/routing/ServerExecutionInfo.java diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java index a220bc53a55d..b890edeacecd 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java @@ -45,7 +45,6 @@ import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.broker.broker.AccessControlFactory; import org.apache.pinot.broker.queryquota.QueryQuotaManager; import org.apache.pinot.broker.routing.BrokerRoutingManager; @@ -56,6 +55,7 @@ import org.apache.pinot.core.auth.ManualAuthorization; import org.apache.pinot.core.auth.TargetType; import org.apache.pinot.core.routing.RoutingTable; +import org.apache.pinot.core.routing.ServerExecutionInfo; import org.apache.pinot.core.routing.TimeBoundaryInfo; import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager; @@ -157,11 +157,11 @@ public Map>> getRoutingTable( @ApiResponse(code = 404, message = "Routing not found"), @ApiResponse(code = 500, message = "Internal server error") }) - public Map, List>>> getRoutingTableWithOptionalSegments( + public Map> getRoutingTableWithOptionalSegments( @ApiParam(value = "Name of the table") @PathParam("tableName") String tableName, @Context HttpHeaders headers) { tableName = DatabaseUtils.translateTableName(tableName, headers); - Map, List>>> result = new TreeMap<>(); + Map> result = new TreeMap<>(); getRoutingTable(tableName, (tableNameWithType, routingTable) -> result.put(tableNameWithType, routingTable.getServerInstanceToSegmentsMap())); if (!result.isEmpty()) { @@ -192,9 +192,9 @@ private void getRoutingTable(String tableName, BiConsumer } private static Map> removeOptionalSegments( - Map, List>> serverInstanceToSegmentsMap) { + Map serverInstanceToSegmentsMap) { Map> ret = new HashMap<>(); - serverInstanceToSegmentsMap.forEach((k, v) -> ret.put(k, v.getLeft())); + serverInstanceToSegmentsMap.forEach((k, v) -> ret.put(k, v.getSegmentList())); return ret; } @@ -231,7 +231,7 @@ public Map> getRoutingTableForQuery( @ApiResponse(code = 404, message = "Routing not found"), @ApiResponse(code = 500, message = "Internal server error") }) - public Map, List>> getRoutingTableForQueryWithOptionalSegments( + public Map getRoutingTableForQueryWithOptionalSegments( @ApiParam(value = "SQL query (table name should have type suffix)") @QueryParam("query") String query, @Context HttpHeaders httpHeaders) { BrokerRequest brokerRequest = CalciteSqlCompiler.compileToBrokerRequest(query); diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java index b8c04140dc74..13cf368c3b96 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java @@ -85,6 +85,7 @@ import org.apache.pinot.core.auth.TargetType; import org.apache.pinot.core.query.optimizer.QueryOptimizer; import org.apache.pinot.core.routing.RoutingTable; +import org.apache.pinot.core.routing.ServerExecutionInfo; import org.apache.pinot.core.routing.TimeBoundaryInfo; import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.core.util.GapfillUtils; @@ -617,8 +618,8 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO // Calculate routing table for the query // TODO: Modify RoutingManager interface to directly take PinotQuery long routingStartTimeNs = System.nanoTime(); - Map, List>> offlineRoutingTable = null; - Map, List>> realtimeRoutingTable = null; + Map offlineRoutingTable = null; + Map realtimeRoutingTable = null; List unavailableSegments = new ArrayList<>(); int numPrunedSegmentsTotal = 0; boolean offlineTableDisabled = false; @@ -633,7 +634,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO } if (routingTable != null) { unavailableSegments.addAll(routingTable.getUnavailableSegments()); - Map, List>> serverInstanceToSegmentsMap = + Map serverInstanceToSegmentsMap = routingTable.getServerInstanceToSegmentsMap(); if (!serverInstanceToSegmentsMap.isEmpty()) { offlineRoutingTable = serverInstanceToSegmentsMap; @@ -654,7 +655,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO } if (routingTable != null) { unavailableSegments.addAll(routingTable.getUnavailableSegments()); - Map, List>> serverInstanceToSegmentsMap = + Map serverInstanceToSegmentsMap = routingTable.getServerInstanceToSegmentsMap(); if (!serverInstanceToSegmentsMap.isEmpty()) { realtimeRoutingTable = serverInstanceToSegmentsMap; @@ -1872,9 +1873,9 @@ private static void attachTimeBoundary(PinotQuery pinotQuery, TimeBoundaryInfo t */ protected abstract BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest, BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest, - @Nullable Map, List>> offlineRoutingTable, + @Nullable Map offlineRoutingTable, @Nullable BrokerRequest realtimeBrokerRequest, - @Nullable Map, List>> realtimeRoutingTable, long timeoutMs, + @Nullable Map realtimeRoutingTable, long timeoutMs, ServerStats serverStats, RequestContext requestContext) throws Exception; @@ -1904,8 +1905,8 @@ private static class QueryServers { final String _query; final Set _servers = new HashSet<>(); - QueryServers(String query, @Nullable Map, List>> offlineRoutingTable, - @Nullable Map, List>> realtimeRoutingTable) { + QueryServers(String query, @Nullable Map offlineRoutingTable, + @Nullable Map realtimeRoutingTable) { _query = query; if (offlineRoutingTable != null) { _servers.addAll(offlineRoutingTable.keySet()); diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java index 0484476a410b..aa7c3393d600 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java @@ -26,7 +26,6 @@ import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; -import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.broker.broker.AccessControlFactory; import org.apache.pinot.broker.queryquota.QueryQuotaManager; import org.apache.pinot.broker.routing.BrokerRoutingManager; @@ -38,6 +37,7 @@ import org.apache.pinot.common.utils.grpc.GrpcQueryClient; import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder; import org.apache.pinot.core.query.reduce.StreamingReduceService; +import org.apache.pinot.core.routing.ServerExecutionInfo; import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.core.transport.ServerRoutingInstance; import org.apache.pinot.spi.config.table.TableType; @@ -76,9 +76,9 @@ public void shutDown() { @Override protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest, BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest, - @Nullable Map, List>> offlineRoutingTable, + @Nullable Map offlineRoutingTable, @Nullable BrokerRequest realtimeBrokerRequest, - @Nullable Map, List>> realtimeRoutingTable, long timeoutMs, + @Nullable Map realtimeRoutingTable, long timeoutMs, ServerStats serverStats, RequestContext requestContext) throws Exception { // TODO: Support failure detection @@ -106,12 +106,12 @@ protected BrokerResponseNative processBrokerRequest(long requestId, BrokerReques * Query pinot server for data table. */ private void sendRequest(long requestId, TableType tableType, BrokerRequest brokerRequest, - Map, List>> routingTable, + Map routingTable, Map> responseMap, boolean trace) { - for (Map.Entry, List>> routingEntry : routingTable.entrySet()) { + for (Map.Entry routingEntry : routingTable.entrySet()) { ServerInstance serverInstance = routingEntry.getKey(); // TODO: support optional segments for GrpcQueryServer. - List segments = routingEntry.getValue().getLeft(); + List segments = routingEntry.getValue().getSegmentList(); String serverHost = serverInstance.getHostname(); int port = serverInstance.getGrpcPort(); // TODO: enable throttling on per host bases. diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java index 7ad9268b0d04..374223674f6f 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java @@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; -import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.broker.broker.AccessControlFactory; import org.apache.pinot.broker.failuredetector.FailureDetector; import org.apache.pinot.broker.failuredetector.FailureDetectorFactory; @@ -43,6 +42,7 @@ import org.apache.pinot.common.response.broker.QueryProcessingException; import org.apache.pinot.common.utils.config.QueryOptionsUtils; import org.apache.pinot.core.query.reduce.BrokerReduceService; +import org.apache.pinot.core.routing.ServerExecutionInfo; import org.apache.pinot.core.transport.AsyncQueryResponse; import org.apache.pinot.core.transport.QueryResponse; import org.apache.pinot.core.transport.QueryRouter; @@ -99,9 +99,9 @@ public void shutDown() { @Override protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest, BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest, - @Nullable Map, List>> offlineRoutingTable, + @Nullable Map offlineRoutingTable, @Nullable BrokerRequest realtimeBrokerRequest, - @Nullable Map, List>> realtimeRoutingTable, long timeoutMs, + @Nullable Map realtimeRoutingTable, long timeoutMs, ServerStats serverStats, RequestContext requestContext) throws Exception { assert offlineBrokerRequest != null || realtimeBrokerRequest != null; diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java index b6f82e070521..40e0a45d8e55 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java @@ -61,6 +61,7 @@ import org.apache.pinot.common.utils.HashUtil; import org.apache.pinot.core.routing.RoutingManager; import org.apache.pinot.core.routing.RoutingTable; +import org.apache.pinot.core.routing.ServerExecutionInfo; import org.apache.pinot.core.routing.TablePartitionInfo; import org.apache.pinot.core.routing.TimeBoundaryInfo; import org.apache.pinot.core.transport.ServerInstance; @@ -635,15 +636,15 @@ public RoutingTable getRoutingTable(BrokerRequest brokerRequest, long requestId) selectionResult.getUnavailableSegments(), selectionResult.getNumPrunedSegments()); } - private Map, List>> getServerInstanceToSegmentsMap(String tableNameWithType, + private Map getServerInstanceToSegmentsMap(String tableNameWithType, InstanceSelector.SelectionResult selectionResult) { - Map, List>> merged = new HashMap<>(); + Map merged = new HashMap<>(); for (Map.Entry entry : selectionResult.getSegmentToInstanceMap().entrySet()) { ServerInstance serverInstance = _enabledServerInstanceMap.get(entry.getValue()); if (serverInstance != null) { - Pair, List> pair = - merged.computeIfAbsent(serverInstance, k -> Pair.of(new ArrayList<>(), new ArrayList<>())); - pair.getLeft().add(entry.getKey()); + ServerExecutionInfo executionInfo = + merged.computeIfAbsent(serverInstance, k -> new ServerExecutionInfo(new ArrayList<>(), new ArrayList<>())); + executionInfo.getSegmentList().add(entry.getKey()); } else { // Should not happen in normal case unless encountered unexpected exception when updating routing entries _brokerMetrics.addMeteredTableValue(tableNameWithType, BrokerMeter.SERVER_MISSING_FOR_ROUTING, 1L); @@ -652,12 +653,12 @@ private Map, List>> getServerInstanceT for (Map.Entry entry : selectionResult.getOptionalSegmentToInstanceMap().entrySet()) { ServerInstance serverInstance = _enabledServerInstanceMap.get(entry.getValue()); if (serverInstance != null) { - Pair, List> pair = merged.get(serverInstance); + ServerExecutionInfo executionInfo = merged.get(serverInstance); // Skip servers that don't have non-optional segments, so that servers always get some non-optional segments // to process, to be backward compatible. // TODO: allow servers only with optional segments - if (pair != null) { - pair.getRight().add(entry.getKey()); + if (executionInfo != null) { + executionInfo.getOptionalSegmentList().add(entry.getKey()); } } // TODO: Report missing server metrics when we allow servers only with optional segments. diff --git a/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingTable.java b/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingTable.java index ccc6aedb81d1..38748fd9b37b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingTable.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingTable.java @@ -20,7 +20,6 @@ import java.util.List; import java.util.Map; -import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.core.transport.ServerInstance; @@ -29,18 +28,18 @@ public class RoutingTable { // the newly created consuming segments. Such segments were simply skipped by brokers at query routing time, but that // had caused wrong query results, particularly for upsert tables. Instead, we should pass such segments to servers // and let them decide how to handle them, e.g. skip them upon issues or include them for better query results. - private final Map, List/*optional segments*/>> _serverInstanceToSegmentsMap; + private final Map _serverInstanceToSegmentsMap; private final List _unavailableSegments; private final int _numPrunedSegments; - public RoutingTable(Map, List>> serverInstanceToSegmentsMap, + public RoutingTable(Map serverInstanceToSegmentsMap, List unavailableSegments, int numPrunedSegments) { _serverInstanceToSegmentsMap = serverInstanceToSegmentsMap; _unavailableSegments = unavailableSegments; _numPrunedSegments = numPrunedSegments; } - public Map, List>> getServerInstanceToSegmentsMap() { + public Map getServerInstanceToSegmentsMap() { return _serverInstanceToSegmentsMap; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/routing/ServerExecutionInfo.java b/pinot-core/src/main/java/org/apache/pinot/core/routing/ServerExecutionInfo.java new file mode 100644 index 000000000000..4db7fec749fa --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/routing/ServerExecutionInfo.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.routing; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.List; + +/** + * Class representing the execution information for a server. + * It contains the list of segments and optional segments assigned to the server. + */ +public class ServerExecutionInfo { + private final List _segmentList; + private final List _optionalSegmentList; + + /** + * Constructor for ServerExecutionInfo. + * + * @param segmentList List of segments assigned to the server. + * @param optionalSegmentList List of optional segments assigned to the server. + */ + @JsonCreator + public ServerExecutionInfo( + @JsonProperty("segmentList") List segmentList, + @JsonProperty("optionalSegmentList") List optionalSegmentList) { + _segmentList = segmentList; + _optionalSegmentList = optionalSegmentList; + } + + /** + * Gets the list of segments assigned to the server. + * + * @return List of segments. + */ + @JsonProperty("segmentList") + public List getSegmentList() { + return _segmentList; + } + + /** + * Gets the list of optional segments assigned to the server. + * + * @return List of optional segments. + */ + @JsonProperty("optionalSegmentList") + public List getOptionalSegmentList() { + return _optionalSegmentList; + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java index 2b7561186524..1b7441f5fc40 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java @@ -19,14 +19,12 @@ package org.apache.pinot.core.transport; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeoutException; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.apache.commons.collections4.CollectionUtils; -import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.common.config.NettyConfig; import org.apache.pinot.common.config.TlsConfig; import org.apache.pinot.common.datatable.DataTable; @@ -36,6 +34,7 @@ import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.common.request.InstanceRequest; import org.apache.pinot.common.utils.config.QueryOptionsUtils; +import org.apache.pinot.core.routing.ServerExecutionInfo; import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.utils.CommonConstants; @@ -89,9 +88,9 @@ public QueryRouter(String brokerId, BrokerMetrics brokerMetrics, @Nullable Netty public AsyncQueryResponse submitQuery(long requestId, String rawTableName, @Nullable BrokerRequest offlineBrokerRequest, - @Nullable Map, List>> offlineRoutingTable, + @Nullable Map offlineRoutingTable, @Nullable BrokerRequest realtimeBrokerRequest, - @Nullable Map, List>> realtimeRoutingTable, long timeoutMs) { + @Nullable Map realtimeRoutingTable, long timeoutMs) { assert offlineBrokerRequest != null || realtimeBrokerRequest != null; // can prefer but not require TLS until all servers guaranteed to be on TLS @@ -104,7 +103,7 @@ public AsyncQueryResponse submitQuery(long requestId, String rawTableName, Map requestMap = new HashMap<>(); if (offlineBrokerRequest != null) { assert offlineRoutingTable != null; - for (Map.Entry, List>> entry : offlineRoutingTable.entrySet()) { + for (Map.Entry entry : offlineRoutingTable.entrySet()) { ServerRoutingInstance serverRoutingInstance = entry.getKey().toServerRoutingInstance(TableType.OFFLINE, preferTls); InstanceRequest instanceRequest = getInstanceRequest(requestId, offlineBrokerRequest, entry.getValue()); @@ -113,7 +112,7 @@ public AsyncQueryResponse submitQuery(long requestId, String rawTableName, } if (realtimeBrokerRequest != null) { assert realtimeRoutingTable != null; - for (Map.Entry, List>> entry : realtimeRoutingTable.entrySet()) { + for (Map.Entry entry : realtimeRoutingTable.entrySet()) { ServerRoutingInstance serverRoutingInstance = entry.getKey().toServerRoutingInstance(TableType.REALTIME, preferTls); InstanceRequest instanceRequest = getInstanceRequest(requestId, realtimeBrokerRequest, entry.getValue()); @@ -213,7 +212,7 @@ void markQueryDone(long requestId) { } private InstanceRequest getInstanceRequest(long requestId, BrokerRequest brokerRequest, - Pair, List> segments) { + ServerExecutionInfo segments) { InstanceRequest instanceRequest = new InstanceRequest(); instanceRequest.setRequestId(requestId); instanceRequest.setQuery(brokerRequest); @@ -221,13 +220,13 @@ private InstanceRequest getInstanceRequest(long requestId, BrokerRequest brokerR if (queryOptions != null) { instanceRequest.setEnableTrace(Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.TRACE))); } - instanceRequest.setSearchSegments(segments.getLeft()); + instanceRequest.setSearchSegments(segments.getSegmentList()); instanceRequest.setBrokerId(_brokerId); - if (CollectionUtils.isNotEmpty(segments.getRight())) { + if (CollectionUtils.isNotEmpty(segments.getOptionalSegmentList())) { // Don't set this field, i.e. leave it as null, if there is no optional segment at all, to be more backward // compatible, as there are places like in multi-stage query engine where this field is not set today when // creating the InstanceRequest. - instanceRequest.setOptionalSegments(segments.getRight()); + instanceRequest.setOptionalSegments(segments.getOptionalSegmentList()); } return instanceRequest; } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java index 7556993876a6..9b2bc8238e89 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java @@ -31,12 +31,12 @@ import java.util.Set; import javax.annotation.Nullable; import org.apache.calcite.rel.RelDistribution; -import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.calcite.rel.hint.PinotHintOptions; import org.apache.pinot.calcite.rel.rules.ImmutableTableOptions; import org.apache.pinot.calcite.rel.rules.TableOptions; import org.apache.pinot.core.routing.RoutingManager; import org.apache.pinot.core.routing.RoutingTable; +import org.apache.pinot.core.routing.ServerExecutionInfo; import org.apache.pinot.core.routing.TablePartitionInfo; import org.apache.pinot.core.routing.TimeBoundaryInfo; import org.apache.pinot.core.transport.ServerInstance; @@ -332,12 +332,13 @@ private void assignWorkersToNonPartitionedLeafFragment(DispatchablePlanMetadata String tableType = routingEntry.getKey(); RoutingTable routingTable = routingEntry.getValue(); // for each server instance, attach all table types and their associated segment list. - Map, List>> segmentsMap = routingTable.getServerInstanceToSegmentsMap(); - for (Map.Entry, List>> serverEntry : segmentsMap.entrySet()) { + Map segmentsMap = routingTable.getServerInstanceToSegmentsMap(); + for (Map.Entry serverEntry : segmentsMap.entrySet()) { Map> tableTypeToSegmentListMap = serverInstanceToSegmentsMap.computeIfAbsent(serverEntry.getKey(), k -> new HashMap<>()); // TODO: support optional segments for multi-stage engine. - Preconditions.checkState(tableTypeToSegmentListMap.put(tableType, serverEntry.getValue().getLeft()) == null, + Preconditions.checkState( + tableTypeToSegmentListMap.put(tableType, serverEntry.getValue().getSegmentList()) == null, "Entry for server {} and table type: {} already exist!", serverEntry.getKey(), tableType); } diff --git a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TableScanVisitor.java b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TableScanVisitor.java index 3df75ce8ab93..b0d76fa2182a 100644 --- a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TableScanVisitor.java +++ b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TableScanVisitor.java @@ -58,7 +58,7 @@ public void assignSegmentsToPlan(BaseTimeSeriesPlanNode planNode, TimeBuckets ti Preconditions.checkNotNull(routingTable, "Failed to get routing table for table: " + sfpNode.getTableName()); for (var entry : routingTable.getServerInstanceToSegmentsMap().entrySet()) { ServerInstance serverInstance = entry.getKey(); - List segments = entry.getValue().getLeft(); + List segments = entry.getValue().getSegmentList(); context.getLeafIdToSegmentsByServer().computeIfAbsent(serverInstance, (x) -> new HashMap<>()) .put(sfpNode.getId(), segments); } From 1f2d1fd0bdcf0ebd3be47eedd1a03ac05b2f88aa Mon Sep 17 00:00:00 2001 From: Rajat Venkatesh <1638298+vrajat@users.noreply.github.com> Date: Mon, 27 Jan 2025 12:10:58 +0530 Subject: [PATCH 2/9] Fix compilation in tests --- .../pinot/core/transport/QueryRoutingTest.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java index 1b32149d064d..0055b88e777b 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java @@ -22,9 +22,7 @@ import java.io.IOException; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; -import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.common.datatable.DataTable.MetadataKey; import org.apache.pinot.common.exception.QueryException; @@ -36,6 +34,7 @@ import org.apache.pinot.core.common.datatable.DataTableBuilder; import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; import org.apache.pinot.core.query.scheduler.QueryScheduler; +import org.apache.pinot.core.routing.ServerExecutionInfo; import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager; import org.apache.pinot.server.access.AccessControl; import org.apache.pinot.spi.config.table.TableType; @@ -66,8 +65,9 @@ public class QueryRoutingTest { SERVER_INSTANCE.toServerRoutingInstance(TableType.REALTIME, ServerInstance.RoutingType.NETTY); private static final BrokerRequest BROKER_REQUEST = CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM testTable"); - private static final Map, List>> ROUTING_TABLE = - Collections.singletonMap(SERVER_INSTANCE, Pair.of(Collections.emptyList(), Collections.emptyList())); + private static final Map ROUTING_TABLE = + Collections.singletonMap(SERVER_INSTANCE, + new ServerExecutionInfo(Collections.emptyList(), Collections.emptyList())); private QueryRouter _queryRouter; private ServerRoutingStatsManager _serverRoutingStatsManager; @@ -481,9 +481,9 @@ public void testSkipUnavailableServer() serverInstance1.toServerRoutingInstance(TableType.OFFLINE, ServerInstance.RoutingType.NETTY); ServerRoutingInstance serverRoutingInstance2 = serverInstance2.toServerRoutingInstance(TableType.OFFLINE, ServerInstance.RoutingType.NETTY); - Map, List>> routingTable = - Map.of(serverInstance1, Pair.of(Collections.emptyList(), Collections.emptyList()), serverInstance2, - Pair.of(Collections.emptyList(), Collections.emptyList())); + Map routingTable = + Map.of(serverInstance1, new ServerExecutionInfo(Collections.emptyList(), Collections.emptyList()), + serverInstance2, new ServerExecutionInfo(Collections.emptyList(), Collections.emptyList())); long requestId = 123; DataSchema dataSchema = From 9eb1e158650be8d651f6770a9051d65795c986ef Mon Sep 17 00:00:00 2001 From: Rajat Venkatesh <1638298+vrajat@users.noreply.github.com> Date: Mon, 27 Jan 2025 12:19:47 +0530 Subject: [PATCH 3/9] Fix compilation in tests --- .../pinot/query/testutils/MockRoutingManagerFactory.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java index d01185214077..7f0572145163 100644 --- a/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java +++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java @@ -22,18 +22,17 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; -import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.model.InstanceConfig; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.config.provider.TableCache; import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.core.routing.RoutingManager; import org.apache.pinot.core.routing.RoutingTable; +import org.apache.pinot.core.routing.ServerExecutionInfo; import org.apache.pinot.core.routing.TablePartitionInfo; import org.apache.pinot.core.routing.TimeBoundaryInfo; import org.apache.pinot.core.transport.ServerInstance; @@ -57,7 +56,7 @@ public class MockRoutingManagerFactory { private final Map _schemaMap; private final Set _hybridTables; private final Map _serverInstances; - private final Map, List>>> _tableServerSegmentsMap; + private final Map> _tableServerSegmentsMap; public MockRoutingManagerFactory(int... ports) { _tableNameMap = new HashMap<>(); @@ -90,7 +89,8 @@ private void registerTableNameWithType(Schema schema, String tableNameWithType) public void registerSegment(int insertToServerPort, String tableNameWithType, String segmentName) { ServerInstance serverInstance = _serverInstances.get(toHostname(insertToServerPort)); _tableServerSegmentsMap.computeIfAbsent(tableNameWithType, k -> new HashMap<>()) - .computeIfAbsent(serverInstance, k -> Pair.of(new ArrayList<>(), null)).getLeft().add(segmentName); + .computeIfAbsent(serverInstance, k -> new ServerExecutionInfo(new ArrayList<>(), null)).getSegmentList() + .add(segmentName); } public RoutingManager buildRoutingManager(@Nullable Map partitionInfoMap) { From 89d4a205c3d0c18ed51ef7557a308a81f3fd578a Mon Sep 17 00:00:00 2001 From: Rajat Venkatesh <1638298+vrajat@users.noreply.github.com> Date: Mon, 27 Jan 2025 12:32:20 +0530 Subject: [PATCH 4/9] Fix compilation in scala --- .../spark/common/reader/PinotServerDataFetcher.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala index 753b2c379494..e6b60e2892be 100644 --- a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala +++ b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala @@ -18,13 +18,13 @@ */ package org.apache.pinot.connector.spark.common.reader -import org.apache.commons.lang3.tuple.Pair import org.apache.helix.model.InstanceConfig import org.apache.pinot.common.datatable.DataTable import org.apache.pinot.common.metrics.BrokerMetrics import org.apache.pinot.common.request.BrokerRequest import org.apache.pinot.connector.spark.common.partition.PinotSplit import org.apache.pinot.connector.spark.common.{Logging, PinotDataSourceReadOptions, PinotException} +import org.apache.pinot.core.routing.ServerExecutionInfo import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager import org.apache.pinot.core.transport.{AsyncQueryResponse, QueryRouter, ServerInstance} import org.apache.pinot.spi.config.table.TableType @@ -32,7 +32,7 @@ import org.apache.pinot.spi.env.PinotConfiguration import org.apache.pinot.spi.metrics.PinotMetricUtils import org.apache.pinot.sql.parsers.CalciteSqlCompiler -import java.util.{Collections, List => JList, Map => JMap} +import java.util.{Map => JMap} import scala.collection.JavaConverters._ /** @@ -93,7 +93,7 @@ private[reader] class PinotServerDataFetcher( dataTables.filter(_.getNumberOfRows > 0) } - private def createRoutingTableForRequest(): JMap[ServerInstance, Pair[JList[String], JList[String]]] = { + private def createRoutingTableForRequest(): JMap[ServerInstance, ServerExecutionInfo] = { val nullZkId: String = null val instanceConfig = new InstanceConfig(nullZkId) instanceConfig.setHostName(pinotSplit.serverAndSegments.serverHost) @@ -101,15 +101,15 @@ private[reader] class PinotServerDataFetcher( // TODO: support netty-sec val serverInstance = new ServerInstance(instanceConfig) Map( - serverInstance -> Pair.of(pinotSplit.serverAndSegments.segments.asJava, List[String]().asJava) + serverInstance -> new ServerExecutionInfo(pinotSplit.serverAndSegments.segments.asJava, List[String]().asJava) ).asJava } private def submitRequestToPinotServer( offlineBrokerRequest: BrokerRequest, - offlineRoutingTable: JMap[ServerInstance, Pair[JList[String], JList[String]]], + offlineRoutingTable: JMap[ServerInstance, ServerExecutionInfo], realtimeBrokerRequest: BrokerRequest, - realtimeRoutingTable: JMap[ServerInstance, Pair[JList[String], JList[String]]]): AsyncQueryResponse = { + realtimeRoutingTable: JMap[ServerInstance, ServerExecutionInfo]): AsyncQueryResponse = { logInfo(s"Sending request to ${pinotSplit.serverAndSegments.toString}") queryRouter.submitQuery( partitionId, From 69d5b3884a24ce1441ff4d434aa04d25688ffb08 Mon Sep 17 00:00:00 2001 From: Rajat Venkatesh <1638298+vrajat@users.noreply.github.com> Date: Mon, 27 Jan 2025 12:42:18 +0530 Subject: [PATCH 5/9] Fix compilation in tests --- .../pinot/broker/broker/HelixBrokerStarterTest.java | 4 ++-- .../BaseSingleStageBrokerRequestHandlerTest.java | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java index a24b8ac79e9c..51a532d6e3d8 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java @@ -172,7 +172,7 @@ public void testResourceAndTagAssignment() RoutingTable routingTable = routingManager.getRoutingTable(brokerRequest, 0); assertNotNull(routingTable); assertEquals(routingTable.getServerInstanceToSegmentsMap().size(), NUM_SERVERS); - assertEquals(routingTable.getServerInstanceToSegmentsMap().values().iterator().next().getLeft().size(), + assertEquals(routingTable.getServerInstanceToSegmentsMap().values().iterator().next().getSegmentList().size(), NUM_OFFLINE_SEGMENTS); assertTrue(routingTable.getUnavailableSegments().isEmpty()); @@ -182,7 +182,7 @@ public void testResourceAndTagAssignment() TestUtils.waitForCondition(aVoid -> routingManager.getRoutingTable(brokerRequest, 0).getServerInstanceToSegmentsMap().values().iterator().next() - .getLeft().size() == NUM_OFFLINE_SEGMENTS + 1, 30_000L, + .getSegmentList().size() == NUM_OFFLINE_SEGMENTS + 1, 30_000L, "Failed to add the new segment " + "into the routing table"); // Add a new table with different broker tenant diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java index df4b1b6bf8ca..53fe11673501 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java @@ -25,7 +25,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import javax.annotation.Nullable; -import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.model.InstanceConfig; import org.apache.pinot.broker.broker.AllowAllAccessControlFactory; import org.apache.pinot.broker.queryquota.QueryQuotaManager; @@ -37,6 +36,7 @@ import org.apache.pinot.common.request.PinotQuery; import org.apache.pinot.common.response.broker.BrokerResponseNative; import org.apache.pinot.core.routing.RoutingTable; +import org.apache.pinot.core.routing.ServerExecutionInfo; import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TenantConfig; @@ -167,8 +167,8 @@ public void testCancelQuery() { when(routingManager.routingExists(tableName)).thenReturn(true); when(routingManager.getQueryTimeoutMs(tableName)).thenReturn(10000L); RoutingTable rt = mock(RoutingTable.class); - when(rt.getServerInstanceToSegmentsMap()).thenReturn( - Map.of(new ServerInstance(new InstanceConfig("server01_9000")), Pair.of(List.of("segment01"), List.of()))); + when(rt.getServerInstanceToSegmentsMap()).thenReturn(Map.of(new ServerInstance(new InstanceConfig("server01_9000")), + new ServerExecutionInfo(List.of("segment01"), List.of()))); when(routingManager.getRoutingTable(any(), Mockito.anyLong())).thenReturn(rt); QueryQuotaManager queryQuotaManager = mock(QueryQuotaManager.class); when(queryQuotaManager.acquire(anyString())).thenReturn(true); @@ -194,9 +194,9 @@ public void shutDown() { @Override protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest, BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest, - @Nullable Map, List>> offlineRoutingTable, + @Nullable Map offlineRoutingTable, @Nullable BrokerRequest realtimeBrokerRequest, - @Nullable Map, List>> realtimeRoutingTable, long timeoutMs, + @Nullable Map realtimeRoutingTable, long timeoutMs, ServerStats serverStats, RequestContext requestContext) throws Exception { testRequestId[0] = requestId; From 7aa50772750d3c2f60cf0c2408bf05e664614040 Mon Sep 17 00:00:00 2001 From: Rajat Venkatesh <1638298+vrajat@users.noreply.github.com> Date: Mon, 27 Jan 2025 14:29:13 +0530 Subject: [PATCH 6/9] Rename to ServerRouteInfo --- .../broker/api/resources/PinotBrokerDebug.java | 10 +++++----- .../BaseSingleStageBrokerRequestHandler.java | 18 +++++++++--------- .../GrpcBrokerRequestHandler.java | 10 +++++----- .../SingleConnectionBrokerRequestHandler.java | 6 +++--- .../broker/routing/BrokerRoutingManager.java | 12 ++++++------ ...aseSingleStageBrokerRequestHandlerTest.java | 8 ++++---- .../common/reader/PinotServerDataFetcher.scala | 14 +++++++------- .../pinot/core/routing/RoutingTable.java | 6 +++--- ...ExecutionInfo.java => ServerRouteInfo.java} | 8 ++++---- .../pinot/core/transport/QueryRouter.java | 12 ++++++------ .../pinot/core/transport/QueryRoutingTest.java | 12 ++++++------ .../pinot/query/routing/WorkerManager.java | 6 +++--- .../testutils/MockRoutingManagerFactory.java | 6 +++--- 13 files changed, 64 insertions(+), 64 deletions(-) rename pinot-core/src/main/java/org/apache/pinot/core/routing/{ServerExecutionInfo.java => ServerRouteInfo.java} (92%) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java index b890edeacecd..b482b41d93b6 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java @@ -55,7 +55,7 @@ import org.apache.pinot.core.auth.ManualAuthorization; import org.apache.pinot.core.auth.TargetType; import org.apache.pinot.core.routing.RoutingTable; -import org.apache.pinot.core.routing.ServerExecutionInfo; +import org.apache.pinot.core.routing.ServerRouteInfo; import org.apache.pinot.core.routing.TimeBoundaryInfo; import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager; @@ -157,11 +157,11 @@ public Map>> getRoutingTable( @ApiResponse(code = 404, message = "Routing not found"), @ApiResponse(code = 500, message = "Internal server error") }) - public Map> getRoutingTableWithOptionalSegments( + public Map> getRoutingTableWithOptionalSegments( @ApiParam(value = "Name of the table") @PathParam("tableName") String tableName, @Context HttpHeaders headers) { tableName = DatabaseUtils.translateTableName(tableName, headers); - Map> result = new TreeMap<>(); + Map> result = new TreeMap<>(); getRoutingTable(tableName, (tableNameWithType, routingTable) -> result.put(tableNameWithType, routingTable.getServerInstanceToSegmentsMap())); if (!result.isEmpty()) { @@ -192,7 +192,7 @@ private void getRoutingTable(String tableName, BiConsumer } private static Map> removeOptionalSegments( - Map serverInstanceToSegmentsMap) { + Map serverInstanceToSegmentsMap) { Map> ret = new HashMap<>(); serverInstanceToSegmentsMap.forEach((k, v) -> ret.put(k, v.getSegmentList())); return ret; @@ -231,7 +231,7 @@ public Map> getRoutingTableForQuery( @ApiResponse(code = 404, message = "Routing not found"), @ApiResponse(code = 500, message = "Internal server error") }) - public Map getRoutingTableForQueryWithOptionalSegments( + public Map getRoutingTableForQueryWithOptionalSegments( @ApiParam(value = "SQL query (table name should have type suffix)") @QueryParam("query") String query, @Context HttpHeaders httpHeaders) { BrokerRequest brokerRequest = CalciteSqlCompiler.compileToBrokerRequest(query); diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java index 13cf368c3b96..8b78e918b889 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java @@ -85,7 +85,7 @@ import org.apache.pinot.core.auth.TargetType; import org.apache.pinot.core.query.optimizer.QueryOptimizer; import org.apache.pinot.core.routing.RoutingTable; -import org.apache.pinot.core.routing.ServerExecutionInfo; +import org.apache.pinot.core.routing.ServerRouteInfo; import org.apache.pinot.core.routing.TimeBoundaryInfo; import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.core.util.GapfillUtils; @@ -618,8 +618,8 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO // Calculate routing table for the query // TODO: Modify RoutingManager interface to directly take PinotQuery long routingStartTimeNs = System.nanoTime(); - Map offlineRoutingTable = null; - Map realtimeRoutingTable = null; + Map offlineRoutingTable = null; + Map realtimeRoutingTable = null; List unavailableSegments = new ArrayList<>(); int numPrunedSegmentsTotal = 0; boolean offlineTableDisabled = false; @@ -634,7 +634,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO } if (routingTable != null) { unavailableSegments.addAll(routingTable.getUnavailableSegments()); - Map serverInstanceToSegmentsMap = + Map serverInstanceToSegmentsMap = routingTable.getServerInstanceToSegmentsMap(); if (!serverInstanceToSegmentsMap.isEmpty()) { offlineRoutingTable = serverInstanceToSegmentsMap; @@ -655,7 +655,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO } if (routingTable != null) { unavailableSegments.addAll(routingTable.getUnavailableSegments()); - Map serverInstanceToSegmentsMap = + Map serverInstanceToSegmentsMap = routingTable.getServerInstanceToSegmentsMap(); if (!serverInstanceToSegmentsMap.isEmpty()) { realtimeRoutingTable = serverInstanceToSegmentsMap; @@ -1873,9 +1873,9 @@ private static void attachTimeBoundary(PinotQuery pinotQuery, TimeBoundaryInfo t */ protected abstract BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest, BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest, - @Nullable Map offlineRoutingTable, + @Nullable Map offlineRoutingTable, @Nullable BrokerRequest realtimeBrokerRequest, - @Nullable Map realtimeRoutingTable, long timeoutMs, + @Nullable Map realtimeRoutingTable, long timeoutMs, ServerStats serverStats, RequestContext requestContext) throws Exception; @@ -1905,8 +1905,8 @@ private static class QueryServers { final String _query; final Set _servers = new HashSet<>(); - QueryServers(String query, @Nullable Map offlineRoutingTable, - @Nullable Map realtimeRoutingTable) { + QueryServers(String query, @Nullable Map offlineRoutingTable, + @Nullable Map realtimeRoutingTable) { _query = query; if (offlineRoutingTable != null) { _servers.addAll(offlineRoutingTable.keySet()); diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java index aa7c3393d600..4c16145672e0 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java @@ -37,7 +37,7 @@ import org.apache.pinot.common.utils.grpc.GrpcQueryClient; import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder; import org.apache.pinot.core.query.reduce.StreamingReduceService; -import org.apache.pinot.core.routing.ServerExecutionInfo; +import org.apache.pinot.core.routing.ServerRouteInfo; import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.core.transport.ServerRoutingInstance; import org.apache.pinot.spi.config.table.TableType; @@ -76,9 +76,9 @@ public void shutDown() { @Override protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest, BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest, - @Nullable Map offlineRoutingTable, + @Nullable Map offlineRoutingTable, @Nullable BrokerRequest realtimeBrokerRequest, - @Nullable Map realtimeRoutingTable, long timeoutMs, + @Nullable Map realtimeRoutingTable, long timeoutMs, ServerStats serverStats, RequestContext requestContext) throws Exception { // TODO: Support failure detection @@ -106,9 +106,9 @@ protected BrokerResponseNative processBrokerRequest(long requestId, BrokerReques * Query pinot server for data table. */ private void sendRequest(long requestId, TableType tableType, BrokerRequest brokerRequest, - Map routingTable, + Map routingTable, Map> responseMap, boolean trace) { - for (Map.Entry routingEntry : routingTable.entrySet()) { + for (Map.Entry routingEntry : routingTable.entrySet()) { ServerInstance serverInstance = routingEntry.getKey(); // TODO: support optional segments for GrpcQueryServer. List segments = routingEntry.getValue().getSegmentList(); diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java index 374223674f6f..6e8ecff9a5a5 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java @@ -42,7 +42,7 @@ import org.apache.pinot.common.response.broker.QueryProcessingException; import org.apache.pinot.common.utils.config.QueryOptionsUtils; import org.apache.pinot.core.query.reduce.BrokerReduceService; -import org.apache.pinot.core.routing.ServerExecutionInfo; +import org.apache.pinot.core.routing.ServerRouteInfo; import org.apache.pinot.core.transport.AsyncQueryResponse; import org.apache.pinot.core.transport.QueryResponse; import org.apache.pinot.core.transport.QueryRouter; @@ -99,9 +99,9 @@ public void shutDown() { @Override protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest, BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest, - @Nullable Map offlineRoutingTable, + @Nullable Map offlineRoutingTable, @Nullable BrokerRequest realtimeBrokerRequest, - @Nullable Map realtimeRoutingTable, long timeoutMs, + @Nullable Map realtimeRoutingTable, long timeoutMs, ServerStats serverStats, RequestContext requestContext) throws Exception { assert offlineBrokerRequest != null || realtimeBrokerRequest != null; diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java index 40e0a45d8e55..b6b0fb125285 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java @@ -61,7 +61,7 @@ import org.apache.pinot.common.utils.HashUtil; import org.apache.pinot.core.routing.RoutingManager; import org.apache.pinot.core.routing.RoutingTable; -import org.apache.pinot.core.routing.ServerExecutionInfo; +import org.apache.pinot.core.routing.ServerRouteInfo; import org.apache.pinot.core.routing.TablePartitionInfo; import org.apache.pinot.core.routing.TimeBoundaryInfo; import org.apache.pinot.core.transport.ServerInstance; @@ -636,14 +636,14 @@ public RoutingTable getRoutingTable(BrokerRequest brokerRequest, long requestId) selectionResult.getUnavailableSegments(), selectionResult.getNumPrunedSegments()); } - private Map getServerInstanceToSegmentsMap(String tableNameWithType, + private Map getServerInstanceToSegmentsMap(String tableNameWithType, InstanceSelector.SelectionResult selectionResult) { - Map merged = new HashMap<>(); + Map merged = new HashMap<>(); for (Map.Entry entry : selectionResult.getSegmentToInstanceMap().entrySet()) { ServerInstance serverInstance = _enabledServerInstanceMap.get(entry.getValue()); if (serverInstance != null) { - ServerExecutionInfo executionInfo = - merged.computeIfAbsent(serverInstance, k -> new ServerExecutionInfo(new ArrayList<>(), new ArrayList<>())); + ServerRouteInfo executionInfo = + merged.computeIfAbsent(serverInstance, k -> new ServerRouteInfo(new ArrayList<>(), new ArrayList<>())); executionInfo.getSegmentList().add(entry.getKey()); } else { // Should not happen in normal case unless encountered unexpected exception when updating routing entries @@ -653,7 +653,7 @@ private Map getServerInstanceToSegmentsMap( for (Map.Entry entry : selectionResult.getOptionalSegmentToInstanceMap().entrySet()) { ServerInstance serverInstance = _enabledServerInstanceMap.get(entry.getValue()); if (serverInstance != null) { - ServerExecutionInfo executionInfo = merged.get(serverInstance); + ServerRouteInfo executionInfo = merged.get(serverInstance); // Skip servers that don't have non-optional segments, so that servers always get some non-optional segments // to process, to be backward compatible. // TODO: allow servers only with optional segments diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java index 53fe11673501..3952b414f07b 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandlerTest.java @@ -36,7 +36,7 @@ import org.apache.pinot.common.request.PinotQuery; import org.apache.pinot.common.response.broker.BrokerResponseNative; import org.apache.pinot.core.routing.RoutingTable; -import org.apache.pinot.core.routing.ServerExecutionInfo; +import org.apache.pinot.core.routing.ServerRouteInfo; import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TenantConfig; @@ -168,7 +168,7 @@ public void testCancelQuery() { when(routingManager.getQueryTimeoutMs(tableName)).thenReturn(10000L); RoutingTable rt = mock(RoutingTable.class); when(rt.getServerInstanceToSegmentsMap()).thenReturn(Map.of(new ServerInstance(new InstanceConfig("server01_9000")), - new ServerExecutionInfo(List.of("segment01"), List.of()))); + new ServerRouteInfo(List.of("segment01"), List.of()))); when(routingManager.getRoutingTable(any(), Mockito.anyLong())).thenReturn(rt); QueryQuotaManager queryQuotaManager = mock(QueryQuotaManager.class); when(queryQuotaManager.acquire(anyString())).thenReturn(true); @@ -194,9 +194,9 @@ public void shutDown() { @Override protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest, BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest, - @Nullable Map offlineRoutingTable, + @Nullable Map offlineRoutingTable, @Nullable BrokerRequest realtimeBrokerRequest, - @Nullable Map realtimeRoutingTable, long timeoutMs, + @Nullable Map realtimeRoutingTable, long timeoutMs, ServerStats serverStats, RequestContext requestContext) throws Exception { testRequestId[0] = requestId; diff --git a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala index e6b60e2892be..3d0a8577165a 100644 --- a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala +++ b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala @@ -24,7 +24,7 @@ import org.apache.pinot.common.metrics.BrokerMetrics import org.apache.pinot.common.request.BrokerRequest import org.apache.pinot.connector.spark.common.partition.PinotSplit import org.apache.pinot.connector.spark.common.{Logging, PinotDataSourceReadOptions, PinotException} -import org.apache.pinot.core.routing.ServerExecutionInfo +import org.apache.pinot.core.routing.ServerRouteInfo import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager import org.apache.pinot.core.transport.{AsyncQueryResponse, QueryRouter, ServerInstance} import org.apache.pinot.spi.config.table.TableType @@ -93,7 +93,7 @@ private[reader] class PinotServerDataFetcher( dataTables.filter(_.getNumberOfRows > 0) } - private def createRoutingTableForRequest(): JMap[ServerInstance, ServerExecutionInfo] = { + private def createRoutingTableForRequest(): JMap[ServerInstance, ServerRouteInfo] = { val nullZkId: String = null val instanceConfig = new InstanceConfig(nullZkId) instanceConfig.setHostName(pinotSplit.serverAndSegments.serverHost) @@ -101,15 +101,15 @@ private[reader] class PinotServerDataFetcher( // TODO: support netty-sec val serverInstance = new ServerInstance(instanceConfig) Map( - serverInstance -> new ServerExecutionInfo(pinotSplit.serverAndSegments.segments.asJava, List[String]().asJava) + serverInstance -> new ServerRouteInfo(pinotSplit.serverAndSegments.segments.asJava, List[String]().asJava) ).asJava } private def submitRequestToPinotServer( - offlineBrokerRequest: BrokerRequest, - offlineRoutingTable: JMap[ServerInstance, ServerExecutionInfo], - realtimeBrokerRequest: BrokerRequest, - realtimeRoutingTable: JMap[ServerInstance, ServerExecutionInfo]): AsyncQueryResponse = { + offlineBrokerRequest: BrokerRequest, + offlineRoutingTable: JMap[ServerInstance, ServerRouteInfo], + realtimeBrokerRequest: BrokerRequest, + realtimeRoutingTable: JMap[ServerInstance, ServerRouteInfo]): AsyncQueryResponse = { logInfo(s"Sending request to ${pinotSplit.serverAndSegments.toString}") queryRouter.submitQuery( partitionId, diff --git a/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingTable.java b/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingTable.java index 38748fd9b37b..5a7407805d6d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingTable.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingTable.java @@ -28,18 +28,18 @@ public class RoutingTable { // the newly created consuming segments. Such segments were simply skipped by brokers at query routing time, but that // had caused wrong query results, particularly for upsert tables. Instead, we should pass such segments to servers // and let them decide how to handle them, e.g. skip them upon issues or include them for better query results. - private final Map _serverInstanceToSegmentsMap; + private final Map _serverInstanceToSegmentsMap; private final List _unavailableSegments; private final int _numPrunedSegments; - public RoutingTable(Map serverInstanceToSegmentsMap, + public RoutingTable(Map serverInstanceToSegmentsMap, List unavailableSegments, int numPrunedSegments) { _serverInstanceToSegmentsMap = serverInstanceToSegmentsMap; _unavailableSegments = unavailableSegments; _numPrunedSegments = numPrunedSegments; } - public Map getServerInstanceToSegmentsMap() { + public Map getServerInstanceToSegmentsMap() { return _serverInstanceToSegmentsMap; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/routing/ServerExecutionInfo.java b/pinot-core/src/main/java/org/apache/pinot/core/routing/ServerRouteInfo.java similarity index 92% rename from pinot-core/src/main/java/org/apache/pinot/core/routing/ServerExecutionInfo.java rename to pinot-core/src/main/java/org/apache/pinot/core/routing/ServerRouteInfo.java index 4db7fec749fa..1ebb28cb2ede 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/routing/ServerExecutionInfo.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/routing/ServerRouteInfo.java @@ -23,21 +23,21 @@ import java.util.List; /** - * Class representing the execution information for a server. + * Class representing the route information for a server. * It contains the list of segments and optional segments assigned to the server. */ -public class ServerExecutionInfo { +public class ServerRouteInfo { private final List _segmentList; private final List _optionalSegmentList; /** - * Constructor for ServerExecutionInfo. + * Constructor for ServerRouteInfo. * * @param segmentList List of segments assigned to the server. * @param optionalSegmentList List of optional segments assigned to the server. */ @JsonCreator - public ServerExecutionInfo( + public ServerRouteInfo( @JsonProperty("segmentList") List segmentList, @JsonProperty("optionalSegmentList") List optionalSegmentList) { _segmentList = segmentList; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java index 1b7441f5fc40..8183b6070aa6 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java @@ -34,7 +34,7 @@ import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.common.request.InstanceRequest; import org.apache.pinot.common.utils.config.QueryOptionsUtils; -import org.apache.pinot.core.routing.ServerExecutionInfo; +import org.apache.pinot.core.routing.ServerRouteInfo; import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.utils.CommonConstants; @@ -88,9 +88,9 @@ public QueryRouter(String brokerId, BrokerMetrics brokerMetrics, @Nullable Netty public AsyncQueryResponse submitQuery(long requestId, String rawTableName, @Nullable BrokerRequest offlineBrokerRequest, - @Nullable Map offlineRoutingTable, + @Nullable Map offlineRoutingTable, @Nullable BrokerRequest realtimeBrokerRequest, - @Nullable Map realtimeRoutingTable, long timeoutMs) { + @Nullable Map realtimeRoutingTable, long timeoutMs) { assert offlineBrokerRequest != null || realtimeBrokerRequest != null; // can prefer but not require TLS until all servers guaranteed to be on TLS @@ -103,7 +103,7 @@ public AsyncQueryResponse submitQuery(long requestId, String rawTableName, Map requestMap = new HashMap<>(); if (offlineBrokerRequest != null) { assert offlineRoutingTable != null; - for (Map.Entry entry : offlineRoutingTable.entrySet()) { + for (Map.Entry entry : offlineRoutingTable.entrySet()) { ServerRoutingInstance serverRoutingInstance = entry.getKey().toServerRoutingInstance(TableType.OFFLINE, preferTls); InstanceRequest instanceRequest = getInstanceRequest(requestId, offlineBrokerRequest, entry.getValue()); @@ -112,7 +112,7 @@ public AsyncQueryResponse submitQuery(long requestId, String rawTableName, } if (realtimeBrokerRequest != null) { assert realtimeRoutingTable != null; - for (Map.Entry entry : realtimeRoutingTable.entrySet()) { + for (Map.Entry entry : realtimeRoutingTable.entrySet()) { ServerRoutingInstance serverRoutingInstance = entry.getKey().toServerRoutingInstance(TableType.REALTIME, preferTls); InstanceRequest instanceRequest = getInstanceRequest(requestId, realtimeBrokerRequest, entry.getValue()); @@ -212,7 +212,7 @@ void markQueryDone(long requestId) { } private InstanceRequest getInstanceRequest(long requestId, BrokerRequest brokerRequest, - ServerExecutionInfo segments) { + ServerRouteInfo segments) { InstanceRequest instanceRequest = new InstanceRequest(); instanceRequest.setRequestId(requestId); instanceRequest.setQuery(brokerRequest); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java index 0055b88e777b..da9c98205684 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java @@ -34,7 +34,7 @@ import org.apache.pinot.core.common.datatable.DataTableBuilder; import org.apache.pinot.core.common.datatable.DataTableBuilderFactory; import org.apache.pinot.core.query.scheduler.QueryScheduler; -import org.apache.pinot.core.routing.ServerExecutionInfo; +import org.apache.pinot.core.routing.ServerRouteInfo; import org.apache.pinot.core.transport.server.routing.stats.ServerRoutingStatsManager; import org.apache.pinot.server.access.AccessControl; import org.apache.pinot.spi.config.table.TableType; @@ -65,9 +65,9 @@ public class QueryRoutingTest { SERVER_INSTANCE.toServerRoutingInstance(TableType.REALTIME, ServerInstance.RoutingType.NETTY); private static final BrokerRequest BROKER_REQUEST = CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM testTable"); - private static final Map ROUTING_TABLE = + private static final Map ROUTING_TABLE = Collections.singletonMap(SERVER_INSTANCE, - new ServerExecutionInfo(Collections.emptyList(), Collections.emptyList())); + new ServerRouteInfo(Collections.emptyList(), Collections.emptyList())); private QueryRouter _queryRouter; private ServerRoutingStatsManager _serverRoutingStatsManager; @@ -481,9 +481,9 @@ public void testSkipUnavailableServer() serverInstance1.toServerRoutingInstance(TableType.OFFLINE, ServerInstance.RoutingType.NETTY); ServerRoutingInstance serverRoutingInstance2 = serverInstance2.toServerRoutingInstance(TableType.OFFLINE, ServerInstance.RoutingType.NETTY); - Map routingTable = - Map.of(serverInstance1, new ServerExecutionInfo(Collections.emptyList(), Collections.emptyList()), - serverInstance2, new ServerExecutionInfo(Collections.emptyList(), Collections.emptyList())); + Map routingTable = + Map.of(serverInstance1, new ServerRouteInfo(Collections.emptyList(), Collections.emptyList()), + serverInstance2, new ServerRouteInfo(Collections.emptyList(), Collections.emptyList())); long requestId = 123; DataSchema dataSchema = diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java index 9b2bc8238e89..b24013509162 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java @@ -36,7 +36,7 @@ import org.apache.pinot.calcite.rel.rules.TableOptions; import org.apache.pinot.core.routing.RoutingManager; import org.apache.pinot.core.routing.RoutingTable; -import org.apache.pinot.core.routing.ServerExecutionInfo; +import org.apache.pinot.core.routing.ServerRouteInfo; import org.apache.pinot.core.routing.TablePartitionInfo; import org.apache.pinot.core.routing.TimeBoundaryInfo; import org.apache.pinot.core.transport.ServerInstance; @@ -332,8 +332,8 @@ private void assignWorkersToNonPartitionedLeafFragment(DispatchablePlanMetadata String tableType = routingEntry.getKey(); RoutingTable routingTable = routingEntry.getValue(); // for each server instance, attach all table types and their associated segment list. - Map segmentsMap = routingTable.getServerInstanceToSegmentsMap(); - for (Map.Entry serverEntry : segmentsMap.entrySet()) { + Map segmentsMap = routingTable.getServerInstanceToSegmentsMap(); + for (Map.Entry serverEntry : segmentsMap.entrySet()) { Map> tableTypeToSegmentListMap = serverInstanceToSegmentsMap.computeIfAbsent(serverEntry.getKey(), k -> new HashMap<>()); // TODO: support optional segments for multi-stage engine. diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java index 7f0572145163..071820b8b2dd 100644 --- a/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java +++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java @@ -32,7 +32,7 @@ import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.core.routing.RoutingManager; import org.apache.pinot.core.routing.RoutingTable; -import org.apache.pinot.core.routing.ServerExecutionInfo; +import org.apache.pinot.core.routing.ServerRouteInfo; import org.apache.pinot.core.routing.TablePartitionInfo; import org.apache.pinot.core.routing.TimeBoundaryInfo; import org.apache.pinot.core.transport.ServerInstance; @@ -56,7 +56,7 @@ public class MockRoutingManagerFactory { private final Map _schemaMap; private final Set _hybridTables; private final Map _serverInstances; - private final Map> _tableServerSegmentsMap; + private final Map> _tableServerSegmentsMap; public MockRoutingManagerFactory(int... ports) { _tableNameMap = new HashMap<>(); @@ -89,7 +89,7 @@ private void registerTableNameWithType(Schema schema, String tableNameWithType) public void registerSegment(int insertToServerPort, String tableNameWithType, String segmentName) { ServerInstance serverInstance = _serverInstances.get(toHostname(insertToServerPort)); _tableServerSegmentsMap.computeIfAbsent(tableNameWithType, k -> new HashMap<>()) - .computeIfAbsent(serverInstance, k -> new ServerExecutionInfo(new ArrayList<>(), null)).getSegmentList() + .computeIfAbsent(serverInstance, k -> new ServerRouteInfo(new ArrayList<>(), null)).getSegmentList() .add(segmentName); } From a424a5cbc6c550bdd5c63e8a83362b79e9dc4794 Mon Sep 17 00:00:00 2001 From: Rajat Venkatesh <1638298+vrajat@users.noreply.github.com> Date: Mon, 27 Jan 2025 17:44:18 +0530 Subject: [PATCH 7/9] Fix whitespace --- .../spark/common/reader/PinotServerDataFetcher.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala index 3d0a8577165a..603f25fc0565 100644 --- a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala +++ b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala @@ -106,10 +106,10 @@ private[reader] class PinotServerDataFetcher( } private def submitRequestToPinotServer( - offlineBrokerRequest: BrokerRequest, - offlineRoutingTable: JMap[ServerInstance, ServerRouteInfo], - realtimeBrokerRequest: BrokerRequest, - realtimeRoutingTable: JMap[ServerInstance, ServerRouteInfo]): AsyncQueryResponse = { + offlineBrokerRequest: BrokerRequest, + offlineRoutingTable: JMap[ServerInstance, ServerRouteInfo], + realtimeBrokerRequest: BrokerRequest, + realtimeRoutingTable: JMap[ServerInstance, ServerRouteInfo]): AsyncQueryResponse = { logInfo(s"Sending request to ${pinotSplit.serverAndSegments.toString}") queryRouter.submitQuery( partitionId, From b8e98366a4205f10428d592b4b0614c9ed506732 Mon Sep 17 00:00:00 2001 From: Rajat Venkatesh <1638298+vrajat@users.noreply.github.com> Date: Tue, 28 Jan 2025 10:16:27 +0530 Subject: [PATCH 8/9] Review comments on naming --- .../api/resources/PinotBrokerDebug.java | 2 +- .../GrpcBrokerRequestHandler.java | 2 +- .../broker/routing/BrokerRoutingManager.java | 10 +++---- .../broker/broker/HelixBrokerStarterTest.java | 4 +-- .../pinot/core/routing/ServerRouteInfo.java | 28 ++++++++----------- .../pinot/core/transport/QueryRouter.java | 6 ++-- .../pinot/query/routing/WorkerManager.java | 2 +- .../testutils/MockRoutingManagerFactory.java | 2 +- .../planner/physical/TableScanVisitor.java | 2 +- 9 files changed, 27 insertions(+), 31 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java index b482b41d93b6..477a8f02f0da 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java @@ -194,7 +194,7 @@ private void getRoutingTable(String tableName, BiConsumer private static Map> removeOptionalSegments( Map serverInstanceToSegmentsMap) { Map> ret = new HashMap<>(); - serverInstanceToSegmentsMap.forEach((k, v) -> ret.put(k, v.getSegmentList())); + serverInstanceToSegmentsMap.forEach((k, v) -> ret.put(k, v.getSegments())); return ret; } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java index 4c16145672e0..d6c2f3aaccc4 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java @@ -111,7 +111,7 @@ private void sendRequest(long requestId, TableType tableType, BrokerRequest brok for (Map.Entry routingEntry : routingTable.entrySet()) { ServerInstance serverInstance = routingEntry.getKey(); // TODO: support optional segments for GrpcQueryServer. - List segments = routingEntry.getValue().getSegmentList(); + List segments = routingEntry.getValue().getSegments(); String serverHost = serverInstance.getHostname(); int port = serverInstance.getGrpcPort(); // TODO: enable throttling on per host bases. diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java index b6b0fb125285..e324f0ece3c4 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java @@ -642,9 +642,9 @@ private Map getServerInstanceToSegmentsMap(Stri for (Map.Entry entry : selectionResult.getSegmentToInstanceMap().entrySet()) { ServerInstance serverInstance = _enabledServerInstanceMap.get(entry.getValue()); if (serverInstance != null) { - ServerRouteInfo executionInfo = + ServerRouteInfo serverRouteInfoInfo = merged.computeIfAbsent(serverInstance, k -> new ServerRouteInfo(new ArrayList<>(), new ArrayList<>())); - executionInfo.getSegmentList().add(entry.getKey()); + serverRouteInfoInfo.getSegments().add(entry.getKey()); } else { // Should not happen in normal case unless encountered unexpected exception when updating routing entries _brokerMetrics.addMeteredTableValue(tableNameWithType, BrokerMeter.SERVER_MISSING_FOR_ROUTING, 1L); @@ -653,12 +653,12 @@ private Map getServerInstanceToSegmentsMap(Stri for (Map.Entry entry : selectionResult.getOptionalSegmentToInstanceMap().entrySet()) { ServerInstance serverInstance = _enabledServerInstanceMap.get(entry.getValue()); if (serverInstance != null) { - ServerRouteInfo executionInfo = merged.get(serverInstance); + ServerRouteInfo serverRouteInfo = merged.get(serverInstance); // Skip servers that don't have non-optional segments, so that servers always get some non-optional segments // to process, to be backward compatible. // TODO: allow servers only with optional segments - if (executionInfo != null) { - executionInfo.getOptionalSegmentList().add(entry.getKey()); + if (serverRouteInfo != null) { + serverRouteInfo.getOptionalSegments().add(entry.getKey()); } } // TODO: Report missing server metrics when we allow servers only with optional segments. diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java index 51a532d6e3d8..eb0d77a17b0d 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java @@ -172,7 +172,7 @@ public void testResourceAndTagAssignment() RoutingTable routingTable = routingManager.getRoutingTable(brokerRequest, 0); assertNotNull(routingTable); assertEquals(routingTable.getServerInstanceToSegmentsMap().size(), NUM_SERVERS); - assertEquals(routingTable.getServerInstanceToSegmentsMap().values().iterator().next().getSegmentList().size(), + assertEquals(routingTable.getServerInstanceToSegmentsMap().values().iterator().next().getSegments().size(), NUM_OFFLINE_SEGMENTS); assertTrue(routingTable.getUnavailableSegments().isEmpty()); @@ -182,7 +182,7 @@ public void testResourceAndTagAssignment() TestUtils.waitForCondition(aVoid -> routingManager.getRoutingTable(brokerRequest, 0).getServerInstanceToSegmentsMap().values().iterator().next() - .getSegmentList().size() == NUM_OFFLINE_SEGMENTS + 1, 30_000L, + .getSegments().size() == NUM_OFFLINE_SEGMENTS + 1, 30_000L, "Failed to add the new segment " + "into the routing table"); // Add a new table with different broker tenant diff --git a/pinot-core/src/main/java/org/apache/pinot/core/routing/ServerRouteInfo.java b/pinot-core/src/main/java/org/apache/pinot/core/routing/ServerRouteInfo.java index 1ebb28cb2ede..dce578ce293f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/routing/ServerRouteInfo.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/routing/ServerRouteInfo.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.core.routing; -import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import java.util.List; @@ -27,21 +26,20 @@ * It contains the list of segments and optional segments assigned to the server. */ public class ServerRouteInfo { - private final List _segmentList; - private final List _optionalSegmentList; + private final List _segments; + private final List _optionalSegments; /** * Constructor for ServerRouteInfo. * - * @param segmentList List of segments assigned to the server. - * @param optionalSegmentList List of optional segments assigned to the server. + * @param segments List of segments assigned to the server. + * @param optionalSegments List of optional segments assigned to the server. */ - @JsonCreator public ServerRouteInfo( - @JsonProperty("segmentList") List segmentList, - @JsonProperty("optionalSegmentList") List optionalSegmentList) { - _segmentList = segmentList; - _optionalSegmentList = optionalSegmentList; + @JsonProperty("segmentList") List segments, + @JsonProperty("optionalSegmentList") List optionalSegments) { + _segments = segments; + _optionalSegments = optionalSegments; } /** @@ -49,9 +47,8 @@ public ServerRouteInfo( * * @return List of segments. */ - @JsonProperty("segmentList") - public List getSegmentList() { - return _segmentList; + public List getSegments() { + return _segments; } /** @@ -59,8 +56,7 @@ public List getSegmentList() { * * @return List of optional segments. */ - @JsonProperty("optionalSegmentList") - public List getOptionalSegmentList() { - return _optionalSegmentList; + public List getOptionalSegments() { + return _optionalSegments; } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java index 8183b6070aa6..d0177e86b0f4 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java @@ -220,13 +220,13 @@ private InstanceRequest getInstanceRequest(long requestId, BrokerRequest brokerR if (queryOptions != null) { instanceRequest.setEnableTrace(Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.TRACE))); } - instanceRequest.setSearchSegments(segments.getSegmentList()); + instanceRequest.setSearchSegments(segments.getSegments()); instanceRequest.setBrokerId(_brokerId); - if (CollectionUtils.isNotEmpty(segments.getOptionalSegmentList())) { + if (CollectionUtils.isNotEmpty(segments.getOptionalSegments())) { // Don't set this field, i.e. leave it as null, if there is no optional segment at all, to be more backward // compatible, as there are places like in multi-stage query engine where this field is not set today when // creating the InstanceRequest. - instanceRequest.setOptionalSegments(segments.getOptionalSegmentList()); + instanceRequest.setOptionalSegments(segments.getOptionalSegments()); } return instanceRequest; } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java index b24013509162..b4ef55b5458d 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java @@ -338,7 +338,7 @@ private void assignWorkersToNonPartitionedLeafFragment(DispatchablePlanMetadata serverInstanceToSegmentsMap.computeIfAbsent(serverEntry.getKey(), k -> new HashMap<>()); // TODO: support optional segments for multi-stage engine. Preconditions.checkState( - tableTypeToSegmentListMap.put(tableType, serverEntry.getValue().getSegmentList()) == null, + tableTypeToSegmentListMap.put(tableType, serverEntry.getValue().getSegments()) == null, "Entry for server {} and table type: {} already exist!", serverEntry.getKey(), tableType); } diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java index 071820b8b2dd..f4ca63a32b81 100644 --- a/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java +++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java @@ -89,7 +89,7 @@ private void registerTableNameWithType(Schema schema, String tableNameWithType) public void registerSegment(int insertToServerPort, String tableNameWithType, String segmentName) { ServerInstance serverInstance = _serverInstances.get(toHostname(insertToServerPort)); _tableServerSegmentsMap.computeIfAbsent(tableNameWithType, k -> new HashMap<>()) - .computeIfAbsent(serverInstance, k -> new ServerRouteInfo(new ArrayList<>(), null)).getSegmentList() + .computeIfAbsent(serverInstance, k -> new ServerRouteInfo(new ArrayList<>(), null)).getSegments() .add(segmentName); } diff --git a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TableScanVisitor.java b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TableScanVisitor.java index b0d76fa2182a..2217f4157004 100644 --- a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TableScanVisitor.java +++ b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TableScanVisitor.java @@ -58,7 +58,7 @@ public void assignSegmentsToPlan(BaseTimeSeriesPlanNode planNode, TimeBuckets ti Preconditions.checkNotNull(routingTable, "Failed to get routing table for table: " + sfpNode.getTableName()); for (var entry : routingTable.getServerInstanceToSegmentsMap().entrySet()) { ServerInstance serverInstance = entry.getKey(); - List segments = entry.getValue().getSegmentList(); + List segments = entry.getValue().getSegments(); context.getLeafIdToSegmentsByServer().computeIfAbsent(serverInstance, (x) -> new HashMap<>()) .put(sfpNode.getId(), segments); } From f80ec5f053c65269220113132435dbabc5c818bc Mon Sep 17 00:00:00 2001 From: "Xiaotian (Jackie) Jiang" <17555551+Jackie-Jiang@users.noreply.github.com> Date: Tue, 28 Jan 2025 11:55:50 -0800 Subject: [PATCH 9/9] Remove JsonProperty annotation in ServerRouteInfo --- .../java/org/apache/pinot/core/routing/ServerRouteInfo.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/routing/ServerRouteInfo.java b/pinot-core/src/main/java/org/apache/pinot/core/routing/ServerRouteInfo.java index dce578ce293f..cd2b52053bee 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/routing/ServerRouteInfo.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/routing/ServerRouteInfo.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.core.routing; -import com.fasterxml.jackson.annotation.JsonProperty; import java.util.List; /** @@ -35,9 +34,7 @@ public class ServerRouteInfo { * @param segments List of segments assigned to the server. * @param optionalSegments List of optional segments assigned to the server. */ - public ServerRouteInfo( - @JsonProperty("segmentList") List segments, - @JsonProperty("optionalSegmentList") List optionalSegments) { + public ServerRouteInfo(List segments, List optionalSegments) { _segments = segments; _optionalSegments = optionalSegments; }