/
Throttle.h
102 lines (86 loc) · 2.18 KB
/
Throttle.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
#ifndef CEPH_THROTTLE_H
#define CEPH_THROTTLE_H
#include "Mutex.h"
#include "Cond.h"
#include <list>
#include "include/atomic.h"
class CephContext;
class PerfCounters;
class Throttle {
CephContext *cct;
std::string name;
PerfCounters *logger;
ceph::atomic_t count, max;
Mutex lock;
list<Cond*> cond;
bool use_perf;
public:
Throttle(CephContext *cct, std::string n, int64_t m = 0, bool _use_perf = true);
~Throttle();
private:
void _reset_max(int64_t m);
bool _should_wait(int64_t c) {
int64_t m = max.read();
int64_t cur = count.read();
return
m &&
((c <= m && cur + c > m) || // normally stay under max
(c >= m && cur > m)); // except for large c
}
bool _wait(int64_t c);
public:
int64_t get_current() {
return count.read();
}
int64_t get_max() { return max.read(); }
bool wait(int64_t m = 0);
int64_t take(int64_t c = 1);
bool get(int64_t c = 1, int64_t m = 0);
/**
* Returns true if it successfully got the requested amount,
* or false if it would block.
*/
bool get_or_fail(int64_t c = 1);
int64_t put(int64_t c = 1);
};
/**
* @class SimpleThrottle
* This is a simple way to bound the number of concurrent operations.
*
* It tracks the first error encountered, and makes it available
* when all requests are complete. wait_for_ret() should be called
* before the instance is destroyed.
*
* Re-using the same instance isn't safe if you want to check each set
* of operations for errors, since the return value is not reset.
*/
class SimpleThrottle {
public:
SimpleThrottle(uint64_t max, bool ignore_enoent);
~SimpleThrottle();
void start_op();
void end_op(int r);
bool pending_error() const;
int wait_for_ret();
private:
mutable Mutex m_lock;
Cond m_cond;
uint64_t m_max;
uint64_t m_current;
int m_ret;
bool m_ignore_enoent;
};
class C_SimpleThrottle : public Context {
public:
C_SimpleThrottle(SimpleThrottle *throttle) : m_throttle(throttle) {
m_throttle->start_op();
}
virtual void finish(int r) {
m_throttle->end_op(r);
}
private:
SimpleThrottle *m_throttle;
};
#endif