Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
oxzi committed Jun 19, 2024
1 parent fb1264c commit 24b6233
Show file tree
Hide file tree
Showing 20 changed files with 1,108 additions and 883 deletions.
30 changes: 30 additions & 0 deletions internal/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import (
"context"
"errors"
"fmt"
"github.com/icinga/icinga-go-library/types"
"github.com/icinga/icinga-notifications/internal/contracts"
"github.com/icinga/icinga-notifications/internal/event"
"github.com/icinga/icinga-notifications/internal/recipient"
"github.com/icinga/icinga-notifications/pkg/plugin"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"net/url"
)

Expand All @@ -18,6 +20,9 @@ type Channel struct {
Type string `db:"type"`
Config string `db:"config" json:"-"` // excluded from JSON config dump as this may contain sensitive information

ChangedAt types.UnixMilli `db:"changed_at" json:"changed_at"`
Deleted types.Bool `db:"deleted" json:"deleted"`

Logger *zap.SugaredLogger `db:"-"`

restartCh chan newConfig
Expand All @@ -27,6 +32,31 @@ type Channel struct {
pluginCtxCancel func()
}

func (c *Channel) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
encoder.AddInt64("id", c.ID)
encoder.AddString("name", c.Name)
encoder.AddString("type", c.Type)
encoder.AddTime("changed_at", c.ChangedAt.Time())
encoder.AddBool("deleted", c.IsDeleted())
return nil
}

func (c *Channel) GetID() int64 {
return c.ID
}

func (c *Channel) GetChangedAt() types.UnixMilli {
return c.ChangedAt
}

func (c *Channel) IsDeleted() bool {
return c.Deleted.Valid && c.Deleted.Bool
}

func (c *Channel) Validate() error {
return ValidateType(c.Type)
}

