Skip to content

Commit

Permalink
review
Browse files Browse the repository at this point in the history
  • Loading branch information
andresmgot committed May 15, 2024
1 parent cfcea0b commit 562799f
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 13 deletions.
23 changes: 14 additions & 9 deletions experimental/concurrent/concurrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,33 @@ type splitResponse struct {
refID string
}

// SingleQuery is a single query to be executed concurrently
// Query is a single query to be executed concurrently
// Index is the index of the query in the request
// PluginContext is the plugin context
// Headers are the HTTP headers of the request
// DataQuery is the query to be executed
type SingleQuery struct {
type Query struct {
Index int
PluginContext backend.PluginContext
Headers http.Header
DataQuery backend.DataQuery
}

// SingleQueryData is the function that plugins need to define to execute a single query
type SingleQueryData func(ctx context.Context, query SingleQuery) (res backend.DataResponse)
// QueryDataFunc is the function that plugins need to define to execute a single query
type QueryDataFunc func(ctx context.Context, query Query) (res backend.DataResponse)

// QueryData executes all queries from a request concurrently, using the provided function to execute each query.
// The concurrency limit is set by the limit parameter. A negative limit means no limit.
func QueryData(ctx context.Context, req *backend.QueryDataRequest, fn SingleQueryData, limit int) (*backend.QueryDataResponse, error) {
headers := req.GetHTTPHeaders()
func QueryData(ctx context.Context, req *backend.QueryDataRequest, fn QueryDataFunc, limit int) (*backend.QueryDataResponse, error) {
ctxLogger := log.DefaultLogger.FromContext(ctx)
ctxLogger.Debug("Concurrent QueryData", "queries", len(req.Queries))

if limit <= 0 || limit > 10 {
ctxLogger.Warn("QueryData concurrency limit is not set or is too high, setting to 10", "limit", limit)
limit = 10
}

headers := req.GetHTTPHeaders()
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(limit)
rchan := make(chan splitResponse, len(req.Queries))
Expand All @@ -50,10 +55,10 @@ func QueryData(ctx context.Context, req *backend.QueryDataRequest, fn SingleQuer
} else if theErrString, ok := r.(string); ok {
err = fmt.Errorf(theErrString)
} else {
err = fmt.Errorf("unexpected error - %v", err)
err = fmt.Errorf("unexpected error - %w", err)
}
// Due to the panic, there is no valid response for any query for this datasource. Append an error for each one.
rchan <- splitResponse{backend.ErrDataResponse(backend.StatusInternal, err.Error()), q.RefID}
rchan <- splitResponse{backend.DataResponse{Status: backend.StatusInternal, Error: err}, q.RefID}
}
}

Expand All @@ -66,7 +71,7 @@ func QueryData(ctx context.Context, req *backend.QueryDataRequest, fn SingleQuer
defer recoveryFn(iQuery)

ctxLogger.Debug("Starting single query", "query", iQuery.RefID)
res := fn(ctx, SingleQuery{
res := fn(ctx, Query{
Index: iIndex,
PluginContext: req.PluginContext,
Headers: headers,
Expand Down
8 changes: 4 additions & 4 deletions experimental/concurrent/concurrent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
func Test_QueryData(t *testing.T) {
t.Run("executes all queries concurrently", func(t *testing.T) {
secondExecuted := make(chan bool, 1)
fn := func(_ context.Context, query SingleQuery) (res backend.DataResponse) {
fn := func(_ context.Context, query Query) (res backend.DataResponse) {
if query.Index == 0 {
// Blocks until the second query is executed
<-secondExecuted
Expand All @@ -33,7 +33,7 @@ func Test_QueryData(t *testing.T) {
t.Run("executes all queries concurrently with limit", func(t *testing.T) {
secondExecutedChannel := make(chan bool, 1)
secondExecuted := false
fn := func(_ context.Context, query SingleQuery) (res backend.DataResponse) {
fn := func(_ context.Context, query Query) (res backend.DataResponse) {
if query.Index == 0 {
// Blocks until the second query is executed
secondExecuted = <-secondExecutedChannel
Expand All @@ -53,13 +53,13 @@ func Test_QueryData(t *testing.T) {
req := &backend.QueryDataRequest{}
req.Queries = append(req.Queries, backend.DataQuery{RefID: "A"})
req.Queries = append(req.Queries, backend.DataQuery{RefID: "B"})
// Limit to 1 query
// Limit to 2 queries
_, err := QueryData(context.Background(), req, fn, 2)
require.NoError(t, err)
})

t.Run("handles panics", func(t *testing.T) {
fn := func(_ context.Context, query SingleQuery) (res backend.DataResponse) {
fn := func(_ context.Context, query Query) (res backend.DataResponse) {
if query.Index == 0 {
panic("panic")
}
Expand Down

0 comments on commit 562799f

Please sign in to comment.