Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apps/workspace-engine/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"workspace-engine/svc/controllers/deploymentresourceselectoreval"
"workspace-engine/svc/controllers/desiredrelease"
"workspace-engine/svc/controllers/environmentresourceselectoreval"
"workspace-engine/svc/controllers/forcedeploy"
"workspace-engine/svc/controllers/jobdispatch"
"workspace-engine/svc/controllers/jobeligibility"
"workspace-engine/svc/controllers/jobverificationmetric"
Expand Down Expand Up @@ -48,6 +49,7 @@ func main() {
deploymentplanresult.New(WorkerID, db.GetPool(ctx)),
deploymentresourceselectoreval.New(WorkerID, db.GetPool(ctx)),
environmentresourceselectoreval.New(WorkerID, db.GetPool(ctx)),
forcedeploy.New(WorkerID, db.GetPool(ctx)),
jobdispatch.New(WorkerID, db.GetPool(ctx)),
jobeligibility.New(WorkerID, db.GetPool(ctx)),
jobverificationmetric.New(WorkerID, db.GetPool(ctx)),
Expand Down
3 changes: 3 additions & 0 deletions apps/workspace-engine/pkg/reconcile/events/forcedeploy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package events

const ForceDeployKind = "force-deploy"
103 changes: 103 additions & 0 deletions apps/workspace-engine/svc/controllers/forcedeploy/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package forcedeploy

import (
"context"
"fmt"
"time"

"github.com/charmbracelet/log"
"github.com/jackc/pgx/v5/pgxpool"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"workspace-engine/pkg/config"
"workspace-engine/pkg/reconcile"
"workspace-engine/pkg/reconcile/events"
"workspace-engine/pkg/reconcile/postgres"
"workspace-engine/svc"
)

var tracer = otel.Tracer("workspace-engine/svc/controllers/forcedeploy")
var _ reconcile.Processor = (*Controller)(nil)

type Controller struct {
getter Getter
setter Setter
}

func (c *Controller) Process(ctx context.Context, item reconcile.Item) (reconcile.Result, error) {
ctx, span := tracer.Start(ctx, "forcedeploy.Controller.Process")
defer span.End()

span.SetAttributes(
attribute.Int64("item.id", item.ID),
attribute.String("item.scope_id", item.ScopeID),
)

rt, err := NewReleaseTarget(item.ScopeID)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return reconcile.Result{}, fmt.Errorf("parse release target: %w", err)
}

exists, err := c.getter.ReleaseTargetExists(ctx, rt)
if err != nil {
return reconcile.Result{}, fmt.Errorf("check release target exists: %w", err)
}
if !exists {
return reconcile.Result{}, nil
}

result, err := Reconcile(ctx, item.WorkspaceID, c.getter, c.setter, rt)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return reconcile.Result{}, fmt.Errorf("reconcile force deploy: %w", err)
}

if result.RequeueAfter > 0 {
return reconcile.Result{RequeueAfter: result.RequeueAfter}, nil
}

return reconcile.Result{}, nil
}

// NewController creates a Controller with the given dependencies.
// Use this constructor in tests to inject mock implementations.
func NewController(getter Getter, setter Setter) *Controller {
return &Controller{getter: getter, setter: setter}
}

func New(workerID string, pgxPool *pgxpool.Pool) svc.Service {
if pgxPool == nil {
log.Fatal("Failed to get pgx pool")
panic("failed to get pgx pool")
}

kind := events.ForceDeployKind
maxConcurrency := config.GetMaxConcurrency(kind)

nodeConfig := reconcile.NodeConfig{
WorkerID: workerID,
BatchSize: 10,
PollInterval: 1 * time.Second,
LeaseDuration: 10 * time.Second,
LeaseHeartbeat: 5 * time.Second,
MaxConcurrency: maxConcurrency,
MaxRetryBackoff: 10 * time.Second,
}

queue := postgres.NewForKinds(pgxPool, kind)
controller := &Controller{
getter: &PostgresGetter{},
setter: &PostgresSetter{},
}

worker, err := reconcile.NewWorker(kind, queue, controller, nodeConfig)
if err != nil {
log.Fatal("Failed to create force deploy reconcile worker", "error", err)
}

return worker
}
18 changes: 18 additions & 0 deletions apps/workspace-engine/svc/controllers/forcedeploy/getters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package forcedeploy

import (
"context"

"github.com/google/uuid"
"workspace-engine/pkg/oapi"
)

