diff --git a/internal/extractor/plugins/shared/vm_host_residency.sql b/internal/extractor/plugins/shared/vm_host_residency.sql index 8ad5188f..9b95f3b0 100644 --- a/internal/extractor/plugins/shared/vm_host_residency.sql +++ b/internal/extractor/plugins/shared/vm_host_residency.sql @@ -21,7 +21,7 @@ WITH durations AS ( )) AS BIGINT) ) AS duration FROM openstack_migrations AS migrations - LEFT JOIN openstack_servers_v2 AS servers ON servers.id = migrations.instance_uuid + LEFT JOIN openstack_servers AS servers ON servers.id = migrations.instance_uuid LEFT JOIN openstack_flavors_v2 AS flavors ON flavors.name = servers.flavor_name ) SELECT diff --git a/internal/extractor/plugins/vmware/vrops_hostsystem_resolver.sql b/internal/extractor/plugins/vmware/vrops_hostsystem_resolver.sql index 8ab0a2c7..e2c6ad4b 100644 --- a/internal/extractor/plugins/vmware/vrops_hostsystem_resolver.sql +++ b/internal/extractor/plugins/vmware/vrops_hostsystem_resolver.sql @@ -3,5 +3,5 @@ SELECT DISTINCT m.hostsystem AS vrops_hostsystem, s.os_ext_srv_attr_host AS nova_compute_host FROM vrops_vm_metrics m -LEFT JOIN openstack_servers_v2 s ON m.instance_uuid = s.id +LEFT JOIN openstack_servers s ON m.instance_uuid = s.id WHERE s.os_ext_srv_attr_host IS NOT NULL; diff --git a/internal/extractor/plugins/vmware/vrops_project_noisiness.sql b/internal/extractor/plugins/vmware/vrops_project_noisiness.sql index 0b006779..334668b2 100644 --- a/internal/extractor/plugins/vmware/vrops_project_noisiness.sql +++ b/internal/extractor/plugins/vmware/vrops_project_noisiness.sql @@ -19,7 +19,7 @@ host_cpu_usage AS ( s.tenant_id, h.service_host, AVG(p.avg_cpu) AS avg_cpu_of_project - FROM openstack_servers_v2 s + FROM openstack_servers s JOIN vrops_vm_metrics m ON s.id = m.instance_uuid JOIN projects_avg_cpu p ON s.tenant_id = p.tenant_id JOIN openstack_hypervisors h ON s.os_ext_srv_attr_hypervisor_hostname = h.hostname diff --git a/internal/sync/openstack/nova/nova_api.go b/internal/sync/openstack/nova/nova_api.go index 07afd29c..5552c821 100644 --- a/internal/sync/openstack/nova/nova_api.go +++ b/internal/sync/openstack/nova/nova_api.go @@ -32,8 +32,8 @@ type NovaAPI interface { GetAllHypervisors(ctx context.Context) ([]Hypervisor, error) // Get all nova flavors. GetAllFlavors(ctx context.Context) ([]Flavor, error) - // Get all changed nova migrations since the timestamp. - GetChangedMigrations(ctx context.Context, changedSince *time.Time) ([]Migration, error) + // Get all nova migrations. + GetAllMigrations(ctx context.Context) ([]Migration, error) // Get all aggregates. GetAllAggregates(ctx context.Context) ([]Aggregate, error) } @@ -110,6 +110,11 @@ func (api *novaAPI) GetAllServers(ctx context.Context) ([]Server, error) { } // Get all deleted Nova servers. +// Note on Nova terminology: Nova uses "instance" internally in its database and code, +// but exposes these as "server" objects through the public API. +// Server lifecycle and cleanup: +// - In SAP Cloud Infrastructure's Nova fork, orphaned servers are purged after 3 weeks +// - This means historical server data is limited to 3 weeks func (api *novaAPI) GetDeletedServers(ctx context.Context, since time.Time) ([]DeletedServer, error) { label := DeletedServer{}.TableName() @@ -226,10 +231,19 @@ func (api *novaAPI) GetAllFlavors(ctx context.Context) ([]Flavor, error) { return data.Flavors, nil } -// Get all changed Nova migrations. -func (api *novaAPI) GetChangedMigrations(ctx context.Context, changedSince *time.Time) ([]Migration, error) { +// Get all Nova migrations from the OpenStack API. +// +// Note on Nova terminology: Nova uses "instance" internally in its database and code, +// but exposes these as "server" objects through the public API. +// +// Migration lifecycle and cleanup: +// - Migrations are automatically deleted when their associated server is deleted +// (see Nova source: https://github.com/openstack/nova/blob/1508cb39a2b12ef2d4f706b9c303a744ce40e707/nova/db/main/api.py#L1337-L1358) +// - In SAP Cloud Infrastructure's Nova fork, orphaned migrations are purged after 3 weeks +// - This means historical migration data has limited retention +func (api *novaAPI) GetAllMigrations(ctx context.Context) ([]Migration, error) { label := Migration{}.TableName() - slog.Info("fetching nova data", "label", label, "changedSince", changedSince) + slog.Info("fetching nova data", "label", label) // Note: currently we need to fetch this without gophercloud. // See: https://github.com/gophercloud/gophercloud/pull/3244 if api.mon.PipelineRequestTimer != nil { @@ -238,11 +252,6 @@ func (api *novaAPI) GetChangedMigrations(ctx context.Context, changedSince *time defer timer.ObserveDuration() } initialURL := api.sc.Endpoint + "os-migrations" - // It is important to omit the changes-since parameter if it is nil. - // Otherwise Nova may return huge amounts of data since the beginning of time. - if changedSince != nil { - initialURL += "?changes-since=" + changedSince.Format(time.RFC3339) - } var nextURL = &initialURL var migrations []Migration for nextURL != nil { diff --git a/internal/sync/openstack/nova/nova_api_test.go b/internal/sync/openstack/nova/nova_api_test.go index 26aca709..3be154c3 100644 --- a/internal/sync/openstack/nova/nova_api_test.go +++ b/internal/sync/openstack/nova/nova_api_test.go @@ -191,60 +191,45 @@ func TestNovaAPI_GetAllFlavors(t *testing.T) { } } -func TestNovaAPI_GetChangedMigrations(t *testing.T) { - tests := []struct { - name string - time *time.Time - }{ - {"nil", nil}, - {"time", &time.Time{}}, - } - for _, tt := range tests { - handler := func(w http.ResponseWriter, r *http.Request) { - if tt.time == nil { - // Check that the changes-since query parameter is not set. - if r.URL.Query().Get("changes-since") != "" { - t.Fatalf("expected no changes-since query parameter, got %s", r.URL.Query().Get("changes-since")) - } - } else { - if r.URL.Query().Get("changes-since") != tt.time.Format(time.RFC3339) { - t.Fatalf("expected changes-since query parameter to be %s, got %s", tt.time.Format(time.RFC3339), r.URL.Query().Get("changes-since")) - } - } - w.Header().Add("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - resp := struct { - Migrations []Migration `json:"migrations"` - Links []struct { - Rel string `json:"rel"` - Href string `json:"href"` - } `json:"migrations_links"` - }{ - Migrations: []Migration{{ID: 1, SourceCompute: "host1", DestCompute: "host2", Status: "completed"}}, - } - if err := json.NewEncoder(w).Encode(resp); err != nil { - t.Fatalf("failed to write response: %v", err) - } +func TestNovaAPI_GetAllMigrations(t *testing.T) { + handler := func(w http.ResponseWriter, r *http.Request) { + if r.URL.Query().Get("changes-since") != "" { + t.Fatalf("expected no changes-since query parameter, got %s", r.URL.Query().Get("changes-since")) } - server, k := setupNovaMockServer(handler) - defer server.Close() + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + resp := struct { + Migrations []Migration `json:"migrations"` + Links []struct { + Rel string `json:"rel"` + Href string `json:"href"` + } `json:"migrations_links"` + }{ + Migrations: []Migration{{ID: 1, SourceCompute: "host1", DestCompute: "host2", Status: "completed"}}, + } + if err := json.NewEncoder(w).Encode(resp); err != nil { + t.Fatalf("failed to write response: %v", err) + } + } - mon := sync.Monitor{} - conf := NovaConf{Availability: "public"} + server, k := setupNovaMockServer(handler) + defer server.Close() - api := NewNovaAPI(mon, k, conf).(*novaAPI) - api.Init(t.Context()) + mon := sync.Monitor{} + conf := NovaConf{Availability: "public"} - ctx := t.Context() - migrations, err := api.GetChangedMigrations(ctx, tt.time) - if err != nil { - t.Fatalf("expected no error, got %v", err) - } - if len(migrations) != 1 { - t.Fatalf("expected 1 migration, got %d", len(migrations)) - } - if migrations[0].ID != 1 || migrations[0].SourceCompute != "host1" || migrations[0].DestCompute != "host2" || migrations[0].Status != "completed" { - t.Errorf("unexpected migration data: %+v", migrations[0]) - } + api := NewNovaAPI(mon, k, conf).(*novaAPI) + api.Init(t.Context()) + + ctx := t.Context() + migrations, err := api.GetAllMigrations(ctx) + if err != nil { + t.Fatalf("expected no error, got %v", err) + } + if len(migrations) != 1 { + t.Fatalf("expected 1 migration, got %d", len(migrations)) + } + if migrations[0].ID != 1 || migrations[0].SourceCompute != "host1" || migrations[0].DestCompute != "host2" || migrations[0].Status != "completed" { + t.Errorf("unexpected migration data: %+v", migrations[0]) } } diff --git a/internal/sync/openstack/nova/nova_sync.go b/internal/sync/openstack/nova/nova_sync.go index 133a0c1a..e793e952 100644 --- a/internal/sync/openstack/nova/nova_sync.go +++ b/internal/sync/openstack/nova/nova_sync.go @@ -5,7 +5,6 @@ package nova import ( "context" - "log/slog" "slices" "time" @@ -15,24 +14,6 @@ import ( "github.com/go-gorp/gorp" ) -// Table to store which sync runs where performed and when. -type novaSync struct { - // Name of the sync run. - Name string `db:"name"` - // Time when the sync run was performed. - Time time.Time `db:"time"` -} - -// Table under which the nova sync will be stored. -func (novaSync) TableName() string { - return "nova_sync" -} - -// Indexes for the nova sync table. -func (novaSync) Indexes() []db.Index { - return nil -} - // Syncer for OpenStack nova. type NovaSyncer struct { // Database to store the nova objects in. @@ -50,7 +31,7 @@ type NovaSyncer struct { // Init the OpenStack nova syncer. func (s *NovaSyncer) Init(ctx context.Context) { s.API.Init(ctx) - tables := []*gorp.TableMap{s.DB.AddTable(novaSync{})} + tables := []*gorp.TableMap{} // Only add the tables that are configured in the yaml conf. if slices.Contains(s.Conf.Types, "servers") { tables = append(tables, s.DB.AddTable(Server{})) @@ -79,22 +60,18 @@ func (s *NovaSyncer) Init(ctx context.Context) { func (s *NovaSyncer) Sync(ctx context.Context) error { // Only sync the objects that are configured in the yaml conf. if slices.Contains(s.Conf.Types, "servers") { - changedServers, err := s.SyncAllServers(ctx) + _, err := s.SyncAllServers(ctx) if err != nil { return err } - if len(changedServers) > 0 { - go s.MqttClient.Publish(TriggerNovaServersSynced, "") - } + go s.MqttClient.Publish(TriggerNovaServersSynced, "") } if slices.Contains(s.Conf.Types, "deleted_servers") { - changedDeletedServers, err := s.SyncDeletedServers(ctx) + _, err := s.SyncDeletedServers(ctx) if err != nil { return err } - if len(changedDeletedServers) > 0 { - go s.MqttClient.Publish(TriggerNovaDeletedServersSynced, "") - } + go s.MqttClient.Publish(TriggerNovaDeletedServersSynced, "") } if slices.Contains(s.Conf.Types, "hypervisors") { _, err := s.SyncAllHypervisors(ctx) @@ -104,119 +81,25 @@ func (s *NovaSyncer) Sync(ctx context.Context) error { go s.MqttClient.Publish(TriggerNovaHypervisorsSynced, "") } if slices.Contains(s.Conf.Types, "flavors") { - changedFlavors, err := s.SyncAllFlavors(ctx) + _, err := s.SyncAllFlavors(ctx) if err != nil { return err } - if len(changedFlavors) > 0 { - go s.MqttClient.Publish(TriggerNovaFlavorsSynced, "") - } + go s.MqttClient.Publish(TriggerNovaFlavorsSynced, "") } if slices.Contains(s.Conf.Types, "migrations") { - changedMigrations, err := s.SyncChangedMigrations(ctx) + _, err := s.SyncAllMigrations(ctx) if err != nil { return err } - if len(changedMigrations) > 0 { - go s.MqttClient.Publish(TriggerNovaMigrationsSynced, "") - } + go s.MqttClient.Publish(TriggerNovaMigrationsSynced, "") } if slices.Contains(s.Conf.Types, "aggregates") { - changedAggregates, err := s.SyncAllAggregates(ctx) + _, err := s.SyncAllAggregates(ctx) if err != nil { return err } - if len(changedAggregates) > 0 { - go s.MqttClient.Publish(TriggerNovaAggregatesSynced, "") - } - } - return nil -} - -// Check when the last sync run for a specific table was performed. -// If there was no sync run, return nil. -func (s *NovaSyncer) getLastSyncTime(tableName string) *time.Time { - // Check when the last sync run was performed, if there was one. - var lastSyncTime *time.Time - var lastSync novaSync - table := novaSync{}.TableName() - if err := s.DB.SelectOne(&lastSync, ` - SELECT * FROM `+table+` - WHERE name = :name ORDER BY time DESC LIMIT 1 - `, map[string]any{"name": tableName}); err == nil { - lastSyncTime = &lastSync.Time - slog.Info("last nova sync run", "time", lastSync.Time, "table", tableName) - } else { - slog.Info("no previous nova sync run", "table", tableName) - } - return lastSyncTime -} - -// Store a new sync run in the database. -func (s *NovaSyncer) setLastSyncTime(tableName string, time time.Time) { - if err := s.DB.Insert(&novaSync{Name: tableName, Time: time}); err != nil { - slog.Error("failed to insert nova sync", "error", err) - } -} - -// Upsert nova objects into the database. -func upsert[O any](s *NovaSyncer, objects []O, pk string, getpk func(O) string, tableName string) error { - nObjectsInDB := 0 - q := "SELECT COUNT(*) FROM " + tableName - if err := s.DB.SelectOne(&nObjectsInDB, q); err != nil { - return err - } - var existingObjects []O - if nObjectsInDB > 0 && len(objects) > 0 { - // Check which objects only need to be updated instead of inserted. - // Using a contains query with the object ID: - q = "SELECT " + pk + " FROM " + tableName + " WHERE " + pk + " IN (" - for i, object := range objects { - if i > 0 { - q += ", " - } - q += "'" + getpk(object) + "'" - } - q += ")" - if _, err := s.DB.Select(&existingObjects, q); err != nil { - return err - } - } - existingObjectsByID := make(map[string]O, len(existingObjects)) - for _, object := range existingObjects { - existingObjectsByID[getpk(object)] = object - } - tx, err := s.DB.Begin() - if err != nil { - return err - } - for _, object := range objects { - if _, ok := existingObjectsByID[getpk(object)]; ok { - if _, err := tx.Update(&object); err != nil { - return tx.Rollback() - } - } else { - if err := tx.Insert(&object); err != nil { - return tx.Rollback() - } - } - } - if err := tx.Commit(); err != nil { - slog.Error("failed to commit transaction", "error", err) - } - // Check how many objects we have in the database. - q = "SELECT COUNT(*) FROM " + tableName - var count int - if err := s.DB.SelectOne(&count, q); err != nil { - return err - } - if s.Mon.PipelineObjectsGauge != nil { - gauge := s.Mon.PipelineObjectsGauge.WithLabelValues(tableName) - gauge.Set(float64(count)) - } - if s.Mon.PipelineRequestProcessedCounter != nil { - counter := s.Mon.PipelineRequestProcessedCounter.WithLabelValues(tableName) - counter.Inc() + go s.MqttClient.Publish(TriggerNovaAggregatesSynced, "") } return nil } @@ -285,22 +168,19 @@ func (s *NovaSyncer) SyncAllFlavors(ctx context.Context) ([]Flavor, error) { } // Sync the OpenStack migrations into the database. -func (s *NovaSyncer) SyncChangedMigrations(ctx context.Context) ([]Migration, error) { - tableName := Migration{}.TableName() - updatedSyncTime := time.Now() - lastSyncTime := s.getLastSyncTime(tableName) - changedMigrations, err := s.API.GetChangedMigrations(ctx, lastSyncTime) +func (s *NovaSyncer) SyncAllMigrations(ctx context.Context) ([]Migration, error) { + allMigrations, err := s.API.GetAllMigrations(ctx) if err != nil { return nil, err } - err = upsert(s, changedMigrations, "uuid", func(m Migration) string { return m.UUID }, tableName) + err = db.ReplaceAll(s.DB, allMigrations...) if err != nil { return nil, err } - s.setLastSyncTime(tableName, updatedSyncTime) - return changedMigrations, nil + return allMigrations, nil } +// Sync the OpenStack aggregates into the database. func (s *NovaSyncer) SyncAllAggregates(ctx context.Context) ([]Aggregate, error) { allAggregates, err := s.API.GetAllAggregates(ctx) if err != nil { diff --git a/internal/sync/openstack/nova/nova_sync_test.go b/internal/sync/openstack/nova/nova_sync_test.go index 30e917a7..a8244e69 100644 --- a/internal/sync/openstack/nova/nova_sync_test.go +++ b/internal/sync/openstack/nova/nova_sync_test.go @@ -48,7 +48,7 @@ func (m *mockNovaAPI) GetAllFlavors(ctx context.Context) ([]Flavor, error) { return []Flavor{{ID: "1", Name: "flavor1"}}, nil } -func (m *mockNovaAPI) GetChangedMigrations(ctx context.Context, t *time.Time) ([]Migration, error) { +func (m *mockNovaAPI) GetAllMigrations(ctx context.Context) ([]Migration, error) { return []Migration{{ID: 1}}, nil } @@ -234,7 +234,7 @@ func TestNovaSyncer_SyncMigrations(t *testing.T) { ctx := t.Context() syncer.Init(ctx) - migrations, err := syncer.SyncChangedMigrations(ctx) + migrations, err := syncer.SyncAllMigrations(ctx) if err != nil { t.Fatalf("expected no error, got %v", err) } diff --git a/internal/sync/openstack/nova/nova_types.go b/internal/sync/openstack/nova/nova_types.go index ed40cecd..f4fec89c 100644 --- a/internal/sync/openstack/nova/nova_types.go +++ b/internal/sync/openstack/nova/nova_types.go @@ -165,7 +165,7 @@ func (s *Server) MarshalJSON() ([]byte, error) { } // Table in which the openstack model is stored. -func (Server) TableName() string { return "openstack_servers_v2" } +func (Server) TableName() string { return "openstack_servers" } // Index for the openstack model. func (Server) Indexes() []db.Index { return nil }