From 467add30e37f9d32a4be9a572af01f1936c25efe Mon Sep 17 00:00:00 2001 From: Aditya Choudhari Date: Tue, 14 Apr 2026 13:56:41 -0700 Subject: [PATCH 1/3] feat: redeploy controller --- apps/workspace-engine/main.go | 2 + .../pkg/reconcile/events/forcedeploy.go | 3 + .../svc/controllers/forcedeploy/controller.go | 104 ++++++ .../svc/controllers/forcedeploy/getters.go | 18 + .../forcedeploy/getters_postgres.go | 125 +++++++ .../svc/controllers/forcedeploy/reconcile.go | 149 +++++++++ .../controllers/forcedeploy/reconcile_test.go | 315 ++++++++++++++++++ .../controllers/forcedeploy/releasetarget.go | 50 +++ .../svc/controllers/forcedeploy/setters.go | 12 + .../forcedeploy/setters_postgres.go | 109 ++++++ packages/db/src/reconcilers/force-deploy.ts | 22 ++ packages/db/src/reconcilers/index.ts | 1 + packages/trpc/src/routes/redeploy.ts | 86 +---- 13 files changed, 918 insertions(+), 78 deletions(-) create mode 100644 apps/workspace-engine/pkg/reconcile/events/forcedeploy.go create mode 100644 apps/workspace-engine/svc/controllers/forcedeploy/controller.go create mode 100644 apps/workspace-engine/svc/controllers/forcedeploy/getters.go create mode 100644 apps/workspace-engine/svc/controllers/forcedeploy/getters_postgres.go create mode 100644 apps/workspace-engine/svc/controllers/forcedeploy/reconcile.go create mode 100644 apps/workspace-engine/svc/controllers/forcedeploy/reconcile_test.go create mode 100644 apps/workspace-engine/svc/controllers/forcedeploy/releasetarget.go create mode 100644 apps/workspace-engine/svc/controllers/forcedeploy/setters.go create mode 100644 apps/workspace-engine/svc/controllers/forcedeploy/setters_postgres.go create mode 100644 packages/db/src/reconcilers/force-deploy.ts diff --git a/apps/workspace-engine/main.go b/apps/workspace-engine/main.go index b0b32bffb..eb55a43ae 100644 --- a/apps/workspace-engine/main.go +++ b/apps/workspace-engine/main.go @@ -16,6 +16,7 @@ import ( "workspace-engine/svc/controllers/deploymentresourceselectoreval" "workspace-engine/svc/controllers/desiredrelease" "workspace-engine/svc/controllers/environmentresourceselectoreval" + "workspace-engine/svc/controllers/forcedeploy" "workspace-engine/svc/controllers/jobdispatch" "workspace-engine/svc/controllers/jobeligibility" "workspace-engine/svc/controllers/jobverificationmetric" @@ -48,6 +49,7 @@ func main() { deploymentplanresult.New(WorkerID, db.GetPool(ctx)), deploymentresourceselectoreval.New(WorkerID, db.GetPool(ctx)), environmentresourceselectoreval.New(WorkerID, db.GetPool(ctx)), + forcedeploy.New(WorkerID, db.GetPool(ctx)), jobdispatch.New(WorkerID, db.GetPool(ctx)), jobeligibility.New(WorkerID, db.GetPool(ctx)), jobverificationmetric.New(WorkerID, db.GetPool(ctx)), diff --git a/apps/workspace-engine/pkg/reconcile/events/forcedeploy.go b/apps/workspace-engine/pkg/reconcile/events/forcedeploy.go new file mode 100644 index 000000000..a9f6e20a9 --- /dev/null +++ b/apps/workspace-engine/pkg/reconcile/events/forcedeploy.go @@ -0,0 +1,3 @@ +package events + +const ForceDeployKind = "force-deploy" diff --git a/apps/workspace-engine/svc/controllers/forcedeploy/controller.go b/apps/workspace-engine/svc/controllers/forcedeploy/controller.go new file mode 100644 index 000000000..85fe08ab8 --- /dev/null +++ b/apps/workspace-engine/svc/controllers/forcedeploy/controller.go @@ -0,0 +1,104 @@ +package forcedeploy + +import ( + "context" + "fmt" + "time" + + "github.com/charmbracelet/log" + "github.com/jackc/pgx/v5/pgxpool" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "workspace-engine/pkg/config" + "workspace-engine/pkg/reconcile" + "workspace-engine/pkg/reconcile/events" + "workspace-engine/pkg/reconcile/postgres" + "workspace-engine/svc" +) + +var tracer = otel.Tracer("workspace-engine/svc/controllers/forcedeploy") +var _ reconcile.Processor = (*Controller)(nil) + +type Controller struct { + getter Getter + setter Setter +} + +func (c *Controller) Process(ctx context.Context, item reconcile.Item) (reconcile.Result, error) { + ctx, span := tracer.Start(ctx, "forcedeploy.Controller.Process") + defer span.End() + + span.SetAttributes( + attribute.Int64("item.id", item.ID), + attribute.String("item.scope_id", item.ScopeID), + ) + + rt, err := NewReleaseTarget(item.ScopeID) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return reconcile.Result{}, fmt.Errorf("parse release target: %w", err) + } + + exists, err := c.getter.ReleaseTargetExists(ctx, rt) + if err != nil { + return reconcile.Result{}, fmt.Errorf("check release target exists: %w", err) + } + if !exists { + return reconcile.Result{}, nil + } + + result, err := Reconcile(ctx, item.WorkspaceID, c.getter, c.setter, rt) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return reconcile.Result{}, fmt.Errorf("reconcile force deploy: %w", err) + } + + if result.RequeueAfter > 0 { + return reconcile.Result{RequeueAfter: result.RequeueAfter}, nil + } + + return reconcile.Result{}, nil +} + +// NewController creates a Controller with the given dependencies. +// Use this constructor in tests to inject mock implementations. +func NewController(getter Getter, setter Setter) *Controller { + return &Controller{getter: getter, setter: setter} +} + +func New(workerID string, pgxPool *pgxpool.Pool) svc.Service { + if pgxPool == nil { + log.Fatal("Failed to get pgx pool") + panic("failed to get pgx pool") + } + + kind := events.ForceDeployKind + maxConcurrency := config.GetMaxConcurrency(kind) + + nodeConfig := reconcile.NodeConfig{ + WorkerID: workerID, + BatchSize: 10, + PollInterval: 1 * time.Second, + LeaseDuration: 10 * time.Second, + LeaseHeartbeat: 5 * time.Second, + MaxConcurrency: maxConcurrency, + MaxRetryBackoff: 10 * time.Second, + } + + queue := postgres.NewForKinds(pgxPool, kind) + enqueueQueue := postgres.New(pgxPool) + controller := &Controller{ + getter: &PostgresGetter{}, + setter: &PostgresSetter{Queue: enqueueQueue}, + } + + worker, err := reconcile.NewWorker(kind, queue, controller, nodeConfig) + if err != nil { + log.Fatal("Failed to create force deploy reconcile worker", "error", err) + } + + return worker +} diff --git a/apps/workspace-engine/svc/controllers/forcedeploy/getters.go b/apps/workspace-engine/svc/controllers/forcedeploy/getters.go new file mode 100644 index 000000000..666b72771 --- /dev/null +++ b/apps/workspace-engine/svc/controllers/forcedeploy/getters.go @@ -0,0 +1,18 @@ +package forcedeploy + +import ( + "context" + + "github.com/google/uuid" + "workspace-engine/pkg/oapi" +) + +type Getter interface { + ReleaseTargetExists(ctx context.Context, rt *ReleaseTarget) (bool, error) + GetDesiredRelease(ctx context.Context, rt *ReleaseTarget) (*oapi.Release, error) + GetActiveJobsForReleaseTarget(ctx context.Context, rt *oapi.ReleaseTarget) ([]*oapi.Job, error) + GetDeployment(ctx context.Context, deploymentID uuid.UUID) (*oapi.Deployment, error) + GetEnvironment(ctx context.Context, environmentID uuid.UUID) (*oapi.Environment, error) + GetResource(ctx context.Context, resourceID uuid.UUID) (*oapi.Resource, error) + ListJobAgentsByWorkspaceID(ctx context.Context, workspaceID uuid.UUID) ([]oapi.JobAgent, error) +} diff --git a/apps/workspace-engine/svc/controllers/forcedeploy/getters_postgres.go b/apps/workspace-engine/svc/controllers/forcedeploy/getters_postgres.go new file mode 100644 index 000000000..8ac9d1a76 --- /dev/null +++ b/apps/workspace-engine/svc/controllers/forcedeploy/getters_postgres.go @@ -0,0 +1,125 @@ +package forcedeploy + +import ( + "context" + "errors" + + "github.com/google/uuid" + "github.com/jackc/pgx/v5" + "workspace-engine/pkg/db" + "workspace-engine/pkg/oapi" +) + +var _ Getter = (*PostgresGetter)(nil) + +type PostgresGetter struct{} + +func (g *PostgresGetter) ReleaseTargetExists(ctx context.Context, rt *ReleaseTarget) (bool, error) { + return db.GetQueries(ctx).ReleaseTargetExists(ctx, db.ReleaseTargetExistsParams{ + DeploymentID: rt.DeploymentID, + EnvironmentID: rt.EnvironmentID, + ResourceID: rt.ResourceID, + }) +} + +func (g *PostgresGetter) GetDesiredRelease( + ctx context.Context, + rt *ReleaseTarget, +) (*oapi.Release, error) { + row, err := db.GetQueries(ctx). + GetDesiredReleaseByReleaseTarget(ctx, db.GetDesiredReleaseByReleaseTargetParams{ + ResourceID: rt.ResourceID, + EnvironmentID: rt.EnvironmentID, + DeploymentID: rt.DeploymentID, + }) + if errors.Is(err, pgx.ErrNoRows) { + return nil, nil + } + if err != nil { + return nil, err + } + return db.ToOapiFullRelease(row), nil +} + +func (g *PostgresGetter) GetActiveJobsForReleaseTarget( + ctx context.Context, + rt *oapi.ReleaseTarget, +) ([]*oapi.Job, error) { + deploymentID, err := uuid.Parse(rt.DeploymentId) + if err != nil { + return nil, err + } + environmentID, err := uuid.Parse(rt.EnvironmentId) + if err != nil { + return nil, err + } + resourceID, err := uuid.Parse(rt.ResourceId) + if err != nil { + return nil, err + } + + rows, err := db.GetQueries(ctx). + ListJobsByReleaseTargetWithStatuses(ctx, db.ListJobsByReleaseTargetWithStatusesParams{ + DeploymentID: deploymentID, + EnvironmentID: environmentID, + ResourceID: resourceID, + Statuses: []string{"in_progress", "action_required", "pending"}, + }) + if err != nil { + return nil, err + } + + jobs := make([]*oapi.Job, len(rows)) + for i, row := range rows { + jobs[i] = db.ToOapiJob(db.ListJobsByReleaseIDRow(row)) + } + return jobs, nil +} + +func (g *PostgresGetter) GetDeployment( + ctx context.Context, + deploymentID uuid.UUID, +) (*oapi.Deployment, error) { + row, err := db.GetQueries(ctx).GetDeploymentByID(ctx, deploymentID) + if err != nil { + return nil, err + } + return db.ToOapiDeployment(row), nil +} + +func (g *PostgresGetter) GetEnvironment( + ctx context.Context, + environmentID uuid.UUID, +) (*oapi.Environment, error) { + row, err := db.GetQueries(ctx).GetEnvironmentByID(ctx, environmentID) + if err != nil { + return nil, err + } + return db.ToOapiEnvironment(row), nil +} + +func (g *PostgresGetter) GetResource( + ctx context.Context, + resourceID uuid.UUID, +) (*oapi.Resource, error) { + row, err := db.GetQueries(ctx).GetResourceByID(ctx, resourceID) + if err != nil { + return nil, err + } + return db.ToOapiResource(row), nil +} + +func (g *PostgresGetter) ListJobAgentsByWorkspaceID( + ctx context.Context, + workspaceID uuid.UUID, +) ([]oapi.JobAgent, error) { + rows, err := db.GetQueries(ctx).ListJobAgentsByWorkspaceID(ctx, workspaceID) + if err != nil { + return nil, err + } + agents := make([]oapi.JobAgent, len(rows)) + for i, row := range rows { + agents[i] = *db.ToOapiJobAgent(row) + } + return agents, nil +} diff --git a/apps/workspace-engine/svc/controllers/forcedeploy/reconcile.go b/apps/workspace-engine/svc/controllers/forcedeploy/reconcile.go new file mode 100644 index 000000000..505ee6b2a --- /dev/null +++ b/apps/workspace-engine/svc/controllers/forcedeploy/reconcile.go @@ -0,0 +1,149 @@ +package forcedeploy + +import ( + "context" + "fmt" + "time" + + "github.com/charmbracelet/log" + "github.com/google/uuid" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + "workspace-engine/pkg/oapi" + "workspace-engine/pkg/selector" + "workspace-engine/pkg/workspace/jobs" +) + +const requeueDelay = 5 * time.Second + +type ReconcileResult struct { + RequeueAfter time.Duration +} + +func Reconcile( + ctx context.Context, + workspaceID string, + getter Getter, + setter Setter, + rt *ReleaseTarget, +) (*ReconcileResult, error) { + ctx, span := tracer.Start(ctx, "forcedeploy.Reconcile") + defer span.End() + + workspaceUUID, err := uuid.Parse(workspaceID) + if err != nil { + return nil, fmt.Errorf("parse workspace id: %w", err) + } + + release, err := getter.GetDesiredRelease(ctx, rt) + if err != nil { + return nil, recordErr(span, "get desired release", err) + } + if release == nil { + log.Info("no desired release for release target, skipping", "rt", rt.ToOAPI().Key()) + return &ReconcileResult{}, nil + } + + activeJobs, err := getter.GetActiveJobsForReleaseTarget(ctx, rt.ToOAPI()) + if err != nil { + return nil, recordErr(span, "get active jobs", err) + } + if len(activeJobs) > 0 { + span.SetAttributes(attribute.Int("active_jobs", len(activeJobs))) + log.Info("release target has active jobs, requeueing", + "rt", rt.ToOAPI().Key(), + "activeJobs", len(activeJobs), + ) + return &ReconcileResult{RequeueAfter: requeueDelay}, nil + } + + if err := buildAndDispatchJob(ctx, span, workspaceUUID, getter, setter, rt, release); err != nil { + return nil, recordErr(span, "build and dispatch job", err) + } + + return &ReconcileResult{}, nil +} + +func buildAndDispatchJob( + ctx context.Context, + span trace.Span, + workspaceID uuid.UUID, + getter Getter, + setter Setter, + rt *ReleaseTarget, + release *oapi.Release, +) error { + deploymentID, err := uuid.Parse(release.ReleaseTarget.DeploymentId) + if err != nil { + return fmt.Errorf("parse deployment id: %w", err) + } + + deployment, err := getter.GetDeployment(ctx, deploymentID) + if err != nil { + return fmt.Errorf("get deployment: %w", err) + } + + if deployment.JobAgentSelector == "" { + span.AddEvent("no job agent selector configured") + return fmt.Errorf("no job agent selector configured for deployment '%s'", deployment.Name) + } + + resource, err := getter.GetResource(ctx, rt.ResourceID) + if err != nil { + return fmt.Errorf("get resource: %w", err) + } + + allAgents, err := getter.ListJobAgentsByWorkspaceID(ctx, workspaceID) + if err != nil { + return fmt.Errorf("list job agents: %w", err) + } + + matchResult := selector.MatchJobAgentsWithResourceDetailed( + deployment.JobAgentSelector, + allAgents, + resource, + ) + if matchResult.Err != nil { + return fmt.Errorf("match job agents: %w", matchResult.Err) + } + + matchedAgents := matchResult.Result.Matched + span.SetAttributes(attribute.Int("matched_agents", len(matchedAgents))) + + if len(matchedAgents) == 0 { + return fmt.Errorf( + "no job agents matched selector for deployment '%s' (selector=%q)", + deployment.Name, deployment.JobAgentSelector, + ) + } + + factory := jobs.NewFactoryFromGetters(getter) + for i := range matchedAgents { + agent := &matchedAgents[i] + agent.Config = oapi.DeepMergeConfigs( + agent.Config, deployment.JobAgentConfig, release.Version.JobAgentConfig, + ) + + job, err := factory.CreateJobForRelease(ctx, release, agent) + if err != nil { + return fmt.Errorf("create job for agent %s: %w", agent.Name, err) + } + + if err := setter.CreateJob(ctx, job, release); err != nil { + return fmt.Errorf("persist job: %w", err) + } + + if err := setter.EnqueueJobDispatch(ctx, workspaceID.String(), job.Id); err != nil { + return fmt.Errorf("enqueue job dispatch: %w", err) + } + } + + return nil +} + +func recordErr(span trace.Span, msg string, err error) error { + span.RecordError(err) + span.SetStatus(codes.Error, msg+" failed") + return fmt.Errorf("%s: %w", msg, err) +} diff --git a/apps/workspace-engine/svc/controllers/forcedeploy/reconcile_test.go b/apps/workspace-engine/svc/controllers/forcedeploy/reconcile_test.go new file mode 100644 index 000000000..3169542ef --- /dev/null +++ b/apps/workspace-engine/svc/controllers/forcedeploy/reconcile_test.go @@ -0,0 +1,315 @@ +package forcedeploy + +import ( + "context" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "workspace-engine/pkg/oapi" + "workspace-engine/pkg/reconcile" +) + +// --------------------------------------------------------------------------- +// Mock Getter +// --------------------------------------------------------------------------- + +type mockGetter struct { + rtExists bool + rtExistsErr error + + release *oapi.Release + releaseErr error + + activeJobs []*oapi.Job + activeJobsErr error + + deployment *oapi.Deployment + deploymentErr error + + environment *oapi.Environment + environmentErr error + + resource *oapi.Resource + resourceErr error + + workspaceAgents []oapi.JobAgent +} + +func (m *mockGetter) ReleaseTargetExists(_ context.Context, _ *ReleaseTarget) (bool, error) { + return m.rtExists, m.rtExistsErr +} + +func (m *mockGetter) GetDesiredRelease(_ context.Context, _ *ReleaseTarget) (*oapi.Release, error) { + return m.release, m.releaseErr +} + +func (m *mockGetter) GetActiveJobsForReleaseTarget(_ context.Context, _ *oapi.ReleaseTarget) ([]*oapi.Job, error) { + return m.activeJobs, m.activeJobsErr +} + +func (m *mockGetter) GetDeployment(_ context.Context, _ uuid.UUID) (*oapi.Deployment, error) { + return m.deployment, m.deploymentErr +} + +func (m *mockGetter) GetEnvironment(_ context.Context, _ uuid.UUID) (*oapi.Environment, error) { + return m.environment, m.environmentErr +} + +func (m *mockGetter) GetResource(_ context.Context, _ uuid.UUID) (*oapi.Resource, error) { + return m.resource, m.resourceErr +} + +func (m *mockGetter) ListJobAgentsByWorkspaceID(_ context.Context, _ uuid.UUID) ([]oapi.JobAgent, error) { + return m.workspaceAgents, nil +} + +var _ Getter = (*mockGetter)(nil) + +// --------------------------------------------------------------------------- +// Mock Setter +// --------------------------------------------------------------------------- + +type enqueueCall struct { + WorkspaceID string + JobID string +} + +type mockSetter struct { + createdJobs []*oapi.Job + createJobErr error + + enqueueCalls []enqueueCall + enqueueErr error +} + +func (m *mockSetter) CreateJob(_ context.Context, job *oapi.Job, _ *oapi.Release) error { + if m.createJobErr != nil { + return m.createJobErr + } + m.createdJobs = append(m.createdJobs, job) + return nil +} + +func (m *mockSetter) EnqueueJobDispatch(_ context.Context, workspaceID string, jobID string) error { + if m.enqueueErr != nil { + return m.enqueueErr + } + m.enqueueCalls = append(m.enqueueCalls, enqueueCall{WorkspaceID: workspaceID, JobID: jobID}) + return nil +} + +var _ Setter = (*mockSetter)(nil) + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +func testRT() *ReleaseTarget { + return &ReleaseTarget{ + WorkspaceID: uuid.New(), + DeploymentID: uuid.New(), + EnvironmentID: uuid.New(), + ResourceID: uuid.New(), + } +} + +func testRelease(rt *ReleaseTarget) *oapi.Release { + return &oapi.Release{ + Id: uuid.New(), + CreatedAt: time.Now().Format(time.RFC3339), + ReleaseTarget: oapi.ReleaseTarget{ + DeploymentId: rt.DeploymentID.String(), + EnvironmentId: rt.EnvironmentID.String(), + ResourceId: rt.ResourceID.String(), + }, + Variables: map[string]oapi.LiteralValue{}, + EncryptedVariables: []string{}, + Version: oapi.DeploymentVersion{ + Id: uuid.New().String(), + DeploymentId: rt.DeploymentID.String(), + Tag: "v1.0.0", + }, + } +} + +func testDeployment(rt *ReleaseTarget) *oapi.Deployment { + return &oapi.Deployment{ + Id: rt.DeploymentID.String(), + Name: "test-deployment", + Slug: "test-deployment", + Metadata: map[string]string{}, + JobAgentConfig: oapi.JobAgentConfig{}, + JobAgentSelector: "true", + } +} + +func testAgent() oapi.JobAgent { + return oapi.JobAgent{ + Id: uuid.New().String(), + Name: "test-agent", + Type: "test", + Config: oapi.JobAgentConfig{}, + } +} + +func defaultMocks(rt *ReleaseTarget, release *oapi.Release) (*mockGetter, *mockSetter) { + agent := testAgent() + getter := &mockGetter{ + rtExists: true, + release: release, + activeJobs: []*oapi.Job{}, + deployment: testDeployment(rt), + workspaceAgents: []oapi.JobAgent{agent}, + environment: &oapi.Environment{ + Id: rt.EnvironmentID.String(), + Name: "test-env", + Metadata: map[string]string{}, + }, + resource: &oapi.Resource{ + Id: rt.ResourceID.String(), + Name: "test-resource", + Kind: "TestKind", + Identifier: "test-resource-id", + Metadata: map[string]string{}, + Config: map[string]any{}, + }, + } + setter := &mockSetter{} + return getter, setter +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +func TestReconcile_HappyPath_CreatesJobAndEnqueuesDispatch(t *testing.T) { + rt := testRT() + release := testRelease(rt) + getter, setter := defaultMocks(rt, release) + + result, err := Reconcile(context.Background(), rt.WorkspaceID.String(), getter, setter, rt) + + require.NoError(t, err) + assert.Zero(t, result.RequeueAfter) + require.Len(t, setter.createdJobs, 1) + assert.Equal(t, release.Id.String(), setter.createdJobs[0].ReleaseId) + assert.Equal(t, oapi.JobStatusPending, setter.createdJobs[0].Status) + require.Len(t, setter.enqueueCalls, 1) + assert.Equal(t, rt.WorkspaceID.String(), setter.enqueueCalls[0].WorkspaceID) + assert.Equal(t, setter.createdJobs[0].Id, setter.enqueueCalls[0].JobID) +} + +func TestReconcile_HappyPath_WithCompletedJob(t *testing.T) { + rt := testRT() + release := testRelease(rt) + getter, setter := defaultMocks(rt, release) + + // A completed job already exists — should NOT block the redeploy. + // activeJobs only returns processing-state jobs, so completed jobs + // are not included. + getter.activeJobs = []*oapi.Job{} + + result, err := Reconcile(context.Background(), rt.WorkspaceID.String(), getter, setter, rt) + + require.NoError(t, err) + assert.Zero(t, result.RequeueAfter) + require.Len(t, setter.createdJobs, 1) +} + +func TestReconcile_NoDesiredRelease_Noop(t *testing.T) { + rt := testRT() + getter := &mockGetter{ + rtExists: true, + release: nil, + } + setter := &mockSetter{} + + result, err := Reconcile(context.Background(), rt.WorkspaceID.String(), getter, setter, rt) + + require.NoError(t, err) + assert.Zero(t, result.RequeueAfter) + assert.Empty(t, setter.createdJobs) + assert.Empty(t, setter.enqueueCalls) +} + +func TestReconcile_ActiveJobExists_Requeues(t *testing.T) { + rt := testRT() + release := testRelease(rt) + getter, setter := defaultMocks(rt, release) + + getter.activeJobs = []*oapi.Job{ + { + Id: uuid.New().String(), + ReleaseId: release.Id.String(), + Status: oapi.JobStatusInProgress, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + Metadata: map[string]string{}, + }, + } + + result, err := Reconcile(context.Background(), rt.WorkspaceID.String(), getter, setter, rt) + + require.NoError(t, err) + assert.Equal(t, requeueDelay, result.RequeueAfter) + assert.Empty(t, setter.createdJobs) + assert.Empty(t, setter.enqueueCalls) +} + +func TestReconcile_ActivePendingJob_Requeues(t *testing.T) { + rt := testRT() + release := testRelease(rt) + getter, setter := defaultMocks(rt, release) + + getter.activeJobs = []*oapi.Job{ + { + Id: uuid.New().String(), + ReleaseId: release.Id.String(), + Status: oapi.JobStatusPending, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + Metadata: map[string]string{}, + }, + } + + result, err := Reconcile(context.Background(), rt.WorkspaceID.String(), getter, setter, rt) + + require.NoError(t, err) + assert.Equal(t, requeueDelay, result.RequeueAfter) + assert.Empty(t, setter.createdJobs) +} + +func TestProcess_ActiveJob_ReturnsRequeueResult(t *testing.T) { + rt := testRT() + release := testRelease(rt) + getter, setter := defaultMocks(rt, release) + + getter.activeJobs = []*oapi.Job{ + { + Id: uuid.New().String(), + ReleaseId: release.Id.String(), + Status: oapi.JobStatusInProgress, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + Metadata: map[string]string{}, + }, + } + + ctrl := NewController(getter, setter) + item := reconcile.Item{ + ID: 1, + WorkspaceID: rt.WorkspaceID.String(), + Kind: "force-deploy", + ScopeType: "release-target", + ScopeID: rt.DeploymentID.String() + ":" + rt.EnvironmentID.String() + ":" + rt.ResourceID.String(), + } + + result, err := ctrl.Process(context.Background(), item) + + require.NoError(t, err) + assert.Equal(t, requeueDelay, result.RequeueAfter) + assert.Empty(t, setter.createdJobs) +} diff --git a/apps/workspace-engine/svc/controllers/forcedeploy/releasetarget.go b/apps/workspace-engine/svc/controllers/forcedeploy/releasetarget.go new file mode 100644 index 000000000..40683b95e --- /dev/null +++ b/apps/workspace-engine/svc/controllers/forcedeploy/releasetarget.go @@ -0,0 +1,50 @@ +package forcedeploy + +import ( + "fmt" + "strings" + + "github.com/google/uuid" + "workspace-engine/pkg/oapi" +) + +type ReleaseTarget struct { + WorkspaceID uuid.UUID + DeploymentID uuid.UUID + EnvironmentID uuid.UUID + ResourceID uuid.UUID +} + +func NewReleaseTarget(key string) (*ReleaseTarget, error) { + split := strings.SplitN(key, ":", 3) + if len(split) != 3 { + return nil, fmt.Errorf("invalid release target key: %s", key) + } + + deploymentID, err := uuid.Parse(split[0]) + if err != nil { + return nil, fmt.Errorf("invalid deployment id: %s", split[0]) + } + environmentID, err := uuid.Parse(split[1]) + if err != nil { + return nil, fmt.Errorf("invalid environment id: %s", split[1]) + } + resourceID, err := uuid.Parse(split[2]) + if err != nil { + return nil, fmt.Errorf("invalid resource id: %s", split[2]) + } + + return &ReleaseTarget{ + DeploymentID: deploymentID, + EnvironmentID: environmentID, + ResourceID: resourceID, + }, nil +} + +func (rt *ReleaseTarget) ToOAPI() *oapi.ReleaseTarget { + return &oapi.ReleaseTarget{ + DeploymentId: rt.DeploymentID.String(), + EnvironmentId: rt.EnvironmentID.String(), + ResourceId: rt.ResourceID.String(), + } +} diff --git a/apps/workspace-engine/svc/controllers/forcedeploy/setters.go b/apps/workspace-engine/svc/controllers/forcedeploy/setters.go new file mode 100644 index 000000000..48198b958 --- /dev/null +++ b/apps/workspace-engine/svc/controllers/forcedeploy/setters.go @@ -0,0 +1,12 @@ +package forcedeploy + +import ( + "context" + + "workspace-engine/pkg/oapi" +) + +type Setter interface { + CreateJob(ctx context.Context, job *oapi.Job, release *oapi.Release) error + EnqueueJobDispatch(ctx context.Context, workspaceID string, jobID string) error +} diff --git a/apps/workspace-engine/svc/controllers/forcedeploy/setters_postgres.go b/apps/workspace-engine/svc/controllers/forcedeploy/setters_postgres.go new file mode 100644 index 000000000..cc51ee872 --- /dev/null +++ b/apps/workspace-engine/svc/controllers/forcedeploy/setters_postgres.go @@ -0,0 +1,109 @@ +package forcedeploy + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/google/uuid" + "github.com/jackc/pgx/v5/pgtype" + "workspace-engine/pkg/db" + "workspace-engine/pkg/oapi" + "workspace-engine/pkg/reconcile" +) + +var _ Setter = (*PostgresSetter)(nil) + +type PostgresSetter struct { + Queue reconcile.Queue +} + +func (s *PostgresSetter) CreateJob( + ctx context.Context, + job *oapi.Job, + release *oapi.Release, +) error { + jobID, err := uuid.Parse(job.Id) + if err != nil { + return fmt.Errorf("parse job id: %w", err) + } + + var jobAgentIDParam pgtype.UUID + if job.JobAgentId != "" { + parsed, err := uuid.Parse(job.JobAgentId) + if err != nil { + return fmt.Errorf("parse job agent id: %w", err) + } + jobAgentIDParam = pgtype.UUID{Bytes: parsed, Valid: true} + } + + jobAgentConfig, err := json.Marshal(job.JobAgentConfig) + if err != nil { + return fmt.Errorf("marshal job agent config: %w", err) + } + + dispatchContext, err := json.Marshal(job.DispatchContext) + if err != nil { + return fmt.Errorf("marshal dispatch context: %w", err) + } + + tx, err := db.GetPool(ctx).Begin(ctx) + if err != nil { + return fmt.Errorf("begin transaction: %w", err) + } + defer func() { _ = tx.Rollback(ctx) }() + + q := db.GetQueries(ctx).WithTx(tx) + + var completedAt pgtype.Timestamptz + if job.CompletedAt != nil { + completedAt = pgtype.Timestamptz{Time: *job.CompletedAt, Valid: true} + } + + if err := q.InsertJob(ctx, db.InsertJobParams{ + ID: jobID, + JobAgentID: jobAgentIDParam, + JobAgentConfig: jobAgentConfig, + Status: db.ToDBJobStatus(job.Status), + Message: toPgText(job.Message), + CreatedAt: pgtype.Timestamptz{Time: job.CreatedAt, Valid: !job.CreatedAt.IsZero()}, + UpdatedAt: pgtype.Timestamptz{Time: job.UpdatedAt, Valid: !job.UpdatedAt.IsZero()}, + CompletedAt: completedAt, + DispatchContext: dispatchContext, + }); err != nil { + return fmt.Errorf("insert job: %w", err) + } + + if err := q.InsertReleaseJob(ctx, db.InsertReleaseJobParams{ + ReleaseID: release.Id, + JobID: jobID, + }); err != nil { + return fmt.Errorf("insert release job: %w", err) + } + + if err := tx.Commit(ctx); err != nil { + return fmt.Errorf("commit transaction: %w", err) + } + + return nil +} + +func (s *PostgresSetter) EnqueueJobDispatch( + ctx context.Context, + workspaceID string, + jobID string, +) error { + return s.Queue.Enqueue(ctx, reconcile.EnqueueParams{ + WorkspaceID: workspaceID, + Kind: "job-dispatch", + ScopeType: "job", + ScopeID: jobID, + }) +} + +func toPgText(s *string) pgtype.Text { + if s == nil { + return pgtype.Text{} + } + return pgtype.Text{String: *s, Valid: true} +} diff --git a/packages/db/src/reconcilers/force-deploy.ts b/packages/db/src/reconcilers/force-deploy.ts new file mode 100644 index 000000000..de1ef5252 --- /dev/null +++ b/packages/db/src/reconcilers/force-deploy.ts @@ -0,0 +1,22 @@ +import type { Tx } from "../common.js"; +import type { ReconcileWorkScope } from "../schema/reconcile.js"; +import { enqueue } from "./enqueue.js"; + +const FORCE_DEPLOY_KIND = "force-deploy"; + +export async function enqueueForceDeploy( + db: Tx, + params: { + workspaceId: string; + deploymentId: string; + environmentId: string; + resourceId: string; + }, +): Promise { + return enqueue(db, { + workspaceId: params.workspaceId, + kind: FORCE_DEPLOY_KIND, + scopeType: "release-target", + scopeId: `${params.deploymentId}:${params.environmentId}:${params.resourceId}`, + }); +} diff --git a/packages/db/src/reconcilers/index.ts b/packages/db/src/reconcilers/index.ts index fc8e79828..6fe919688 100644 --- a/packages/db/src/reconcilers/index.ts +++ b/packages/db/src/reconcilers/index.ts @@ -1,4 +1,5 @@ export * from "./enqueue.js"; +export * from "./force-deploy.js"; export * from "./deployment-plan.js"; export * from "./deployment-selector-eval.js"; export * from "./environment-selector-eval.js"; diff --git a/packages/trpc/src/routes/redeploy.ts b/packages/trpc/src/routes/redeploy.ts index dbc08d712..41b7d456a 100644 --- a/packages/trpc/src/routes/redeploy.ts +++ b/packages/trpc/src/routes/redeploy.ts @@ -1,13 +1,7 @@ import type { Tx } from "@ctrlplane/db"; -import { v4 as uuidv4 } from "uuid"; import { z } from "zod"; -import { and, desc, eq, takeFirstOrNull } from "@ctrlplane/db"; -import { - enqueueDesiredRelease, - enqueueJobDispatch, -} from "@ctrlplane/db/reconcilers"; -import * as schema from "@ctrlplane/db/schema"; +import { enqueueForceDeploy } from "@ctrlplane/db/reconcilers"; import { protectedProcedure, router } from "../trpc.js"; @@ -17,81 +11,17 @@ type ReleaseTarget = { resourceId: string; }; -const getLatestJobForTarget = (db: Tx, releaseTarget: ReleaseTarget) => - db - .select() - .from(schema.job) - .innerJoin(schema.releaseJob, eq(schema.job.id, schema.releaseJob.jobId)) - .innerJoin( - schema.release, - eq(schema.releaseJob.releaseId, schema.release.id), - ) - .where( - and( - eq(schema.release.deploymentId, releaseTarget.deploymentId), - eq(schema.release.environmentId, releaseTarget.environmentId), - eq(schema.release.resourceId, releaseTarget.resourceId), - ), - ) - .orderBy(desc(schema.job.createdAt)) - .limit(1) - .then(takeFirstOrNull) - .then((result) => - result - ? { job: result.job, releaseId: result.release_job.releaseId } - : null, - ); - -const getJobMetadata = (db: Tx, jobId: string) => - db - .select() - .from(schema.jobMetadata) - .where(eq(schema.jobMetadata.jobId, jobId)) - .then((rows) => - Object.fromEntries(rows.map((row) => [row.key, row.value])), - ); - -const redeployReleaseTarget = async ( +const redeployReleaseTarget = ( db: Tx, workspaceId: string, releaseTarget: ReleaseTarget, -) => { - const result = await getLatestJobForTarget(db, releaseTarget); - if (result == null) - return enqueueDesiredRelease(db, { - workspaceId, - deploymentId: releaseTarget.deploymentId, - environmentId: releaseTarget.environmentId, - resourceId: releaseTarget.resourceId, - }); - - const { job, releaseId } = result; - const metadata = await getJobMetadata(db, job.id); - - const newJob = { - id: uuidv4(), - status: "pending" as const, - createdAt: new Date(), - updatedAt: new Date(), - jobAgentId: job.jobAgentId, - jobAgentConfig: job.jobAgentConfig, - dispatchContext: job.dispatchContext, - reason: "redeploy" as const, - metadata, - }; - - await db.transaction(async (tx) => { - await tx.insert(schema.job).values(newJob); - await tx.insert(schema.releaseJob).values({ - releaseId, - jobId: newJob.id, - }); - await enqueueJobDispatch(tx, { - workspaceId, - jobId: newJob.id, - }); +) => + enqueueForceDeploy(db, { + workspaceId, + deploymentId: releaseTarget.deploymentId, + environmentId: releaseTarget.environmentId, + resourceId: releaseTarget.resourceId, }); -}; export const redeployRouter = router({ releaseTarget: protectedProcedure From a38f3ebda186ce0b5bcd51dbbaec0b1ca6fffdb2 Mon Sep 17 00:00:00 2001 From: Aditya Choudhari Date: Tue, 14 Apr 2026 14:14:46 -0700 Subject: [PATCH 2/3] fmt --- .../svc/controllers/forcedeploy/reconcile.go | 10 +++++++++- .../svc/controllers/forcedeploy/reconcile_test.go | 10 ++++++++-- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/apps/workspace-engine/svc/controllers/forcedeploy/reconcile.go b/apps/workspace-engine/svc/controllers/forcedeploy/reconcile.go index 505ee6b2a..c88d197bd 100644 --- a/apps/workspace-engine/svc/controllers/forcedeploy/reconcile.go +++ b/apps/workspace-engine/svc/controllers/forcedeploy/reconcile.go @@ -58,7 +58,15 @@ func Reconcile( return &ReconcileResult{RequeueAfter: requeueDelay}, nil } - if err := buildAndDispatchJob(ctx, span, workspaceUUID, getter, setter, rt, release); err != nil { + if err := buildAndDispatchJob( + ctx, + span, + workspaceUUID, + getter, + setter, + rt, + release, + ); err != nil { return nil, recordErr(span, "build and dispatch job", err) } diff --git a/apps/workspace-engine/svc/controllers/forcedeploy/reconcile_test.go b/apps/workspace-engine/svc/controllers/forcedeploy/reconcile_test.go index 3169542ef..193c79b04 100644 --- a/apps/workspace-engine/svc/controllers/forcedeploy/reconcile_test.go +++ b/apps/workspace-engine/svc/controllers/forcedeploy/reconcile_test.go @@ -46,7 +46,10 @@ func (m *mockGetter) GetDesiredRelease(_ context.Context, _ *ReleaseTarget) (*oa return m.release, m.releaseErr } -func (m *mockGetter) GetActiveJobsForReleaseTarget(_ context.Context, _ *oapi.ReleaseTarget) ([]*oapi.Job, error) { +func (m *mockGetter) GetActiveJobsForReleaseTarget( + _ context.Context, + _ *oapi.ReleaseTarget, +) ([]*oapi.Job, error) { return m.activeJobs, m.activeJobsErr } @@ -62,7 +65,10 @@ func (m *mockGetter) GetResource(_ context.Context, _ uuid.UUID) (*oapi.Resource return m.resource, m.resourceErr } -func (m *mockGetter) ListJobAgentsByWorkspaceID(_ context.Context, _ uuid.UUID) ([]oapi.JobAgent, error) { +func (m *mockGetter) ListJobAgentsByWorkspaceID( + _ context.Context, + _ uuid.UUID, +) ([]oapi.JobAgent, error) { return m.workspaceAgents, nil } From 01bdc3db5a9f9b4f8ba51837e020027f4162dc0c Mon Sep 17 00:00:00 2001 From: Aditya Choudhari Date: Tue, 14 Apr 2026 15:52:56 -0700 Subject: [PATCH 3/3] cleanup --- .../svc/controllers/forcedeploy/controller.go | 3 +- .../svc/controllers/forcedeploy/reconcile.go | 13 +++--- .../controllers/forcedeploy/reconcile_test.go | 17 ++++---- .../svc/controllers/forcedeploy/setters.go | 8 +++- .../forcedeploy/setters_postgres.go | 42 +++++++++++-------- 5 files changed, 45 insertions(+), 38 deletions(-) diff --git a/apps/workspace-engine/svc/controllers/forcedeploy/controller.go b/apps/workspace-engine/svc/controllers/forcedeploy/controller.go index 85fe08ab8..db0db6f80 100644 --- a/apps/workspace-engine/svc/controllers/forcedeploy/controller.go +++ b/apps/workspace-engine/svc/controllers/forcedeploy/controller.go @@ -89,10 +89,9 @@ func New(workerID string, pgxPool *pgxpool.Pool) svc.Service { } queue := postgres.NewForKinds(pgxPool, kind) - enqueueQueue := postgres.New(pgxPool) controller := &Controller{ getter: &PostgresGetter{}, - setter: &PostgresSetter{Queue: enqueueQueue}, + setter: &PostgresSetter{}, } worker, err := reconcile.NewWorker(kind, queue, controller, nodeConfig) diff --git a/apps/workspace-engine/svc/controllers/forcedeploy/reconcile.go b/apps/workspace-engine/svc/controllers/forcedeploy/reconcile.go index c88d197bd..ae2ee6901 100644 --- a/apps/workspace-engine/svc/controllers/forcedeploy/reconcile.go +++ b/apps/workspace-engine/svc/controllers/forcedeploy/reconcile.go @@ -138,12 +138,13 @@ func buildAndDispatchJob( return fmt.Errorf("create job for agent %s: %w", agent.Name, err) } - if err := setter.CreateJob(ctx, job, release); err != nil { - return fmt.Errorf("persist job: %w", err) - } - - if err := setter.EnqueueJobDispatch(ctx, workspaceID.String(), job.Id); err != nil { - return fmt.Errorf("enqueue job dispatch: %w", err) + if err := setter.CreateJobAndEnqueueDispatch( + ctx, + job, + release, + workspaceID.String(), + ); err != nil { + return fmt.Errorf("create and enqueue job: %w", err) } } diff --git a/apps/workspace-engine/svc/controllers/forcedeploy/reconcile_test.go b/apps/workspace-engine/svc/controllers/forcedeploy/reconcile_test.go index 193c79b04..815566b93 100644 --- a/apps/workspace-engine/svc/controllers/forcedeploy/reconcile_test.go +++ b/apps/workspace-engine/svc/controllers/forcedeploy/reconcile_test.go @@ -88,22 +88,19 @@ type mockSetter struct { createJobErr error enqueueCalls []enqueueCall - enqueueErr error } -func (m *mockSetter) CreateJob(_ context.Context, job *oapi.Job, _ *oapi.Release) error { +func (m *mockSetter) CreateJobAndEnqueueDispatch( + _ context.Context, + job *oapi.Job, + _ *oapi.Release, + workspaceID string, +) error { if m.createJobErr != nil { return m.createJobErr } m.createdJobs = append(m.createdJobs, job) - return nil -} - -func (m *mockSetter) EnqueueJobDispatch(_ context.Context, workspaceID string, jobID string) error { - if m.enqueueErr != nil { - return m.enqueueErr - } - m.enqueueCalls = append(m.enqueueCalls, enqueueCall{WorkspaceID: workspaceID, JobID: jobID}) + m.enqueueCalls = append(m.enqueueCalls, enqueueCall{WorkspaceID: workspaceID, JobID: job.Id}) return nil } diff --git a/apps/workspace-engine/svc/controllers/forcedeploy/setters.go b/apps/workspace-engine/svc/controllers/forcedeploy/setters.go index 48198b958..ef801d702 100644 --- a/apps/workspace-engine/svc/controllers/forcedeploy/setters.go +++ b/apps/workspace-engine/svc/controllers/forcedeploy/setters.go @@ -7,6 +7,10 @@ import ( ) type Setter interface { - CreateJob(ctx context.Context, job *oapi.Job, release *oapi.Release) error - EnqueueJobDispatch(ctx context.Context, workspaceID string, jobID string) error + CreateJobAndEnqueueDispatch( + ctx context.Context, + job *oapi.Job, + release *oapi.Release, + workspaceID string, + ) error } diff --git a/apps/workspace-engine/svc/controllers/forcedeploy/setters_postgres.go b/apps/workspace-engine/svc/controllers/forcedeploy/setters_postgres.go index cc51ee872..6222fc59a 100644 --- a/apps/workspace-engine/svc/controllers/forcedeploy/setters_postgres.go +++ b/apps/workspace-engine/svc/controllers/forcedeploy/setters_postgres.go @@ -4,30 +4,35 @@ import ( "context" "encoding/json" "fmt" + "time" "github.com/google/uuid" "github.com/jackc/pgx/v5/pgtype" "workspace-engine/pkg/db" "workspace-engine/pkg/oapi" - "workspace-engine/pkg/reconcile" + reconciledb "workspace-engine/pkg/reconcile/postgres/db" ) var _ Setter = (*PostgresSetter)(nil) -type PostgresSetter struct { - Queue reconcile.Queue -} +type PostgresSetter struct{} -func (s *PostgresSetter) CreateJob( +func (s *PostgresSetter) CreateJobAndEnqueueDispatch( ctx context.Context, job *oapi.Job, release *oapi.Release, + workspaceID string, ) error { jobID, err := uuid.Parse(job.Id) if err != nil { return fmt.Errorf("parse job id: %w", err) } + wsID, err := uuid.Parse(workspaceID) + if err != nil { + return fmt.Errorf("parse workspace id: %w", err) + } + var jobAgentIDParam pgtype.UUID if job.JobAgentId != "" { parsed, err := uuid.Parse(job.JobAgentId) @@ -81,6 +86,20 @@ func (s *PostgresSetter) CreateJob( return fmt.Errorf("insert release job: %w", err) } + now := time.Now() + rq := reconciledb.New(tx) + if err := rq.UpsertReconcileWorkItem(ctx, reconciledb.UpsertReconcileWorkItemParams{ + WorkspaceID: wsID, + Kind: "job-dispatch", + ScopeType: "job", + ScopeID: job.Id, + EventTs: pgtype.Timestamptz{Time: now, Valid: true}, + Priority: 100, + NotBefore: pgtype.Timestamptz{Time: now.Add(-1 * time.Second), Valid: true}, + }); err != nil { + return fmt.Errorf("enqueue job dispatch: %w", err) + } + if err := tx.Commit(ctx); err != nil { return fmt.Errorf("commit transaction: %w", err) } @@ -88,19 +107,6 @@ func (s *PostgresSetter) CreateJob( return nil } -func (s *PostgresSetter) EnqueueJobDispatch( - ctx context.Context, - workspaceID string, - jobID string, -) error { - return s.Queue.Enqueue(ctx, reconcile.EnqueueParams{ - WorkspaceID: workspaceID, - Kind: "job-dispatch", - ScopeType: "job", - ScopeID: jobID, - }) -} - func toPgText(s *string) pgtype.Text { if s == nil { return pgtype.Text{}