Skip to content

Commit

Permalink
EVG-7292 make event processing limit configurable (#3307)
Browse files Browse the repository at this point in the history
  • Loading branch information
ybrill committed Mar 19, 2020
1 parent c45dfbf commit de3ec33
Show file tree
Hide file tree
Showing 13 changed files with 83 additions and 32 deletions.
15 changes: 13 additions & 2 deletions config_alerts_notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,17 @@ import (
"go.mongodb.org/mongo-driver/mongo/options"
)

const (
DefaultEventProcessingLimit = 1000
DefaultBufferIntervalSeconds = 60
DefaultBufferTargetPerInterval = 20
)

// NotifyConfig hold logging and email settings for the notify package.
type NotifyConfig struct {
BufferTargetPerInterval int `bson:"buffer_target_per_interval" json:"buffer_target_per_interval" yaml:"buffer_target_per_interval"`
BufferIntervalSeconds int `bson:"buffer_interval_seconds" json:"buffer_interval_seconds" yaml:"buffer_interval_seconds"`
EventProcessingLimit int `bson:"event_processing_limit" json:"event_processing_limit" yaml:"event_processing_limit"`
SMTP SMTPConfig `bson:"smtp" json:"smtp" yaml:"smtp"`
}

Expand Down Expand Up @@ -50,10 +57,10 @@ func (c *NotifyConfig) Set() error {

func (c *NotifyConfig) ValidateAndDefault() error {
if c.BufferIntervalSeconds <= 0 {
c.BufferIntervalSeconds = 60
c.BufferIntervalSeconds = DefaultBufferIntervalSeconds
}
if c.BufferTargetPerInterval <= 0 {
c.BufferTargetPerInterval = 20
c.BufferTargetPerInterval = DefaultBufferTargetPerInterval
}

// cap to 100 jobs/sec per server
Expand All @@ -63,6 +70,10 @@ func (c *NotifyConfig) ValidateAndDefault() error {

}

if c.EventProcessingLimit <= 0 {
c.EventProcessingLimit = DefaultEventProcessingLimit
}

return nil
}

Expand Down
1 change: 1 addition & 0 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,7 @@ func (s *AdminSuite) TestNotifyConfig() {

config.BufferIntervalSeconds = 1
config.BufferTargetPerInterval = 2
config.EventProcessingLimit = 1
s.NoError(config.Set())

settings, err = GetConfig()
Expand Down
5 changes: 3 additions & 2 deletions model/commitqueue/dequeue_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package commitqueue
import (
"testing"

"github.com/evergreen-ci/evergreen"
"github.com/evergreen-ci/evergreen/db"
"github.com/evergreen-ci/evergreen/model/event"
"github.com/mongodb/grip/level"
Expand Down Expand Up @@ -46,7 +47,7 @@ func TestCommitQueueDequeueLogger(t *testing.T) {
assert.NoError(err)
assert.False(q.Processing)
assert.Equal("2", q.Queue[0].Issue)
eventLog, err := event.FindUnprocessedEvents()
eventLog, err := event.FindUnprocessedEvents(evergreen.DefaultEventProcessingLimit)
assert.NoError(err)
assert.Len(eventLog, 1)

Expand All @@ -57,7 +58,7 @@ func TestCommitQueueDequeueLogger(t *testing.T) {
})
assert.Error(dequeueSender.doSend(msg))
// no additional events are logged
eventLog, err = event.FindUnprocessedEvents()
eventLog, err = event.FindUnprocessedEvents(evergreen.DefaultEventProcessingLimit)
assert.NoError(err)
assert.Len(eventLog, 1)
}
4 changes: 2 additions & 2 deletions model/event/event_finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ func Find(coll string, query db.Q) ([]EventLogEntry, error) {

// FindUnprocessedEvents returns all unprocessed events in AllLogCollection.
// Events are considered unprocessed if their "processed_at" time IsZero
func FindUnprocessedEvents() ([]EventLogEntry, error) {
func FindUnprocessedEvents(limit int) ([]EventLogEntry, error) {
out := []EventLogEntry{}
err := db.FindAllQ(AllLogCollection, db.Query(unprocessedEvents()).Limit(1000), &out)
err := db.FindAllQ(AllLogCollection, db.Query(unprocessedEvents()).Sort([]string{TimestampKey}).Limit(limit), &out)
if err != nil {
return nil, errors.Wrap(err, "failed to fetch unprocssed events")
}
Expand Down
3 changes: 2 additions & 1 deletion model/event/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/evergreen-ci/evergreen"
"github.com/evergreen-ci/evergreen/db"
amboyRegistry "github.com/mongodb/amboy/registry"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -323,7 +324,7 @@ func (s *eventSuite) TestFindUnprocessedEvents() {
for i := range data {
s.NoError(db.Insert(AllLogCollection, data[i]))
}
events, err := FindUnprocessedEvents()
events, err := FindUnprocessedEvents(evergreen.DefaultEventProcessingLimit)
s.NoError(err)
s.Len(events, 1)

Expand Down
6 changes: 3 additions & 3 deletions model/task_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2069,7 +2069,7 @@ func TestMarkEndRequiresAllTasksToFinishToUpdateBuildStatus(t *testing.T) {
assert.NoError(err)
assert.True(complete)

e, err := event.FindUnprocessedEvents()
e, err := event.FindUnprocessedEvents(evergreen.DefaultEventProcessingLimit)
assert.NoError(err)
assert.Len(e, 7)
}
Expand Down Expand Up @@ -2158,7 +2158,7 @@ func TestMarkEndRequiresAllTasksToFinishToUpdateBuildStatusWithCompileTask(t *te
assert.True(b.IsFinished())
assert.True(b.Tasks[1].Blocked)

e, err := event.FindUnprocessedEvents()
e, err := event.FindUnprocessedEvents(evergreen.DefaultEventProcessingLimit)
assert.NoError(err)
assert.Len(e, 3)
}
Expand Down Expand Up @@ -2246,7 +2246,7 @@ func TestMarkEndWithBlockedDependenciesTriggersNotifications(t *testing.T) {
assert.NoError(err)
assert.True(b.IsFinished())

e, err := event.FindUnprocessedEvents()
e, err := event.FindUnprocessedEvents(evergreen.DefaultEventProcessingLimit)
assert.NoError(err)
assert.Len(e, 3)
}
Expand Down
3 changes: 3 additions & 0 deletions rest/model/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -1056,6 +1056,7 @@ func (a *APILogBuffering) ToService() (interface{}, error) {
type APINotifyConfig struct {
BufferTargetPerInterval int `json:"buffer_target_per_interval"`
BufferIntervalSeconds int `json:"buffer_interval_seconds"`
EventProcessingLimit int `json:"event_processing_limit"`
SMTP APISMTPConfig `json:"smtp"`
}

Expand All @@ -1068,6 +1069,7 @@ func (a *APINotifyConfig) BuildFromService(h interface{}) error {
}
a.BufferTargetPerInterval = v.BufferTargetPerInterval
a.BufferIntervalSeconds = v.BufferIntervalSeconds
a.EventProcessingLimit = v.EventProcessingLimit
default:
return errors.Errorf("%T is not a supported type", h)
}
Expand All @@ -1082,6 +1084,7 @@ func (a *APINotifyConfig) ToService() (interface{}, error) {
return evergreen.NotifyConfig{
BufferTargetPerInterval: a.BufferTargetPerInterval,
BufferIntervalSeconds: a.BufferIntervalSeconds,
EventProcessingLimit: a.EventProcessingLimit,
SMTP: smtp.(evergreen.SMTPConfig),
}, nil
}
Expand Down
6 changes: 5 additions & 1 deletion service/templates/admin.html
Original file line number Diff line number Diff line change
Expand Up @@ -1228,7 +1228,7 @@ <h2 class="modal-title">[[modalTitle]]</h2>
</section>

<section layout="row" flex>
<md-card flex=50 id="notifications" style="height:180px">
<md-card flex=50 id="notifications">
<md-card-title>
<md-card-title-text>
<span>Notifications Config</span>
Expand All @@ -1247,6 +1247,10 @@ <h2 class="modal-title">[[modalTitle]]</h2>
<label>Notifications Rate Limit Time Interval (seconds)</label>
<input type="number" ng-model="Settings.notify.buffer_interval_seconds">
</md-input-container>
<md-input-container class="control" style="width:45%;">
<label>Event Processing Limit (events per job)</label>
<input type="number" ng-model="Settings.notify.event_processing_limit">
</md-input-container>
</md-card-content>
</md-card>
<md-card flex=50 id="triggers" style="height:180px">
Expand Down
4 changes: 2 additions & 2 deletions units/crons.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func PopulateHostMonitoring(env evergreen.Environment) amboy.QueueOperation {
}
}

func PopulateEventAlertProcessing(parts int) amboy.QueueOperation {
func PopulateEventAlertProcessing(env evergreen.Environment, parts int) amboy.QueueOperation {
return func(ctx context.Context, queue amboy.Queue) error {
flags, err := evergreen.GetServiceFlags()
if err != nil {
Expand All @@ -215,7 +215,7 @@ func PopulateEventAlertProcessing(parts int) amboy.QueueOperation {

ts := util.RoundPartOfHour(parts).Format(TSFormat)

return errors.Wrap(queue.Put(ctx, NewEventMetaJob(queue, ts)), "failed to queue event-metajob")
return errors.Wrap(queue.Put(ctx, NewEventMetaJob(env, queue, ts)), "failed to queue event-metajob")
}
}

Expand Down
2 changes: 1 addition & 1 deletion units/crons_remote_minute.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (j *cronsRemoteMinuteJob) Run(ctx context.Context) {
PopulateIdleHostJobs(j.env),
PopulateHostTerminationJobs(j.env),
PopulateHostMonitoring(j.env),
PopulateEventAlertProcessing(1),
PopulateEventAlertProcessing(j.env, 1),
PopulateBackgroundStatsJobs(j.env, 0),
PopulateLastContainerFinishTimeJobs(),
PopulateParentDecommissionJobs(),
Expand Down
47 changes: 36 additions & 11 deletions units/event_metajob.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type eventMetaJob struct {
q amboy.Queue
events []event.EventLogEntry
flags *evergreen.ServiceFlags
env evergreen.Environment
}

func makeEventMetaJob() *eventMetaJob {
Expand All @@ -85,9 +86,10 @@ func makeEventMetaJob() *eventMetaJob {
return j
}

func NewEventMetaJob(q amboy.Queue, ts string) amboy.Job {
func NewEventMetaJob(env evergreen.Environment, q amboy.Queue, ts string) amboy.Job {
j := makeEventMetaJob()
j.q = q
j.env = env

j.SetID(fmt.Sprintf("%s:%s", eventMetaJobName, ts))

Expand Down Expand Up @@ -256,9 +258,12 @@ func (j *eventMetaJob) Run(ctx context.Context) {
defer cancel()
defer j.MarkComplete()

if j.env == nil {
j.env = evergreen.GetEnvironment()
}

if j.q == nil {
env := evergreen.GetEnvironment()
j.q = env.RemoteQueue()
j.q = j.env.RemoteQueue()
}
if j.q == nil || !j.q.Started() {
j.AddError(errors.New("evergreen environment not setup correctly"))
Expand All @@ -281,22 +286,42 @@ func (j *eventMetaJob) Run(ctx context.Context) {

j.AddError(j.dispatchUnprocessedNotifications(ctx))

j.events, err = event.FindUnprocessedEvents()
settings := j.env.Settings()
limit := settings.Notify.EventProcessingLimit
if limit <= 0 {
limit = evergreen.DefaultEventProcessingLimit
}
j.events, err = event.FindUnprocessedEvents(limit)
if err != nil {
j.AddError(err)
return
}

j.AddError(errors.Wrap(j.logEventCount(len(j.events), limit), "can't log unprocessed event count"))

if len(j.events) == 0 {
grip.Info(message.Fields{
"job_id": j.ID(),
"job": eventMetaJobName,
"time": time.Now().String(),
"message": "no events need to be processed",
"source": "events-processing",
})
return
}

j.AddError(j.dispatchLoop(ctx))
}

func (j *eventMetaJob) logEventCount(eventCount, limit int) error {
if eventCount == limit {
var err error
eventCount, err = event.CountUnprocessedEvents()
if err != nil {
return errors.Wrap(err, "error getting unprocessed event count")
}
}

grip.Info(message.Fields{
"job_id": j.ID(),
"job": eventMetaJobName,
"message": "unprocessed event count",
"count": eventCount,
"source": "events-processing",
})

return nil
}
14 changes: 9 additions & 5 deletions units/event_metajob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,12 @@ func (s *eventMetaJobSuite) TestDegradedMode() {
logger := event.NewDBEventLogger(event.AllLogCollection)
s.NoError(logger.LogEvent(&e))

job := NewEventMetaJob(evergreen.GetEnvironment().RemoteQueue(), "1")
env := evergreen.GetEnvironment()
job := NewEventMetaJob(env, env.RemoteQueue(), "1")
job.Run(s.ctx)
s.NoError(job.Error())

out, err := event.FindUnprocessedEvents()
out, err := event.FindUnprocessedEvents(evergreen.DefaultEventProcessingLimit)
s.NoError(err)
s.Len(out, 1)
}
Expand All @@ -152,7 +153,8 @@ func (s *eventMetaJobSuite) TestSenderDegradedModeDoesntDispatchJobs() {

startingStats := evergreen.GetEnvironment().RemoteQueue().Stats(ctx)

job := NewEventMetaJob(evergreen.GetEnvironment().RemoteQueue(), "1").(*eventMetaJob)
env := evergreen.GetEnvironment()
job := NewEventMetaJob(env, env.RemoteQueue(), "1").(*eventMetaJob)
job.flags = &flags
s.NoError(job.dispatch(ctx, s.n))
s.NoError(job.Error())
Expand Down Expand Up @@ -271,7 +273,8 @@ func (s *eventMetaJobSuite) TestEndToEnd() {

go httpServer(ln, handler)

job := NewEventMetaJob(evergreen.GetEnvironment().LocalQueue(), "1").(*eventMetaJob)
env := evergreen.GetEnvironment()
job := NewEventMetaJob(env, env.LocalQueue(), "1").(*eventMetaJob)
job.q = evergreen.GetEnvironment().LocalQueue()
job.Run(s.ctx)
s.NoError(job.Error())
Expand All @@ -292,7 +295,8 @@ func (s *eventMetaJobSuite) TestEndToEnd() {

func (s *eventMetaJobSuite) TestDispatchUnprocessedNotifications() {
s.NoError(notification.InsertMany(s.n...))
job := NewEventMetaJob(evergreen.GetEnvironment().LocalQueue(), "1").(*eventMetaJob)
env := evergreen.GetEnvironment()
job := NewEventMetaJob(env, env.LocalQueue(), "1").(*eventMetaJob)
flags, err := evergreen.GetServiceFlags()
s.NoError(err)
job.flags = flags
Expand Down
5 changes: 3 additions & 2 deletions units/spawnhost_expiration_warning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"
"time"

"github.com/evergreen-ci/evergreen"
"github.com/evergreen-ci/evergreen/db"
"github.com/evergreen-ci/evergreen/model/alertrecord"
"github.com/evergreen-ci/evergreen/model/event"
Expand Down Expand Up @@ -48,7 +49,7 @@ func (s *spawnHostExpirationSuite) SetupTest() {
func (s *spawnHostExpirationSuite) TestAlerts() {
ctx := context.Background()
s.j.Run(ctx)
events, err := event.FindUnprocessedEvents()
events, err := event.FindUnprocessedEvents(evergreen.DefaultEventProcessingLimit)
s.NoError(err)
s.Len(events, 3)
}
Expand All @@ -57,7 +58,7 @@ func (s *spawnHostExpirationSuite) TestCanceledJob() {
ctx, cancel := context.WithCancel(context.Background())
cancel()
s.j.Run(ctx)
events, err := event.FindUnprocessedEvents()
events, err := event.FindUnprocessedEvents(evergreen.DefaultEventProcessingLimit)
s.NoError(err)
s.Len(events, 0)
}

0 comments on commit de3ec33

Please sign in to comment.