From a4f65dcdf91241064011ca93996be241249ef483 Mon Sep 17 00:00:00 2001 From: Aleksandr Maus Date: Wed, 20 Jan 2021 10:42:14 -0500 Subject: [PATCH 1/2] Make fleet server components resilent if the indices do not exist --- internal/pkg/coordinator/monitor.go | 13 +++++++++++-- internal/pkg/esboot/bootstrap.go | 17 +++++++++-------- internal/pkg/migrate/migrate.go | 14 ++++++++++++-- internal/pkg/monitor/monitor.go | 10 +++++++++- internal/pkg/policy/monitor.go | 6 ++++++ 5 files changed, 47 insertions(+), 13 deletions(-) diff --git a/internal/pkg/coordinator/monitor.go b/internal/pkg/coordinator/monitor.go index 6544610e0..de27570c0 100644 --- a/internal/pkg/coordinator/monitor.go +++ b/internal/pkg/coordinator/monitor.go @@ -6,6 +6,7 @@ package coordinator import ( "context" + "errors" "net" "os" "runtime" @@ -184,7 +185,11 @@ func (m *monitorT) ensureLeadership(ctx context.Context) error { leaders := map[string]model.PolicyLeader{} policies, err := dl.QueryLatestPolicies(ctx, m.bulker, dl.WithIndexName(m.policiesIndex)) if err != nil { - return err + if errors.Is(err, es.ErrIndexNotFound) { + err = nil + } else { + return err + } } if len(policies) > 0 { ids := make([]string, len(policies)) @@ -193,7 +198,11 @@ func (m *monitorT) ensureLeadership(ctx context.Context) error { } leaders, err = dl.SearchPolicyLeaders(ctx, m.bulker, ids, dl.WithIndexName(m.leadersIndex)) if err != nil { - return err + if errors.Is(err, es.ErrIndexNotFound) { + err = nil + } else { + return err + } } } diff --git a/internal/pkg/esboot/bootstrap.go b/internal/pkg/esboot/bootstrap.go index 614d500d6..c000f965d 100644 --- a/internal/pkg/esboot/bootstrap.go +++ b/internal/pkg/esboot/bootstrap.go @@ -6,7 +6,6 @@ package esboot import ( "context" - "github.com/elastic/fleet-server/v7/internal/pkg/es" "github.com/elastic/go-elasticsearch/v8" ) @@ -20,13 +19,15 @@ type indexConfig struct { } var indexConfigs = map[string]indexConfig{ - ".fleet-actions": {mapping: es.MappingAction}, - ".fleet-actions-results": {mapping: es.MappingActionResult, datastream: true}, - ".fleet-agents": {mapping: es.MappingAgent}, - ".fleet-enrollment-api-keys": {mapping: es.MappingEnrollmentApiKey}, - ".fleet-policies": {mapping: es.MappingPolicy}, - ".fleet-policies-leader": {mapping: es.MappingPolicyLeader}, - ".fleet-servers": {mapping: es.MappingServer}, + // Commenting out the boostrapping for now here, just in case if it needs to be "enabled" again. + // Will remove all the boostrapping code completely later once all is fully integrated + // ".fleet-actions": {mapping: es.MappingAction}, + // ".fleet-actions-results": {mapping: es.MappingActionResult, datastream: true}, + // ".fleet-agents": {mapping: es.MappingAgent}, + // ".fleet-enrollment-api-keys": {mapping: es.MappingEnrollmentApiKey}, + // ".fleet-policies": {mapping: es.MappingPolicy}, + // ".fleet-policies-leader": {mapping: es.MappingPolicyLeader}, + // ".fleet-servers": {mapping: es.MappingServer}, } // Bootstrap creates .fleet-actions data stream diff --git a/internal/pkg/migrate/migrate.go b/internal/pkg/migrate/migrate.go index dbcc8cb83..4eed7d7ed 100644 --- a/internal/pkg/migrate/migrate.go +++ b/internal/pkg/migrate/migrate.go @@ -7,8 +7,11 @@ package migrate import ( "context" "encoding/json" + "errors" + "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/dl" + "github.com/elastic/fleet-server/v7/internal/pkg/es" "github.com/elastic/fleet-server/v7/internal/pkg/model" "github.com/elastic/fleet-server/v7/internal/pkg/saved" ) @@ -42,12 +45,19 @@ func MigrateEnrollmentAPIKeys(ctx context.Context, sv saved.CRUD, bulker bulk.Bu } var recs []model.EnrollmentApiKey + var resHits []es.HitT res, err := bulker.Search(ctx, []string{dl.FleetEnrollmentAPIKeys}, raw, bulk.WithRefresh()) if err != nil { - return err + if errors.Is(err, es.ErrIndexNotFound) { + err = nil + } else { + return err + } + } else { + resHits = res.Hits } - for _, hit := range res.Hits { + for _, hit := range resHits { var rec model.EnrollmentApiKey err := json.Unmarshal(hit.Source, &rec) if err != nil { diff --git a/internal/pkg/monitor/monitor.go b/internal/pkg/monitor/monitor.go index 78df63db9..a6f28936b 100644 --- a/internal/pkg/monitor/monitor.go +++ b/internal/pkg/monitor/monitor.go @@ -8,6 +8,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "sync/atomic" "time" @@ -291,7 +292,14 @@ func (m *simpleMonitorT) search(ctx context.Context, tmpl *dsl.Tmpl, params map[ } if res.IsError() { - return nil, es.TranslateError(res.StatusCode, esres.Error) + err = es.TranslateError(res.StatusCode, esres.Error) + } + + if err != nil { + if errors.Is(err, es.ErrIndexNotFound) { + return nil, nil + } + return nil, err } return esres.Hits.Hits, nil diff --git a/internal/pkg/policy/monitor.go b/internal/pkg/policy/monitor.go index 2c09836a0..70ff237ee 100644 --- a/internal/pkg/policy/monitor.go +++ b/internal/pkg/policy/monitor.go @@ -18,6 +18,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/bulk" "github.com/elastic/fleet-server/v7/internal/pkg/dl" + "github.com/elastic/fleet-server/v7/internal/pkg/es" "github.com/elastic/fleet-server/v7/internal/pkg/model" "github.com/elastic/fleet-server/v7/internal/pkg/monitor" ) @@ -125,6 +126,11 @@ LOOP: func (m *monitorT) process(ctx context.Context) error { policies, err := m.policyF(ctx, m.bulker, dl.WithIndexName(m.policiesIndex)) if err != nil { + if errors.Is(err, es.ErrIndexNotFound) { + err = nil + } else { + err = nil + } return err } if len(policies) == 0 { From 0824f935e3eeb3cab786d9890d465fe1f073a5b6 Mon Sep 17 00:00:00 2001 From: Aleksandr Maus Date: Thu, 21 Jan 2021 10:14:12 -0500 Subject: [PATCH 2/2] Address code review comments --- cmd/fleet/main.go | 2 +- internal/pkg/coordinator/monitor.go | 12 ++++++------ internal/pkg/migrate/migrate.go | 9 ++++++--- internal/pkg/monitor/monitor.go | 1 + internal/pkg/policy/monitor.go | 5 ++--- 5 files changed, 16 insertions(+), 13 deletions(-) diff --git a/cmd/fleet/main.go b/cmd/fleet/main.go index 2c0e1abd2..6900e51d2 100644 --- a/cmd/fleet/main.go +++ b/cmd/fleet/main.go @@ -212,7 +212,7 @@ func (f *FleetServer) runServer(ctx context.Context, cfg *config.Config) (err er if err != nil { return err } - err = migrate.Migrate(ctx, sv, bulker) + err = migrate.Migrate(ctx, log.Logger, sv, bulker) if err != nil { return err } diff --git a/internal/pkg/coordinator/monitor.go b/internal/pkg/coordinator/monitor.go index de27570c0..31ec54179 100644 --- a/internal/pkg/coordinator/monitor.go +++ b/internal/pkg/coordinator/monitor.go @@ -186,10 +186,10 @@ func (m *monitorT) ensureLeadership(ctx context.Context) error { policies, err := dl.QueryLatestPolicies(ctx, m.bulker, dl.WithIndexName(m.policiesIndex)) if err != nil { if errors.Is(err, es.ErrIndexNotFound) { - err = nil - } else { - return err + m.log.Debug().Str("index", m.policiesIndex).Msg(es.ErrIndexNotFound.Error()) + return nil } + return err } if len(policies) > 0 { ids := make([]string, len(policies)) @@ -199,10 +199,10 @@ func (m *monitorT) ensureLeadership(ctx context.Context) error { leaders, err = dl.SearchPolicyLeaders(ctx, m.bulker, ids, dl.WithIndexName(m.leadersIndex)) if err != nil { if errors.Is(err, es.ErrIndexNotFound) { - err = nil - } else { - return err + m.log.Debug().Str("index", m.leadersIndex).Msg(es.ErrIndexNotFound.Error()) + return nil } + return err } } diff --git a/internal/pkg/migrate/migrate.go b/internal/pkg/migrate/migrate.go index 4eed7d7ed..f18a4bde0 100644 --- a/internal/pkg/migrate/migrate.go +++ b/internal/pkg/migrate/migrate.go @@ -14,6 +14,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/es" "github.com/elastic/fleet-server/v7/internal/pkg/model" "github.com/elastic/fleet-server/v7/internal/pkg/saved" + "github.com/rs/zerolog" ) type enrollmentApiKey struct { @@ -32,11 +33,11 @@ type enrollmentApiKey struct { // This is for development only (1 instance of fleet) // Not safe for multiple instances of fleet // Initially needed to migrate the enrollment-api-keys that kibana creates -func Migrate(ctx context.Context, sv saved.CRUD, bulker bulk.Bulk) error { - return MigrateEnrollmentAPIKeys(ctx, sv, bulker) +func Migrate(ctx context.Context, log zerolog.Logger, sv saved.CRUD, bulker bulk.Bulk) error { + return MigrateEnrollmentAPIKeys(ctx, log, sv, bulker) } -func MigrateEnrollmentAPIKeys(ctx context.Context, sv saved.CRUD, bulker bulk.Bulk) error { +func MigrateEnrollmentAPIKeys(ctx context.Context, log zerolog.Logger, sv saved.CRUD, bulker bulk.Bulk) error { // Query all enrollment keys from the new schema raw, err := dl.RenderAllEnrollmentAPIKeysQuery(1000) @@ -49,6 +50,8 @@ func MigrateEnrollmentAPIKeys(ctx context.Context, sv saved.CRUD, bulker bulk.Bu res, err := bulker.Search(ctx, []string{dl.FleetEnrollmentAPIKeys}, raw, bulk.WithRefresh()) if err != nil { if errors.Is(err, es.ErrIndexNotFound) { + log.Debug().Str("index", dl.FleetEnrollmentAPIKeys).Msg(es.ErrIndexNotFound.Error()) + // Continue with migration if the .fleet-enrollment-api-keys index is not found err = nil } else { return err diff --git a/internal/pkg/monitor/monitor.go b/internal/pkg/monitor/monitor.go index a6f28936b..bdea51fce 100644 --- a/internal/pkg/monitor/monitor.go +++ b/internal/pkg/monitor/monitor.go @@ -297,6 +297,7 @@ func (m *simpleMonitorT) search(ctx context.Context, tmpl *dsl.Tmpl, params map[ if err != nil { if errors.Is(err, es.ErrIndexNotFound) { + m.log.Debug().Str("index", m.index).Msg(es.ErrIndexNotFound.Error()) return nil, nil } return nil, err diff --git a/internal/pkg/policy/monitor.go b/internal/pkg/policy/monitor.go index 70ff237ee..980529c6a 100644 --- a/internal/pkg/policy/monitor.go +++ b/internal/pkg/policy/monitor.go @@ -127,9 +127,8 @@ func (m *monitorT) process(ctx context.Context) error { policies, err := m.policyF(ctx, m.bulker, dl.WithIndexName(m.policiesIndex)) if err != nil { if errors.Is(err, es.ErrIndexNotFound) { - err = nil - } else { - err = nil + m.log.Debug().Str("index", m.policiesIndex).Msg(es.ErrIndexNotFound.Error()) + return nil } return err }