Skip to content

Commit

Permalink
Track unavailable segments in InstanceSelector (#5337)
Browse files Browse the repository at this point in the history
Unavailable segments are the segments that has no enabled instance or all enabled instances are in ERROR state.
We don't count segment with enabled instance in OFFLINE state as unavailable because it is a valid state when segment is new added and has not become ONLINE/CONSUMING.

Introduced RoutingTable class to wrap the a map from ServerInstance to segments and a list of unavailable segments.
Added a new property 'numUnavailableSegments' into the RequestStatistics.
  • Loading branch information
Jackie-Jiang committed May 6, 2020
1 parent 347a97f commit f0ecdf1
Show file tree
Hide file tree
Showing 9 changed files with 440 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public enum FanoutType {
}

private FanoutType _fanoutType;
private int _numUnavailableSegments;

public RequestStatistics() {
}
Expand Down Expand Up @@ -133,6 +134,14 @@ public FanoutType getFanoutType() {
return _fanoutType;
}

public void setNumUnavailableSegments(int numUnavailableSegments) {
_numUnavailableSegments = numUnavailableSegments;
}

public int getNumUnavailableSegments() {
return _numUnavailableSegments;
}

public int getErrorCode() {
return _errorCode;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.pinot.broker.routing.RoutingManager;
import org.apache.pinot.broker.routing.RoutingTable;
import org.apache.pinot.broker.routing.timeboundary.TimeBoundaryInfo;
import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.pql.parsers.Pql2Compiler;
Expand All @@ -45,6 +46,7 @@

@Api(tags = "Debug")
@Path("/")
// TODO: Add APIs to return the RoutingTable (with unavailable segments)
public class PinotBrokerDebug {
private static final Pql2Compiler COMPILER = new Pql2Compiler();

Expand Down Expand Up @@ -79,18 +81,18 @@ public Map<String, Map<ServerInstance, List<String>>> getRoutingTable(
TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
if (tableType != TableType.REALTIME) {
String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
Map<ServerInstance, List<String>> routingTable =
RoutingTable routingTable =
_routingManager.getRoutingTable(COMPILER.compileToBrokerRequest("SELECT * FROM " + offlineTableName));
if (routingTable != null) {
result.put(offlineTableName, routingTable);
result.put(offlineTableName, routingTable.getServerInstanceToSegmentsMap());
}
}
if (tableType != TableType.OFFLINE) {
String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName);
Map<ServerInstance, List<String>> routingTable =
RoutingTable routingTable =
_routingManager.getRoutingTable(COMPILER.compileToBrokerRequest("SELECT * FROM " + realtimeTableName));
if (routingTable != null) {
result.put(realtimeTableName, routingTable);
result.put(realtimeTableName, routingTable.getServerInstanceToSegmentsMap());
}
}
if (!result.isEmpty()) {
Expand All @@ -107,10 +109,9 @@ public Map<String, Map<ServerInstance, List<String>>> getRoutingTable(
@ApiResponses(value = {@ApiResponse(code = 200, message = "Routing table"), @ApiResponse(code = 404, message = "Routing not found"), @ApiResponse(code = 500, message = "Internal server error")})
public Map<ServerInstance, List<String>> getRoutingTableForQuery(
@ApiParam(value = "Pql query (table name should have type suffix)") @QueryParam("pql") String pql) {
Map<ServerInstance, List<String>> routingTable =
_routingManager.getRoutingTable(COMPILER.compileToBrokerRequest(pql));
RoutingTable routingTable = _routingManager.getRoutingTable(COMPILER.compileToBrokerRequest(pql));
if (routingTable != null) {
return routingTable;
return routingTable.getServerInstanceToSegmentsMap();
} else {
throw new WebApplicationException("Cannot find routing for query: " + pql, Response.Status.NOT_FOUND);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -47,6 +46,7 @@
import org.apache.pinot.broker.broker.AccessControlFactory;
import org.apache.pinot.broker.queryquota.QueryQuotaManager;
import org.apache.pinot.broker.routing.RoutingManager;
import org.apache.pinot.broker.routing.RoutingTable;
import org.apache.pinot.broker.routing.timeboundary.TimeBoundaryInfo;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.function.AggregationFunctionType;
Expand Down Expand Up @@ -286,24 +286,39 @@ public BrokerResponse handleRequest(JsonNode request, @Nullable RequesterIdentit
long routingStartTimeNs = System.nanoTime();
Map<ServerInstance, List<String>> offlineRoutingTable = null;
Map<ServerInstance, List<String>> realtimeRoutingTable = null;
int numUnavailableSegments = 0;
if (offlineBrokerRequest != null) {
// NOTE: Routing table might be null if table is just removed
offlineRoutingTable = _routingManager.getRoutingTable(offlineBrokerRequest);
if (offlineRoutingTable == null || offlineRoutingTable.isEmpty()) {
LOGGER.debug("No OFFLINE server found for request {}: {}", requestId, query);
RoutingTable routingTable = _routingManager.getRoutingTable(offlineBrokerRequest);
if (routingTable != null) {
numUnavailableSegments += routingTable.getUnavailableSegments().size();
Map<ServerInstance, List<String>> serverInstanceToSegmentsMap = routingTable.getServerInstanceToSegmentsMap();
if (!serverInstanceToSegmentsMap.isEmpty()) {
offlineRoutingTable = serverInstanceToSegmentsMap;
} else {
offlineBrokerRequest = null;
}
} else {
offlineBrokerRequest = null;
offlineRoutingTable = null;
}
}
if (realtimeBrokerRequest != null) {
// NOTE: Routing table might be null if table is just removed
realtimeRoutingTable = _routingManager.getRoutingTable(realtimeBrokerRequest);
if (realtimeRoutingTable == null || realtimeRoutingTable.isEmpty()) {
LOGGER.debug("No REALTIME server found for request {}: {}", requestId, query);
RoutingTable routingTable = _routingManager.getRoutingTable(realtimeBrokerRequest);
if (routingTable != null) {
numUnavailableSegments += routingTable.getUnavailableSegments().size();
Map<ServerInstance, List<String>> serverInstanceToSegmentsMap = routingTable.getServerInstanceToSegmentsMap();
if (!serverInstanceToSegmentsMap.isEmpty()) {
realtimeRoutingTable = serverInstanceToSegmentsMap;
} else {
realtimeBrokerRequest = null;
}
} else {
realtimeBrokerRequest = null;
realtimeRoutingTable = null;
}
}
requestStatistics.setNumUnavailableSegments(numUnavailableSegments);

if (offlineBrokerRequest == null && realtimeBrokerRequest == null) {
LOGGER.info("No server found for request {}: {}", requestId, query);
_brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.NO_SERVER_FOUND_EXCEPTIONS, 1);
Expand Down Expand Up @@ -361,15 +376,16 @@ public BrokerResponse handleRequest(JsonNode request, @Nullable RequesterIdentit
if (_queryLogRateLimiter.tryAcquire() || forceLog(brokerResponse, totalTimeMs)) {
// Table name might have been changed (with suffix _OFFLINE/_REALTIME appended)
LOGGER.info("RequestId:{}, table:{}, timeMs:{}, docs:{}/{}, entries:{}/{},"
+ " segments(queried/processed/matched/consuming):{}/{}/{}/{}, consumingFreshnessTimeMs:{},"
+ " segments(queried/processed/matched/consuming/unavailable):{}/{}/{}/{}/{}, consumingFreshnessTimeMs:{},"
+ " servers:{}/{}, groupLimitReached:{}, exceptions:{}, serverStats:{}, query:{}", requestId,
brokerRequest.getQuerySource().getTableName(), totalTimeMs, brokerResponse.getNumDocsScanned(),
brokerResponse.getTotalDocs(), brokerResponse.getNumEntriesScannedInFilter(),
brokerResponse.getNumEntriesScannedPostFilter(), brokerResponse.getNumSegmentsQueried(),
brokerResponse.getNumSegmentsProcessed(), brokerResponse.getNumSegmentsMatched(),
brokerResponse.getNumConsumingSegmentsQueried(), brokerResponse.getMinConsumingFreshnessTimeMs(),
brokerResponse.getNumServersResponded(), brokerResponse.getNumServersQueried(),
brokerResponse.isNumGroupsLimitReached(), brokerResponse.getExceptionsSize(), serverStats.getServerStats(),
brokerResponse.getNumConsumingSegmentsQueried(), numUnavailableSegments,
brokerResponse.getMinConsumingFreshnessTimeMs(), brokerResponse.getNumServersResponded(),
brokerResponse.getNumServersQueried(), brokerResponse.isNumGroupsLimitReached(),
brokerResponse.getExceptionsSize(), serverStats.getServerStats(),
StringUtils.substring(query, 0, _queryLogLength));

// Limit the dropping log message at most once per second.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,29 +418,30 @@ public boolean routingExists(String tableNameWithType) {
}

/**
* Returns the routing table (map from server instance to list of segments hosted by the server) based on the broker
* request, or {@code null} if the routing does not exist.
* Returns the routing table (a map from server instance to list of segments hosted by the server, and a list of
* unavailable segments) based on the broker request, or {@code null} if the routing does not exist.
* <p>NOTE: The broker request should already have the table suffix (_OFFLINE or _REALTIME) appended.
*/
@Nullable
public Map<ServerInstance, List<String>> getRoutingTable(BrokerRequest brokerRequest) {
public RoutingTable getRoutingTable(BrokerRequest brokerRequest) {
String tableNameWithType = brokerRequest.getQuerySource().getTableName();
RoutingEntry routingEntry = _routingEntryMap.get(tableNameWithType);
if (routingEntry == null) {
return null;
}
Map<String, String> segmentToInstanceMap = routingEntry.calculateSegmentToInstanceMap(brokerRequest);
Map<ServerInstance, List<String>> routingTable = new HashMap<>();
InstanceSelector.SelectionResult selectionResult = routingEntry.calculateRouting(brokerRequest);
Map<String, String> segmentToInstanceMap = selectionResult.getSegmentToInstanceMap();
Map<ServerInstance, List<String>> serverInstanceToSegmentsMap = new HashMap<>();
for (Map.Entry<String, String> entry : segmentToInstanceMap.entrySet()) {
ServerInstance serverInstance = _enabledServerInstanceMap.get(entry.getValue());
if (serverInstance != null) {
routingTable.computeIfAbsent(serverInstance, k -> new ArrayList<>()).add(entry.getKey());
serverInstanceToSegmentsMap.computeIfAbsent(serverInstance, k -> new ArrayList<>()).add(entry.getKey());
} else {
// Should not happen in normal case unless encountered unexpected exception when updating routing entries
_brokerMetrics.addMeteredTableValue(tableNameWithType, BrokerMeter.SERVER_MISSING_FOR_ROUTING, 1L);
}
}
return routingTable;
return new RoutingTable(serverInstanceToSegmentsMap, selectionResult.getUnavailableSegments());
}

/**
Expand Down Expand Up @@ -541,15 +542,18 @@ void refreshSegment(String segment) {
}
}

Map<String, String> calculateSegmentToInstanceMap(BrokerRequest brokerRequest) {
InstanceSelector.SelectionResult calculateRouting(BrokerRequest brokerRequest) {
List<String> selectedSegments = _segmentSelector.select(brokerRequest);
if (selectedSegments.isEmpty()) {
return Collections.emptyMap();
if (!selectedSegments.isEmpty()) {
for (SegmentPruner segmentPruner : _segmentPruners) {
selectedSegments = segmentPruner.prune(brokerRequest, selectedSegments);
}
}
for (SegmentPruner segmentPruner : _segmentPruners) {
selectedSegments = segmentPruner.prune(brokerRequest, selectedSegments);
if (!selectedSegments.isEmpty()) {
return _instanceSelector.select(brokerRequest, selectedSegments);
} else {
return new InstanceSelector.SelectionResult(Collections.emptyMap(), Collections.emptyList());
}
return _instanceSelector.select(brokerRequest, selectedSegments);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.broker.routing;

import java.util.List;
import java.util.Map;
import org.apache.pinot.core.transport.ServerInstance;


public class RoutingTable {
private final Map<ServerInstance, List<String>> _serverInstanceToSegmentsMap;
private final List<String> _unavailableSegments;

public RoutingTable(Map<ServerInstance, List<String>> serverInstanceToSegmentsMap, List<String> unavailableSegments) {
_serverInstanceToSegmentsMap = serverInstanceToSegmentsMap;
_unavailableSegments = unavailableSegments;
}

public Map<ServerInstance, List<String>> getServerInstanceToSegmentsMap() {
return _serverInstanceToSegmentsMap;
}

public List<String> getUnavailableSegments() {
return _unavailableSegments;
}
}

0 comments on commit f0ecdf1

Please sign in to comment.