Skip to content

Commit

Permalink
lint n' stuff
Browse files Browse the repository at this point in the history
Signed-off-by: Joe Elliott <number101010@gmail.com>
  • Loading branch information
joe-elliott committed May 24, 2024
1 parent 343fc7a commit d1dc786
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 16 deletions.
15 changes: 5 additions & 10 deletions modules/frontend/combiner/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,13 @@ func TestGenericCombinerDoesntRace(t *testing.T) {
}
}
go concurrent(func() {
err := combiner.AddResponse(newTestResponse(t))
require.NoError(t, err)
_ = combiner.AddResponse(newTestResponse(t))
})

go concurrent(func() {
// this test is going to add a failed response which cuts off certain code paths. just wait a bit to test the other paths
time.Sleep(10 * time.Millisecond)
err := combiner.AddResponse(newFailedTestResponse())
require.NoError(t, err)
_ = combiner.AddResponse(newFailedTestResponse())
})

go concurrent(func() {
Expand All @@ -144,18 +142,15 @@ func TestGenericCombinerDoesntRace(t *testing.T) {
})

go concurrent(func() {
_, err := combiner.HTTPFinal()
require.NoError(t, err)
_, _ = combiner.HTTPFinal()
})

go concurrent(func() {
_, err := combiner.GRPCFinal()
require.NoError(t, err)
_, _ = combiner.GRPCFinal()
})

go concurrent(func() {
_, err := combiner.GRPCDiff()
require.NoError(t, err)
_, _ = combiner.GRPCDiff()
})

time.Sleep(100 * time.Millisecond)
Expand Down
2 changes: 1 addition & 1 deletion modules/frontend/pipeline/collector_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (c GRPCCollector[T]) RoundTrip(req *http.Request) error {

lastUpdate := time.Now()

err = addNextAsync(ctx, c.consumers, resps, c.next, c.combiner, func() error {
err = addNextAsync(ctx, c.consumers, resps, c.combiner, func() error {
// check if we should send an update
if time.Since(lastUpdate) > 500*time.Millisecond {
lastUpdate = time.Now()
Expand Down
4 changes: 2 additions & 2 deletions modules/frontend/pipeline/collector_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@ func (r httpCollector) RoundTrip(req *http.Request) (*http.Response, error) {
return nil, err
}

err = addNextAsync(ctx, r.consumers, resps, r.next, r.combiner, nil)
err = addNextAsync(ctx, r.consumers, resps, r.combiner, nil)
if err != nil {
return nil, err
}

return r.combiner.HTTPFinal()
}

func addNextAsync(ctx context.Context, consumers int, resps Responses[combiner.PipelineResponse], next AsyncRoundTripper[combiner.PipelineResponse], c combiner.Combiner, callback func() error) error {
func addNextAsync(ctx context.Context, consumers int, resps Responses[combiner.PipelineResponse], c combiner.Combiner, callback func() error) error {
respChan := make(chan combiner.PipelineResponse)
overallErr := atomic.Error{}
wg := sync.WaitGroup{}
Expand Down
6 changes: 3 additions & 3 deletions modules/frontend/pipeline/responses_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func TestAsyncResponsesDoesNotLeak(t *testing.T) {
bridge := &pipelineBridge{
next: tc.finalRT(cancel),
}
grpcCollector := NewGRPCCollector[*tempopb.SearchResponse](sharder{next: bridge}, 0, combiner.NewTypedSearch(0), func(sr *tempopb.SearchResponse) error { return nil })
grpcCollector := NewGRPCCollector[*tempopb.SearchResponse](sharder{next: bridge}, 0, combiner.NewTypedSearch(0), func(_ *tempopb.SearchResponse) error { return nil })

_ = grpcCollector.RoundTrip(req)

Expand All @@ -350,7 +350,7 @@ func TestAsyncResponsesDoesNotLeak(t *testing.T) {
}

s := sharder{next: sharder{next: bridge}, funcSharder: true}
grpcCollector := NewGRPCCollector[*tempopb.SearchResponse](s, 0, combiner.NewTypedSearch(0), func(sr *tempopb.SearchResponse) error { return nil })
grpcCollector := NewGRPCCollector[*tempopb.SearchResponse](s, 0, combiner.NewTypedSearch(0), func(_ *tempopb.SearchResponse) error { return nil })

_ = grpcCollector.RoundTrip(req)

Expand All @@ -373,7 +373,7 @@ func TestAsyncResponsesDoesNotLeak(t *testing.T) {
}

s := sharder{next: sharder{next: bridge, funcSharder: true}}
grpcCollector := NewGRPCCollector[*tempopb.SearchResponse](s, 0, combiner.NewTypedSearch(0), func(sr *tempopb.SearchResponse) error { return nil })
grpcCollector := NewGRPCCollector[*tempopb.SearchResponse](s, 0, combiner.NewTypedSearch(0), func(_ *tempopb.SearchResponse) error { return nil })

_ = grpcCollector.RoundTrip(req)

Expand Down

0 comments on commit d1dc786

Please sign in to comment.