Skip to content

Commit

Permalink
new(sdk/plugins/source): add options for setting custom event batch size
Browse files Browse the repository at this point in the history
Signed-off-by: Jason Dellaluce <jasondellaluce@gmail.com>
  • Loading branch information
jasondellaluce authored and poiana committed Jun 13, 2022
1 parent 416a70e commit c7d7e44
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 18 deletions.
4 changes: 2 additions & 2 deletions pkg/sdk/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func BenchmarkEventWritersNext(b *testing.B) {
}

func BenchmarkEventWritersNextBatch(b *testing.B) {
writers, err := NewEventWriters(DefaultBatchSize, int64(DefaultEvtSize))
writers, err := NewEventWriters(int64(DefaultBatchSize), int64(DefaultEvtSize))
if err != nil {
println(err.Error())
b.Fail()
Expand All @@ -88,7 +88,7 @@ func BenchmarkEventWritersNextBatch(b *testing.B) {
}

func TestEventWritersNextBatch(t *testing.T) {
events, err := NewEventWriters(DefaultBatchSize, int64(DefaultEvtSize))
events, err := NewEventWriters(int64(DefaultBatchSize), int64(DefaultEvtSize))
if err != nil {
println(err.Error())
t.Fail()
Expand Down
52 changes: 44 additions & 8 deletions pkg/sdk/plugins/source/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type builtinInstance struct {
timeout time.Duration
timeoutTicker *time.Ticker
eof bool
eventSize uint32
batchSize uint32
}

func (s *builtinInstance) Close() {
Expand Down Expand Up @@ -80,6 +82,22 @@ func WithInstanceClose(close func()) func(*builtinInstance) {
}
}

// WithInstanceBatchSize sets a custom size for the pre-allocated event batch
// used by NextBatch()
func WithInstanceBatchSize(size uint32) func(*builtinInstance) {
return func(s *builtinInstance) {
s.batchSize = size
}
}

// WithInstanceEventSize sets a custom maximum size for each event returned
// by NextBatch()
func WithInstanceEventSize(size uint32) func(*builtinInstance) {
return func(s *builtinInstance) {
s.eventSize = size
}
}

// WithInstanceProgress sets a custom callback for the framework to request
// a the progress state of the opened event stream
func WithInstanceProgress(progress func() (float64, string)) func(*builtinInstance) {
Expand Down Expand Up @@ -125,10 +143,12 @@ func NewPullInstance(pull PullFunc, options ...func(*builtinInstance)) (Instance
res := &pullInstance{
pull: pull,
builtinInstance: builtinInstance{
ctx: context.Background(),
timeout: defaultInstanceTimeout,
shutdown: func() {},
eof: false,
ctx: context.Background(),
timeout: defaultInstanceTimeout,
shutdown: func() {},
eof: false,
batchSize: sdk.DefaultBatchSize,
eventSize: sdk.DefaultEvtSize,
},
}

Expand All @@ -137,6 +157,13 @@ func NewPullInstance(pull PullFunc, options ...func(*builtinInstance)) (Instance
opt(&res.builtinInstance)
}

// create custom-sized event batch
batch, err := sdk.NewEventWriters(int64(res.batchSize), int64(res.eventSize))
if err != nil {
return nil, err
}
res.SetEvents(batch)

// init timer
res.timeoutTicker = time.NewTicker(res.timeout)

Expand Down Expand Up @@ -225,10 +252,12 @@ func NewPushInstance(evtC <-chan PushEvent, options ...func(*builtinInstance)) (
res := &pushInstance{
evtC: evtC,
builtinInstance: builtinInstance{
ctx: context.Background(),
timeout: defaultInstanceTimeout,
shutdown: func() {},
eof: false,
ctx: context.Background(),
timeout: defaultInstanceTimeout,
shutdown: func() {},
eof: false,
batchSize: sdk.DefaultBatchSize,
eventSize: sdk.DefaultEvtSize,
},
}

Expand All @@ -237,6 +266,13 @@ func NewPushInstance(evtC <-chan PushEvent, options ...func(*builtinInstance)) (
opt(&res.builtinInstance)
}

// create custom-sized event batch
batch, err := sdk.NewEventWriters(int64(res.batchSize), int64(res.eventSize))
if err != nil {
return nil, err
}
res.SetEvents(batch)

// init timer
res.timeoutTicker = time.NewTicker(res.timeout)

Expand Down
14 changes: 7 additions & 7 deletions pkg/sdk/plugins/source/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ const (
benchEvtTimeout = 30 * time.Millisecond
)

func benchNextBatch(b *testing.B, inst Instance, batchSize, evtCount int) {
func benchNextBatch(b *testing.B, inst Instance, batchSize uint32, evtCount int) {
batch := &sdk.InMemoryEventWriters{}
for i := 0; i < batchSize; i++ {
for i := uint32(0); i < batchSize; i++ {
batch.Writers = append(batch.Writers, &sdk.InMemoryEventWriter{})
}
b.ResetTimer()
Expand Down Expand Up @@ -144,7 +144,7 @@ func TestPullInstance(t *testing.T) {

// create batch
batch := &sdk.InMemoryEventWriters{}
for i := 0; i < sdk.DefaultBatchSize; i++ {
for i := uint32(0); i < sdk.DefaultBatchSize; i++ {
batch.Writers = append(batch.Writers, &sdk.InMemoryEventWriter{})
}

Expand Down Expand Up @@ -214,7 +214,7 @@ func TestPullInstance(t *testing.T) {
func TestPullInstanceCtxCanceling(t *testing.T) {
// create batch
batch := &sdk.InMemoryEventWriters{}
for i := 0; i < sdk.DefaultBatchSize; i++ {
for i := uint32(0); i < sdk.DefaultBatchSize; i++ {
batch.Writers = append(batch.Writers, &sdk.InMemoryEventWriter{})
}

Expand Down Expand Up @@ -252,7 +252,7 @@ func TestPushInstance(t *testing.T) {

// create batch
batch := &sdk.InMemoryEventWriters{}
for i := 0; i < sdk.DefaultBatchSize; i++ {
for i := uint32(0); i < sdk.DefaultBatchSize; i++ {
batch.Writers = append(batch.Writers, &sdk.InMemoryEventWriter{})
}

Expand Down Expand Up @@ -323,7 +323,7 @@ func TestPushInstance(t *testing.T) {
func TestPushInstanceChanClosing(t *testing.T) {
// create batch
batch := &sdk.InMemoryEventWriters{}
for i := 0; i < sdk.DefaultBatchSize; i++ {
for i := uint32(0); i < sdk.DefaultBatchSize; i++ {
batch.Writers = append(batch.Writers, &sdk.InMemoryEventWriter{})
}

Expand Down Expand Up @@ -356,7 +356,7 @@ func TestPushInstanceChanClosing(t *testing.T) {
func TestPushInstanceCtxCanceling(t *testing.T) {
// create batch
batch := &sdk.InMemoryEventWriters{}
for i := 0; i < sdk.DefaultBatchSize; i++ {
for i := uint32(0); i < sdk.DefaultBatchSize; i++ {
batch.Writers = append(batch.Writers, &sdk.InMemoryEventWriter{})
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sdk/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ const DefaultEvtSize uint32 = 256 * 1024

// DefaultBatchSize is the default number of events in the EventWriters
// interface used by the SDK.
const DefaultBatchSize = 128
const DefaultBatchSize uint32 = 128

// The full set of values that can be returned in the ftype
// member of ss_plugin_extract_field structs.
Expand Down

0 comments on commit c7d7e44

Please sign in to comment.