-
Notifications
You must be signed in to change notification settings - Fork 9.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
*: allow fully concurrent large read #9384
Conversation
/cc @heyitsanthony hmm... cc'd wrong person initially. haha. |
mvcc/index.go
Outdated
for next { | ||
i := 0 | ||
|
||
ti.RLock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@heyitsanthony This is for reducing locking time. Or read will still block write for 100ms when ranging over the index. Luckily, our index itself is MVCC. So we can release and acquire again as we wish.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be worthwhile to wrap 'f' higher up so it has some way to communicate a continuation so items can be requested in 10k chunks, which would avoid having to break up the lock here-- the implicit loss of atomicity on the index is a little worrying.
Codecov Report
@@ Coverage Diff @@
## master #9384 +/- ##
=========================================
Coverage ? 69.63%
=========================================
Files ? 376
Lines ? 35247
Branches ? 0
=========================================
Hits ? 24545
Misses ? 8942
Partials ? 1760
Continue to review full report at Codecov.
|
@@ -46,7 +63,9 @@ func (tr *storeTxnRead) Range(key, end []byte, ro RangeOptions) (r *RangeResult, | |||
} | |||
|
|||
func (tr *storeTxnRead) End() { | |||
tr.tx.Unlock() | |||
if tr.txlocked { | |||
tr.tx.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tr.txlocked = false
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be better encapsulated? Could tr.tx.Unlock()
and tr.txlocked = false
be coordinated by a function that tr
provides so we have to remember both correctly everywhere? Same for below tr.tx.Lock()
logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some suggestions
mvcc/backend/backend.go
Outdated
// ConcurrentReadTx returns a non-blocking read tx that is suitable for large reads. | ||
// ConcurrentReadTx call itself will not return until the current BatchTx gets committed to | ||
// ensure consistency. | ||
ConcurrentReadTx() ReadTx |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StaleReadTx
? CommittedReadTx
? Concurrent doesn't really tell me what it is (or makes me think it is the thread-safe one)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will rename.
@@ -95,6 +100,8 @@ type backend struct { | |||
|
|||
readTx *readTx | |||
|
|||
concurrentReadTxCh chan chan ReadTx |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May want to avoid O(n) broadcast here; can do O(1) by returning <-chan struct{}
that closes on the next commit, wait on channel close, fetch the latest committed tx, then acquire some semaphore to limit concurrency. Could return a closed channel if the backend is already fully committed, so no need to wait on commit timer.
for i := 0; i < 100; i++ { | ||
select { | ||
case rch := <-b.concurrentReadTxCh: | ||
rtx, err := b.db.Begin(false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lazily create tx following each commit?
mvcc/index.go
Outdated
for next { | ||
i := 0 | ||
|
||
ti.RLock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be worthwhile to wrap 'f' higher up so it has some way to communicate a continuation so items can be requested in 10k chunks, which would avoid having to break up the lock here-- the implicit loss of atomicity on the index is a little worrying.
txlocked bool | ||
|
||
// for creating concurrent read tx when the read is expensive. | ||
b backend.Backend |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks like there should be something like storeTxnReadLimited
that wraps storeTxnRead
@@ -133,6 +157,15 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions | |||
limit = len(revpairs) | |||
} | |||
|
|||
if limit > expensiveReadLimit && !tr.txlocked && tr.ro { // first expensive read in a read only transcation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
e.g., storeTxnReadLimited
could impose a limit+1 on kvindex.Revisions()
. If the result is >limit, load by chunk from the committed tx-- it would split up the rlock hold times and spend less memory on revpairs
f98b76d
to
6f5716d
Compare
Marking |
/cc @jingyih |
@@ -134,6 +158,15 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions | |||
limit = len(revpairs) | |||
} | |||
|
|||
if limit > expensiveReadLimit && !tr.txlocked && tr.ro { // first expensive read in a read only transcation | |||
// too many keys to range. upgrade the read transcation to concurrent read tx. | |||
tr.tx = tr.b.CommittedReadTx() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When tr.tx
is reassigned, does this leak a bolt.Tx
? If it is rolledback, I was unable to figure out where.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Traced through the code. The original tx is manged by the backend and is rolled back as usual when it finishes the current batch iteration. This looks like it works correctly.
@@ -46,7 +63,9 @@ func (tr *storeTxnRead) Range(key, end []byte, ro RangeOptions) (r *RangeResult, | |||
} | |||
|
|||
func (tr *storeTxnRead) End() { | |||
tr.tx.Unlock() | |||
if tr.txlocked { | |||
tr.tx.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be better encapsulated? Could tr.tx.Unlock()
and tr.txlocked = false
be coordinated by a function that tr
provides so we have to remember both correctly everywhere? Same for below tr.tx.Lock()
logic.
@@ -134,6 +158,15 @@ func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions | |||
limit = len(revpairs) | |||
} | |||
|
|||
if limit > expensiveReadLimit && !tr.txlocked && tr.ro { // first expensive read in a read only transcation | |||
// too many keys to range. upgrade the read transcation to concurrent read tx. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As an alternative to upgrading the tr.tx
, would it make sense to have both a tr.tx
and a tr.committedTx
? This would limit what is put in the committed read transaction to only expensive reads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nvm, once we perform a CommittedReadTx
there is no point having two transactions.
@jpbetz it would be great if you can take over this patch :P. Thank you! |
We also have started a design doc explaining the change that we'll send out for review shortly. |
Ran a adhoc benchmark and the results look good. Benchmark setup:
Benchamrk runs:
Benchmark results:
This PR (with large concurrent read optimization)Fast Read Response time histogram: Write Response time histogram: Expensive Response time histogram: Master (baseline)Fast Read Response time histogram: Write Response time histogram: Expensive Response time histogram: Master without expensive reads in mix (regression check baseline)Read Response time histogram: Write Response time histogram: This PR, without any expensive reads in the mix (regression check)Fast Response time histogram: Write Response time histogram: |
This is also configurable. 100ms might not be a good value for large cluster :P. |
@@ -91,10 +91,11 @@ func (ti *treeIndex) keyIndex(keyi *keyIndex) *keyIndex { | |||
func (ti *treeIndex) visit(key, end []byte, f func(ki *keyIndex)) { | |||
keyi, endi := &keyIndex{key: key}, &keyIndex{key: end} | |||
|
|||
ti.RLock() | |||
defer ti.RUnlock() | |||
ti.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jingyih i think this improvement is still needed? can you make a separate PR for this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good.
This PR fully solve the large read blocking issue.
Previously, a large read can block put for seconds. With this patch, there is no blocking at all.
/cc @heyitsanthony