type Getter interface {
ReleaseTargetExists(ctx context.Context, rt *ReleaseTarget) (bool, error)
GetDesiredRelease(ctx context.Context, rt *ReleaseTarget) (*oapi.Release, error)
GetActiveJobsForReleaseTarget(ctx context.Context, rt *oapi.ReleaseTarget) ([]*oapi.Job, error)
GetDeployment(ctx context.Context, deploymentID uuid.UUID) (*oapi.Deployment, error)
GetEnvironment(ctx context.Context, environmentID uuid.UUID) (*oapi.Environment, error)
GetResource(ctx context.Context, resourceID uuid.UUID) (*oapi.Resource, error)
ListJobAgentsByWorkspaceID(ctx context.Context, workspaceID uuid.UUID) ([]oapi.JobAgent, error)
}
125 changes: 125 additions & 0 deletions apps/workspace-engine/svc/controllers/forcedeploy/getters_postgres.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package forcedeploy

import (
"context"
"errors"

"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"workspace-engine/pkg/db"
"workspace-engine/pkg/oapi"
)

var _ Getter = (*PostgresGetter)(nil)

type PostgresGetter struct{}

func (g *PostgresGetter) ReleaseTargetExists(ctx context.Context, rt *ReleaseTarget) (bool, error) {
return db.GetQueries(ctx).ReleaseTargetExists(ctx, db.ReleaseTargetExistsParams{
DeploymentID: rt.DeploymentID,
EnvironmentID: rt.EnvironmentID,
ResourceID: rt.ResourceID,
})
}

func (g *PostgresGetter) GetDesiredRelease(
ctx context.Context,
rt *ReleaseTarget,
) (*oapi.Release, error) {
row, err := db.GetQueries(ctx).
GetDesiredReleaseByReleaseTarget(ctx, db.GetDesiredReleaseByReleaseTargetParams{
ResourceID: rt.ResourceID,
EnvironmentID: rt.EnvironmentID,
DeploymentID: rt.DeploymentID,
})
if errors.Is(err, pgx.ErrNoRows) {
return nil, nil
}
if err != nil {
return nil, err
}
return db.ToOapiFullRelease(row), nil
}

func (g *PostgresGetter) GetActiveJobsForReleaseTarget(
ctx context.Context,
rt *oapi.ReleaseTarget,
) ([]*oapi.Job, error) {
deploymentID, err := uuid.Parse(rt.DeploymentId)
if err != nil {
return nil, err
}
environmentID, err := uuid.Parse(rt.EnvironmentId)
if err != nil {
return nil, err
}
resourceID, err := uuid.Parse(rt.ResourceId)
if err != nil {
return nil, err
}

rows, err := db.GetQueries(ctx).
ListJobsByReleaseTargetWithStatuses(ctx, db.ListJobsByReleaseTargetWithStatusesParams{
DeploymentID: deploymentID,
EnvironmentID: environmentID,
ResourceID: resourceID,
Statuses: []string{"in_progress", "action_required", "pending"},
})
if err != nil {
return nil, err
}

jobs := make([]*oapi.Job, len(rows))
for i, row := range rows {
jobs[i] = db.ToOapiJob(db.ListJobsByReleaseIDRow(row))
}
return jobs, nil
}

func (g *PostgresGetter) GetDeployment(
ctx context.Context,
deploymentID uuid.UUID,
) (*oapi.Deployment, error) {
row, err := db.GetQueries(ctx).GetDeploymentByID(ctx, deploymentID)
if err != nil {
return nil, err
}
return db.ToOapiDeployment(row), nil
}

func (g *PostgresGetter) GetEnvironment(
ctx context.Context,
environmentID uuid.UUID,
) (*oapi.Environment, error) {
row, err := db.GetQueries(ctx).GetEnvironmentByID(ctx, environmentID)
if err != nil {
return nil, err
}
return db.ToOapiEnvironment(row), nil
}

func (g *PostgresGetter) GetResource(
ctx context.Context,
resourceID uuid.UUID,
) (*oapi.Resource, error) {
row, err := db.GetQueries(ctx).GetResourceByID(ctx, resourceID)
if err != nil {
return nil, err
}
return db.ToOapiResource(row), nil
}

func (g *PostgresGetter) ListJobAgentsByWorkspaceID(
ctx context.Context,
workspaceID uuid.UUID,
) ([]oapi.JobAgent, error) {
rows, err := db.GetQueries(ctx).ListJobAgentsByWorkspaceID(ctx, workspaceID)
if err != nil {
return nil, err
}
agents := make([]oapi.JobAgent, len(rows))
for i, row := range rows {
agents[i] = *db.ToOapiJobAgent(row)
}
return agents, nil
}
Loading
Loading