Skip to content

Commit

Permalink
Load sources configuration from database
Browse files Browse the repository at this point in the history
The source table was extended to hold additional data, auth_username and
auth_password, which is now synchronized into a new Source type, being
hold in the RuntimeConfig.

Those two values are now being used in the Listener to enforce
authenticated API requests.
  • Loading branch information
oxzi committed Nov 27, 2023
1 parent 11d2d60 commit 6739452
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 20 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ It is required that you have created a new database and imported the [schema](sc
Additionally, it also requires you to manually insert items into the **source** table before starting the daemon.
```sql
INSERT INTO source (id, type, name) VALUES (1, 'icinga2', 'Icinga 2')
INSERT INTO source (id, type, name, auth_username, auth_password) VALUES (1, 'icinga2', 'Icinga 2', 'icinga', 'correct horse battery staple')
```

Then, you can launch the daemon with the following command.
Expand Down
6 changes: 5 additions & 1 deletion icinga2.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ if (!globals.contains("IcingaNotificationsIcingaWebUrl")) {
const IcingaNotificationsIcingaWebUrl = "http://localhost/icingaweb2"
}
if (!globals.contains("IcingaNotificationsEventSourceId")) {
// INSERT INTO source (id, type, name) VALUES (1, 'icinga2', 'Icinga 2')
// INSERT INTO source (id, type, name, auth_username, auth_password) VALUES (1, 'icinga2', 'Icinga 2', 'icinga', 'correct horse battery staple')
const IcingaNotificationsEventSourceId = 1
}
if (!globals.contains("IcingaNotificationsAuth")) {
const IcingaNotificationsAuth = "icinga:correct horse battery staple"
}

// urlencode a string loosely based on RFC 3986.
//
Expand Down Expand Up @@ -55,6 +58,7 @@ var baseBody = {
(len(macro("$event_severity$")) > 0 || len(macro("$event_type$")) > 0) ? "curl" : "true"
}}
}
"--user" = { value = IcingaNotificationsAuth }
"--fail" = { set_if = true }
"--silent" = { set_if = true }
"--show-error" = { set_if = true }
Expand Down
27 changes: 27 additions & 0 deletions internal/config/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package config

