This repository has been archived by the owner on May 10, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 25
/
FlowController.java
88 lines (82 loc) · 2.2 KB
/
FlowController.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
// Copyright (c) 2017, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.
package com.xiaomi.infra.pegasus.tools;
import java.util.concurrent.atomic.AtomicInteger;
/**
* A tool class to support simple flow control.
*
* <p>Usage:
*
* <pre>{@code
* FlowController cntl = new FlowController(qps);
* while (true) {
* cntl.getToken(); // call getToken before operation
* client.set(...);
* }
* cntl.stop();
* }</pre>
*/
public class FlowController {
private final int qps;
private int[] slots;
private int next;
private AtomicInteger token;
private boolean stopped;
/** @param qps_ QPS to control. should > 0. */
public FlowController(int qps_) {
this.qps = qps_;
this.slots = new int[10];
int base = qps / 10;
for (int i = 0; i < 10; i++) {
slots[i] = base;
}
int remain = qps % 10;
for (int i = 0; i < 10 && remain > 0; i++) {
slots[i]++;
remain--;
}
this.next = 1;
this.token = new AtomicInteger(slots[0]);
this.stopped = false;
new Thread(
new Runnable() {
@Override
public void run() {
while (!stopped) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
synchronized (token) {
token.set(slots[next]);
token.notifyAll();
}
next++;
if (next >= 10) next = 0;
}
}
})
.start();
}
/**
* Call getToken() to do flow control when send request. The method will block for some time if
* QPS limit is reached to control the flow.
*/
public void getToken() {
int t = token.decrementAndGet();
while (!stopped && t < 0) {
synchronized (token) {
try {
token.wait(100);
} catch (InterruptedException e) {
}
}
t = token.decrementAndGet();
}
}
/** Should call stop after use done. */
public void stop() {
this.stopped = true;
}
}