Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ linters:
- third_party$
- builtin$
- examples$
- node_modules
formatters:
enable:
- gci
Expand All @@ -64,3 +65,4 @@ formatters:
- third_party$
- builtin$
- examples$
- node_modules
53 changes: 48 additions & 5 deletions apiserver/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ import (
wsWriter "github.com/cloudbase/garm/websocket"
"github.com/cloudbase/garm/workers/websocket/agent"
"github.com/cloudbase/garm/workers/websocket/events"
wsMetrics "github.com/cloudbase/garm/workers/websocket/metrics"
)

func NewAPIController(r *runner.Runner, authenticator *auth.Authenticator, hub *wsWriter.Hub, agentHub *agent.Hub, apiCfg config.APIServer) (*APIController, error) {
func NewAPIController(r *runner.Runner, authenticator *auth.Authenticator, hub *wsWriter.Hub, agentHub *agent.Hub, metricsHub *wsMetrics.MetricsHub, apiCfg config.APIServer) (*APIController, error) {
controllerInfo, err := r.GetControllerInfo(auth.GetAdminContext(context.Background()))
if err != nil {
return nil, fmt.Errorf("failed to get controller info: %w", err)
Expand Down Expand Up @@ -75,10 +76,11 @@ func NewAPIController(r *runner.Runner, authenticator *auth.Authenticator, hub *
}
}
return &APIController{
r: r,
auth: authenticator,
hub: hub,
agentHub: agentHub,
r: r,
auth: authenticator,
hub: hub,
agentHub: agentHub,
metricsHub: metricsHub,
upgrader: websocket.Upgrader{
ReadBufferSize: 4096,
WriteBufferSize: 16384,
Expand All @@ -93,6 +95,7 @@ type APIController struct {
auth *auth.Authenticator
hub *wsWriter.Hub
agentHub *agent.Hub
metricsHub *wsMetrics.MetricsHub
upgrader websocket.Upgrader
controllerID string
}
Expand Down Expand Up @@ -243,6 +246,46 @@ func (a *APIController) EventsHandler(w http.ResponseWriter, r *http.Request) {
<-eventHandler.Done()
}

func (a *APIController) MetricsHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
if !auth.IsAdmin(ctx) {
w.WriteHeader(http.StatusForbidden)
if _, err := w.Write([]byte("metrics are available to admin users")); err != nil {
slog.With(slog.Any("error", err)).ErrorContext(ctx, "failed to encode response")
}
return
}

if a.metricsHub == nil {
handleError(ctx, w, gErrors.NewBadRequestError("metrics hub is not enabled"))
return
}

conn, err := a.upgrader.Upgrade(w, r, nil)
if err != nil {
slog.With(slog.Any("error", err)).ErrorContext(ctx, "error upgrading to websockets")
return
}
defer conn.Close()

client, err := wsWriter.NewClient(ctx, conn)
if err != nil {
slog.With(slog.Any("error", err)).ErrorContext(ctx, "failed to create new client")
return
}
if err := a.metricsHub.Register(client); err != nil {
slog.With(slog.Any("error", err)).ErrorContext(ctx, "failed to register metrics client")
return
}
defer a.metricsHub.Unregister(client) //nolint:errcheck

if err := client.Start(); err != nil {
slog.With(slog.Any("error", err)).ErrorContext(ctx, "failed to start client")
return
}
<-client.Done()
}

func (a *APIController) WSHandler(writer http.ResponseWriter, req *http.Request) {
ctx := req.Context()
if !auth.IsAdmin(ctx) {
Expand Down
3 changes: 3 additions & 0 deletions apiserver/routers/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,9 @@ func NewAPIRouter(han *controllers.APIController, authMiddleware, initMiddleware
// DB watcher websocket endpoint
apiRouter.Handle("/ws/events/", http.HandlerFunc(han.EventsHandler)).Methods("GET")
apiRouter.Handle("/ws/events", http.HandlerFunc(han.EventsHandler)).Methods("GET")
// Metrics websocket endpoint
apiRouter.Handle("/ws/metrics/", http.HandlerFunc(han.MetricsHandler)).Methods("GET")
apiRouter.Handle("/ws/metrics", http.HandlerFunc(han.MetricsHandler)).Methods("GET")
apiRouter.Handle("/ws/agent/{agentName}/shell", http.HandlerFunc(han.AgentShellHandler)).Methods("GET")

// NotFound handler - this should be last
Expand Down
264 changes: 264 additions & 0 deletions cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1434,6 +1434,270 @@ func (c *CacheTestSuite) TestGetEntityForInstanceWithBothPoolAndScaleSet() {
// Should retrieve entity via scale set (scale set takes precedence)
}

// Tests for pool/scaleset cleanup on delete operations.
// These cover bugs where deleted pools and scale sets remained
// in the global maps (e.pools, e.scalesets) after deletion.

func (c *CacheTestSuite) TestDeleteEntityPoolRemovesFromGlobalMap() {
entity := params.ForgeEntity{
ID: "test-entity",
EntityType: params.ForgeEntityTypeOrganization,
Name: "test",
Owner: "test",
}
pool := params.Pool{
ID: "pool-1",
OrgID: "test-entity",
CreatedAt: time.Now(),
}

SetEntity(entity)
SetEntityPool(entity.ID, pool)

// Pool should be in both entity index and global map.
pools := GetAllPools()
c.Require().Len(pools, 1)
c.Require().Equal("pool-1", pools[0].ID)

_, ok := GetPoolByID("pool-1")
c.Require().True(ok)

// Delete the pool.
DeleteEntityPool(entity.ID, pool.ID)

// Pool must be gone from entity index.
entityPools := GetEntityPools(entity.ID)
c.Require().Len(entityPools, 0)

// Pool must also be gone from the global map.
pools = GetAllPools()
c.Require().Len(pools, 0)

_, ok = GetPoolByID("pool-1")
c.Require().False(ok)
}

func (c *CacheTestSuite) TestDeleteEntityScaleSetRemovesFromGlobalMap() {
entity := params.ForgeEntity{
ID: "test-entity",
EntityType: params.ForgeEntityTypeOrganization,
Name: "test",
Owner: "test",
}
ss := params.ScaleSet{
ID: 1,
OrgID: "test-entity",
}

SetEntity(entity)
SetEntityScaleSet(entity.ID, ss)

// Scale set should be in both entity index and global map.
allSS := GetAllScaleSets()
c.Require().Len(allSS, 1)

_, ok := GetScaleSetByID(1)
c.Require().True(ok)

// Delete the scale set.
DeleteEntityScaleSet(entity.ID, ss.ID)

// Scale set must be gone from entity index.
entitySS := GetEntityScaleSets(entity.ID)
c.Require().Len(entitySS, 0)

// Scale set must also be gone from the global map.
allSS = GetAllScaleSets()
c.Require().Len(allSS, 0)

_, ok = GetScaleSetByID(1)
c.Require().False(ok)
}

func (c *CacheTestSuite) TestDeleteEntityCleansUpPoolsAndScaleSets() {
entity := params.ForgeEntity{
ID: "test-entity",
EntityType: params.ForgeEntityTypeOrganization,
Name: "test",
Owner: "test",
}
pool1 := params.Pool{
ID: "pool-1",
OrgID: "test-entity",
CreatedAt: time.Now(),
}
pool2 := params.Pool{
ID: "pool-2",
OrgID: "test-entity",
CreatedAt: time.Now().Add(1 * time.Second),
}
ss := params.ScaleSet{
ID: 1,
OrgID: "test-entity",
}

SetEntity(entity)
SetEntityPool(entity.ID, pool1)
SetEntityPool(entity.ID, pool2)
SetEntityScaleSet(entity.ID, ss)

// Verify everything is in the global maps.
c.Require().Len(GetAllPools(), 2)
c.Require().Len(GetAllScaleSets(), 1)

// Delete the entity (simulates cascade delete from DB).
DeleteEntity(entity.ID)

// Entity must be gone.
_, ok := GetEntity(entity.ID)
c.Require().False(ok)

// All pools belonging to the entity must also be gone from global map.
c.Require().Len(GetAllPools(), 0)
_, ok = GetPoolByID("pool-1")
c.Require().False(ok)
_, ok = GetPoolByID("pool-2")
c.Require().False(ok)

// All scale sets belonging to the entity must also be gone.
c.Require().Len(GetAllScaleSets(), 0)
_, ok = GetScaleSetByID(1)
c.Require().False(ok)
}

func (c *CacheTestSuite) TestDeleteEntityDoesNotAffectOtherEntityPools() {
entity1 := params.ForgeEntity{
ID: "entity-1",
EntityType: params.ForgeEntityTypeOrganization,
Name: "org1",
Owner: "owner1",
}
entity2 := params.ForgeEntity{
ID: "entity-2",
EntityType: params.ForgeEntityTypeOrganization,
Name: "org2",
Owner: "owner2",
}
pool1 := params.Pool{
ID: "pool-1",
OrgID: "entity-1",
CreatedAt: time.Now(),
}
pool2 := params.Pool{
ID: "pool-2",
OrgID: "entity-2",
CreatedAt: time.Now(),
}

SetEntity(entity1)
SetEntity(entity2)
SetEntityPool(entity1.ID, pool1)
SetEntityPool(entity2.ID, pool2)

c.Require().Len(GetAllPools(), 2)

// Delete entity1. Only its pools should be removed.
DeleteEntity(entity1.ID)

pools := GetAllPools()
c.Require().Len(pools, 1)
c.Require().Equal("pool-2", pools[0].ID)

// entity2 pools should still be accessible.
entityPools := GetEntityPools(entity2.ID)
c.Require().Len(entityPools, 1)
c.Require().Equal("pool-2", entityPools[0].ID)
}

func (c *CacheTestSuite) TestReplaceEntityPoolsCleansUpRemovedPools() {
entity := params.ForgeEntity{
ID: "test-entity",
EntityType: params.ForgeEntityTypeOrganization,
Name: "test",
Owner: "test",
}
pool1 := params.Pool{
ID: "pool-1",
OrgID: "test-entity",
CreatedAt: time.Now(),
}
pool2 := params.Pool{
ID: "pool-2",
OrgID: "test-entity",
CreatedAt: time.Now().Add(1 * time.Second),
}
pool3 := params.Pool{
ID: "pool-3",
OrgID: "test-entity",
CreatedAt: time.Now().Add(2 * time.Second),
}

SetEntity(entity)
ReplaceEntityPools(entity.ID, []params.Pool{pool1, pool2})
c.Require().Len(GetAllPools(), 2)

// Replace with a different set that drops pool1 and adds pool3.
ReplaceEntityPools(entity.ID, []params.Pool{pool2, pool3})

pools := GetAllPools()
c.Require().Len(pools, 2)

poolIDs := map[string]bool{}
for _, p := range pools {
poolIDs[p.ID] = true
}
c.Require().True(poolIDs["pool-2"])
c.Require().True(poolIDs["pool-3"])
c.Require().False(poolIDs["pool-1"])

// pool-1 should no longer be accessible by ID.
_, ok := GetPoolByID("pool-1")
c.Require().False(ok)
}

func (c *CacheTestSuite) TestReplaceEntityScaleSetsCleansUpRemovedScaleSets() {
entity := params.ForgeEntity{
ID: "test-entity",
EntityType: params.ForgeEntityTypeOrganization,
Name: "test",
Owner: "test",
}
ss1 := params.ScaleSet{
ID: 1,
OrgID: "test-entity",
}
ss2 := params.ScaleSet{
ID: 2,
OrgID: "test-entity",
}
ss3 := params.ScaleSet{
ID: 3,
OrgID: "test-entity",
}

SetEntity(entity)
ReplaceEntityScaleSets(entity.ID, []params.ScaleSet{ss1, ss2})
c.Require().Len(GetAllScaleSets(), 2)

// Replace with a different set that drops ss1 and adds ss3.
ReplaceEntityScaleSets(entity.ID, []params.ScaleSet{ss2, ss3})

allSS := GetAllScaleSets()
c.Require().Len(allSS, 2)

ssIDs := map[uint]bool{}
for _, s := range allSS {
ssIDs[s.ID] = true
}
c.Require().True(ssIDs[2])
c.Require().True(ssIDs[3])
c.Require().False(ssIDs[1])

// ss1 should no longer be accessible by ID.
_, ok := GetScaleSetByID(1)
c.Require().False(ok)
}

func TestCacheTestSuite(t *testing.T) {
t.Parallel()
suite.Run(t, new(CacheTestSuite))
Expand Down
Loading
Loading