Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use hit counter to track max QPS per minute for broker #4472

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -49,6 +49,7 @@
public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHandler, QueryQuotaManager {
private static final Logger LOGGER = LoggerFactory.getLogger(HelixExternalViewBasedQueryQuotaManager.class);
private static final int TIME_RANGE_IN_SECOND = 1;
private static final int HIT_COUNTER_BUCKETS = 100;

private final AtomicInteger _lastKnownBrokerResourceVersion = new AtomicInteger(-1);
private final Map<String, QueryQuotaConfig> _rateLimiterMap = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -182,7 +183,7 @@ private void createRateLimiter(String tableNameWithType, ExternalView brokerReso

double perBrokerRate = overallRate / onlineCount;
QueryQuotaConfig queryQuotaConfig =
new QueryQuotaConfig(RateLimiter.create(perBrokerRate), new HitCounter(TIME_RANGE_IN_SECOND));
new QueryQuotaConfig(RateLimiter.create(perBrokerRate), new HitCounter(TIME_RANGE_IN_SECOND, HIT_COUNTER_BUCKETS));
_rateLimiterMap.put(tableNameWithType, queryQuotaConfig);
LOGGER.info(
"Rate limiter for table: {} has been initialized. Overall rate: {}. Per-broker rate: {}. Number of online broker instances: {}",
Expand Down
Expand Up @@ -22,23 +22,36 @@
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.concurrent.atomic.AtomicLongArray;


/**
* This hit counter is for counting the number of hits within a range of time. Right now the granularity we use is second.
* In order to save the space and time, we store the number of hits over the last 100 time buckets. When the method hit
* gets called, we put the timestamp to the specified bucket. When the method getHitCount gets called, we sum all the number
* of hits within the last 100 time buckets.
* This hit counter is for counting the number of hits within a range of time.
* Right now the granularity we use is configured the users. Currently two users
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"configured by the users"? I think we can omit this, since the ctor kind of indicates that the granularity is used by whoever calls the constructor. The use of the term "user" here could somehow indicate the user of pinot, or administrator of pinot -- neither of which is true in this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* are there
*
* (1) QueryQuota Manager {@link HelixExternalViewBasedQueryQuotaManager} which
* uses the hit counter for 1sec range of time and 100 time buckets thus
* each bucket covers a time width of 10ms.
*
* (2) {@link org.apache.pinot.broker.requesthandler.BrokerPeakQPSMetricsHandler}
* which uses the hit counter to keep track of peak QPS per minute for broker. The
* hit counter is configured for a time range of 60secs with 600 buckets thus
* each bucket covers a time width of 100ms
*
* In order to save the space and time, we store the number of hits over the
* last configured number of time buckets. When the method hit gets called,
* we put the timestamp to the specified bucket. When the method getHitCount
* gets called, we sum all the numberof hits within the last 100 time buckets.
*/
public class HitCounter {
private static int BUCKET_COUNT = 100;
private final int _bucketCount;
private final int _timeBucketWidthMs;
private final AtomicLongArray _bucketStartTime;
private final AtomicIntegerArray _bucketHitCount;

public HitCounter(int timeRangeInSeconds) {
_timeBucketWidthMs = timeRangeInSeconds * 1000 / BUCKET_COUNT;
_bucketStartTime = new AtomicLongArray(BUCKET_COUNT);
_bucketHitCount = new AtomicIntegerArray(BUCKET_COUNT);
public HitCounter(final int timeRangeInSeconds, final int bucketCount) {
_bucketCount = bucketCount;
_timeBucketWidthMs = timeRangeInSeconds * 1000 / _bucketCount;
_bucketStartTime = new AtomicLongArray(_bucketCount);
_bucketHitCount = new AtomicIntegerArray(_bucketCount);
}

/**
Expand All @@ -51,7 +64,7 @@ public void hit() {
@VisibleForTesting
void hit(long timestamp) {
long numTimeUnits = timestamp / _timeBucketWidthMs;
int index = (int) (numTimeUnits % BUCKET_COUNT);
int index = (int) (numTimeUnits % _bucketCount);
if (_bucketStartTime.get(index) == numTimeUnits) {
_bucketHitCount.incrementAndGet(index);
} else {
Expand All @@ -77,11 +90,154 @@ public int getHitCount() {
int getHitCount(long timestamp) {
long numTimeUnits = timestamp / _timeBucketWidthMs;
int count = 0;
for (int i = 0; i < BUCKET_COUNT; i++) {
if (numTimeUnits - _bucketStartTime.get(i) < BUCKET_COUNT) {
for (int i = 0; i < _bucketCount; i++) {
if (numTimeUnits - _bucketStartTime.get(i) < _bucketCount) {
count += _bucketHitCount.get(i);
}
}
return count;
}

/*
* Explanation of algorithm to keep track of
* max QPS per minute:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"max QPS within a one minute window"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, corrected it

*
* Hit counter is configured with fixed number buckets
* and each bucket covers a fixed time window to
* cover an overall range of 60secs.
*
* Each bucket stores two values -- start time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems that we only need to store the start time for the first bucket, right? the others can be derived. Not sure if we implement it that way, just making a note here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not quite sure what that implies so we should discuss it. Let's take an example:

Start time of bucket b1 is 100 and bucket b2 is 101. Now if there are 6 buckets, then 106 and 107 will also map to b1 and b2 respectively.

When there are only 2 hits recorded for 100 and 101 start times the latter can probably be derived from former. But let's say 3rd hit comes after a window of 1 min and let say start time is 107 then b2's start time will be set to 107 and this is something that can't be derived from b1's start time of 100 (although b1's hit count has now become irrelelevant as it is not out of 60sec window)

* and hit counter.
*
* Example:
* T = timestamp
* BST = bucket start time
* B = bucket index
* WIDTH = bucket window width = 10000ms
* BUCKETS = 6
*
* Every time hitAndUpdateLatestTime() is called,
* we do the following:
*
* T = current timestamp
* BST = T / WIDTH
* B = BST % BUCKETS
*
* if a timestamp T is ending in a bucket B,
* T, T + 0, T + 1, ..... T + 9999ms will all
* end up in the same bucket B
*
* T + WIDTHms will end up in the next bucket
* and so on
*
* T + 60000, T + 60000 + 1 ... T + 60000 + 9999
* will also end up in the same bucket B as T but
* BST would be T + BUCKETS
*
* Now this is how bucket update rules work on every
* call to hitAndUpdateLatesTime()
*
* (1) Get T
* (2) Compute BST and B
* (3) CURR = current BST of B
* (4) update B's start time as BST
* (5) if BST != CURR, it implies we have gone
* over a minute and the current hit counter value of the
* bucket along with CURR can be overwritten --
* so we set B's hit counter to 1 and B's BST to BST.
* else, we increment B's hit counter by 1
*
* note that BST != CURR also implies that
* BST - CURR >= BUCKETS which further indicates we have gone
* over a minute.
*
* getMaxHitCount() -- used to update BrokerGauge with peak
* QPS per minute
*
* (1) go over all buckets
* (2) keep track of 2 max values seen so far in the loop
* (a) max_BST and (b) max_hits
* (3) consider each bucket's BST to see if max_BST
* should be updated
* (4) consider each bucket's hit counter value
* as follows to see if max_hits need to be updated
*
* if abs(bucket's BST - max_BST) <= BUCKETS, it
* tell us that start time of this bucket is within
* the window of 1min, so we should consider it's hit
* counter value to update max_hits
*
* else if bucket's BST > max_BST, it implies
* that bucket's BST is beyond current max_BST
* by more than a minute, so we should simply
* set max_hits as bucket's hit counter value
*
* See the unit test for peakQPS in HitCounterTest
*/

/**
* Currently invoked by {@link org.apache.pinot.broker.requesthandler.BrokerPeakQPSMetricsHandler}
* every time a query comes into {@link org.apache.pinot.broker.requesthandler.BaseBrokerRequestHandler}
*/
public void hitAndUpdateLatestTime() {
hitAndUpdateLatestTime(System.currentTimeMillis());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it helps, you can introduce a protected method called nowMillis() which returns current time in millis. You can then override this method to return whatever time you like in tests. Yet to read through the rest of the review, but just suggesting here. Not sure if it will help us get better test coverage.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For coverage, I am explicitly calling the next method with a custom timestamp to test various values at different intervals.

}

@VisibleForTesting
void hitAndUpdateLatestTime(final long timestamp) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the same as the existing method to hit()

// since we are updating two words, it is not possible
// to do them in a single atomic instruction
// unless there is a way to do multi-word CAS.
// wrapping this under synchronize block will
// hurt performance. currently the caller of
// this BrokerPeakQPSMetricsHandler creates 600 buckets
// so that should avoid extreme contention on a
// single bucket. it should be noted that without
// the use of synchronize, we should expect some
// inconsistency at a bucket level
final long numTimeUnits = timestamp / _timeBucketWidthMs;
final int bucketIndex = (int) (numTimeUnits % _bucketCount);
if (numTimeUnits != _bucketStartTime.get(bucketIndex)) {
_bucketHitCount.set(bucketIndex, 1);
_bucketStartTime.set(bucketIndex, numTimeUnits);
} else {
_bucketHitCount.incrementAndGet(bucketIndex);
}

/* potential alternative approach where we can miss
some updates by simply returning if CAS fails
final long numTimeUnits = timestamp / _timeBucketWidthMs;
final int bucketIndex = (int) (numTimeUnits % _bucketCount);
final int hitCount = _bucketHitCount.get(bucketIndex);
final long startTime = _bucketStartTime.get(bucketIndex);
if (numTimeUnits != startTime) {
if (_bucketStartTime.compareAndSet(bucketIndex, startTime, numTimeUnits)) {
_bucketHitCount.set(bucketIndex, 1);
}
} else {
if (_bucketHitCount.compareAndSet(bucketIndex, hitCount, hitCount + 1)) {
_bucketStartTime.set(bucketIndex, numTimeUnits);
}
}*/
}

/**
* Used to update {@link org.apache.pinot.common.metrics.BrokerGauge}
* @return max hit count in a minute
*/
public long getMaxHitCount() {
long maxWindowStartTime = _bucketStartTime.get(0);
int maxHits = _bucketHitCount.get(0);
for (int bucket = 1; bucket < _bucketCount; ++bucket) {
final long startTime = _bucketStartTime.get(bucket);
final int hits = _bucketHitCount.get(bucket);
if (Math.abs(startTime - maxWindowStartTime) <= _bucketCount) {
maxHits = Math.max(maxHits, hits);
} else if (startTime > maxWindowStartTime) {
maxHits = hits;
}
maxWindowStartTime = Math.max(maxWindowStartTime, startTime);
}
return maxHits;
}
}
Expand Up @@ -87,6 +87,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
private final RateLimiter _numDroppedLogRateLimiter;
private final AtomicInteger _numDroppedLog;

protected final BrokerPeakQPSMetricsHandler _brokerPeakQPSMetricsHandler;

public BaseBrokerRequestHandler(Configuration config, RoutingTable routingTable,
TimeBoundaryService timeBoundaryService, AccessControlFactory accessControlFactory,
QueryQuotaManager queryQuotaManager, BrokerMetrics brokerMetrics) {
Expand All @@ -107,6 +109,8 @@ public BaseBrokerRequestHandler(Configuration config, RoutingTable routingTable,
_numDroppedLog = new AtomicInteger(0);
_numDroppedLogRateLimiter = RateLimiter.create(1.0);

_brokerPeakQPSMetricsHandler = new BrokerPeakQPSMetricsHandler(_brokerMetrics);

LOGGER
.info("Broker Id: {}, timeout: {}ms, query response limit: {}, query log length: {}, query log max rate: {}qps",
_brokerId, _brokerTimeoutMs, _queryResponseLimit, _queryLogLength, _queryLogRateLimiter.getRate());
Expand Down Expand Up @@ -154,6 +158,7 @@ public BrokerResponse handleRequest(JsonNode request, @Nullable RequesterIdentit
_brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.REQUEST_COMPILATION,
compilationEndTimeNs - compilationStartTimeNs);
_brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERIES, 1);
_brokerPeakQPSMetricsHandler.incrementQueryCount();

// Check table access
boolean hasAccess = _accessControlFactory.create().hasAccess(requesterIdentity, brokerRequest);
Expand Down
@@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.broker.requesthandler;

import org.apache.pinot.broker.queryquota.HitCounter;
import org.apache.pinot.common.metrics.BrokerGauge;
import org.apache.pinot.common.metrics.BrokerMetrics;

/**
* Handles the setting of QPS related metrics to the metrics registry
*/
public class BrokerPeakQPSMetricsHandler {
private static final int TIME_RANGE_IN_SECONDS = 60;
// create 600 buckets of 100milliseconds width each to cover a range of 60secs
private static final int HIT_COUNTER_BUCKETS = 600;
private final HitCounter _hitCounter;

BrokerPeakQPSMetricsHandler(final BrokerMetrics brokerMetrics) {
_hitCounter = new HitCounter(TIME_RANGE_IN_SECONDS, HIT_COUNTER_BUCKETS);
brokerMetrics.addCallbackGauge(BrokerGauge.PEAK_QPS_PER_MINUTE.getGaugeName(),
() -> _hitCounter.getMaxHitCount());
}

void incrementQueryCount() {
_hitCounter.hitAndUpdateLatestTime();
}
}