Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,23 @@ func (r *Agent) Run(ctx context.Context, recipe recipe.Recipe) (run Run) {
}
stream.Shutdown()
}()
if err := runExtractor(); err != nil {

retryNotification := func(e error, d time.Duration) {
r.logger.Warn(
fmt.Sprintf("retrying extractor in %s", d),
"retry_delay_ms", d.Milliseconds(),
"extractor", recipe.Source.Name,
"error", e,
)
}

err := r.retrier.retry(
ctx,
func() error { return runExtractor() },
retryNotification,
)

if err != nil {
run.Error = errors.Wrap(err, "failed to run extractor")
}
}()
Expand Down
108 changes: 107 additions & 1 deletion agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,58 @@ func TestAgentRun(t *testing.T) {
ProcessorFactory: pf,
SinkFactory: sf,
Logger: utils.Logger,
StopOnSinkError: true,
StopOnSinkError: false,
Monitor: monitor,
})

run := r.Run(ctx, validRecipe)
assert.False(t, run.Success)
assert.Error(t, run.Error)
})

t.Run("should return error when sink fails to close", func(t *testing.T) {
data := []models.Record{
models.NewRecord(&v1beta2.Asset{}),
}

extr := mocks.NewExtractor()
extr.SetEmit(data)
extr.On("Init", mockCtx, buildPluginConfig(validRecipe.Source)).Return(nil).Once()
extr.On("Extract", mockCtx, mock.AnythingOfType("plugins.Emit")).Return(nil)
ef := registry.NewExtractorFactory()
if err := ef.Register("test-extractor", newExtractor(extr)); err != nil {
t.Fatal(err)
}

proc := mocks.NewProcessor()
proc.On("Init", mockCtx, buildPluginConfig(validRecipe.Processors[0])).Return(nil).Once()
proc.On("Process", mockCtx, data[0]).Return(data[0], nil)
defer proc.AssertExpectations(t)
pf := registry.NewProcessorFactory()
if err := pf.Register("test-processor", newProcessor(proc)); err != nil {
t.Fatal(err)
}

sink := mocks.NewSink()
sink.On("Init", mockCtx, buildPluginConfig(validRecipe.Sinks[0])).Return(nil).Once()
sink.On("Sink", mockCtx, data).Return(nil)
sink.On("Close").Return(errors.New("some error"))
defer sink.AssertExpectations(t)
sf := registry.NewSinkFactory()
if err := sf.Register("test-sink", newSink(sink)); err != nil {
t.Fatal(err)
}

monitor := newMockMonitor()
monitor.On("RecordRun", mock.AnythingOfType("agent.Run")).Once()
defer monitor.AssertExpectations(t)

r := agent.NewAgent(agent.Config{
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Logger: utils.Logger,
StopOnSinkError: false,
Monitor: monitor,
})

Expand Down Expand Up @@ -670,6 +721,61 @@ func TestAgentRun(t *testing.T) {
assert.Equal(t, validRecipe, run.Recipe)
})

t.Run("should retry if extractor returns retry error", func(t *testing.T) {
err := errors.New("some-error")
data := []models.Record{
models.NewRecord(&v1beta2.Asset{}),
}

extr := mocks.NewExtractor()
extr.SetEmit(data)
extr.On("Init", mockCtx, buildPluginConfig(validRecipe.Source)).Return(nil).Once()
extr.On("Extract", mockCtx, mock.AnythingOfType("plugins.Emit")).Return(plugins.NewRetryError(err)).Twice()
extr.On("Extract", mockCtx, mock.AnythingOfType("plugins.Emit")).Return(nil)
ef := registry.NewExtractorFactory()
if err := ef.Register("test-extractor", newExtractor(extr)); err != nil {
t.Fatal(err)
}

proc := mocks.NewProcessor()
proc.On("Init", mockCtx, buildPluginConfig(validRecipe.Processors[0])).Return(nil).Once()
proc.On("Process", mockCtx, data[0]).Return(data[0], nil)
defer proc.AssertExpectations(t)
pf := registry.NewProcessorFactory()
if err := pf.Register("test-processor", newProcessor(proc)); err != nil {
t.Fatal(err)
}

sink := mocks.NewSink()
sink.On("Init", mockCtx, buildPluginConfig(validRecipe.Sinks[0])).Return(nil).Once()
sink.On("Sink", mockCtx, data).Return(nil)
sink.On("Close").Return(nil)
defer sink.AssertExpectations(t)

sf := registry.NewSinkFactory()
if err := sf.Register("test-sink", newSink(sink)); err != nil {
t.Fatal(err)
}

monitor := newMockMonitor()
monitor.On("RecordRun", mock.AnythingOfType("agent.Run")).Once()
monitor.On("RecordPlugin", mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("string"), mock.AnythingOfType("bool"))
defer monitor.AssertExpectations(t)

r := agent.NewAgent(agent.Config{
ExtractorFactory: ef,
ProcessorFactory: pf,
SinkFactory: sf,
Logger: utils.Logger,
Monitor: monitor,
MaxRetries: 2, // need to retry "at least" 2 times since Extractor returns RetryError twice
RetryInitialInterval: 1 * time.Millisecond, // this is to override default retry interval to reduce test time
})
run := r.Run(ctx, validRecipe)
assert.NoError(t, run.Error)
assert.Equal(t, validRecipe, run.Recipe)
})

t.Run("should retry if sink returns retry error", func(t *testing.T) {
err := errors.New("some-error")
data := []models.Record{
Expand Down
5 changes: 4 additions & 1 deletion plugins/extractors/caramlstore/caramlstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ func (e *Extractor) Init(ctx context.Context, config plugins.Config) error {

// Extract checks if the table is valid and extracts the table schema
func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error {
defer e.client.Close()

projects, err := e.client.Projects(ctx)
if err != nil {
Expand Down Expand Up @@ -136,6 +135,10 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error {
return nil
}

func (e *Extractor) Close() error {
return e.client.Close()
}

func shouldRetry(err error) bool {
switch utils.StatusCode(err) {
case codes.Canceled,
Expand Down