// newConfig helps to store the channel's updated properties
type newConfig struct {
ctype string
Expand Down
96 changes: 20 additions & 76 deletions internal/config/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,84 +3,28 @@ package config
import (
"context"
"github.com/icinga/icinga-notifications/internal/channel"
"github.com/jmoiron/sqlx"
"go.uber.org/zap"
)

func (r *RuntimeConfig) fetchChannels(ctx context.Context, tx *sqlx.Tx) error {
var channelPtr *channel.Channel
stmt := r.db.BuildSelectStmt(channelPtr, channelPtr)
r.logger.Debugf("Executing query %q", stmt)

var channels []*channel.Channel
if err := tx.SelectContext(ctx, &channels, stmt); err != nil {
r.logger.Errorln(err)
return err
}

channelsById := make(map[int64]*channel.Channel)
for _, c := range channels {
channelLogger := r.logger.With(
zap.Int64("id", c.ID),
zap.String("name", c.Name),
zap.String("type", c.Type),
)
if channelsById[c.ID] != nil {
channelLogger.Warnw("ignoring duplicate config for channel type")
} else if err := channel.ValidateType(c.Type); err != nil {
channelLogger.Errorw("Cannot load channel config", zap.Error(err))
} else {
channelsById[c.ID] = c

channelLogger.Debugw("loaded channel config")
}
}

if r.Channels != nil {
// mark no longer existing channels for deletion
for id := range r.Channels {
if _, ok := channelsById[id]; !ok {
channelsById[id] = nil
}
}
}

r.pending.Channels = channelsById

return nil
}

func (r *RuntimeConfig) applyPendingChannels() {
if r.Channels == nil {
r.Channels = make(map[int64]*channel.Channel)
}

for id, pendingChannel := range r.pending.Channels {
if pendingChannel == nil {
r.Channels[id].Logger.Info("Channel has been removed")
r.Channels[id].Stop()

delete(r.Channels, id)
} else if currentChannel := r.Channels[id]; currentChannel != nil {
// Currently, the whole config is reloaded from the database frequently, replacing everything.
// Prevent restarting the plugin processes every time by explicitly checking for config changes.
// The if condition should no longer be necessary when https://github.com/Icinga/icinga-notifications/issues/5
// is solved properly.
if currentChannel.Type != pendingChannel.Type || currentChannel.Name != pendingChannel.Name || currentChannel.Config != pendingChannel.Config {
currentChannel.Type = pendingChannel.Type
currentChannel.Name = pendingChannel.Name
currentChannel.Config = pendingChannel.Config

currentChannel.Restart()
}
} else {
pendingChannel.Start(context.TODO(), r.logs.GetChildLogger("channel").With(
zap.Int64("id", pendingChannel.ID),
zap.String("name", pendingChannel.Name)))

r.Channels[id] = pendingChannel
}
}

r.pending.Channels = nil
incrementalApplyPending(
r,
&r.Channels, &r.configChange.Channels,
func(newElement *channel.Channel) error {
newElement.Start(context.TODO(), r.logs.GetChildLogger("channel").With(
zap.Int64("id", newElement.ID),
zap.String("name", newElement.Name)))
return nil
},
func(curElement, update *channel.Channel) error {
curElement.Name = update.Name
curElement.Type = update.Type
curElement.Config = update.Config
curElement.Restart()
return nil
},
func(delElement *channel.Channel) error {
delElement.Stop()
return nil
})
}
94 changes: 43 additions & 51 deletions internal/config/contact.go
Original file line number Diff line number Diff line change
@@ -1,62 +1,54 @@
package config

import (
"context"
"fmt"
"github.com/icinga/icinga-notifications/internal/recipient"
"github.com/jmoiron/sqlx"
"go.uber.org/zap"
"slices"
)

func (r *RuntimeConfig) fetchContacts(ctx context.Context, tx *sqlx.Tx) error {
var contactPtr *recipient.Contact
stmt := r.db.BuildSelectStmt(contactPtr, contactPtr)
r.logger.Debugf("Executing query %q", stmt)

var contacts []*recipient.Contact
if err := tx.SelectContext(ctx, &contacts, stmt); err != nil {
r.logger.Errorln(err)
return err
}

contactsByID := make(map[int64]*recipient.Contact)
for _, c := range contacts {
contactsByID[c.ID] = c

r.logger.Debugw("loaded contact config",
zap.Int64("id", c.ID),
zap.String("name", c.FullName))
}

if r.Contacts != nil {
// mark no longer existing contacts for deletion
for id := range r.Contacts {
if _, ok := contactsByID[id]; !ok {
contactsByID[id] = nil
func (r *RuntimeConfig) applyPendingContacts() {
incrementalApplyPending(
r,
&r.Contacts, &r.configChange.Contacts,
nil,
func(curElement, update *recipient.Contact) error {
curElement.FullName = update.FullName
curElement.Username = update.Username
curElement.DefaultChannelID = update.DefaultChannelID
return nil
},
nil)

incrementalApplyPending(
r,
&r.ContactAddresses, &r.configChange.ContactAddresses,
func(newElement *recipient.Address) error {
contact, ok := r.Contacts[newElement.ContactID]
if !ok {
return fmt.Errorf("contact address refers unknown contact %d", newElement.ContactID)
}
}
}

r.pending.Contacts = contactsByID

return nil
}

func (r *RuntimeConfig) applyPendingContacts() {
if r.Contacts == nil {
r.Contacts = make(map[int64]*recipient.Contact)
}
contact.Addresses = append(contact.Addresses, newElement)
return nil
},
func(curElement, update *recipient.Address) error {
if curElement.ContactID != update.ContactID {
return reAddUpdateFnErr
}

for id, pendingContact := range r.pending.Contacts {
if pendingContact == nil {
delete(r.Contacts, id)
} else if currentContact := r.Contacts[id]; currentContact != nil {
currentContact.FullName = pendingContact.FullName
currentContact.Username = pendingContact.Username
currentContact.DefaultChannelID = pendingContact.DefaultChannelID
} else {
r.Contacts[id] = pendingContact
}
}
curElement.Type = update.Type
curElement.Address = update.Address
return nil
},
func(delElement *recipient.Address) error {
contact, ok := r.Contacts[delElement.ContactID]
if !ok {
return fmt.Errorf("contact address refers unknown contact %d", delElement.ContactID)
}

r.pending.Contacts = nil
contact.Addresses = slices.DeleteFunc(contact.Addresses, func(address *recipient.Address) bool {
return address.ID == delElement.ID
})
return nil
})
}
112 changes: 0 additions & 112 deletions internal/config/contact_address.go

This file was deleted.

Loading

0 comments on commit 24b6233

Please sign in to comment.