New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: add merge joiner benchmark and bytes usage optimization #31216

Merged
merged 2 commits into from Oct 11, 2018
Jump to file or symbol
Failed to load files and symbols.
+11 −210
Diff settings

Always

Just for now

Next

Revert "Merge #30924"

This reverts commit 472d36f, reversing
changes made to 6b912fa.

Release note: None
  • Loading branch information...
changangela committed Oct 10, 2018
commit 99b8f67152c226e1f5ac279d1a0f7d1c8067ffd1
@@ -17,7 +17,6 @@ package distsqlrun
import (
"context"
"fmt"
"math"
"testing"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
@@ -872,43 +871,4 @@ func BenchmarkMergeJoiner(b *testing.B) {
}
})
}
for _, inputSize := range []int{0, 1 << 2, 1 << 4, 1 << 8, 1 << 12, 1 << 16} {
numRepeats := inputSize
b.Run(fmt.Sprintf("OneSideRepeatInputSize=%d", inputSize), func(b *testing.B) {
leftInput := NewRepeatableRowSource(oneIntCol, makeIntRows(inputSize, numCols))
rightInput := NewRepeatableRowSource(oneIntCol, makeRepeatedIntRows(numRepeats, inputSize, numCols))
b.SetBytes(int64(8 * inputSize * numCols * 2))
b.ResetTimer()
for i := 0; i < b.N; i++ {
m, err := newMergeJoiner(flowCtx, 0 /* processorID */, spec, leftInput, rightInput, post, disposer)
if err != nil {
b.Fatal(err)
}
m.Run(context.Background(), nil /* wg */)
leftInput.Reset()
rightInput.Reset()
}
})
}
for _, inputSize := range []int{0, 1 << 2, 1 << 4, 1 << 8, 1 << 12, 1 << 16} {
numRepeats := int(math.Sqrt(float64(inputSize)))
b.Run(fmt.Sprintf("BothSidesRepeatInputSize=%d", inputSize), func(b *testing.B) {
row := makeRepeatedIntRows(100, numRepeats, numCols)
leftInput := NewRepeatableRowSource(oneIntCol, row)
rightInput := NewRepeatableRowSource(oneIntCol, row)
b.SetBytes(int64(8 * inputSize * numCols * 2))
b.ResetTimer()
for i := 0; i < b.N; i++ {
m, err := newMergeJoiner(flowCtx, 0 /* processorID */, spec, leftInput, rightInput, post, disposer)
if err != nil {
b.Fatal(err)
}
m.Run(context.Background(), nil /* wg */)
leftInput.Reset()
rightInput.Reset()
}
})
}
}
@@ -44,8 +44,7 @@ type streamGroupAccumulator struct {
rowAlloc sqlbase.EncDatumRowAlloc
memAcc mon.BoundAccount
minAllocatedSet bool
memAcc mon.BoundAccount
}
func makeStreamGroupAccumulator(
@@ -118,17 +117,7 @@ func (s *streamGroupAccumulator) nextGroup(
n := len(s.curGroup)
ret := s.curGroup[:n:n]
s.curGroup = s.curGroup[:0]
if !s.minAllocatedSet {
err := s.memAcc.SetMinAllocated(evalCtx.Context, int64(row.Size()))
if err != nil {
return nil, &ProducerMetadata{Err: err}
}
s.minAllocatedSet = true
}
s.memAcc.Empty(evalCtx.Context)
s.memAcc.Clear(evalCtx.Ctx())
s.leftoverRow = row
return ret, nil
}
@@ -311,19 +311,6 @@ func makeRandIntRows(rng *rand.Rand, numRows int, numCols int) sqlbase.EncDatumR
return rows
}
// makeRepeatedIntRows constructs a numRows x numCols table where blocks of n
// consecutive rows have the same value.
func makeRepeatedIntRows(n int, numRows int, numCols int) sqlbase.EncDatumRows {
rows := make(sqlbase.EncDatumRows, numRows)
for i := range rows {
rows[i] = make(sqlbase.EncDatumRow, numCols)
for j := 0; j < numCols; j++ {
rows[i][j] = intEncDatum(i/n + j)
}
}
return rows
}
// runProcessorTest instantiates a processor with the provided spec, runs it
// with the given inputs, and asserts that the outputted rows are as expected.
func runProcessorTest(
@@ -40,7 +40,7 @@ ALTER TABLE kw EXPERIMENTAL_RELOCATE SELECT ARRAY[i], i FROM generate_series(1,
query T
SELECT url FROM [EXPLAIN ANALYZE (DISTSQL) SELECT kv.k, avg(kw.k) FROM kv JOIN kw ON kv.k=kw.k GROUP BY kv.k]
----
https://cockroachdb.github.io/distsqlplan/decode.html#eJzcmFFvo0YUhd_7K9A8pVoqMwN2HKRKbPuUdmNX2eShqqyIhFsHrW2sAXc3WuW_V4Bb2-DMZcJ4ZpI3GwP343Dn-Mz9TlZZApN4CTkJ_yKUuIQRl_jEJQFxyZDMXLLm2QPkecbLU-oLLpNvJPRckq7Wm6I8PHPJQ8aBhN9JkRYLICG5ie8XcA1xAnzgEZckUMTpoiqz5uky5k_Rl3-IS6abInSisjLPvuYOhzgJnfJrXsSLhVOkSwgdLyezZ5dkm2JbcFfn_sl5jPPHwwoRJbPnWXWPOZCQPruvAx-9AP5VEzh7EXx3n4wnwCFp3udDWbjTWUc0uAI-h9-ydAV8QBtvbwF_F2cR_fDjzzydP9Yfd3K4UdlA5TlOQ5bqWEMbl1T3aJ5aH2ydu4y_OUtYZvzJ2eSQhA7znN_TXxoK79TzZV77x_mcwzwuMj6gw9ZLccm0FnD7yj9O_rybTG_uJrefPp1FtBTg8-3VWcTKT79Obyc328_i3jjyTPTYM8l1TdCva8TiMK-fOJ9vr-4uS3n88ts1rBLgVeM4ERtEvjLJdnIMO8ixWR0T5KgWk-ynbD1gw8aZx2uPDmrT7s5DrbJMCfBzqyyTmrFM2t8yve6W6UlYpud09Uvkne_75egkful1WvztB-pjltL9gpgl1WiWr9XrJadk3Rc9s8qtJMDHVrkVM-NW7B24FfLO993q_N24lXS_IG7F3rBb-d0XvW-VW0mAX1jlVr4Zt_L7u1XQ3a2CU21Hkde-b1jjkxhWYGY7Kt01iGf5Gj2rh2Qv2VbQffUHVtmWBPjQKtsKzNhW8A5CFvLO9z3r4t2ELOl-QQwreMMhC5mhXkO-zlY5HD6hzMLeFfZKHSGZQ617nm34A_zBs4eKov46ra6rdu0J5EX9K6u_XK7--ykv4mIPJNsUsJWmrcT2YJLmX_4_VDlA9_pj1fVdcv9UQO7ksCpewUN924A6KkR1ATH9ClGJFmYnaGFx_bHq-j31aLSwBUAdFVLYwkjH6FeINYG8faBDHq95sS98mkC1QbTgxfWpp90QEKChZUDMNoWYfoUC4QJo8DQvHgovZqPD5XOCf4CR3hCFqinmUR6qJOsbCFEIkP4QhXSMfoXO9YaonjzKQ5VkfQMhCgHSH6KQjtGv0Fj4N3AhDlEXKkKUwocR86gPVbIA-iOCGMhAiEKA9CtEW1tzUYraA8LuK7PD1vKgMltuC4DUJx5pAv0RB-saA4Oi1jbbcMhBgAyMjsRA6mOPNIGBYRHSNQbmaa3t-4HP00AcdWhrs2046yBABiZIGJHy_3ZZAgNxByMykHfEg5_X5x3b5jkIkIG8Y92EByNSnn-ku8aAJrYNdRAgA3nHujEPRqQ8_0h3jQFNxJMdiox2qG2zHQTIRN6xbtqDEHXNP0GPdWDdfIedaL7DlMx3FG65ECD9eQcBMpB3MCLrNFKff9oESuY7KvvYtvkOAmQg72BE1mmkPv-0CcTzHYbMd5ht8x0EyEDewYgM_LdbN-_BiE4_A2Oq5juz5x_-DQAA___eamsU
https://cockroachdb.github.io/distsqlplan/decode.html#eJzcmFFvo0YUhd_7K9A8pVoqMwN2HKRKbPuUdmNX2eShqqyIhFsHrW2sAXc3WuW_V4Bb2-DMZcJ4ZpI3GwP343Dn-Mz9TlZZApN4CTkJ_yKUuIQRl_jEJQFxyZDMXLLm2QPkecbLU-oLLpNvJPRckq7Wm6I8PHPJQ8aBhN9JkRYLICG5ie8XcA1xAnzgEZckUMTpoiqz5uky5k_Rl3-IS6abInSisjLPvuYOhzgJnfJrXsSLhVOkSwgdLyezZ5dkm2JbcFfn_sl5jPPHwwoRJbPnWXWPOZCQPruvAx-9AP5VEzh7EXx3n4wnwCFp3udDWbjTWUc0uAI-h9-ydAV8QBtvbwF_F2cR_fDjzzydP9Yfd3K4UdlA5TlOQ5bqWEMbl1T3aJ5aH2ydu4y_OUtYZvzJ2eSQhA7znN_TXxoK79TzZV77x_mcwzwuMj6gw9ZLccm0FnD7yj9O_rybTG_uJrefPp1FtBTg8-3VWcTKT79Obyc328_i3jjyTPTYM8l1TdCva8TiMK-fOJ9vr-4uS3n88ts1rBLgVeM4ERtEvjLJdnIMO8ixWR0T5KgWk-ynbD1gw8aZx2uPDmrT7s5DrbJMCfBzqyyTmrFM2t8yve6W6UlYpud09Uvkne_75egkful1WvztB-pjltL9gpgl1WiWr9XrJadk3Rc9s8qtJMDHVrkVM-NW7B24FfLO993q_N24lXS_IG7F3rBb-d0XvW-VW0mAX1jlVr4Zt_L7u1XQ3a0CCbdC9iG-zGvfN6zxSQwrMLMdle4axLN8jZ7VQ7KXbCvovvoDq2xLAnxolW0FZmwreAchC3nn-5518W5ClnS_IIYVvOGQhcxQryFfZ6scDp9QZmHvCnuljpDModY9zzb8Af7g2UNFUX-dVtdVu_YE8qL-ldVfLlf__ZQXcbEHkm0K2ErTVmJ7MEnzL_8fqhyge_2x6vouuX8qIHdyWBWv4KG-bUAdFaK6gJh-hahEC7MTtLC4_lh1_Z56NFrYAqCOCilsYaRj9CvEmkDePtAhj9e82Bc-TaDaIFrw4vrU024ICNDQMiBmm0JMv0KBcAE0eJoXD4UXs9Hh8jnBP8BIb4hC1RTzKA9VkvUNhCgESH-IQjpGv0LnekNUTx7loUqyvoEQhQDpD1FIx-hXaCz8G7gQh6gLFSFK4cOIedSHKlkA_RFBDGQgRCFA-hWira25KEXtAWH3ldlha3lQmS23BUDqE480gf6Ig3WNgUFRa5ttOOQgQAZGR2Ig9bFHmsDAsAjpGgPztNb2_cDnaSCOOrS12TacdRAgAxMkjEj5f7ssgYG4gxEZyDviwc_r845t8xwEyEDesW7CgxEpzz_SXWNAE9uGOgiQgbxj3ZgHI1Kef6S7xoAm4skORUY71LbZDgJkIu9YN-1BiLrmn6DHOrBuvsNONN9hSuY7CrdcCJD-vIMAGcg7GJF1GqnPP20CJfMdlX1s23wHATKQdzAi6zRSn3_aBOL5DkPmO8y2-Q4CZCDvYEQG_tutm_dgRKefgTFV853Z8w__BgAA__-qGmsT
# This query verifies stats collection for the hashJoiner, distinct and sorter.
query T
@@ -449,10 +449,7 @@ type BoundAccount struct {
// reserved is a small buffer to amortize the cost of growing an account. It
// decreases as used increases (and vice-versa).
reserved int64
// minAllocated is a minimum allocated bytes size that the account reach
// before being able to release bytes.
minAllocated int64
mon *BytesMonitor
mon *BytesMonitor
}
// MakeStandaloneBudget creates a BoundAccount suitable for root
@@ -471,9 +468,7 @@ func (b BoundAccount) Monitor() *BytesMonitor {
return b.mon
}
// Allocated returns the total number of bytes which this account is using or
// reserving
func (b BoundAccount) Allocated() int64 {
func (b BoundAccount) allocated() int64 {
return b.used + b.reserved
}
@@ -482,47 +477,6 @@ func (mm *BytesMonitor) MakeBoundAccount() BoundAccount {
return BoundAccount{mon: mm}
}
// SetMinAllocated allocates a minimum Allocated size (reserved + used) for the
// account. The account would not be able to Shrink() unless it has surpassed
// this minimum Allocated value.
func (b *BoundAccount) SetMinAllocated(ctx context.Context, size int64) error {
if size < 0 {
panic(fmt.Sprintf("%s: cannot set bound account min allocated to a negative value",
b.mon.name))
}
b.minAllocated = b.mon.roundSize(size)
if b.Allocated() < b.minAllocated {
minExtra := b.mon.roundSize(b.minAllocated - b.Allocated())
if err := b.mon.reserveBytes(ctx, minExtra); err != nil {
return err
}
b.reserved += minExtra
} else {
var released int64
if b.used < b.minAllocated {
released = b.Allocated() - b.minAllocated
} else {
released = b.reserved - b.Monitor().poolAllocationSize
}
b.mon.releaseBytes(ctx, released)
b.reserved -= released
}
return nil
}
// Empty shrinks the account to use 0 bytes, while maintaining the minAllocated
// size.
func (b *BoundAccount) Empty(ctx context.Context) {
b.reserved += b.used
b.used = 0
if b.Allocated() > b.minAllocated && b.reserved > b.mon.poolAllocationSize {
b.mon.releaseBytes(ctx, b.reserved-b.mon.poolAllocationSize)
b.reserved = b.mon.poolAllocationSize
}
}
// Clear releases all the cumulated allocations of an account at once and
// primes it for reuse.
func (b *BoundAccount) Clear(ctx context.Context) {
@@ -534,7 +488,6 @@ func (b *BoundAccount) Clear(ctx context.Context) {
b.Close(ctx)
b.used = 0
b.reserved = 0
b.minAllocated = 0
}
// Close releases all the cumulated allocations of an account at once.
@@ -544,7 +497,7 @@ func (b *BoundAccount) Close(ctx context.Context) {
// monitor -- "bytes out of the aether". This needs not be closed.
return
}
if a := b.Allocated(); a > 0 {
if a := b.allocated(); a > 0 {
b.mon.releaseBytes(ctx, a)
}
}
@@ -601,7 +554,7 @@ func (b *BoundAccount) Shrink(ctx context.Context, delta int64) {
}
b.used -= delta
b.reserved += delta
if b.Allocated() > b.minAllocated && b.reserved > b.mon.poolAllocationSize {
if b.reserved >= b.mon.poolAllocationSize {
b.mon.releaseBytes(ctx, b.reserved-b.mon.poolAllocationSize)
b.reserved = b.mon.poolAllocationSize
}
@@ -658,10 +611,6 @@ func (mm *BytesMonitor) reserveBytes(ctx context.Context, x int64) error {
// releaseBytes releases bytes previously successfully registered via
// reserveBytes().
func (mm *BytesMonitor) releaseBytes(ctx context.Context, sz int64) {
if sz == 0 {
return
}
mm.mu.Lock()
defer mm.mu.Unlock()
if mm.mu.curAllocated < sz {
@@ -716,7 +665,7 @@ func (mm *BytesMonitor) roundSize(sz int64) int64 {
func (mm *BytesMonitor) releaseBudget(ctx context.Context) {
// NB: mm.mu need not be locked here, as this is only called from StopMonitor().
if log.V(2) {
log.Infof(ctx, "%s: releasing %d bytes to the pool", mm.name, mm.mu.curBudget.Allocated())
log.Infof(ctx, "%s: releasing %d bytes to the pool", mm.name, mm.mu.curBudget.allocated())
}
mm.mu.curBudget.Clear(ctx)
}
@@ -67,7 +67,7 @@ func TestMemoryAllocations(t *testing.T) {
t.Errorf("account %d went negative: %d", accI, accs[accI].used)
fail = true
}
sum += accs[accI].Allocated()
sum += accs[accI].allocated()
}
if m.mu.curAllocated < 0 {
t.Errorf("monitor current count went negative: %d", m.mu.curAllocated)
@@ -81,7 +81,7 @@ func TestMemoryAllocations(t *testing.T) {
t.Errorf("monitor current budget went negative: %d", m.mu.curBudget.used)
fail = true
}
avail := m.mu.curBudget.Allocated() + m.reserved.used
avail := m.mu.curBudget.allocated() + m.reserved.used
if sum > avail {
t.Errorf("total account sum %d greater than total monitor budget %d", sum, avail)
fail = true
@@ -90,7 +90,7 @@ func TestMemoryAllocations(t *testing.T) {
t.Errorf("pool cur %d exceeds max %d", pool.mu.curAllocated, pool.reserved.used)
fail = true
}
if m.mu.curBudget.Allocated() != pool.mu.curAllocated {
if m.mu.curBudget.allocated() != pool.mu.curAllocated {
t.Errorf("monitor budget %d different from pool cur %d", m.mu.curBudget.used, pool.mu.curAllocated)
fail = true
}
@@ -263,90 +263,6 @@ func TestBoundAccount(t *testing.T) {
t.Fatalf("monitor refused reset + allocation: %v", err)
}
a1.Clear(ctx)
a2.Clear(ctx)
if err := a1.Grow(ctx, 20); err != nil {
t.Fatalf("monitor refused allocation: %v", err)
}
if err := a2.Grow(ctx, 10); err != nil {
t.Fatalf("monitor refused allocation: %v", err)
}
if err := a1.SetMinAllocated(ctx, 60); err != nil {
t.Fatalf("monitor refused min allocation: %v", err)
}
if !(a1.reserved == 40 && a1.used == 20) {
t.Fatalf("bound account incorrect min allocation: used %d and reserved %d", a1.used, a1.reserved)
}
if err := a2.Grow(ctx, 31); err == nil {
t.Fatalf("monitor accepted excessive allocation")
}
a1.Shrink(ctx, 10)
if err := a2.Grow(ctx, 31); err == nil {
t.Fatalf("monitor accepted excessive allocation")
}
a1.Empty(ctx)
if !(a1.used == 0 && a1.reserved == 60) {
t.Fatalf("bound account incorrect empty: used %d and reserved %d", a1.used, a1.reserved)
}
if err := a2.Grow(ctx, 31); err == nil {
t.Fatalf("monitor accepted excessive allocation")
}
if err := a1.SetMinAllocated(ctx, 50); err != nil {
t.Fatalf("monitor refused min allocation: %v", err)
}
if !(a1.used == 0 && a1.reserved == 50) {
t.Fatalf("bound account incorrect min allocation: used %d and reserved %d", a1.used, a1.reserved)
}
if err := a1.Resize(ctx, 0, 20); err != nil {
t.Fatalf("monitor refused allocation: %v", err)
}
if err := a2.ResizeTo(ctx, 50); err != nil {
t.Fatalf("monitor refused allocation: %v", err)
}
if err := a1.SetMinAllocated(ctx, 20); err != nil {
t.Fatalf("monitor refused min allocation: %v", err)
}
if !(a1.used == 20 && a1.reserved == 1) {
t.Fatalf("bound account incorrect min allocation: used %d and reserved %d", a1.used, a1.reserved)
}
if err := a2.ResizeTo(ctx, 79); err != nil {
t.Fatalf("monitor refused allocation: %v", err)
}
a1.Clear(ctx)
a2.Clear(ctx)
if err := a1.Grow(ctx, 70); err != nil {
t.Fatalf("monitor refused allocation: %v", err)
}
a1.Shrink(ctx, 10)
if !(a1.used == 60 && a1.reserved == 1) {
t.Fatalf("bound account incorrect shrink: used %d and reserved %d", a1.used, a1.reserved)
}
if err := a2.Grow(ctx, 39); err != nil {
t.Fatalf("monitor refused allocation: %v", err)
}
a1.Close(ctx)
a2.Close(ctx)
ProTip! Use n and p to navigate between commits in a pull request.