Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: promtail race fixes #12656

Merged
merged 8 commits into from
Apr 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions clients/pkg/promtail/client/client_writeto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,14 @@ func TestClientWriter_LogEntriesAreReconstructedAndForwardedCorrectly(t *testing
ch := make(chan api.Entry)
defer close(ch)

var mu sync.Mutex
var receivedEntries []api.Entry

go func() {
for e := range ch {
mu.Lock()
receivedEntries = append(receivedEntries, e)
mu.Unlock()
}
}()

Expand Down Expand Up @@ -72,12 +75,16 @@ func TestClientWriter_LogEntriesAreReconstructedAndForwardedCorrectly(t *testing
}

require.Eventually(t, func() bool {
mu.Lock()
defer mu.Unlock()
return len(receivedEntries) == len(lines)
}, time.Second*10, time.Second)
mu.Lock()
for _, receivedEntry := range receivedEntries {
require.Contains(t, lines, receivedEntry.Line, "entry line was not expected")
require.Equal(t, model.LabelValue("test"), receivedEntry.Labels["app"])
}
mu.Unlock()
}

func TestClientWriter_LogEntriesWithoutMatchingSeriesAreIgnored(t *testing.T) {
Expand Down
24 changes: 24 additions & 0 deletions clients/pkg/promtail/client/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"
"net/url"
"os"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -127,10 +128,13 @@ func TestManager_WALEnabled(t *testing.T) {
require.NoError(t, err)
require.Equal(t, "wal:test-client", manager.Name())

var mu sync.Mutex
receivedRequests := []utils.RemoteWriteRequest{}
go func() {
for req := range rwReceivedReqs {
mu.Lock()
receivedRequests = append(receivedRequests, req)
mu.Unlock()
}
}()

Expand All @@ -155,17 +159,21 @@ func TestManager_WALEnabled(t *testing.T) {
}

require.Eventually(t, func() bool {
mu.Lock()
defer mu.Unlock()
return len(receivedRequests) == totalLines
}, 5*time.Second, time.Second, "timed out waiting for requests to be received")

var seenEntries = map[string]struct{}{}
// assert over rw client received entries
mu.Lock()
for _, req := range receivedRequests {
require.Len(t, req.Request.Streams, 1, "expected 1 stream requests to be received")
require.Len(t, req.Request.Streams[0].Entries, 1, "expected 1 entry in the only stream received per request")
require.Equal(t, `{wal_enabled="true"}`, req.Request.Streams[0].Labels)
seenEntries[req.Request.Streams[0].Entries[0].Line] = struct{}{}
}
mu.Unlock()
require.Len(t, seenEntries, totalLines)
}

Expand All @@ -182,10 +190,13 @@ func TestManager_WALDisabled(t *testing.T) {
require.NoError(t, err)
require.Equal(t, "multi:test-client", manager.Name())

var mu sync.Mutex
receivedRequests := []utils.RemoteWriteRequest{}
go func() {
for req := range rwReceivedReqs {
mu.Lock()
receivedRequests = append(receivedRequests, req)
mu.Unlock()
}
}()

Expand All @@ -209,17 +220,21 @@ func TestManager_WALDisabled(t *testing.T) {
}

require.Eventually(t, func() bool {
mu.Lock()
defer mu.Unlock()
return len(receivedRequests) == totalLines
}, 5*time.Second, time.Second, "timed out waiting for requests to be received")

var seenEntries = map[string]struct{}{}
// assert over rw client received entries
mu.Lock()
for _, req := range receivedRequests {
require.Len(t, req.Request.Streams, 1, "expected 1 stream requests to be received")
require.Len(t, req.Request.Streams[0].Entries, 1, "expected 1 entry in the only stream received per request")
require.Equal(t, `{pizza-flavour="fugazzeta"}`, req.Request.Streams[0].Labels)
seenEntries[req.Request.Streams[0].Entries[0].Line] = struct{}{}
}
mu.Unlock()
require.Len(t, seenEntries, totalLines)
}

Expand Down Expand Up @@ -250,15 +265,20 @@ func TestManager_WALDisabled_MultipleConfigs(t *testing.T) {
require.NoError(t, err)
require.Equal(t, "multi:test-client,test-client-2", manager.Name())

var mu sync.Mutex
receivedRequests := []utils.RemoteWriteRequest{}
ctx, cancel := context.WithCancel(context.Background())
go func(ctx context.Context) {
for {
select {
case req := <-rwReceivedReqs:
mu.Lock()
receivedRequests = append(receivedRequests, req)
mu.Unlock()
case req := <-rwReceivedReqs2:
mu.Lock()
receivedRequests = append(receivedRequests, req)
mu.Unlock()
case <-ctx.Done():
return
}
Expand Down Expand Up @@ -289,16 +309,20 @@ func TestManager_WALDisabled_MultipleConfigs(t *testing.T) {
// times 2 due to clients being run
expectedTotalLines := totalLines * 2
require.Eventually(t, func() bool {
mu.Lock()
defer mu.Unlock()
return len(receivedRequests) == expectedTotalLines
}, 5*time.Second, time.Second, "timed out waiting for requests to be received")

var seenEntries = map[string]struct{}{}
// assert over rw client received entries
mu.Lock()
for _, req := range receivedRequests {
require.Len(t, req.Request.Streams, 1, "expected 1 stream requests to be received")
require.Len(t, req.Request.Streams[0].Entries, 1, "expected 1 entry in the only stream received per request")
seenEntries[fmt.Sprintf("%s-%s", req.Request.Streams[0].Labels, req.Request.Streams[0].Entries[0].Line)] = struct{}{}
}
mu.Unlock()
require.Len(t, seenEntries, expectedTotalLines)
}

Expand Down
26 changes: 24 additions & 2 deletions clients/pkg/promtail/promtail_wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,25 @@ func TestPromtailWithWAL_SingleTenant(t *testing.T) {
// create receive channel and start a collect routine
receivedCh := make(chan utils.RemoteWriteRequest)
received := map[string][]push.Entry{}
var mu sync.Mutex
// Create a channel for log messages
logCh := make(chan string, 100) // Buffered channel to avoid blocking
Comment on lines +63 to +64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why was the logging change required for fixing a test race here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was a race related to the t essentially. The t.Logf("received request: %s", counts) was in a goroutine and was fighting with t.Logf("started test server at URL %s", testServer.URL)


wg.Add(1)
go func() {
defer wg.Done()
for req := range receivedCh {
mu.Lock()
// Add some observability to the requests received in the remote write endpoint
var counts []string
for _, str := range req.Request.Streams {
counts = append(counts, fmt.Sprint(len(str.Entries)))
}
t.Logf("received request: %s", counts)
logCh <- fmt.Sprintf("received request: %s", counts)
for _, stream := range req.Request.Streams {
received[stream.Labels] = append(received[stream.Labels], stream.Entries...)
}
mu.Unlock()
}
}()

Expand Down Expand Up @@ -120,14 +126,23 @@ func TestPromtailWithWAL_SingleTenant(t *testing.T) {
for i := 0; i < entriesToWrite; i++ {
_, err = logsFile.WriteString(fmt.Sprintf("log line # %d\n", i))
if err != nil {
t.Logf("error writing to log file. Err: %s", err.Error())
logCh <- fmt.Sprintf("error writing to log file. Err: %s", err.Error())
}
// not overkill log file
time.Sleep(1 * time.Millisecond)
}
}()

// Goroutine to handle log messages
go func() {
for msg := range logCh {
t.Log(msg)
}
}()

require.Eventually(t, func() bool {
mu.Lock()
defer mu.Unlock()
if seen, ok := received[expectedLabelSet]; ok {
return len(seen) == entriesToWrite
}
Expand Down Expand Up @@ -158,11 +173,13 @@ func TestPromtailWithWAL_MultipleTenants(t *testing.T) {
receivedCh := make(chan utils.RemoteWriteRequest)
// received is a mapping from tenant, string-formatted label set to received entries
received := map[string]map[string][]push.Entry{}
var mu sync.Mutex
var totalReceived = 0
wg.Add(1)
go func() {
defer wg.Done()
for req := range receivedCh {
mu.Lock()
// start received label entries map if first time tenant is seen
if _, ok := received[req.TenantID]; !ok {
received[req.TenantID] = map[string][]push.Entry{}
Expand All @@ -173,6 +190,7 @@ func TestPromtailWithWAL_MultipleTenants(t *testing.T) {
// increment total count
totalReceived += len(stream.Entries)
}
mu.Unlock()
}
}()

Expand Down Expand Up @@ -250,15 +268,19 @@ func TestPromtailWithWAL_MultipleTenants(t *testing.T) {

// wait for all entries to be remote written
require.Eventually(t, func() bool {
mu.Lock()
defer mu.Unlock()
return totalReceived == entriesToWrite
}, time.Second*20, time.Second, "timed out waiting for entries to be remote written")

// assert over received entries
require.Len(t, received, expectedTenantCounts, "not expected tenant count")
mu.Lock()
for tenantID := 0; tenantID < expectedTenantCounts; tenantID++ {
// we should've received at least entriesToWrite / expectedTenantCounts
require.GreaterOrEqual(t, len(received[fmt.Sprint(tenantID)][expectedLabelSet]), entriesToWrite/expectedTenantCounts)
}
mu.Unlock()

pr.Shutdown()
close(receivedCh)
Expand Down
6 changes: 6 additions & 0 deletions clients/pkg/promtail/targets/cloudflare/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cloudflare
import (
"context"
"errors"
"sync"
"time"

"github.com/grafana/cloudflare-go"
Expand All @@ -13,10 +14,13 @@ var ErrorLogpullReceived = errors.New("error logpull received")

type fakeCloudflareClient struct {
mock.Mock
mu sync.Mutex
}

func (f *fakeCloudflareClient) CallCount() int {
var actualCalls int
f.mu.Lock()
defer f.mu.Unlock()
for _, call := range f.Calls {
if call.Method == "LogpullReceived" {
actualCalls++
Expand Down Expand Up @@ -59,7 +63,9 @@ func newFakeCloudflareClient() *fakeCloudflareClient {
}

func (f *fakeCloudflareClient) LogpullReceived(ctx context.Context, start, end time.Time) (cloudflare.LogpullReceivedIterator, error) {
f.mu.Lock()
r := f.Called(ctx, start, end)
f.mu.Unlock()
if r.Get(0) != nil {
it := r.Get(0).(cloudflare.LogpullReceivedIterator)
if it.Err() == ErrorLogpullReceived {
Expand Down
Loading
Loading