Skip to content

Commit

Permalink
Merge pull request #68 from aleksmaus/feature/resilent_fleet_server
Browse files Browse the repository at this point in the history
Make Fleet Server components resilient if the indices do not exist
  • Loading branch information
aleksmaus committed Jan 21, 2021
2 parents bd7527d + 0824f93 commit f931091
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 15 deletions.
2 changes: 1 addition & 1 deletion cmd/fleet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (f *FleetServer) runServer(ctx context.Context, cfg *config.Config) (err er
if err != nil {
return err
}
err = migrate.Migrate(ctx, sv, bulker)
err = migrate.Migrate(ctx, log.Logger, sv, bulker)
if err != nil {
return err
}
Expand Down
9 changes: 9 additions & 0 deletions internal/pkg/coordinator/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package coordinator

import (
"context"
"errors"
"net"
"os"
"runtime"
Expand Down Expand Up @@ -184,6 +185,10 @@ func (m *monitorT) ensureLeadership(ctx context.Context) error {
leaders := map[string]model.PolicyLeader{}
policies, err := dl.QueryLatestPolicies(ctx, m.bulker, dl.WithIndexName(m.policiesIndex))
if err != nil {
if errors.Is(err, es.ErrIndexNotFound) {
m.log.Debug().Str("index", m.policiesIndex).Msg(es.ErrIndexNotFound.Error())
return nil
}
return err
}
if len(policies) > 0 {
Expand All @@ -193,6 +198,10 @@ func (m *monitorT) ensureLeadership(ctx context.Context) error {
}
leaders, err = dl.SearchPolicyLeaders(ctx, m.bulker, ids, dl.WithIndexName(m.leadersIndex))
if err != nil {
if errors.Is(err, es.ErrIndexNotFound) {
m.log.Debug().Str("index", m.leadersIndex).Msg(es.ErrIndexNotFound.Error())
return nil
}
return err
}
}
Expand Down
17 changes: 9 additions & 8 deletions internal/pkg/esboot/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package esboot

import (
"context"
"github.com/elastic/fleet-server/v7/internal/pkg/es"

"github.com/elastic/go-elasticsearch/v8"
)
Expand All @@ -20,13 +19,15 @@ type indexConfig struct {
}

var indexConfigs = map[string]indexConfig{
".fleet-actions": {mapping: es.MappingAction},
".fleet-actions-results": {mapping: es.MappingActionResult, datastream: true},
".fleet-agents": {mapping: es.MappingAgent},
".fleet-enrollment-api-keys": {mapping: es.MappingEnrollmentApiKey},
".fleet-policies": {mapping: es.MappingPolicy},
".fleet-policies-leader": {mapping: es.MappingPolicyLeader},
".fleet-servers": {mapping: es.MappingServer},
// Commenting out the boostrapping for now here, just in case if it needs to be "enabled" again.
// Will remove all the boostrapping code completely later once all is fully integrated
// ".fleet-actions": {mapping: es.MappingAction},
// ".fleet-actions-results": {mapping: es.MappingActionResult, datastream: true},
// ".fleet-agents": {mapping: es.MappingAgent},
// ".fleet-enrollment-api-keys": {mapping: es.MappingEnrollmentApiKey},
// ".fleet-policies": {mapping: es.MappingPolicy},
// ".fleet-policies-leader": {mapping: es.MappingPolicyLeader},
// ".fleet-servers": {mapping: es.MappingServer},
}

// Bootstrap creates .fleet-actions data stream
Expand Down
23 changes: 18 additions & 5 deletions internal/pkg/migrate/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@ package migrate
import (
"context"
"encoding/json"
"errors"

"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
"github.com/elastic/fleet-server/v7/internal/pkg/es"
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"github.com/elastic/fleet-server/v7/internal/pkg/saved"
"github.com/rs/zerolog"
)

type enrollmentApiKey struct {
Expand All @@ -29,11 +33,11 @@ type enrollmentApiKey struct {
// This is for development only (1 instance of fleet)
// Not safe for multiple instances of fleet
// Initially needed to migrate the enrollment-api-keys that kibana creates
func Migrate(ctx context.Context, sv saved.CRUD, bulker bulk.Bulk) error {
return MigrateEnrollmentAPIKeys(ctx, sv, bulker)
func Migrate(ctx context.Context, log zerolog.Logger, sv saved.CRUD, bulker bulk.Bulk) error {
return MigrateEnrollmentAPIKeys(ctx, log, sv, bulker)
}

func MigrateEnrollmentAPIKeys(ctx context.Context, sv saved.CRUD, bulker bulk.Bulk) error {
func MigrateEnrollmentAPIKeys(ctx context.Context, log zerolog.Logger, sv saved.CRUD, bulker bulk.Bulk) error {

// Query all enrollment keys from the new schema
raw, err := dl.RenderAllEnrollmentAPIKeysQuery(1000)
Expand All @@ -42,12 +46,21 @@ func MigrateEnrollmentAPIKeys(ctx context.Context, sv saved.CRUD, bulker bulk.Bu
}

var recs []model.EnrollmentApiKey
var resHits []es.HitT
res, err := bulker.Search(ctx, []string{dl.FleetEnrollmentAPIKeys}, raw, bulk.WithRefresh())
if err != nil {
return err
if errors.Is(err, es.ErrIndexNotFound) {
log.Debug().Str("index", dl.FleetEnrollmentAPIKeys).Msg(es.ErrIndexNotFound.Error())
// Continue with migration if the .fleet-enrollment-api-keys index is not found
err = nil
} else {
return err
}
} else {
resHits = res.Hits
}

for _, hit := range res.Hits {
for _, hit := range resHits {
var rec model.EnrollmentApiKey
err := json.Unmarshal(hit.Source, &rec)
if err != nil {
Expand Down
11 changes: 10 additions & 1 deletion internal/pkg/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"sync/atomic"
"time"

Expand Down Expand Up @@ -291,7 +292,15 @@ func (m *simpleMonitorT) search(ctx context.Context, tmpl *dsl.Tmpl, params map[
}

if res.IsError() {
return nil, es.TranslateError(res.StatusCode, esres.Error)
err = es.TranslateError(res.StatusCode, esres.Error)
}

if err != nil {
if errors.Is(err, es.ErrIndexNotFound) {
m.log.Debug().Str("index", m.index).Msg(es.ErrIndexNotFound.Error())
return nil, nil
}
return nil, err
}

return esres.Hits.Hits, nil
Expand Down
5 changes: 5 additions & 0 deletions internal/pkg/policy/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/elastic/fleet-server/v7/internal/pkg/bulk"
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
"github.com/elastic/fleet-server/v7/internal/pkg/es"
"github.com/elastic/fleet-server/v7/internal/pkg/model"
"github.com/elastic/fleet-server/v7/internal/pkg/monitor"
)
Expand Down Expand Up @@ -125,6 +126,10 @@ LOOP:
func (m *monitorT) process(ctx context.Context) error {
policies, err := m.policyF(ctx, m.bulker, dl.WithIndexName(m.policiesIndex))
if err != nil {
if errors.Is(err, es.ErrIndexNotFound) {
m.log.Debug().Str("index", m.policiesIndex).Msg(es.ErrIndexNotFound.Error())
return nil
}
return err
}
if len(policies) == 0 {
Expand Down

0 comments on commit f931091

Please sign in to comment.