Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ import (
"github.com/pkg/errors"
)

const defaultBatchSize = 1

// TimerFn of function type
type TimerFn func() func() int

Expand All @@ -32,6 +30,7 @@ type Agent struct {
retrier *retrier
stopOnSinkError bool
timerFn TimerFn
sinkBatchSize int
}

// NewAgent returns an Agent with plugin factories.
Expand All @@ -53,6 +52,7 @@ func NewAgent(config Config) *Agent {
logger: config.Logger,
retrier: retrier,
timerFn: timerFn,
sinkBatchSize: config.SinkBatchSize,
}
}

Expand Down Expand Up @@ -313,7 +313,7 @@ func (r *Agent) setupSink(ctx context.Context, sr recipe.PluginRecipe, stream *s

r.logger.Info("Successfully published record", "sink", sr.Name, "recipe", recipeName)
return nil
}, defaultBatchSize)
}, r.sinkBatchSize)

stream.onClose(func() {
if err := sink.Close(); err != nil {
Expand Down
1 change: 1 addition & 0 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,7 @@ func TestAgentRun(t *testing.T) {
Monitor: monitor,
MaxRetries: 2, // need to retry "at least" 2 times since Extractor returns RetryError twice
RetryInitialInterval: 1 * time.Millisecond, // this is to override default retry interval to reduce test time
SinkBatchSize: 1,
})
run := r.Run(ctx, validRecipe)
assert.NoError(t, run.Error)
Expand Down
1 change: 1 addition & 0 deletions agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ type Config struct {
RetryInitialInterval time.Duration
StopOnSinkError bool
TimerFn TimerFn
SinkBatchSize int
}
1 change: 1 addition & 0 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func RunCmd() *cobra.Command {
MaxRetries: cfg.MaxRetries,
RetryInitialInterval: time.Duration(cfg.RetryInitialIntervalSeconds) * time.Second,
StopOnSinkError: cfg.StopOnSinkError,
SinkBatchSize: cfg.SinkBatchSize,
})

recipes, err := recipe.NewReader(lg, pathToConfig).Read(args[0])
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Config struct {
OtelEnabled bool `mapstructure:"OTEL_ENABLED" default:"false"`
OtelCollectorAddr string `mapstructure:"OTEL_COLLECTOR_ADDR" default:"localhost:4317"`
OtelTraceSampleProbability float64 `mapstructure:"OTEL_TRACE_SAMPLE_PROBABILITY" default:"1"`
SinkBatchSize int `mapstructure:"SINK_BATCH_SIZE" default:"1"`
}

func Load(configFile string) (Config, error) {
Expand Down
2 changes: 2 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func TestLoad(t *testing.T) {
MaxRetries: 5,
RetryInitialIntervalSeconds: 5,
StopOnSinkError: false,
SinkBatchSize: 1,
},
},
{
Expand All @@ -46,6 +47,7 @@ func TestLoad(t *testing.T) {
OtelTraceSampleProbability: 1,
MaxRetries: 5,
RetryInitialIntervalSeconds: 5,
SinkBatchSize: 1,
},
expectedErr: "",
},
Expand Down
3 changes: 2 additions & 1 deletion config/meteor.yaml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ STOP_ON_SINK_ERROR: false
APP_NAME: meteor
OTEL_ENABLED: false
OTEL_COLLECTOR_ADDR: "localhost:4317"
OTEL_TRACE_SAMPLE_PROBABILITY: 1
OTEL_TRACE_SAMPLE_PROBABILITY: 1
SINK_BATCH_SIZE: 10
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ require (
go.opentelemetry.io/otel/sdk v1.16.0
go.opentelemetry.io/otel/sdk/metric v0.39.0
golang.org/x/oauth2 v0.6.0
golang.org/x/sync v0.1.0
google.golang.org/api v0.114.0
google.golang.org/genproto v0.0.0-20230330154414-c0448cd141ea
google.golang.org/grpc v1.55.0
Expand Down Expand Up @@ -221,7 +222,6 @@ require (
golang.org/x/crypto v0.8.0 // indirect
golang.org/x/mod v0.9.0 // indirect
golang.org/x/net v0.9.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/term v0.7.0 // indirect
golang.org/x/text v0.9.0 // indirect
Expand Down