From 90b26f861a08392e445dc67828ac30ea8bd54fff Mon Sep 17 00:00:00 2001 From: Alvar Penning Date: Mon, 17 Jun 2024 18:21:33 +0200 Subject: [PATCH] WIP --- internal/channel/channel.go | 30 ++++ internal/config/channel.go | 88 +++--------- internal/config/contact.go | 60 ++------ internal/config/contact_address.go | 57 +------- internal/config/group.go | 113 ++++++--------- internal/config/incremental_sync.go | 207 ++++++++++++++++++++++++++++ internal/config/rule.go | 2 + internal/config/runtime.go | 24 ++-- internal/recipient/contact.go | 45 ++++++ internal/recipient/group.go | 62 ++++++++- internal/rule/escalation.go | 54 ++++++++ internal/rule/rule.go | 28 ++++ internal/utils/utils.go | 6 + 13 files changed, 518 insertions(+), 258 deletions(-) create mode 100644 internal/config/incremental_sync.go diff --git a/internal/channel/channel.go b/internal/channel/channel.go index a318ac69f..dd0ba154e 100644 --- a/internal/channel/channel.go +++ b/internal/channel/channel.go @@ -4,11 +4,13 @@ import ( "context" "errors" "fmt" + "github.com/icinga/icinga-go-library/types" "github.com/icinga/icinga-notifications/internal/contracts" "github.com/icinga/icinga-notifications/internal/event" "github.com/icinga/icinga-notifications/internal/recipient" "github.com/icinga/icinga-notifications/pkg/plugin" "go.uber.org/zap" + "go.uber.org/zap/zapcore" "net/url" ) @@ -18,6 +20,9 @@ type Channel struct { Type string `db:"type"` Config string `db:"config" json:"-"` // excluded from JSON config dump as this may contain sensitive information + ChangedAt types.UnixMilli `db:"changed_at" json:"changed_at"` + Deleted types.Bool `db:"deleted" json:"deleted"` + Logger *zap.SugaredLogger `db:"-"` restartCh chan newConfig @@ -27,6 +32,31 @@ type Channel struct { pluginCtxCancel func() } +func (c *Channel) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddInt64("id", c.ID) + encoder.AddString("name", c.Name) + encoder.AddString("type", c.Type) + encoder.AddTime("changed_at", c.ChangedAt.Time()) + encoder.AddBool("deleted", c.IsDeleted()) + return nil +} + +func (c *Channel) GetID() int64 { + return c.ID +} + +func (c *Channel) GetChangedAt() types.UnixMilli { + return c.ChangedAt +} + +func (c *Channel) IsDeleted() bool { + return c.Deleted.Valid && c.Deleted.Bool +} + +func (c *Channel) Validate() error { + return ValidateType(c.Type) +} + // newConfig helps to store the channel's updated properties type newConfig struct { ctype string diff --git a/internal/config/channel.go b/internal/config/channel.go index 769a919f3..dcf882e9d 100644 --- a/internal/config/channel.go +++ b/internal/config/channel.go @@ -8,79 +8,23 @@ import ( ) func (r *RuntimeConfig) fetchChannels(ctx context.Context, tx *sqlx.Tx) error { - var channelPtr *channel.Channel - stmt := r.db.BuildSelectStmt(channelPtr, channelPtr) - r.logger.Debugf("Executing query %q", stmt) - - var channels []*channel.Channel - if err := tx.SelectContext(ctx, &channels, stmt); err != nil { - r.logger.Errorln(err) - return err - } - - channelsById := make(map[int64]*channel.Channel) - for _, c := range channels { - channelLogger := r.logger.With( - zap.Int64("id", c.ID), - zap.String("name", c.Name), - zap.String("type", c.Type), - ) - if channelsById[c.ID] != nil { - channelLogger.Warnw("ignoring duplicate config for channel type") - } else if err := channel.ValidateType(c.Type); err != nil { - channelLogger.Errorw("Cannot load channel config", zap.Error(err)) - } else { - channelsById[c.ID] = c - - channelLogger.Debugw("loaded channel config") - } - } - - if r.Channels != nil { - // mark no longer existing channels for deletion - for id := range r.Channels { - if _, ok := channelsById[id]; !ok { - channelsById[id] = nil - } - } - } - - r.pending.Channels = channelsById - - return nil + return incrementalFetch[channel.Channel](ctx, r, tx, &r.pending.Channels) } func (r *RuntimeConfig) applyPendingChannels() { - if r.Channels == nil { - r.Channels = make(map[int64]*channel.Channel) - } - - for id, pendingChannel := range r.pending.Channels { - if pendingChannel == nil { - r.Channels[id].Logger.Info("Channel has been removed") - r.Channels[id].Stop() - - delete(r.Channels, id) - } else if currentChannel := r.Channels[id]; currentChannel != nil { - // Currently, the whole config is reloaded from the database frequently, replacing everything. - // Prevent restarting the plugin processes every time by explicitly checking for config changes. - // The if condition should no longer be necessary when https://github.com/Icinga/icinga-notifications/issues/5 - // is solved properly. - if currentChannel.Type != pendingChannel.Type || currentChannel.Name != pendingChannel.Name || currentChannel.Config != pendingChannel.Config { - currentChannel.Type = pendingChannel.Type - currentChannel.Name = pendingChannel.Name - currentChannel.Config = pendingChannel.Config - - currentChannel.Restart() - } - } else { - pendingChannel.Start(context.TODO(), r.logs.GetChildLogger("channel").With( - zap.Int64("id", pendingChannel.ID), - zap.String("name", pendingChannel.Name))) - - r.Channels[id] = pendingChannel - } - } - - r.pending.Channels = nil + incrementalApplyPending[channel.Channel]( + r, + &r.Channels, &r.pending.Channels, + func(element *channel.Channel) { element.Stop() }, + func(element, update *channel.Channel) { + element.Name = update.Name + element.Type = update.Type + element.Config = update.Config + element.Restart() + }, + func(element *channel.Channel) { + element.Start(context.TODO(), r.logs.GetChildLogger("channel").With( + zap.Int64("id", element.ID), + zap.String("name", element.Name))) + }) } diff --git a/internal/config/contact.go b/internal/config/contact.go index bb72efad4..c7fca42ec 100644 --- a/internal/config/contact.go +++ b/internal/config/contact.go @@ -4,59 +4,21 @@ import ( "context" "github.com/icinga/icinga-notifications/internal/recipient" "github.com/jmoiron/sqlx" - "go.uber.org/zap" ) func (r *RuntimeConfig) fetchContacts(ctx context.Context, tx *sqlx.Tx) error { - var contactPtr *recipient.Contact - stmt := r.db.BuildSelectStmt(contactPtr, contactPtr) - r.logger.Debugf("Executing query %q", stmt) - - var contacts []*recipient.Contact - if err := tx.SelectContext(ctx, &contacts, stmt); err != nil { - r.logger.Errorln(err) - return err - } - - contactsByID := make(map[int64]*recipient.Contact) - for _, c := range contacts { - contactsByID[c.ID] = c - - r.logger.Debugw("loaded contact config", - zap.Int64("id", c.ID), - zap.String("name", c.FullName)) - } - - if r.Contacts != nil { - // mark no longer existing contacts for deletion - for id := range r.Contacts { - if _, ok := contactsByID[id]; !ok { - contactsByID[id] = nil - } - } - } - - r.pending.Contacts = contactsByID - - return nil + return incrementalFetch[recipient.Contact](ctx, r, tx, &r.pending.Contacts) } func (r *RuntimeConfig) applyPendingContacts() { - if r.Contacts == nil { - r.Contacts = make(map[int64]*recipient.Contact) - } - - for id, pendingContact := range r.pending.Contacts { - if pendingContact == nil { - delete(r.Contacts, id) - } else if currentContact := r.Contacts[id]; currentContact != nil { - currentContact.FullName = pendingContact.FullName - currentContact.Username = pendingContact.Username - currentContact.DefaultChannelID = pendingContact.DefaultChannelID - } else { - r.Contacts[id] = pendingContact - } - } - - r.pending.Contacts = nil + incrementalApplyPending[recipient.Contact]( + r, + &r.Contacts, &r.pending.Contacts, + nil, + func(element, update *recipient.Contact) { + element.FullName = update.FullName + element.Username = update.Username + element.DefaultChannelID = update.DefaultChannelID + }, + nil) } diff --git a/internal/config/contact_address.go b/internal/config/contact_address.go index f89f82f0a..2fe71a93c 100644 --- a/internal/config/contact_address.go +++ b/internal/config/contact_address.go @@ -9,59 +9,16 @@ import ( ) func (r *RuntimeConfig) fetchContactAddresses(ctx context.Context, tx *sqlx.Tx) error { - var addressPtr *recipient.Address - stmt := r.db.BuildSelectStmt(addressPtr, addressPtr) - r.logger.Debugf("Executing query %q", stmt) - - var addresses []*recipient.Address - if err := tx.SelectContext(ctx, &addresses, stmt); err != nil { - r.logger.Errorln(err) - return err - } - - addressesById := make(map[int64]*recipient.Address) - for _, a := range addresses { - addressesById[a.ID] = a - r.logger.Debugw("loaded contact_address config", - zap.Int64("id", a.ID), - zap.Int64("contact_id", a.ContactID), - zap.String("type", a.Type), - zap.String("address", a.Address), - ) - } - - if r.ContactAddresses != nil { - // mark no longer existing contacts for deletion - for id := range r.ContactAddresses { - if _, ok := addressesById[id]; !ok { - addressesById[id] = nil - } - } - } - - r.pending.ContactAddresses = addressesById - - return nil + return incrementalFetch[recipient.Address](ctx, r, tx, &r.pending.ContactAddresses) } func (r *RuntimeConfig) applyPendingContactAddresses() { - if r.ContactAddresses == nil { - r.ContactAddresses = make(map[int64]*recipient.Address) - } - - for id, pendingAddress := range r.pending.ContactAddresses { - currentAddress := r.ContactAddresses[id] - - if pendingAddress == nil { - r.removeContactAddress(currentAddress) - } else if currentAddress != nil { - r.updateContactAddress(currentAddress, pendingAddress) - } else { - r.addContactAddress(pendingAddress) - } - } - - r.pending.ContactAddresses = nil + incrementalApplyPending[recipient.Address]( + r, + &r.ContactAddresses, &r.pending.ContactAddresses, + r.removeContactAddress, + r.updateContactAddress, + r.addContactAddress) } func (r *RuntimeConfig) addContactAddress(addr *recipient.Address) { diff --git a/internal/config/group.go b/internal/config/group.go index 433162aaf..d914168c0 100644 --- a/internal/config/group.go +++ b/internal/config/group.go @@ -5,96 +5,61 @@ import ( "github.com/icinga/icinga-notifications/internal/recipient" "github.com/jmoiron/sqlx" "go.uber.org/zap" + "slices" ) func (r *RuntimeConfig) fetchGroups(ctx context.Context, tx *sqlx.Tx) error { - var groupPtr *recipient.Group - stmt := r.db.BuildSelectStmt(groupPtr, groupPtr) - r.logger.Debugf("Executing query %q", stmt) - - var groups []*recipient.Group - if err := tx.SelectContext(ctx, &groups, stmt); err != nil { - r.logger.Errorln(err) + err := incrementalFetch[recipient.Group](ctx, r, tx, &r.pending.Groups) + if err != nil { return err } - groupsById := make(map[int64]*recipient.Group) - for _, g := range groups { - groupsById[g.ID] = g - - r.logger.Debugw("loaded group config", - zap.Int64("id", g.ID), - zap.String("name", g.Name)) - } - - type ContactgroupMember struct { - GroupId int64 `db:"contactgroup_id"` - ContactId int64 `db:"contact_id"` - } - - var memberPtr *ContactgroupMember - stmt = r.db.BuildSelectStmt(memberPtr, memberPtr) - r.logger.Debugf("Executing query %q", stmt) - - var members []*ContactgroupMember - if err := tx.SelectContext(ctx, &members, stmt); err != nil { - r.logger.Errorln(err) + err = incrementalFetch[recipient.GroupMember](ctx, r, tx, &r.pending.GroupMembers) + if err != nil { return err } - for _, m := range members { - memberLogger := r.logger.With( - zap.Int64("contact_id", m.ContactId), - zap.Int64("contactgroup_id", m.GroupId), - ) - - if g := groupsById[m.GroupId]; g == nil { - memberLogger.Warnw("ignoring member for unknown contactgroup_id") - } else { - g.MemberIDs = append(g.MemberIDs, m.ContactId) - - memberLogger.Debugw("loaded contact group member", - zap.String("contactgroup_name", g.Name)) - } - } - - if r.Groups != nil { - // mark no longer existing groups for deletion - for id := range r.Groups { - if _, ok := groupsById[id]; !ok { - groupsById[id] = nil - } - } - } - - r.pending.Groups = groupsById - return nil } func (r *RuntimeConfig) applyPendingGroups() { - if r.Groups == nil { - r.Groups = make(map[int64]*recipient.Group) - } + incrementalApplyPending[recipient.Group]( + r, + &r.Groups, &r.pending.Groups, + nil, + nil, + nil) + + incrementalApplyPending[recipient.GroupMember]( + r, + &r.GroupMembers, &r.pending.GroupMembers, + func(element *recipient.GroupMember) { + group, ok := r.Groups[element.GroupId] + if !ok { + r.logger.Errorw("Cannot update group membership for unknown group", zap.Inline(element)) + return + } - for id, pendingGroup := range r.pending.Groups { - if pendingGroup == nil { - delete(r.Groups, id) - } else { - pendingGroup.Members = make([]*recipient.Contact, 0, len(pendingGroup.MemberIDs)) - for _, contactID := range pendingGroup.MemberIDs { - if c := r.Contacts[contactID]; c != nil { - pendingGroup.Members = append(pendingGroup.Members, c) - } + group.Members = slices.DeleteFunc(group.Members, func(contact *recipient.Contact) bool { + return contact.ID == element.ContactId + }) + }, + func(element, update *recipient.GroupMember) { + r.logger.Warnf("Group memberships shouldn't change, from %+v to %+v", element, update) + }, + func(element *recipient.GroupMember) { + group, ok := r.Groups[element.GroupId] + if !ok { + r.logger.Errorw("Cannot update group membership for unknown group", zap.Inline(element)) + return } - if currentGroup := r.Groups[id]; currentGroup != nil { - *currentGroup = *pendingGroup - } else { - r.Groups[id] = pendingGroup + contact, ok := r.Contacts[element.ContactId] + if !ok { + r.logger.Errorw("Cannot update group membership for unknown contact", zap.Inline(element)) + return } - } - } - r.pending.Groups = nil + group.Members = append(group.Members, contact) + }) } diff --git a/internal/config/incremental_sync.go b/internal/config/incremental_sync.go new file mode 100644 index 000000000..ee4710dd2 --- /dev/null +++ b/internal/config/incremental_sync.go @@ -0,0 +1,207 @@ +package config + +import ( + "context" + "github.com/icinga/icinga-go-library/database" + "github.com/icinga/icinga-go-library/types" + "github.com/jmoiron/sqlx" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "strings" +) + +// IncrementalConfigurable specifies Getter methods required for types supporting incremental configuration loading. +type IncrementalConfigurable[T comparable] interface { + zapcore.ObjectMarshaler + + // GetID returns the primary key value. + GetID() T + + // GetChangedAt returns the changed_at value. + GetChangedAt() types.UnixMilli + + // IsDeleted returns if this entry was marked as deleted. + IsDeleted() bool + + // Validate and optionally return an error, resulting in aborting this entry. + Validate() error +} + +// buildSelectStmtWhereChangedAt creates a SQL SELECT for an incremental configuration synchronization. +// +// The query, which will be a prepared statement, expects a types.UnixMilli parameter to be compared against. This +// parameter might be NULL, being COALESCEd to a numeric zero, returning all rows. +// +// The rows will be ordered by `changed_at`, allowing to update the last change timestamp when iterating over it. +func (r *RuntimeConfig) buildSelectStmtWhereChangedAt(typePtr interface{}) string { + return r.db.Rebind(r.db.BuildSelectStmt(typePtr, typePtr) + + ` WHERE "changed_at" > COALESCE(?, CAST(0 AS BIGINT)) ORDER BY "changed_at"`) +} + +// selectRelationshipTableEntries constructs and execute a SELECT query for the config relationship tables. +// +// A SELECT query against the relationship tables is generated for either all entries at the initial sync (`changedAt` +// has a NULL value) or a subset specified by either all `ids` in the `idField` or all changes since `changedAt`. +// +// The rows will be ordered by `changed_at`, allowing to update the last change timestamp when iterating over it. +func (r *RuntimeConfig) selectRelationshipTableEntries( + ctx context.Context, + tx *sqlx.Tx, + typePtr interface{}, + idField string, + ids []int64, + changedAt types.UnixMilli, + dest interface{}, +) error { + stmt := r.db.BuildSelectStmt(typePtr, typePtr) + ` WHERE "changed_at" > COALESCE(?, CAST(0 AS BIGINT))` + if len(ids) > 0 { + stmt += ` OR "` + idField + `" IN (` + strings.Join(strings.Split(strings.Repeat("?", len(ids)), ""), ",") + `)` + } + stmt += ` ORDER BY "changed_at"` + stmt = r.db.Rebind(stmt) + + args := make([]interface{}, 0, 1+len(ids)) + args = append(args, changedAt) + for id := range ids { + args = append(args, id) + } + + r.logger.Debugw("Executing query to fetch relationship table", + zap.String("table", database.TableName(typePtr)), + zap.String("query", stmt), + zap.Time("changed_at_after", changedAt.Time()), + zap.Int("ids_in", len(ids))) + + return tx.SelectContext(ctx, dest, stmt, args...) +} + +func incrementalFetch[ + BaseT any, + CompT comparable, + T interface { + *BaseT + IncrementalConfigurable[CompT] + }, +](ctx context.Context, r *RuntimeConfig, tx *sqlx.Tx, pendingConfigSetField *map[CompT]T) error { + var typePtr T + + tableName := database.TableName(typePtr) + changedAt := r.pendingChangeTimestamps[tableName] + stmt := r.buildSelectStmtWhereChangedAt(typePtr) + + stmtLogger := r.logger.With( + zap.String("table", tableName), + zap.String("query", stmt), + zap.Time("changed_at", changedAt.Time())) + + stmtLogger.Debug("Executing query to fetch incremental config updates") + + var ts []T + if err := tx.SelectContext(ctx, &ts, stmt, changedAt); err != nil { + stmtLogger.Errorw("Cannot execute query to fetch incremental config updates", zap.Error(err)) + return err + } + + tsById := make(map[CompT]T) + countDel, countErr, countLoad := 0, 0, 0 + for _, t := range ts { + changedAt = t.GetChangedAt() + + logger := r.logger.With(zap.String("table", tableName), zap.Inline(t)) + if t.IsDeleted() { + countDel++ + logger.Debug("Marking entry as deleted") + tsById[t.GetID()] = nil + } else if err := t.Validate(); err != nil { + countErr++ + logger.Errorw("Cannot validate entry", zap.Error(err)) + } else { + countLoad++ + logger.Debug("Loaded entry") + tsById[t.GetID()] = t + } + } + + if countDel+countErr+countLoad > 0 { + r.logger.Debugw("Fetched incremental configuration updates", + zap.String("table", tableName), + zap.Int("deleted-elements", countDel), + zap.Int("faulty-elements", countErr), + zap.Int("loaded-elements", countLoad)) + } else { + r.logger.Debugw("No configuration updates are available", zap.String("table", tableName)) + } + + *pendingConfigSetField = tsById + r.pendingChangeTimestamps[tableName] = changedAt + + return nil +} + +func incrementalApplyPending[ + BaseT any, + CompT comparable, + T interface { + *BaseT + IncrementalConfigurable[CompT] + }, +]( + r *RuntimeConfig, + configSetField, pendingConfigSetField *map[CompT]T, + deleteFn func(element T), + updateFn func(element, update T), + createFn func(element T), +) { + if *configSetField == nil { + *configSetField = make(map[CompT]T) + } + + tableName := database.TableName(new(T)) + countDelSkip, countDel, countUpdate, countNew := 0, 0, 0, 0 + for id, newT := range *pendingConfigSetField { + oldT, oldExists := (*configSetField)[id] + + logger := r.logger.With( + zap.String("table", tableName), + zap.Any("id", id)) + + if newT == nil && !oldExists { + countDelSkip++ + logger.Warn("Skipping unknown element marked as deleted") + continue + } else if newT == nil { + countDel++ + logger.Debug("Deleting element") + if deleteFn != nil { + deleteFn(oldT) + } + delete(*configSetField, id) + } else if oldExists { + countUpdate++ + logger.Debug("Updating known element") + if updateFn != nil { + updateFn(oldT, newT) + } + } else { + countNew++ + logger.Debug("Creating new element") + if createFn != nil { + createFn(newT) + } + (*configSetField)[id] = newT + } + } + + if countDelSkip+countDel+countUpdate+countNew > 0 { + r.logger.Infow("Applied configuration updates", + zap.String("table", tableName), + zap.Int("deleted-unknown-elements", countDelSkip), + zap.Int("deleted-elements", countDel), + zap.Int("updated-elements", countUpdate), + zap.Int("new-elements", countNew)) + } else { + r.logger.Debugw("No configuration updates available to be applied", zap.String("table", tableName)) + } + + *pendingConfigSetField = nil +} diff --git a/internal/config/rule.go b/internal/config/rule.go index 10012cb58..8087491d4 100644 --- a/internal/config/rule.go +++ b/internal/config/rule.go @@ -10,6 +10,8 @@ import ( ) func (r *RuntimeConfig) fetchRules(ctx context.Context, tx *sqlx.Tx) error { + // TODO + var rulePtr *rule.Rule stmt := r.db.BuildSelectStmt(rulePtr, rulePtr) r.logger.Debugf("Executing query %q", stmt) diff --git a/internal/config/runtime.go b/internal/config/runtime.go index 69b38188d..b8de2d90b 100644 --- a/internal/config/runtime.go +++ b/internal/config/runtime.go @@ -6,10 +6,12 @@ import ( "errors" "github.com/icinga/icinga-go-library/database" "github.com/icinga/icinga-go-library/logging" + "github.com/icinga/icinga-go-library/types" "github.com/icinga/icinga-notifications/internal/channel" "github.com/icinga/icinga-notifications/internal/recipient" "github.com/icinga/icinga-notifications/internal/rule" "github.com/icinga/icinga-notifications/internal/timeperiod" + "github.com/icinga/icinga-notifications/internal/utils" "github.com/jmoiron/sqlx" "go.uber.org/zap" "golang.org/x/crypto/bcrypt" @@ -30,7 +32,8 @@ type RuntimeConfig struct { EventStreamLaunchFunc func(source *Source) // pending contains changes to config objects that are to be applied to the embedded live config. - pending ConfigSet + pending ConfigSet + pendingChangeTimestamps map[string]types.UnixMilli logs *logging.Logging logger *logging.Logger @@ -55,14 +58,17 @@ func NewRuntimeConfig( } type ConfigSet struct { - Channels map[int64]*channel.Channel - Contacts map[int64]*recipient.Contact - ContactAddresses map[int64]*recipient.Address - Groups map[int64]*recipient.Group - TimePeriods map[int64]*timeperiod.TimePeriod - Schedules map[int64]*recipient.Schedule - Rules map[int64]*rule.Rule - Sources map[int64]*Source + Channels map[int64]*channel.Channel + Contacts map[int64]*recipient.Contact + ContactAddresses map[int64]*recipient.Address + GroupMembers map[utils.CompTuple[int64, int64]]*recipient.GroupMember + Groups map[int64]*recipient.Group + TimePeriods map[int64]*timeperiod.TimePeriod + Schedules map[int64]*recipient.Schedule + Rules map[int64]*rule.Rule + RuleEscalations map[int64]*rule.Escalation + RuleEscalationRecipients map[int64]*rule.EscalationRecipient + Sources map[int64]*Source } func (r *RuntimeConfig) UpdateFromDatabase(ctx context.Context) error { diff --git a/internal/recipient/contact.go b/internal/recipient/contact.go index 82732f1f6..a6d9d5e4c 100644 --- a/internal/recipient/contact.go +++ b/internal/recipient/contact.go @@ -2,6 +2,7 @@ package recipient import ( "database/sql" + "github.com/icinga/icinga-go-library/types" "go.uber.org/zap/zapcore" "time" ) @@ -12,6 +13,25 @@ type Contact struct { Username sql.NullString `db:"username"` DefaultChannelID int64 `db:"default_channel_id"` Addresses []*Address `db:"-"` + + ChangedAt types.UnixMilli `db:"changed_at" json:"changed_at"` + Deleted types.Bool `db:"deleted" json:"deleted"` +} + +func (c *Contact) GetID() int64 { + return c.ID +} + +func (c *Contact) GetChangedAt() types.UnixMilli { + return c.ChangedAt +} + +func (c *Contact) IsDeleted() bool { + return c.Deleted.Valid && c.Deleted.Bool +} + +func (c *Contact) Validate() error { + return nil } func (c *Contact) String() string { @@ -37,6 +57,31 @@ type Address struct { ContactID int64 `db:"contact_id"` Type string `db:"type"` Address string `db:"address"` + + ChangedAt types.UnixMilli `db:"changed_at" json:"changed_at"` + Deleted types.Bool `db:"deleted" json:"deleted"` +} + +func (a *Address) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddInt64("id", a.ID) + encoder.AddInt64("contact_id", a.ContactID) + return nil +} + +func (a *Address) GetID() int64 { + return a.ID +} + +func (a *Address) GetChangedAt() types.UnixMilli { + return a.ChangedAt +} + +func (a *Address) IsDeleted() bool { + return a.Deleted.Valid && a.Deleted.Bool +} + +func (a *Address) Validate() error { + return nil } func (a *Address) TableName() string { diff --git a/internal/recipient/group.go b/internal/recipient/group.go index 243dde7b9..2742765d3 100644 --- a/internal/recipient/group.go +++ b/internal/recipient/group.go @@ -1,15 +1,35 @@ package recipient import ( + "github.com/icinga/icinga-go-library/types" + "github.com/icinga/icinga-notifications/internal/utils" "go.uber.org/zap/zapcore" "time" ) type Group struct { - ID int64 `db:"id"` - Name string `db:"name"` - Members []*Contact `db:"-"` - MemberIDs []int64 `db:"-"` + ID int64 `db:"id"` + Name string `db:"name"` + Members []*Contact `db:"-"` + + ChangedAt types.UnixMilli `db:"changed_at" json:"changed_at"` + Deleted types.Bool `db:"deleted" json:"deleted"` +} + +func (g *Group) GetID() int64 { + return g.ID +} + +func (g *Group) GetChangedAt() types.UnixMilli { + return g.ChangedAt +} + +func (g *Group) IsDeleted() bool { + return g.Deleted.Valid && g.Deleted.Bool +} + +func (g *Group) Validate() error { + return nil } func (g *Group) GetContactsAt(t time.Time) []*Contact { @@ -32,4 +52,38 @@ func (g *Group) MarshalLogObject(encoder zapcore.ObjectEncoder) error { return nil } +type GroupMember struct { + GroupId int64 `db:"contactgroup_id"` + ContactId int64 `db:"contact_id"` + + ChangedAt types.UnixMilli `db:"changed_at" json:"changed_at"` + Deleted types.Bool `db:"deleted" json:"deleted"` +} + +func (g *GroupMember) TableName() string { + return "contactgroup_member" +} + +func (g *GroupMember) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddInt64("group_id", g.GroupId) + encoder.AddInt64("contact_id", g.ContactId) + return nil +} + +func (g *GroupMember) GetID() utils.CompTuple[int64, int64] { + return utils.CompTuple[int64, int64]{g.GroupId, g.ContactId} +} + +func (g *GroupMember) GetChangedAt() types.UnixMilli { + return g.ChangedAt +} + +func (g *GroupMember) IsDeleted() bool { + return g.Deleted.Valid && g.Deleted.Bool +} + +func (g *GroupMember) Validate() error { + return nil +} + var _ Recipient = (*Group)(nil) diff --git a/internal/rule/escalation.go b/internal/rule/escalation.go index 823648cf7..a1ccc1a42 100644 --- a/internal/rule/escalation.go +++ b/internal/rule/escalation.go @@ -2,6 +2,8 @@ package rule import ( "database/sql" + "fmt" + "github.com/icinga/icinga-go-library/types" "github.com/icinga/icinga-notifications/internal/filter" "github.com/icinga/icinga-notifications/internal/recipient" "go.uber.org/zap/zapcore" @@ -19,6 +21,39 @@ type Escalation struct { Fallbacks []*Escalation `db:"-"` Recipients []*EscalationRecipient `db:"-"` + + ChangedAt types.UnixMilli `db:"changed_at" json:"changed_at"` + Deleted types.Bool `db:"deleted" json:"deleted"` +} + +func (e *Escalation) GetID() int64 { + return e.ID +} + +func (e *Escalation) GetChangedAt() types.UnixMilli { + return e.ChangedAt +} + +func (e *Escalation) IsDeleted() bool { + return e.Deleted.Valid && e.Deleted.Bool +} + +func (e *Escalation) Validate() error { + if e.ConditionExpr.Valid { + cond, err := filter.Parse(e.ConditionExpr.String) + if err != nil { + return err + } + + e.Condition = cond + } + + if e.FallbackForID.Valid { + // TODO: implement fallbacks (needs extra validation: mismatching rule_id, cycles) + return fmt.Errorf("ignoring fallback escalation (not yet implemented)") + } + + return nil } // MarshalLogObject implements the zapcore.ObjectMarshaler interface. @@ -98,6 +133,25 @@ type EscalationRecipient struct { ChannelID sql.NullInt64 `db:"channel_id"` recipient.Key `db:",inline"` Recipient recipient.Recipient `db:"-"` + + ChangedAt types.UnixMilli `db:"changed_at" json:"changed_at"` + Deleted types.Bool `db:"deleted" json:"deleted"` +} + +func (r *EscalationRecipient) GetID() int64 { + return r.ID +} + +func (r *EscalationRecipient) GetChangedAt() types.UnixMilli { + return r.ChangedAt +} + +func (r *EscalationRecipient) IsDeleted() bool { + return r.Deleted.Valid && r.Deleted.Bool +} + +func (r *EscalationRecipient) Validate() error { + return nil } func (r *EscalationRecipient) TableName() string { diff --git a/internal/rule/rule.go b/internal/rule/rule.go index 22b3c0500..977fcb13b 100644 --- a/internal/rule/rule.go +++ b/internal/rule/rule.go @@ -18,6 +18,34 @@ type Rule struct { ObjectFilter filter.Filter `db:"-"` ObjectFilterExpr types.String `db:"object_filter"` Escalations map[int64]*Escalation `db:"-"` + + ChangedAt types.UnixMilli `db:"changed_at" json:"changed_at"` + Deleted types.Bool `db:"deleted" json:"deleted"` +} + +func (r *Rule) GetID() int64 { + return r.ID +} + +func (r *Rule) GetChangedAt() types.UnixMilli { + return r.ChangedAt +} + +func (r *Rule) IsDeleted() bool { + return r.Deleted.Valid && r.Deleted.Bool +} + +func (r *Rule) Validate() error { + if r.ObjectFilterExpr.Valid { + f, err := filter.Parse(r.ObjectFilterExpr.String) + if err != nil { + return err + } + + r.ObjectFilter = f + } + + return nil } // MarshalLogObject implements the zapcore.ObjectMarshaler interface. diff --git a/internal/utils/utils.go b/internal/utils/utils.go index 7d05c736b..fc4d8cac8 100644 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -162,3 +162,9 @@ func IterateOrderedMap[K cmp.Ordered, V any](m map[K]V) func(func(K, V) bool) { } } } + +// CompTuple is a simple tuple struct of two independent comparable types. +type CompTuple[TA, TB comparable] struct { + A TA + B TB +}