Skip to content

Commit

Permalink
fix(memcached): panic on send on closed channel. (grafana#7817)
Browse files Browse the repository at this point in the history
  • Loading branch information
kavirajk authored and Abuelodelanada committed Dec 1, 2022
1 parent 0dae8e4 commit bc285fd
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* [7759](https://github.com/grafana/loki/pull/7759) **kavirajk**: Improve error message for loading config with ENV variables.
* [7785](https://github.com/grafana/loki/pull/7785) **dannykopping**: Add query blocker for queries and rules.
* [7804](https://github.com/grafana/loki/pull/7804) **sandeepsukhani**: Use grpc for communicating with compactor for query time filtering of data requested for deletion.
* [7817](https://github.com/grafana/loki/pull/7817) **kavirajk**: fix(memcached): panic on send on closed channel.

##### Fixes

Expand Down
29 changes: 23 additions & 6 deletions pkg/storage/chunk/cache/memcached.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ type Memcached struct {
wg sync.WaitGroup
inputCh chan *work

// `closed` tracks if `inputCh` is closed.
// So that any writer goroutine wouldn't write to it after closing `intputCh`
closed chan struct{}

logger log.Logger
}

Expand All @@ -66,6 +70,7 @@ func NewMemcached(cfg MemcachedConfig, client MemcachedClient, name string, reg
ConstLabels: prometheus.Labels{"name": name},
}, []string{"method", "status_code"}),
),
closed: make(chan struct{}),
}

if cfg.BatchSize == 0 || cfg.Parallelism == 0 {
Expand All @@ -77,6 +82,7 @@ func NewMemcached(cfg MemcachedConfig, client MemcachedClient, name string, reg

for i := 0; i < cfg.Parallelism; i++ {
go func() {
defer c.wg.Done()
for input := range c.inputCh {
res := &result{
batchID: input.batchID,
Expand All @@ -85,7 +91,6 @@ func NewMemcached(cfg MemcachedConfig, client MemcachedClient, name string, reg
input.resultCh <- res
}

c.wg.Done()
}()
}

Expand Down Expand Up @@ -164,13 +169,17 @@ func (c *Memcached) fetchKeysBatched(ctx context.Context, keys []string) (found
go func() {
for i, j := 0, 0; i < len(keys); i += batchSize {
batchKeys := keys[i:math.Min(i+batchSize, len(keys))]
c.inputCh <- &work{
select {
case <-c.closed:
return
case c.inputCh <- &work{
keys: batchKeys,
ctx: ctx,
resultCh: resultsCh,
batchID: j,
}:
j++
}
j++
}
}()

Expand All @@ -183,8 +192,16 @@ func (c *Memcached) fetchKeysBatched(ctx context.Context, keys []string) (found
// We need to order found by the input keys order.
results := make([]*result, numResults)
for i := 0; i < numResults; i++ {
result := <-resultsCh
results[result.batchID] = result
// NOTE: Without this check, <-resultCh may wait forever as work is
// interrupted (by other goroutine by calling `Stop()`) and there may not be `numResults`
// values to read from `resultsCh` in that case.
// Also we do close(resultsCh) in the same goroutine so <-resultCh may never return.
select {
case <-c.closed:
return
case result := <-resultsCh:
results[result.batchID] = result
}
}
close(resultsCh)

Expand Down Expand Up @@ -219,13 +236,13 @@ func (c *Memcached) Store(ctx context.Context, keys []string, bufs [][]byte) err
return err
}

// Stop does nothing.
func (c *Memcached) Stop() {
if c.inputCh == nil {
return
}

close(c.inputCh)
close(c.closed)
c.wg.Wait()
}

Expand Down
43 changes: 43 additions & 0 deletions pkg/storage/chunk/cache/memcached_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,59 @@ import (
"context"
"errors"
"fmt"
"sync"
"testing"

"github.com/bradfitz/gomemcache/memcache"
"github.com/go-kit/log"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"

"github.com/grafana/loki/pkg/storage/chunk/cache"
)

func TestMemcached_fetchKeysBatched(t *testing.T) {
// This test checks for two things
// 1. `c.inputCh` is closed when `c.Stop()` is triggered
// 2. Once `c.inputCh` is closed, no one should be writing to `c.inputCh` (thus shouldn't panic with "send to closed channel")

client := newMockMemcache()
m := cache.NewMemcached(cache.MemcachedConfig{
BatchSize: 10,
Parallelism: 5,
}, client, "test", nil, log.NewNopLogger(), "test")

var (
wg sync.WaitGroup
stopped = make(chan struct{}) // chan to make goroutine wait till `m.Stop()` is called.
ctx = context.Background()
)

wg.Add(1)

// This goroutine is going to do some real "work" (writing to `c.inputCh`). We then do `m.Stop()` closing `c.inputCh`. We assert there shouldn't be any panics.
go func() {
defer wg.Done()
<-stopped
assert.NotPanics(t, func() {
keys := []string{"1", "2"}
bufs := [][]byte{[]byte("1"), []byte("2")}
err := m.Store(ctx, keys, bufs)
require.NoError(t, err)

_, _, _, err = m.Fetch(ctx, keys) // will try to write to `intputChan` and shouldn't panic
require.NoError(t, err)

})
}()

m.Stop()
close(stopped)

wg.Wait()
}

func TestMemcached(t *testing.T) {
t.Run("unbatched", func(t *testing.T) {
client := newMockMemcache()
Expand Down

0 comments on commit bc285fd

Please sign in to comment.