import (
"context"
"crypto/subtle"
"database/sql"
"github.com/icinga/icinga-notifications/internal/channel"
"github.com/icinga/icinga-notifications/internal/recipient"
Expand Down Expand Up @@ -44,6 +45,7 @@ type ConfigSet struct {
TimePeriods map[int64]*timeperiod.TimePeriod
Schedules map[int64]*recipient.Schedule
Rules map[int64]*rule.Rule
Sources map[int64]*Source
}

func (r *RuntimeConfig) UpdateFromDatabase(ctx context.Context) error {
Expand Down Expand Up @@ -137,6 +139,29 @@ func (r *RuntimeConfig) GetContact(username string) *recipient.Contact {
return nil
}

// CheckSourceCredentials verifies a credential pair for a stored Source.
//
// Debug output will be written to the passed logger.
func (r *RuntimeConfig) CheckSourceCredentials(sourceId int64, user, pass string, logger *logging.Logger) bool {
r.RLock()
defer r.RUnlock()

source, ok := r.Sources[sourceId]
if !ok {
logger.Debugw("Cannot check credentials for unknown source ID", zap.Int64("id", sourceId))
return false
}

userOk := subtle.ConstantTimeCompare([]byte(source.AuthUsername), []byte(user)) == 1
passOk := subtle.ConstantTimeCompare([]byte(source.AuthPassword), []byte(pass)) == 1
if !(userOk && passOk) {
logger.Debugw("Cannot authorize invalid credentials", zap.Int64("id", sourceId), zap.String("user", user))
return false
}

return true
}

func (r *RuntimeConfig) fetchFromDatabase(ctx context.Context) error {
r.logger.Debug("fetching configuration from database")
start := time.Now()
Expand All @@ -162,6 +187,7 @@ func (r *RuntimeConfig) fetchFromDatabase(ctx context.Context) error {
r.fetchTimePeriods,
r.fetchSchedules,
r.fetchRules,
r.fetchSources,
}
for _, f := range updateFuncs {
if err := f(ctx, tx); err != nil {
Expand All @@ -188,6 +214,7 @@ func (r *RuntimeConfig) applyPending() {
r.applyPendingTimePeriods()
r.applyPendingSchedules()
r.applyPendingRules()
r.applyPendingSources()

r.logger.Debugw("applied pending configuration", zap.Duration("took", time.Since(start)))
}
79 changes: 79 additions & 0 deletions internal/config/source.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package config

import (
"context"
"github.com/jmoiron/sqlx"
"go.uber.org/zap"
)

// Source entry within the ConfigSet to describe a source.
type Source struct {
ID int64 `db:"id"`
Type string `db:"type"`
Name string `db:"name"`

AuthUsername string `db:"auth_username"`
AuthPassword string `db:"auth_password"`
}

func (r *RuntimeConfig) fetchSources(ctx context.Context, tx *sqlx.Tx) error {
var sourcePtr *Source
stmt := r.db.BuildSelectStmt(sourcePtr, sourcePtr)
r.logger.Debugf("Executing query %q", stmt)

var sources []*Source
if err := tx.SelectContext(ctx, &sources, stmt); err != nil {
r.logger.Errorln(err)
return err
}

sourcesById := make(map[int64]*Source)
for _, s := range sources {
sourceLogger := r.logger.With(
zap.Int64("id", s.ID),
zap.String("name", s.Name),
zap.String("type", s.Type),
)
if sourcesById[s.ID] != nil {
sourceLogger.Warnw("ignoring duplicate config for source type")
} else {
sourcesById[s.ID] = s

sourceLogger.Debugw("loaded source config")
}
}

if r.Sources != nil {
// mark no longer existing sources for deletion
for id := range r.Sources {
if _, ok := sourcesById[id]; !ok {
sourcesById[id] = nil
}
}
}

r.pending.Sources = sourcesById

return nil
}

func (r *RuntimeConfig) applyPendingSources() {
if r.Sources == nil {
r.Sources = make(map[int64]*Source)
}

for id, pendingSource := range r.pending.Sources {
if pendingSource == nil {
r.logger.Infow("Source has been removed",
zap.Int64("id", r.Sources[id].ID),
zap.String("name", r.Sources[id].Name),
zap.String("type", r.Sources[id].Type))

delete(r.Sources, id)
} else {
r.Sources[id] = pendingSource
}
}

r.pending.Sources = nil
}
53 changes: 35 additions & 18 deletions internal/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,42 +79,63 @@ func (l *Listener) Run(ctx context.Context) error {
}

func (l *Listener) ProcessEvent(w http.ResponseWriter, req *http.Request) {
// abort the current connection by sending the status code and an error both to the log and back to the client.
abort := func(statusCode int, ev event.Event, format string, a ...any) {
msg := format
if len(a) > 0 {
msg = fmt.Sprintf(format, a...)
}

http.Error(w, msg, statusCode)
l.logger.Debugw("Abort listener submitted event processing",
zap.Int("status-code", statusCode), zap.String("msg", msg), zap.String("event", ev.String()))
}

if req.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
_, _ = fmt.Fprintln(w, "POST required")
abort(http.StatusMethodNotAllowed, event.Event{}, "POST required")
return
}

authUser, authPass, authOk := req.BasicAuth()
if !authOk {
w.Header().Set("WWW-Authenticate", fmt.Sprintf(`Basic realm="icinga-notifications"`))

Check failure on line 101 in internal/listener/listener.go

View workflow job for this annotation

GitHub Actions / go

S1039: unnecessary use of fmt.Sprintf (gosimple)
abort(http.StatusUnauthorized, event.Event{}, "HTTP authorization required")
return
}

var ev event.Event
err := json.NewDecoder(req.Body).Decode(&ev)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
_, _ = fmt.Fprintf(w, "cannot parse JSON body: %v\n", err)
abort(http.StatusBadRequest, event.Event{}, "cannot parse JSON body: %v", err)
return
}
ev.Time = time.Now()

if !l.runtimeConfig.CheckSourceCredentials(ev.SourceId, authUser, authPass, l.logger) {
w.Header().Set("WWW-Authenticate", fmt.Sprintf(`Basic realm="icinga-notifications"`))

Check failure on line 115 in internal/listener/listener.go

View workflow job for this annotation

GitHub Actions / go

S1039: unnecessary use of fmt.Sprintf (gosimple)
abort(http.StatusUnauthorized, event.Event{}, "HTTP authorization required")
return
}

if ev.Severity == event.SeverityNone && ev.Type == "" {
w.WriteHeader(http.StatusBadRequest)
_, _ = fmt.Fprintln(w, "ignoring invalid event: must set 'type' or 'severity'")
abort(http.StatusBadRequest, ev, "ignoring invalid event: must set 'type' or 'severity'")
return
}

if ev.Severity != event.SeverityNone {
if ev.Type == "" {
ev.Type = event.TypeState
} else if ev.Type != event.TypeState {
w.WriteHeader(http.StatusBadRequest)
_, _ = fmt.Fprintf(w, "ignoring invalid event: if 'severity' is set, 'type' must not be set or set to %q\n", event.TypeState)
abort(http.StatusBadRequest, ev,
"ignoring invalid event: if 'severity' is set, 'type' must not be set or set to %q", event.TypeState)
return
}
}

if ev.Severity == event.SeverityNone {
if ev.Type != event.TypeAcknowledgement {
// It's neither a state nor an acknowledgement event.
w.WriteHeader(http.StatusBadRequest)
_, _ = fmt.Fprintf(w, "received not a state/acknowledgement event, ignoring\n")
abort(http.StatusBadRequest, ev, "received not a state/acknowledgement event, ignoring")
return
}
}
Expand All @@ -123,17 +144,14 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, req *http.Request) {
obj, err := object.FromEvent(ctx, l.db, &ev)
if err != nil {
l.logger.Errorw("Can't sync object", zap.Error(err))

w.WriteHeader(http.StatusInternalServerError)
_, _ = fmt.Fprintln(w, err.Error())
abort(http.StatusInternalServerError, ev, err.Error())
return
}

createIncident := ev.Severity != event.SeverityNone && ev.Severity != event.SeverityOK
currentIncident, created, err := incident.GetCurrent(ctx, l.db, obj, l.logs.GetChildLogger("incident"), l.runtimeConfig, createIncident)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = fmt.Fprintln(w, err)
abort(http.StatusInternalServerError, ev, err.Error())
return
}

Expand All @@ -159,12 +177,11 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, req *http.Request) {
return
}

l.logger.Infof("Processing event")
l.logger.Infow("Processing event", zap.String("event", ev.String()))

err = currentIncident.ProcessEvent(ctx, &ev, created)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = fmt.Fprintln(w, err)
abort(http.StatusInternalServerError, ev, err.Error())
return
}

Expand Down
5 changes: 5 additions & 0 deletions schema/pgsql/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ CREATE TABLE source (
-- will likely need a distinguishing value for multiple sources of the same type in the future, like for example
-- the Icinga DB environment ID for Icinga 2 sources

-- auth_username and auth_password are required to limit API access for incoming connections to the Listener.
-- This might change in the future to become "type"-dependable.
auth_username text NOT NULL,
auth_password text NOT NULL,

CONSTRAINT pk_source PRIMARY KEY (id)
);

Expand Down
7 changes: 7 additions & 0 deletions schema/pgsql/upgrades/019.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
ALTER TABLE source
ADD COLUMN auth_username text,
ADD COLUMN auth_password text;
UPDATE source SET auth_username = 'icinga', auth_password = 'correct horse battery staple';
ALTER TABLE source
ALTER COLUMN auth_username SET NOT NULL,
ALTER COLUMN auth_password SET NOT NULL;

0 comments on commit 6739452

Please sign in to comment.