Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions pkg/engine/internal/executor/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,28 @@ type Pipeline interface {
Close()
}

// WrappedPipeline represents a pipeline that wraps another pipeline.
type WrappedPipeline interface {
Pipeline

// Unwrap returns the inner pipeline. Implementations must always return the
// same non-nil value representing the inner pipeline.
Unwrap() Pipeline
}

// Unwrap recursively unwraps the provided pipeline. [WrappedPipeline.Unwrap] is
// invoked for each wrapped pipeline until the first non-wrapped pipeline is
// reached.
func Unwrap(p Pipeline) Pipeline {
for {
wrapped, ok := p.(WrappedPipeline)
if !ok {
return p
}
p = wrapped.Unwrap()
}
}

// RegionProvider is an optional interface that pipelines can implement
// to expose their associated xcap region for statistics collection.
type RegionProvider interface {
Expand Down Expand Up @@ -353,6 +375,11 @@ func (p *observedPipeline) Read(ctx context.Context) (arrow.RecordBatch, error)
return rec, err
}

// Unwrap returns the underlying pipeline.
func (p *observedPipeline) Unwrap() Pipeline {
return p.inner
}

// Close implements Pipeline.
func (p *observedPipeline) Close() {
p.inner.Close()
Expand Down
19 changes: 6 additions & 13 deletions pkg/engine/internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,9 +752,9 @@ func (s *Scheduler) Start(ctx context.Context, tasks ...*workflow.Task) error {

// We set markPending *after* enqueueTasks to give tasks an opportunity to
// immediately transition into running (lowering state transition noise).
err = s.enqueueTasks(trackedTasks)
s.enqueueTasks(trackedTasks)
s.markPending(ctx, trackedTasks)
return err
return nil
}

// findTasks gets a list of [task] from workflow tasks. Returns an error if any
Expand All @@ -781,17 +781,15 @@ func (s *Scheduler) findTasks(tasks []*workflow.Task) ([]*task, error) {
return res, nil
}

func (s *Scheduler) enqueueTasks(tasks []*task) error {
func (s *Scheduler) enqueueTasks(tasks []*task) {
s.assignMut.Lock()
defer s.assignMut.Unlock()

var errs []error

for _, task := range tasks {
// Only allow to enqueue tasks in the initial state (created). This
// prevents tasks from accidentally being run multiple times.
// Ignore tasks that aren't in the initial state (created). This
// prevents us from rejecting tasks which were preemptively canceled by
// callers.
if got, want := task.status.State, workflow.TaskStateCreated; got != want {
errs = append(errs, fmt.Errorf("task %s is in state %s, not %s", task.inner.ULID, got, want))
continue
}

Expand All @@ -802,11 +800,6 @@ func (s *Scheduler) enqueueTasks(tasks []*task) error {
if len(s.readyWorkers) > 0 && len(s.taskQueue) > 0 {
nudgeSemaphore(s.assignSema)
}

if len(errs) > 0 {
return errors.Join(errs...)
}
return nil
}

func (s *Scheduler) markPending(ctx context.Context, tasks []*task) {
Expand Down
5 changes: 2 additions & 3 deletions pkg/engine/internal/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func TestScheduler_Start(t *testing.T) {
require.Equal(t, workflow.TaskStatePending, taskStatus.State, "Started tasks should move to pending state")
})

t.Run("Fails with existing task", func(t *testing.T) {
t.Run("Ignores already started tasks", func(t *testing.T) {
sched := newTestScheduler(t)

var (
Expand All @@ -380,8 +380,7 @@ func TestScheduler_Start(t *testing.T) {
)
require.NoError(t, sched.RegisterManifest(t.Context(), manifest), "Scheduler should accept valid manifest")
require.NoError(t, sched.Start(t.Context(), exampleTask), "Scheduler should start registered task")

require.Error(t, sched.Start(t.Context(), exampleTask), "Scheduler should reject already started tasks")
require.NoError(t, sched.Start(t.Context(), exampleTask), "Scheduler should ignore already started tasks")
})
}

Expand Down
26 changes: 18 additions & 8 deletions pkg/engine/internal/scheduler/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,26 @@ var validTaskTransitions = map[workflow.TaskState][]workflow.TaskState{
func (t *task) setState(m *metrics, newStatus workflow.TaskStatus) (bool, error) {
oldState, newState := t.status.State, newStatus.State

if newState == oldState {
switch {
case newStatus != t.status && newState == oldState:
// State is the same (so we don't have to validate transitions), but
// there's a new payload about the status, so we should store it.
t.status = newStatus
return true, nil

case newState == oldState:
// Status is the exact same, no need to update.
return false, nil
}

validStates := validTaskTransitions[oldState]
if !slices.Contains(validStates, newState) {
return false, fmt.Errorf("invalid state transition from %s to %s", oldState, newState)
default:
validStates := validTaskTransitions[oldState]
if !slices.Contains(validStates, newState) {
return false, fmt.Errorf("invalid state transition from %s to %s", oldState, newState)
}

t.status = newStatus
m.tasksTotal.WithLabelValues(newState.String()).Inc()
return true, nil
}

t.status = newStatus
m.tasksTotal.WithLabelValues(newState.String()).Inc()
return true, nil
}
11 changes: 11 additions & 0 deletions pkg/engine/internal/scheduler/wire/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,13 @@ func (c *protobufCodec) taskStatusFromPbTaskStatus(ts *wirepb.TaskStatus) (workf
status.Capture = capture
}

if ts.ContributingTimeRange != nil {
status.ContributingTimeRange = workflow.ContributingTimeRange{
Timestamp: ts.ContributingTimeRange.Timestamp,
LessThan: ts.ContributingTimeRange.LessThan,
}
}

return status, nil
}

Expand Down Expand Up @@ -571,6 +578,10 @@ func (c *protobufCodec) taskToPbTask(from *workflow.Task) (*wirepb.Task, error)
func (c *protobufCodec) taskStatusToPbTaskStatus(from workflow.TaskStatus) (*wirepb.TaskStatus, error) {
ts := &wirepb.TaskStatus{
State: c.taskStateToPbTaskState(from.State),
ContributingTimeRange: &wirepb.ContributingTimeRange{
Timestamp: from.ContributingTimeRange.Timestamp,
LessThan: from.ContributingTimeRange.LessThan,
},
}

if from.Error != nil {
Expand Down
13 changes: 13 additions & 0 deletions pkg/engine/internal/scheduler/wire/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net"
"net/netip"
"testing"
"time"

"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
Expand Down Expand Up @@ -126,6 +127,18 @@ func TestProtobufCodec_Messages(t *testing.T) {
},
},
},
"TaskStatusMessage with Running state and ContributingTimeRange": {
message: TaskStatusMessage{
ID: taskULID,
Status: workflow.TaskStatus{
State: workflow.TaskStateRunning,
ContributingTimeRange: workflow.ContributingTimeRange{
Timestamp: time.Now().Add(-time.Minute),
LessThan: true,
},
},
},
},
"TaskStatusMessage with Completed state": {
message: TaskStatusMessage{
ID: taskULID,
Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/internal/worker/thread.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (t *thread) runJob(ctx context.Context, job *threadJob) {
// If the root pipeline can be interested in some specific contributing time range
// then subscribe to changes.
// TODO(spiridonov): find a way to subscribe on non-root pipelines.
notifier, ok := pipeline.(executor.ContributingTimeRangeChangedNotifier)
notifier, ok := executor.Unwrap(pipeline).(executor.ContributingTimeRangeChangedNotifier)
if ok {
notifier.SubscribeToTimeRangeChanges(func(ts time.Time, lessThan bool) {
// Send a Running task status update with the current time range
Expand Down
9 changes: 8 additions & 1 deletion pkg/engine/internal/workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/ulid/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/grafana/loki/v3/pkg/engine/internal/executor"
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
Expand All @@ -19,6 +21,11 @@ import (
"github.com/grafana/loki/v3/pkg/xcap"
)

var shortCircuitsTotal = promauto.NewCounter(prometheus.CounterOpts{
Name: "loki_engine_v2_task_short_circuits_total",
Help: "Total number of tasks preemptively canceled by short circuiting.",
})

// Options configures a [Workflow].
type Options struct {
// MaxRunningScanTasks specifies the maximum number of scan tasks that may
Expand Down Expand Up @@ -379,8 +386,8 @@ func (wf *Workflow) handleNonTerminalStateChange(ctx context.Context, task *Task
// TODO(spiridonov): We do not check parents here right now, there is only 1 parent now,
// but in general a task can be canceled only if all its parents are in terminal states OR
// have non-inersecting contributing time range.

tasksToCancel = append(tasksToCancel, child)
shortCircuitsTotal.Inc()
}
}
wf.tasksMut.RUnlock()
Expand Down
Loading